Skip to content

Commit

Permalink
Merge pull request #580 from elementary-data/ele-1870-correct-max-col…
Browse files Browse the repository at this point in the history
…umn-size

Ele 1870 correct max column size
  • Loading branch information
elongl authored Oct 8, 2023
2 parents 6b3b15e + e097878 commit 0922ac1
Show file tree
Hide file tree
Showing 8 changed files with 116 additions and 57 deletions.
46 changes: 46 additions & 0 deletions integration_tests/tests/test_long_strings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from dbt_project import DbtProject

SAFE_QUERY_SIZE = 10000


def generate_query(query_size: int) -> str:
query_start = "SELECT '"
query_end = "' as col"
query_mid = "A" * (query_size - len(query_start) - len(query_end))
return query_start + query_mid + query_end


def read_run_result(dbt_project, test_id):
return dbt_project.read_table(
"dbt_run_results",
where=f"unique_id = 'model.elementary_tests.{test_id}'",
)[0]


def test_query_size_exceed(test_id: str, dbt_project: DbtProject):
dbt_project.dbt_runner.vars["disable_run_results"] = False
max_query_size = int(
dbt_project.dbt_runner.run_operation(
"elementary.get_config_var", macro_args={"var_name": "query_max_size"}
)[0]
)

query = generate_query(max_query_size)
with dbt_project.create_temp_model_for_existing_table(
test_id, raw_code=query
) as model_path:
dbt_project.dbt_runner.run(select=str(model_path))
result = read_run_result(dbt_project, test_id)
# Expect truncation.
assert len(result["compiled_code"]) < max_query_size


def test_query_size_safe(test_id: str, dbt_project: DbtProject):
dbt_project.dbt_runner.vars["disable_run_results"] = False
query = generate_query(SAFE_QUERY_SIZE)
with dbt_project.create_temp_model_for_existing_table(
test_id, raw_code=query
) as model_path:
dbt_project.dbt_runner.run(select=str(model_path))
result = read_run_result(dbt_project, test_id)
assert len(result["compiled_code"]) == SAFE_QUERY_SIZE
4 changes: 2 additions & 2 deletions macros/edr/dbt_artifacts/upload_artifacts_to_table.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro upload_artifacts_to_table(table_relation, artifacts, flatten_artifact_callback, append=False, should_commit=False, metadata_hashes=None) %}
{% macro upload_artifacts_to_table(table_relation, artifacts, flatten_artifact_callback, append=False, should_commit=False, metadata_hashes=None, on_query_exceed=none) %}
{% set flatten_artifact_dicts = [] %}
{% do elementary.file_log("[{}] Flattening the artifacts.".format(table_relation.identifier)) %}
{% for artifact in artifacts %}
Expand All @@ -11,7 +11,7 @@

{% if append %}
{# In append mode, just insert, and no need to be atomic #}
{% do elementary.insert_rows(table_relation, flatten_artifact_dicts, should_commit, elementary.get_config_var('dbt_artifacts_chunk_size')) %}
{% do elementary.insert_rows(table_relation, flatten_artifact_dicts, should_commit, elementary.get_config_var('dbt_artifacts_chunk_size'), on_query_exceed) %}
{% else %}
{% if metadata_hashes is not none and elementary.get_config_var("cache_artifacts") %}
{% do elementary.file_log("[{}] Comparing the artifacts state.".format(table_relation.identifier)) %}
Expand Down
12 changes: 8 additions & 4 deletions macros/edr/dbt_artifacts/upload_run_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
{% set relation = elementary.get_elementary_relation('dbt_run_results') %}
{% if execute and relation %}
{{ elementary.file_log("Uploading run results.") }}
{% do elementary.upload_artifacts_to_table(relation, results, elementary.flatten_run_result, append=True, should_commit=True) %}
{% do elementary.upload_artifacts_to_table(relation, results, elementary.flatten_run_result, append=True, should_commit=True, on_query_exceed=elementary.on_run_result_query_exceed) %}
{{ elementary.file_log("Uploaded run results successfully.") }}
{% endif %}
{{ return ('') }}
Expand Down Expand Up @@ -56,13 +56,13 @@
'compile_started_at': none,
'compile_completed_at': none,
'full_refresh': flags.FULL_REFRESH,
'compiled_code': elementary.get_compiled_model_code_text(node),
'compiled_code': elementary.get_compiled_code(node, as_column_value=true),
'failures': run_result_dict.get('failures'),
'query_id': run_result_dict.get('adapter_response', {}).get('query_id'),
'thread_id': run_result_dict.get('thread_id'),
'materialization': config_dict.get('materialized'),
'adapter_response': run_result_dict.get('adapter_response', {})
}%}
} %}

