Validation
There is a lot of ways to validate the ETL process and results. For example, dbt
is a great way to validate the pipeline.
However, you can utilize Airflow Operators such as BigQueryValueCheckOperator
or BigQueryCheckOperator
to add a simple validation task to your DAG.
BigQueryValueCheckOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryValueCheckOperator
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql='validation_query.sql',
pass_value=0,
gcp_conn_id='google_cloud_conn_id',
use_legacy_sql=False,
)
pass_value
.
For example, if you want to validate that none of the rows in target table has duration_minutes
value less than 100,
you could write following query and set pass_value
to 0.
BigQueryCheckOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryCheckOperator
check_value = BigQueryCheckOperator(
task_id="check_value",
sql='validation_query.sql',
gcp_conn_id='google_cloud_conn_id',
use_legacy_sql=False,
)
true
.
For example, if you want to validate that at least one row in the target table have duration_minutes
bigger than 1000,
you can write the following query.
Full Code
Following DAG executes ETL process with 2 validation tasks.
# tutorial_dag_with_validation.py
from __future__ import annotations
from datetime import datetime
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.providers.google.cloud.operators.bigquery import \
BigQueryValueCheckOperator, BigQueryCheckOperator
project_id = "starlit-sum-372013"
destination_table_id = "test_dataset.long_trips"
with models.DAG(
dag_id="test_etl_dag_with_validation",
schedule="@once",
start_date=datetime(2022, 12, 1),
catchup=False,
tags=["example", "bigquery"],
) as dag:
run_etl = BigQueryOperator(
task_id="run_etl",
sql="query.sql",
destination_dataset_table=destination_table_id,
write_disposition="WRITE_TRUNCATE",
gcp_conn_id="google_cloud_conn_id",
use_legacy_sql=False,
)
validate_1 = BigQueryValueCheckOperator(
task_id="validate_1",
sql="bigquery_value_check.sql",
pass_value=0,
gcp_conn_id="google_cloud_conn_id",
use_legacy_sql=False,
)
validate_2 = BigQueryCheckOperator(
task_id="validate_2",
sql="bigquery_check.sql",
gcp_conn_id="google_cloud_conn_id",
use_legacy_sql=False,
)
run_etl >> validate_1 >> validate_2
-- bigquery_value_check.sql
SELECT count(*)
FROM test_dataset.long_trips
WHERE duration_minutes < 100;
For more information, please refer to the GitHub Repository.