Continuous Data Quality Monitoring for AWS Athena
The past few years have seen data teams invest heavily in tooling and infrastructure for ensuring the reliability of business-critical data. Continuous Data Quality Monitoring is a key component of this infrastructure - a set of tools and processes to monitor the quality of rapidly evolving datasets, so that issues can be caught and remediated before they impact data consumers.
While the infrastructure itself can take many different flavours, teams tend to follow the same steps to establish a data quality workflow:
Defining data quality metrics: This involves identifying the data quality metrics that are important for your organization and defining them in a measurable and auditable way. This can include metrics such as completeness, accuracy, consistency, and timeliness. As the lingua franca of databases, SQL is often the best choice of a definition language, however other notable alternatives exist such as Amazon’s DQDL, discussed below.
Evaluating metrics: Once the metrics have been defined, the next step is to schedule the metrics to be evaluated at regular intervals. This can be done using tools such as Apache Airflow or AWS EventBridge. It is important at this stage to configure reasonable evaluation windows and account for any systemic delays in data.
Storing Measurements: The measured data quality metrics must then be stored in a data store that’s appropriate for the scale and cardinality of the data, as well as your query patterns. This data can be stored in raw form or aggregated at different levels of granularity. A time-series database may be a prudent choice, but an RDBMS like Postgres or an OLAP store like Clickhouse may also meet your needs.
Visualizing and alerting: Finally, the data quality metrics can be visualized using tools such as Grafana, Tableau, or AWS QuickSight. Alerts can also be set up to notify stakeholders when certain thresholds are exceeded. This allows for proactive data quality management and helps to ensure that data is accurate, complete, and consistent over time.
Implementation of the above process can vary depending on the data stores and query engine you use, as well as your requirements for auditing and historical analysis. In this post we’ll dive into how to implement a Continuous Data Quality process for AWS Athena.
A Typical Architecture
AWS Athena is a serverless query engine and analytics service that supports a variety of table and file formats, including files stored in S3.
A typical Athena workflow might look like this:
Write data to an S3 path
Define a Glue crawler to crawl the S3 path and register partitions and tables
Query the tables with Athena, storing results in S3
Process the query results from S3, and serve them out to whomever is interested
The above Athena workflow can be extended to emit data quality metrics at various stages of the pipeline. But at scale, it is important to ensure the right measurements and context are available to the right team, and that any noise resulting from low-impact fluctuations in these measurements is filtered out. easurements is filtered out.
Using AWS Glue Data Quality Rulesets
The AWS-native way of capturing Data Quality metrics for such a workflow is to use Data Quality Rulesets in AWS Glue.
It works by selecting a few common data quality rules, such as Completeness, DataFreshness, and DistinctValueCount, and enabling those rules for a Column in your schema.
Once you have figured out a collection of rules for your schema, you can evaluate the ruleset and get PASS/FAIL results for each rule.
These PASS/FAIL results can also be stored in AWS Cloudwatch for future reference. They live under the metric names:
glue.data.quality.rules.passed
glue.data.quality.rules.failed
broken down by CatalogId
, Database
, Ruleset
, and Table
.
The nature of DQDL requires that rules evaluate to a binary output. For example:
DistinctValuesCount "SomeColumn" > 3
While this is great for maintaining a checklist of constraints your dataset should pass, it has a few limitations:
DQDL thresholds must be hardcoded. They cannot, for example, be compared against a dynamic moving average of that number over the last week.
The raw result of a measurement e.g DistinctValuesCount is unavailable to you. This may be necessary for you to quantify the scale and severity of a data quality problem. Distinct values declining by 5% may require a different resolution workflow, than distinct values declining by 100%
DQDL CustomSQL rules must emit 1 numeric result alone, which will be compared against the threshold expression. In cases where you want a `GROUP BY`-style measurement, you will need to pre-define the groups and define separate CustomSQL rules for each.
AWS Glue’s Data Quality rulesets are a great start to measuring data quality, and might be sufficient for many use cases. But in light of the above limitations, let’s explore further what a more sophisticated data quality solution might look like.
Running Data Quality Monitoring Queries on Athena
The equivalent of a custom SQL DQDL check may be run directly on Athena, without any of the constraints imposed by the Glue Data Quality framework. This is a good place to collect grouped measurements.
For example, a query like the below would count customer transactions on a per-country basis, evaluated only over the last 24 hours of transactions.
SELECT country, COUNT(*) AS transaction_count
FROM customer_transactions
WHERE transaction_timestamp >= DATE_TRUNC('hour', CURRENT_TIMESTAMP AT TIME ZONE 'UTC' - INTERVAL '24' HOUR) AT TIME ZONE 'UTC'
AND transaction_timestamp < DATE_TRUNC('hour', CURRENT_TIMESTAMP AT TIME ZONE 'UTC') AT TIME ZONE 'UTC'
GROUP BY country;
By scheduling these query executions on a cadence, you can account for new data coming in, and avoid costly table scans covering your entire data, due to the timestamp filter in the WHERE clause.
This is just a regular Athena query, subject to the same pricing as the rest of your Athena workload. Hence a bit of trial and error might be required to arrive at a time interval and grouping that gives you the granularity you need without breaking the bank. You may also want to schedule these queries to run at “quiet” times so that they don’t interfere with your core workloads
Validating Athena Query Results data using Python and Great Expectations
Athena query results stored in S3, may be pulled into an automated Great Expectations job, and a number of “expectations” may be evaluated against this data. Great Expectations gives you an expressive and flexible DSL for declaring “expectations” on your dataset. A number of pre-built expectations are available in the Great Expectations library, but you can also express specific business logic in your expectations. To do so, you would need to create a Python function that defines the expectation logic, and then register that function as a custom expectation in an Expectation Suite.
For example, the above Athena query would return a dataframe of transaction_counts by country. If we wanted to validate the kurtosis of the transaction_count distribution by country, perhaps as a precursor to feeding this data to an ML training job, we might write a custom expectation like so
import pandas as pd
import numpy as np
import scipy.stats as stats
import great_expectations as ge
from great_expectations.core import ExpectationConfiguration
def expect_kurtosis_to_be_within_range(column, country_column, min_kurtosis, max_kurtosis, **kwargs):
# Group the transaction counts by country
groupby_obj = column.groupby(country_column)
kurtosis_values = []
for _, group in groupby_obj:
# Calculate the kurtosis of the transaction count distribution for each country
kurtosis = stats.kurtosis(group)
kurtosis_values.append(kurtosis)
# Check if the kurtosis values are within the given range
is_within_range = np.all(np.logical_and(np.array(kurtosis_values) >= min_kurtosis, np.array(kurtosis_values) <= max_kurtosis))
if is_within_range:
return {"success": True}
else:
return {
"success": False,
"result": {
"observed_value": kurtosis_values,
"element_count": len(kurtosis_values),
"missing_count": column.isnull().sum()
}
}
# Create an ExpectationConfiguration object to define the expectation
expectation_config = ExpectationConfiguration(
expectation_type="expect_kurtosis_to_be_within_range",
kwargs={
"country_column": "country",
"min_kurtosis": -1,
"max_kurtosis": 1
},
meta={}
)
# Create a new ExpectationSuite with the expectation
suite = ge.dataset.Suite(expectations=[expectation_config])
# Load the dataset from the previous step into a Pandas DataFrame
df = pd.read_csv("path/to/dataset.csv")
# Create a Great Expectations Dataset from the DataFrame and validate against the ExpectationSuite
dataset = ge.dataset.PandasDataset(df)
result = dataset.validate(expectations_suite=suite)
# Print the validation result
print(result)
The results of these validations could be used in various ways. They could trigger a human-in-the-loop workflow to investigate the data that failed validation. They could also trigger a circuit-breaker to prevent subsequent phases of the data pipeline from operating on what is considered “invalid” data. The ability to express business logic in this way can help engineering teams be more efficient in managing their data pipelines and reduce the incidence of bad data making it to consumers.
Summary
While AWS Glue Data Quality rulesets provide a handy, AWS-native workflow for inspecting data quality on Athena databases, there are limits to the nature of issues it can discover. For that reason, it’s advisable to inject business logic into your measurements, and maintain this logic in a way that ensures measurements and alerts stay relevant to your team.
Running data quality monitoring queries on Athena allow you to express groupings and filtering that may not be possible using the DQDL syntax in AWS Glue. Validating Athena query results using Python and Great Expectations enables you to perform custom checks that are tailored to your the needs of your business.
Finally, by storing and visualizing the results of these checks and measurements, you can perform retroactive analysis and remediate ongoing data issues early, before the consumers of your data know about them.
Depending on your needs, you can also use a Data Quality Monitoring platform that allows you to define your metrics of interest and automates the remaining steps of evaluation, storage and visualization.
If you’re looking for a Continuous Data Quality monitoring platform for AWS Athena and more, check out Lariat Data!