{% set timings = elementary.safe_get_with_default(run_result_dict, 'timing', []) %}
{% if timings %}
Expand All @@ -77,4 +77,8 @@
{% endfor %}
{% endif %}
{{ return(flatten_run_result_dict) }}
{% endmacro %}
{% endmacro %}

{% macro on_run_result_query_exceed(flattened_node) %}
{% do flattened_node.update({"compiled_code": elementary.get_compiled_code_too_long_err_msg()}) %}
{% endmacro %}
22 changes: 0 additions & 22 deletions macros/edr/system/system_utils/get_compiled_model_code_text.sql

This file was deleted.

3 changes: 1 addition & 2 deletions macros/edr/system/system_utils/get_config_var.sql
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@
'edr_monitors': elementary.get_default_monitors(),
'long_string_size': 65535,
'collect_model_sql': true,
'model_sql_max_size': 10240,
'query_max_size': 500000,
'query_max_size': 1000000,
'insert_rows_method': 'max_query_size',
'upload_artifacts_method': 'diff',
'project_name': none,
Expand Down
2 changes: 1 addition & 1 deletion macros/utils/data_types/data_size.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
{% endmacro %}

{% macro redshift__get_column_size() %}
{{ return(elementary.default__get_column_size()) }}
{{ return(65535) }}
{% endmacro %}
21 changes: 16 additions & 5 deletions macros/utils/graph/get_compiled_code.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
{% macro get_compiled_code(node) %}
{% do return(adapter.dispatch("get_compiled_code", "elementary")(node)) %}
{% macro get_compiled_code(node, as_column_value=false) %}
{% set compiled_code = adapter.dispatch("get_compiled_code", "elementary")(node) %}

{% set max_column_size = elementary.get_column_size() %}
{% if as_column_value and max_column_size and compiled_code | length > max_column_size %}
{% do return(elementary.get_compiled_code_too_long_err_msg()) %}
{% endif %}

{% do return(compiled_code) %}
{% endmacro %}

{% macro default__get_compiled_code(node) %}
{% do return(node.get('compiled_code') or node.get('compiled_sql')) %}
{% endmacro %}

{% macro redshift__get_compiled_code(node) %}
{% set compilde_code = node.get('compiled_code') or node.get('compiled_sql') %}
{% if not compilde_code %}
{% set compiled_code = node.get('compiled_code') or node.get('compiled_sql') %}
{% if not compiled_code %}
{% do return(none) %}
{% else %}
{% do return(compilde_code.replace("%", "%%")) %}
{% do return(compiled_code.replace("%", "%%")) %}
{% endif %}
{% endmacro %}

{% macro get_compiled_code_too_long_err_msg() %}
{% do return("Compiled code is too long.") %}
{% endmacro %}
63 changes: 42 additions & 21 deletions macros/utils/table_operations/insert_rows.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{% macro insert_rows(table_relation, rows, should_commit=false, chunk_size=5000) %}
{% macro insert_rows(table_relation, rows, should_commit=false, chunk_size=5000, on_query_exceed=none) %}
{% if not rows %}
{{ return(none) }}
{% endif %}
Expand All @@ -18,7 +18,7 @@
{{ elementary.file_log('Inserting {} rows to table {}'.format(rows | length, table_relation)) }}
{% set insert_rows_method = elementary.get_config_var('insert_rows_method') %}
{% if insert_rows_method == 'max_query_size' %}
{% set insert_rows_queries = elementary.get_insert_rows_queries(table_relation, columns, rows) %}
{% set insert_rows_queries = elementary.get_insert_rows_queries(table_relation, columns, rows, on_query_exceed=on_query_exceed) %}
{% set queries_len = insert_rows_queries | length %}
{% for insert_query in insert_rows_queries %}
{% do elementary.file_log("[{}/{}] Running insert query.".format(loop.index, queries_len)) %}
Expand All @@ -39,39 +39,45 @@
{% endif %}
{% endmacro %}

