Skip to content

Commit

Permalink
Add extra_streaming_tables_have_latest.sql verification (#196)
Browse files Browse the repository at this point in the history
  • Loading branch information
medvedev1088 authored Nov 20, 2023
1 parent 5787c70 commit 74b276f
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
22 changes: 19 additions & 3 deletions airflow/dags/polygonetl_airflow/build_verify_streaming_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ def build_verify_streaming_dag(
destination_dataset_project_id,
chain='polygon',
notification_emails=None,
extra_streaming_tables=None,
start_date=datetime(2018, 7, 1),
schedule='*/10 * * * *',
max_lag_in_minutes=15):
Expand Down Expand Up @@ -51,16 +52,24 @@ def build_verify_streaming_dag(

dags_folder = os.environ.get('DAGS_FOLDER', '/home/airflow/gcs/dags')

def add_verify_tasks(task, dependencies=None):
def add_verify_tasks(task, dependencies=None, params=None):
# The queries in verify/sqls will fail when the condition is not met
# Have to use this trick since the Python 2 version of BigQueryCheckOperator doesn't support standard SQL
# and legacy SQL can't be used to query partitioned tables.
sql_path = os.path.join(dags_folder, 'resources/stages/verify_streaming/sqls/{task}.sql'.format(task=task))
sql = read_file(sql_path)

combined_params = environment.copy()
task_id = 'verify_{task}'.format(task=task)
if params:
combined_params.update(params)
serialized_params = '_'.join(params.values()).replace('.', '_')
task_id = task_id + '_' + serialized_params

verify_task = BigQueryInsertJobOperator(
task_id=f"verify_{task}",
task_id=task_id,
configuration={"query": {"query": sql, "useLegacySql": False}},
params=environment,
params=combined_params,
dag=dag,
)
if dependencies is not None and len(dependencies) > 0:
Expand All @@ -78,6 +87,13 @@ def add_verify_tasks(task, dependencies=None):

add_verify_tasks('transactions_count')

# Use this to verify the lag of a streaming job https://github.com/blockchain-etl/blockchain-etl-streaming by piping a Pub/Sub topic to a BigQuery Table
# https://cloud.google.com/blog/products/data-analytics/pub-sub-launches-direct-path-to-bigquery-for-streaming-analytics
if extra_streaming_tables is not None and len(extra_streaming_tables) > 0:
streaming_table_list = [table.strip() for table in extra_streaming_tables.split(',')]
for streaming_table in streaming_table_list:
add_verify_tasks('extra_streaming_tables_have_latest', params={'streaming_table': streaming_table})

return dag


Expand Down
5 changes: 5 additions & 0 deletions airflow/dags/polygonetl_airflow/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,11 @@ def read_parse_dag_vars(var_prefix, **kwargs):
def read_verify_streaming_dag_vars(var_prefix, **kwargs):
vars = {
'destination_dataset_project_id': read_var('destination_dataset_project_id', var_prefix, True, **kwargs),
# Comma separated list of fully qualified BigQuery table names e.g. myproject.streamers.blocks1,myproject.streamers.blocks2
# Each table must have a column data containing a JSON object with field timestamp containing unix seconds
# Used in combination with https://cloud.google.com/blog/products/data-analytics/pub-sub-launches-direct-path-to-bigquery-for-streaming-analytics
# to verify lag of streaming jobs that output to Pub/Sub
'extra_streaming_tables': read_var('extra_streaming_tables', var_prefix, False, **kwargs),
'notification_emails': read_var('notification_emails', None, False, **kwargs),
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
select if(
(
select timestamp_diff(
current_timestamp(),
(select max(timestamp_seconds(cast(json_extract(data, '$.timestamp') AS INTEGER)))
from `{{params.streaming_table}}`),
MINUTE)
) < {{params.max_lag_in_minutes}}, 1,
cast((select 'Streaming table {{params.streaming_table}} is lagging by more than {{params.max_lag_in_minutes}} minutes') as INT64))

0 comments on commit 74b276f

Please sign in to comment.