diff --git a/macros/edr/dbt_artifacts/upload_source_freshness.sql b/macros/edr/dbt_artifacts/upload_source_freshness.sql index d29123f70..9acc3fdbe 100644 --- a/macros/edr/dbt_artifacts/upload_source_freshness.sql +++ b/macros/edr/dbt_artifacts/upload_source_freshness.sql @@ -8,8 +8,8 @@ {% if not sources_json_path.exists() %} {% do exceptions.raise_compiler_error('Source freshness artifact (sources.json) does not exist, please run `dbt source freshness`.') %} {% endif %} - {% set source_freshess_results_dicts = fromjson(sources_json_path.read_text())['results'] %} - {% do elementary.upload_artifacts_to_table(source_freshness_results_relation, source_freshess_results_dicts, elementary.flatten_source_freshness, append=True, should_commit=True) %} + {% set source_freshness_results_dicts = fromjson(sources_json_path.read_text())['results'] %} + {% do elementary.upload_artifacts_to_table(source_freshness_results_relation, source_freshness_results_dicts, elementary.flatten_source_freshness, append=True, should_commit=True) %} {% endmacro %} {% macro flatten_source_freshness(node_dict) %} @@ -22,8 +22,10 @@ {% do execute_timing.update(timing) %} {% endif %} {% endfor %} + {% set metadata_dict = elementary.safe_get_with_default(node_dict, 'metadata', {}) %} + {% set source_freshness_invocation_id = metadata_dict.get('invocation_id', invocation_id) %} {% set flatten_source_freshness_dict = { - 'source_freshness_execution_id': [invocation_id, node_dict.get('unique_id')] | join('.'), + 'source_freshness_execution_id': [source_freshness_invocation_id, node_dict.get('unique_id')] | join('.'), 'unique_id': node_dict.get('unique_id'), 'max_loaded_at': node_dict.get('max_loaded_at'), 'snapshotted_at': node_dict.get('snapshotted_at'), @@ -31,7 +33,7 @@ 'status': node_dict.get('status'), 'error': node_dict.get('error'), 'generated_at': elementary.datetime_now_utc_as_string(), - 'invocation_id': invocation_id, + 'invocation_id': source_freshness_invocation_id, 'compile_started_at': compile_timing.get('started_at'), 'compile_completed_at': compile_timing.get('completed_at'), 'execute_started_at': execute_timing.get('started_at'),