{% macro get_insert_rows_queries(table_relation, columns, rows, query_max_size=elementary.get_config_var('query_max_size')) -%}
{% macro get_insert_rows_queries(table_relation, columns, rows, query_max_size=none, on_query_exceed=none) -%}
{% if not query_max_size %}
{% set query_max_size = elementary.get_config_var('query_max_size') %}
{% endif %}

{% set insert_queries = [] %}
{% set insert_query %}
{% set base_insert_query %}
insert into {{ table_relation }}
({%- for column in columns -%}
{{- column.name -}} {{- "," if not loop.last else "" -}}
{%- endfor -%}) values
{% endset %}

{% set current_query = namespace(data=insert_query) %}
{% set current_query = namespace(data=base_insert_query) %}
{% for row in rows %}
{% set rendered_column_values = [] %}
{% for column in columns %}
{% if column.name.lower() == "created_at" %}
{% set column_value = elementary.edr_current_timestamp() %}
{% do rendered_column_values.append(column_value) %}
{% else %}
{% set column_value = elementary.insensitive_get_dict_value(row, column.name) %}
{% do rendered_column_values.append(elementary.render_value(column_value)) %}
{% endif %}
{% endfor %}
{% set row_sql = "({})".format(rendered_column_values | join(",")) %}
{% set row_sql = elementary.render_row_to_sql(row, columns) %}
{% set query_with_row = current_query.data + ("," if not loop.first else "") + row_sql %}

{% if query_with_row | length > query_max_size %}
{% set new_insert_query = insert_query + row_sql %}
{% set new_insert_query = base_insert_query + row_sql %}

{# Check if row is too large to fit into an insert query. #}
{% if new_insert_query | length > query_max_size %}
{% do elementary.file_log("Oversized row for insert_rows: {}".format(query_with_row)) %}
{% do exceptions.raise_compiler_error("Row to be inserted exceeds var('query_max_size'). Consider increasing its value.") %}
{% if on_query_exceed %}
{% do on_query_exceed(row) %}
{% set row_sql = elementary.render_row_to_sql(row, columns) %}
{% set new_insert_query = base_insert_query + row_sql %}
{% endif %}

{% if new_insert_query | length > query_max_size %}
{% do elementary.file_log("Oversized row for insert_rows: {}".format(query_with_row)) %}
{% do exceptions.raise_compiler_error("Row to be inserted exceeds var('query_max_size'). Consider increasing its value.") %}
{% endif %}

{% if current_query.data != base_insert_query %}
{% do insert_queries.append(current_query.data) %}
{% endif %}
{% set current_query.data = new_insert_query %}
{% endif %}
{% do insert_queries.append(current_query.data) %}
{% set current_query.data = new_insert_query %}
{% else %}
{% set current_query.data = query_with_row %}
{% endif %}
Expand All @@ -83,6 +89,21 @@
{{ return(insert_queries) }}
{%- endmacro %}

{% macro render_row_to_sql(row, columns) %}
{% set rendered_column_values = [] %}
{% for column in columns %}
{% if column.name.lower() == "created_at" %}
{% set column_value = elementary.edr_current_timestamp() %}
{% do rendered_column_values.append(column_value) %}
{% else %}
{% set column_value = elementary.insensitive_get_dict_value(row, column.name) %}
{% do rendered_column_values.append(elementary.render_value(column_value)) %}
{% endif %}
{% endfor %}
{% set row_sql = "({})".format(rendered_column_values | join(",")) %}
{% do return(row_sql) %}
{% endmacro %}

{% macro get_chunk_insert_query(table_relation, columns, rows) -%}
{% set insert_rows_query %}
insert into {{ table_relation }}
Expand Down

0 comments on commit 0922ac1

Please sign in to comment.