diff --git a/integration_tests/tests/test_long_strings.py b/integration_tests/tests/test_long_strings.py new file mode 100644 index 000000000..6094ac292 --- /dev/null +++ b/integration_tests/tests/test_long_strings.py @@ -0,0 +1,46 @@ +from dbt_project import DbtProject + +SAFE_QUERY_SIZE = 10000 + + +def generate_query(query_size: int) -> str: + query_start = "SELECT '" + query_end = "' as col" + query_mid = "A" * (query_size - len(query_start) - len(query_end)) + return query_start + query_mid + query_end + + +def read_run_result(dbt_project, test_id): + return dbt_project.read_table( + "dbt_run_results", + where=f"unique_id = 'model.elementary_tests.{test_id}'", + )[0] + + +def test_query_size_exceed(test_id: str, dbt_project: DbtProject): + dbt_project.dbt_runner.vars["disable_run_results"] = False + max_query_size = int( + dbt_project.dbt_runner.run_operation( + "elementary.get_config_var", macro_args={"var_name": "query_max_size"} + )[0] + ) + + query = generate_query(max_query_size) + with dbt_project.create_temp_model_for_existing_table( + test_id, raw_code=query + ) as model_path: + dbt_project.dbt_runner.run(select=str(model_path)) + result = read_run_result(dbt_project, test_id) + # Expect truncation. + assert len(result["compiled_code"]) < max_query_size + + +def test_query_size_safe(test_id: str, dbt_project: DbtProject): + dbt_project.dbt_runner.vars["disable_run_results"] = False + query = generate_query(SAFE_QUERY_SIZE) + with dbt_project.create_temp_model_for_existing_table( + test_id, raw_code=query + ) as model_path: + dbt_project.dbt_runner.run(select=str(model_path)) + result = read_run_result(dbt_project, test_id) + assert len(result["compiled_code"]) == SAFE_QUERY_SIZE diff --git a/macros/edr/dbt_artifacts/upload_artifacts_to_table.sql b/macros/edr/dbt_artifacts/upload_artifacts_to_table.sql index e1d5423bb..e172377d8 100644 --- a/macros/edr/dbt_artifacts/upload_artifacts_to_table.sql +++ b/macros/edr/dbt_artifacts/upload_artifacts_to_table.sql @@ -1,4 +1,4 @@ -{% macro upload_artifacts_to_table(table_relation, artifacts, flatten_artifact_callback, append=False, should_commit=False, metadata_hashes=None) %} +{% macro upload_artifacts_to_table(table_relation, artifacts, flatten_artifact_callback, append=False, should_commit=False, metadata_hashes=None, on_query_exceed=none) %} {% set flatten_artifact_dicts = [] %} {% do elementary.file_log("[{}] Flattening the artifacts.".format(table_relation.identifier)) %} {% for artifact in artifacts %} @@ -11,7 +11,7 @@ {% if append %} {# In append mode, just insert, and no need to be atomic #} - {% do elementary.insert_rows(table_relation, flatten_artifact_dicts, should_commit, elementary.get_config_var('dbt_artifacts_chunk_size')) %} + {% do elementary.insert_rows(table_relation, flatten_artifact_dicts, should_commit, elementary.get_config_var('dbt_artifacts_chunk_size'), on_query_exceed) %} {% else %} {% if metadata_hashes is not none and elementary.get_config_var("cache_artifacts") %} {% do elementary.file_log("[{}] Comparing the artifacts state.".format(table_relation.identifier)) %} diff --git a/macros/edr/dbt_artifacts/upload_run_results.sql b/macros/edr/dbt_artifacts/upload_run_results.sql index 43b7a5b29..5e3594d59 100644 --- a/macros/edr/dbt_artifacts/upload_run_results.sql +++ b/macros/edr/dbt_artifacts/upload_run_results.sql @@ -2,7 +2,7 @@ {% set relation = elementary.get_elementary_relation('dbt_run_results') %} {% if execute and relation %} {{ elementary.file_log("Uploading run results.") }} - {% do elementary.upload_artifacts_to_table(relation, results, elementary.flatten_run_result, append=True, should_commit=True) %} + {% do elementary.upload_artifacts_to_table(relation, results, elementary.flatten_run_result, append=True, should_commit=True, on_query_exceed=elementary.on_run_result_query_exceed) %} {{ elementary.file_log("Uploaded run results successfully.") }} {% endif %} {{ return ('') }} @@ -56,13 +56,13 @@ 'compile_started_at': none, 'compile_completed_at': none, 'full_refresh': flags.FULL_REFRESH, - 'compiled_code': elementary.get_compiled_model_code_text(node), + 'compiled_code': elementary.get_compiled_code(node, as_column_value=true), 'failures': run_result_dict.get('failures'), 'query_id': run_result_dict.get('adapter_response', {}).get('query_id'), 'thread_id': run_result_dict.get('thread_id'), 'materialization': config_dict.get('materialized'), 'adapter_response': run_result_dict.get('adapter_response', {}) - }%} + } %} {% set timings = elementary.safe_get_with_default(run_result_dict, 'timing', []) %} {% if timings %} @@ -77,4 +77,8 @@ {% endfor %} {% endif %} {{ return(flatten_run_result_dict) }} -{% endmacro %} \ No newline at end of file +{% endmacro %} + +{% macro on_run_result_query_exceed(flattened_node) %} + {% do flattened_node.update({"compiled_code": elementary.get_compiled_code_too_long_err_msg()}) %} +{% endmacro %} diff --git a/macros/edr/system/system_utils/get_compiled_model_code_text.sql b/macros/edr/system/system_utils/get_compiled_model_code_text.sql deleted file mode 100644 index 7625cea56..000000000 --- a/macros/edr/system/system_utils/get_compiled_model_code_text.sql +++ /dev/null @@ -1,22 +0,0 @@ -{% macro get_compiled_model_code_text(node) %} - {% set should_collect_model_sql = elementary.get_config_var('collect_model_sql') %} - {% if not should_collect_model_sql %} - {{ return(none) }} - {% endif %} - - {% set model_sql_max_size = elementary.get_config_var('model_sql_max_size') %} - {% set long_string_size = elementary.get_config_var('long_string_size') %} - {% set model_sql_size_limit = [model_sql_max_size, long_string_size] | min %} - {% set model_code = elementary.get_compiled_code(node) %} - - {# Seeds do not have compiled code. #} - {% if not model_code %} - {{ return(none) }} - {% endif %} - - {% if model_sql_size_limit < model_code | length %} - {{ return('Model code is too long - over ' ~ model_sql_size_limit ~ ' bytes') }} - {% else %} - {{ return(model_code) }} - {% endif %} -{% endmacro %} diff --git a/macros/edr/system/system_utils/get_config_var.sql b/macros/edr/system/system_utils/get_config_var.sql index 5b872f199..c07ae75bb 100644 --- a/macros/edr/system/system_utils/get_config_var.sql +++ b/macros/edr/system/system_utils/get_config_var.sql @@ -42,8 +42,7 @@ 'edr_monitors': elementary.get_default_monitors(), 'long_string_size': 65535, 'collect_model_sql': true, - 'model_sql_max_size': 10240, - 'query_max_size': 500000, + 'query_max_size': 1000000, 'insert_rows_method': 'max_query_size', 'upload_artifacts_method': 'diff', 'project_name': none, diff --git a/macros/utils/data_types/data_size.sql b/macros/utils/data_types/data_size.sql index 0cf807a63..03585e5d0 100644 --- a/macros/utils/data_types/data_size.sql +++ b/macros/utils/data_types/data_size.sql @@ -19,5 +19,5 @@ {% endmacro %} {% macro redshift__get_column_size() %} - {{ return(elementary.default__get_column_size()) }} + {{ return(65535) }} {% endmacro %} diff --git a/macros/utils/graph/get_compiled_code.sql b/macros/utils/graph/get_compiled_code.sql index 5998a2465..97a05fe78 100644 --- a/macros/utils/graph/get_compiled_code.sql +++ b/macros/utils/graph/get_compiled_code.sql @@ -1,5 +1,12 @@ -{% macro get_compiled_code(node) %} - {% do return(adapter.dispatch("get_compiled_code", "elementary")(node)) %} +{% macro get_compiled_code(node, as_column_value=false) %} + {% set compiled_code = adapter.dispatch("get_compiled_code", "elementary")(node) %} + + {% set max_column_size = elementary.get_column_size() %} + {% if as_column_value and max_column_size and compiled_code | length > max_column_size %} + {% do return(elementary.get_compiled_code_too_long_err_msg()) %} + {% endif %} + + {% do return(compiled_code) %} {% endmacro %} {% macro default__get_compiled_code(node) %} @@ -7,10 +14,14 @@ {% endmacro %} {% macro redshift__get_compiled_code(node) %} - {% set compilde_code = node.get('compiled_code') or node.get('compiled_sql') %} - {% if not compilde_code %} + {% set compiled_code = node.get('compiled_code') or node.get('compiled_sql') %} + {% if not compiled_code %} {% do return(none) %} {% else %} - {% do return(compilde_code.replace("%", "%%")) %} + {% do return(compiled_code.replace("%", "%%")) %} {% endif %} {% endmacro %} + +{% macro get_compiled_code_too_long_err_msg() %} + {% do return("Compiled code is too long.") %} +{% endmacro %} diff --git a/macros/utils/table_operations/insert_rows.sql b/macros/utils/table_operations/insert_rows.sql index a9c8939a1..c8ee20ee2 100644 --- a/macros/utils/table_operations/insert_rows.sql +++ b/macros/utils/table_operations/insert_rows.sql @@ -1,4 +1,4 @@ -{% macro insert_rows(table_relation, rows, should_commit=false, chunk_size=5000) %} +{% macro insert_rows(table_relation, rows, should_commit=false, chunk_size=5000, on_query_exceed=none) %} {% if not rows %} {{ return(none) }} {% endif %} @@ -18,7 +18,7 @@ {{ elementary.file_log('Inserting {} rows to table {}'.format(rows | length, table_relation)) }} {% set insert_rows_method = elementary.get_config_var('insert_rows_method') %} {% if insert_rows_method == 'max_query_size' %} - {% set insert_rows_queries = elementary.get_insert_rows_queries(table_relation, columns, rows) %} + {% set insert_rows_queries = elementary.get_insert_rows_queries(table_relation, columns, rows, on_query_exceed=on_query_exceed) %} {% set queries_len = insert_rows_queries | length %} {% for insert_query in insert_rows_queries %} {% do elementary.file_log("[{}/{}] Running insert query.".format(loop.index, queries_len)) %} @@ -39,39 +39,45 @@ {% endif %} {% endmacro %} -{% macro get_insert_rows_queries(table_relation, columns, rows, query_max_size=elementary.get_config_var('query_max_size')) -%} +{% macro get_insert_rows_queries(table_relation, columns, rows, query_max_size=none, on_query_exceed=none) -%} + {% if not query_max_size %} + {% set query_max_size = elementary.get_config_var('query_max_size') %} + {% endif %} + {% set insert_queries = [] %} - {% set insert_query %} + {% set base_insert_query %} insert into {{ table_relation }} ({%- for column in columns -%} {{- column.name -}} {{- "," if not loop.last else "" -}} {%- endfor -%}) values {% endset %} - {% set current_query = namespace(data=insert_query) %} + {% set current_query = namespace(data=base_insert_query) %} {% for row in rows %} - {% set rendered_column_values = [] %} - {% for column in columns %} - {% if column.name.lower() == "created_at" %} - {% set column_value = elementary.edr_current_timestamp() %} - {% do rendered_column_values.append(column_value) %} - {% else %} - {% set column_value = elementary.insensitive_get_dict_value(row, column.name) %} - {% do rendered_column_values.append(elementary.render_value(column_value)) %} - {% endif %} - {% endfor %} - {% set row_sql = "({})".format(rendered_column_values | join(",")) %} + {% set row_sql = elementary.render_row_to_sql(row, columns) %} {% set query_with_row = current_query.data + ("," if not loop.first else "") + row_sql %} {% if query_with_row | length > query_max_size %} - {% set new_insert_query = insert_query + row_sql %} + {% set new_insert_query = base_insert_query + row_sql %} + {# Check if row is too large to fit into an insert query. #} {% if new_insert_query | length > query_max_size %} - {% do elementary.file_log("Oversized row for insert_rows: {}".format(query_with_row)) %} - {% do exceptions.raise_compiler_error("Row to be inserted exceeds var('query_max_size'). Consider increasing its value.") %} + {% if on_query_exceed %} + {% do on_query_exceed(row) %} + {% set row_sql = elementary.render_row_to_sql(row, columns) %} + {% set new_insert_query = base_insert_query + row_sql %} + {% endif %} + + {% if new_insert_query | length > query_max_size %} + {% do elementary.file_log("Oversized row for insert_rows: {}".format(query_with_row)) %} + {% do exceptions.raise_compiler_error("Row to be inserted exceeds var('query_max_size'). Consider increasing its value.") %} + {% endif %} + + {% if current_query.data != base_insert_query %} + {% do insert_queries.append(current_query.data) %} + {% endif %} + {% set current_query.data = new_insert_query %} {% endif %} - {% do insert_queries.append(current_query.data) %} - {% set current_query.data = new_insert_query %} {% else %} {% set current_query.data = query_with_row %} {% endif %} @@ -83,6 +89,21 @@ {{ return(insert_queries) }} {%- endmacro %} +{% macro render_row_to_sql(row, columns) %} + {% set rendered_column_values = [] %} + {% for column in columns %} + {% if column.name.lower() == "created_at" %} + {% set column_value = elementary.edr_current_timestamp() %} + {% do rendered_column_values.append(column_value) %} + {% else %} + {% set column_value = elementary.insensitive_get_dict_value(row, column.name) %} + {% do rendered_column_values.append(elementary.render_value(column_value)) %} + {% endif %} + {% endfor %} + {% set row_sql = "({})".format(rendered_column_values | join(",")) %} + {% do return(row_sql) %} +{% endmacro %} + {% macro get_chunk_insert_query(table_relation, columns, rows) -%} {% set insert_rows_query %} insert into {{ table_relation }}