Skip to content

Commit

Permalink
add functional tests for partitioned runs
Browse files Browse the repository at this point in the history
  • Loading branch information
nicor88 committed Aug 22, 2023
1 parent 0e5cf45 commit 39e96d3
Show file tree
Hide file tree
Showing 2 changed files with 134 additions and 1 deletion.
9 changes: 8 additions & 1 deletion dbt/adapters/athena/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,20 @@ def get_response(cls, cursor: AthenaCursor) -> AthenaAdapterResponse:

@staticmethod
def process_query_stats(cursor: AthenaCursor) -> Tuple[int, int]:
"""
Helper function to parse query statistics from SELECT statements.
The function looks for all statements that contains rowcount or data_scanned_in_bytes,
then strip the SELECT statements, and pick the value between curly brackets.
"""
if all(map(cursor.query.__contains__, ["rowcount", "data_scanned_in_bytes"])):
try:
query_split = cursor.query.lower().split("select")[-1]
# query statistics are in the format {"rowcount":1, "data_scanned_in_bytes": 3}
# the following statement extract the content between { and }
query_stats = re.search("{(.*)}", query_split)
if query_stats:
stats = json.loads("{" + query_stats.group(1) + "}")
return stats["rowcount"], stats["data_scanned_in_bytes"]
return stats.get("rowcount", -1), stats.get("data_scanned_in_bytes", 0)
except Exception as err:
logger.debug(f"There was an error parsing query stats {err}")
return -1, 0
Expand Down
126 changes: 126 additions & 0 deletions tests/functional/adapter/test_partitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import pytest

from dbt.contracts.results import RunStatus
from dbt.tests.util import run_dbt

test_partitions_model_sql = """
select
random() as rnd,
cast(date_column as date) as date_column,
doy(date_column) as doy
from (
values (
sequence(from_iso8601_date('2023-01-01'), from_iso8601_date('2023-07-31'), interval '1' day)
)
) as t1(date_array)
cross join unnest(date_array) as t2(date_column)
"""


class TestHiveTablePartitions:
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"+table_type": "hive", "+materialized": "table", "+partitioned_by": ["date_column", "doy"]}}

@pytest.fixture(scope="class")
def models(self):
return {
"test_hive_partitions.sql": test_partitions_model_sql,
}

def test__check_incremental_run_with_partitions(self, project):
relation_name = "test_hive_partitions"
model_run_result_row_count_query = "select count(*) as records from {}.{}".format(
project.test_schema, relation_name
)

first_model_run = run_dbt(["run", "--select", relation_name])
first_model_run_result = first_model_run.results[0]

# check that the model run successfully
assert first_model_run_result.status == RunStatus.Success

records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert records_count_first_run == 212


class TestIcebergTablePartitions:
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+table_type": "iceberg",
"+materialized": "table",
"+partitioned_by": ["DAY(date_column)", "doy"],
}
}

@pytest.fixture(scope="class")
def models(self):
return {
"test_iceberg_partitions.sql": test_partitions_model_sql,
}

def test__check_incremental_run_with_partitions(self, project):
relation_name = "test_iceberg_partitions"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"

first_model_run = run_dbt(["run", "--select", relation_name])
first_model_run_result = first_model_run.results[0]

# check that the model run successfully
assert first_model_run_result.status == RunStatus.Success

records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert records_count_first_run == 212


class TestIcebergIncrementalPartitions:
@pytest.fixture(scope="class")
def project_config_update(self):
return {
"models": {
"+table_type": "iceberg",
"+materialized": "incremental",
"+incremental_strategy": "merge",
"+unique_key": "doy",
"+partitioned_by": ["DAY(date_column)", "doy"],
}
}

@pytest.fixture(scope="class")
def models(self):
return {
"test_iceberg_partitions_incremental.sql": test_partitions_model_sql,
}

def test__check_incremental_run_with_partitions(self, project):
"""
Check that the incremental run works with iceberg and partitioned datasets
"""

relation_name = "test_iceberg_partitions_incremental"
model_run_result_row_count_query = f"select count(*) as records from {project.test_schema}.{relation_name}"

first_model_run = run_dbt(["run", "--select", relation_name, "--full-refresh"])
first_model_run_result = first_model_run.results[0]

# check that the model run successfully
assert first_model_run_result.status == RunStatus.Success

records_count_first_run = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert records_count_first_run == 212

incremental_model_run = run_dbt(["run", "--select", relation_name])

incremental_model_run_result = incremental_model_run.results[0]

# check that the model run successfully after incremental run
assert incremental_model_run_result.status == RunStatus.Success

incremental_records_count = project.run_sql(model_run_result_row_count_query, fetch="all")[0][0]

assert incremental_records_count == 212

0 comments on commit 39e96d3

Please sign in to comment.