From 0be5d4f4a2478870ed83383ae3f160196e1c8db6 Mon Sep 17 00:00:00 2001 From: jordanwillifordcruise <150287499+jordanwillifordcruise@users.noreply.github.com> Date: Mon, 26 Feb 2024 21:20:04 +0000 Subject: [PATCH] add support for inserts via GCP cloud function and pub/sub --- .../tests/on_run_end/handle_tests_results.sql | 101 +++++++++++------ macros/utils/table_operations/insert_rows.sql | 102 +++++++++++++++++- 2 files changed, 168 insertions(+), 35 deletions(-) diff --git a/macros/edr/tests/on_run_end/handle_tests_results.sql b/macros/edr/tests/on_run_end/handle_tests_results.sql index 9ab1cfa05..99306b392 100644 --- a/macros/edr/tests/on_run_end/handle_tests_results.sql +++ b/macros/edr/tests/on_run_end/handle_tests_results.sql @@ -71,40 +71,73 @@ {% endif %} {%- set temp_relation = elementary.make_temp_view_relation(target_relation) -%} - {% set insert_query %} - INSERT INTO {{ target_relation }} ( - id, - full_table_name, - column_name, - metric_name, - metric_value, - source_value, - bucket_start, - bucket_end, - bucket_duration_hours, - updated_at, - dimension, - dimension_value, - metric_properties, - created_at - ) - SELECT - id, - full_table_name, - column_name, - metric_name, - metric_value, - source_value, - bucket_start, - bucket_end, - bucket_duration_hours, - updated_at, - dimension, - dimension_value, - metric_properties, - {{ elementary.edr_current_timestamp() }} as created_at - FROM {{ temp_relation }} - {% endset %} + + {% if elementary.get_config_var("insert_rows_method") == "gcp-cloud-function" %} + {{ elementary.file_log("Sending to UDF") }} + {% set log_udf_name = "`" + elementary.get_config_var("insert_rows_udf") + "`" %} + {% set insert_query %} + SELECT {{ log_udf_name }} ( + '{{ elementary.get_config_var("insert_rows_topics")["data_monitoring_metrics"] }}', + TO_JSON_STRING( + STRUCT( + id, + COALESCE(full_table_name, '') AS full_table_name, + COALESCE(column_name, '') AS column_name, + COALESCE(metric_name, '') AS metric_name, + metric_value, + COALESCE(source_value, '') AS source_value, + bucket_start, + bucket_end, + bucket_duration_hours, + updated_at, + COALESCE(dimension, '') AS dimension, + COALESCE(dimension_value, '') AS dimension_value, + COALESCE(metric_properties, '') AS metric_properties, + {{ elementary.edr_current_timestamp() }} as created_at + ) + ), + '{}' + ) + FROM {{ temp_relation }} + ; + {% endset %} + + {% else %} + {% set insert_query %} + INSERT INTO {{ target_relation }} ( + id, + full_table_name, + column_name, + metric_name, + metric_value, + source_value, + bucket_start, + bucket_end, + bucket_duration_hours, + updated_at, + dimension, + dimension_value, + metric_properties, + created_at + ) + SELECT + id, + full_table_name, + column_name, + metric_name, + metric_value, + source_value, + bucket_start, + bucket_end, + bucket_duration_hours, + updated_at, + dimension, + dimension_value, + metric_properties, + {{ elementary.edr_current_timestamp() }} as created_at + FROM {{ temp_relation }} + {% endset %} + {% endif %} {{ elementary.file_log("Inserting metrics into {}.".format(target_relation)) }} {%- do elementary.run_query(dbt.create_table_as(True, temp_relation, test_tables_union_query)) %} diff --git a/macros/utils/table_operations/insert_rows.sql b/macros/utils/table_operations/insert_rows.sql index 1b4535809..25c978e21 100644 --- a/macros/utils/table_operations/insert_rows.sql +++ b/macros/utils/table_operations/insert_rows.sql @@ -17,7 +17,30 @@ {{ elementary.file_log('Inserting {} rows to table {}'.format(rows | length, table_relation)) }} {% set insert_rows_method = elementary.get_config_var('insert_rows_method') %} - {% if insert_rows_method == 'max_query_size' %} + + {# If using a GCP Cloud Function, verify we have defined a Cloud Function for it. #} + {% if insert_rows_method == "gcp-cloud-function" %} + {% set table_name = "{}".format(table_relation).split('.')[-1].replace('`', '') %} + {% if table_name not in elementary.get_config_var("insert_rows_topics") %} + {# Revert to another method. #} + {% set insert_rows_method = 'max_query_size' %} + {% endif %} + {% endif %} + + {% if insert_rows_method == "gcp-cloud-function" %} + {{ elementary.file_log("Sending to UDF") }} + + {% set topic_name = elementary.get_config_var("insert_rows_topics")[table_name] %} + {% set insert_rows_queries = elementary.get_cloud_function_query(topic_name, columns, rows) %} + + {% set queries_len = insert_rows_queries | length %} + {% for insert_query in insert_rows_queries %} + {% do elementary.file_log("[{}/{}] Sending INSERT to UDF.".format(loop.index, queries_len)) %} + {% do elementary.file_log("UDF CALL: {}".format(insert_query)) %} + {% do elementary.run_query(insert_query) %} + {% endfor %} + + {% elif insert_rows_method == 'max_query_size' %} {% set insert_rows_queries = elementary.get_insert_rows_queries(table_relation, columns, rows, on_query_exceed=on_query_exceed) %} {% set queries_len = insert_rows_queries | length %} {% for insert_query in insert_rows_queries %} @@ -107,6 +130,21 @@ {% do return(row_sql) %} {% endmacro %} +{% macro render_row_with_name_to_sql(row, columns) %} + {% set rendered_column_values = [] %} + {% for column in columns %} + {% if column.name.lower() == "created_at" %} + {% set column_value = elementary.edr_current_timestamp() %} + {% do rendered_column_values.append("{} AS created_at".format(column_value)) %} + {% else %} + {% set column_value = elementary.insensitive_get_dict_value(row, column.name) %} + {% do rendered_column_values.append("{} AS {}".format(elementary.render_value(column_value), column.name)) %} + {% endif %} + {% endfor %} + {% set row_sql = "{}".format(rendered_column_values | join(",")) %} + {% do return(row_sql) %} +{% endmacro %} + {% macro get_chunk_insert_query(table_relation, columns, rows) -%} {% set insert_rows_query %} insert into {{ table_relation }} @@ -124,6 +162,68 @@ {{ return(insert_rows_query) }} {%- endmacro %} +{% macro get_cloud_function_query(topic_name, columns, rows) -%} + {% set udf_name = "`" + elementary.get_config_var("insert_rows_udf") + "`" %} + + {% if not query_max_size %} + {% set query_max_size = elementary.get_config_var('query_max_size') %} + {% endif %} + + {% set insert_queries = [] %} + {% set base_insert_query %} + SELECT {{ udf_name }} ( + '{{ topic_name }}', + TO_JSON_STRING( + STRUCT( + {%- for column in columns -%} + {{- column.name -}} {{- "," if not loop.last else "" -}} + {%- endfor -%} + ) + ), + '{}' + ) + FROM ( + {% endset %} + + {% set current_query = namespace(data=base_insert_query) %} + {% for row in rows %} + {% set row_sql = "SELECT " + elementary.render_row_with_name_to_sql(row, columns) %} + {% set query_with_row = current_query.data + (" UNION ALL " if not loop.first else "") + row_sql %} + + {% if query_with_row | length > query_max_size %} + {% set new_insert_query = base_insert_query + row_sql %} + + {# Check if row is too large to fit into an insert query. #} + {% if new_insert_query | length > query_max_size %} + {% if on_query_exceed %} + {% do on_query_exceed(row) %} + {% set row_sql = "SELECT " + elementary.render_row_with_name_to_sql(row, columns) %} + {% set new_insert_query = base_insert_query + row_sql %} + {% endif %} + + {% if new_insert_query | length > query_max_size %} + {% do elementary.file_log("Oversized row for insert_rows: {}".format(query_with_row)) %} + {% do exceptions.raise_compiler_error("Row to be inserted exceeds var('query_max_size'). Consider increasing its value.") %} + {% endif %} + {% endif %} + + {% if current_query.data != base_insert_query %} + {% do insert_queries.append(current_query.data + ')') %} + {% endif %} + {% set current_query.data = new_insert_query %} + + {% else %} + {% set current_query.data = query_with_row %} + {% endif %} + + {% if loop.last %} + {% do insert_queries.append(current_query.data + ')') %} + {% endif %} + {% endfor %} + + {{ return(insert_queries) }} +{%- endmacro %} + {% macro escape_special_chars(string_value) %} {{ return(adapter.dispatch('escape_special_chars', 'elementary')(string_value)) }} {% endmacro %}