From 413cd242d17f92c04cdfebf70595421c96cbbae4 Mon Sep 17 00:00:00 2001 From: Chalmer Lowe Date: Fri, 22 Nov 2024 07:47:55 -0500 Subject: [PATCH] Feat: Update partitioning by DATE, DATETIME, TIMESTAMP, _PARTITIONDATE (#1113) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * adds additional functionality to cover more partitioning capability * Updates the partitioning algorithm and tests * Updates special case and tests * Updates test in possible effort to increase coverage. * Tweaks the conditionals in time partitioning process * Updates linting * chore(deps): update all dependencies (#1136) * chore(deps): update all dependencies * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Update protobuf to 5.28.3 * pin google-crc32c for python 3.7/3.8 * Pin mako for python 3.8 * Pin markupsafe for Python 3.8 * Pin pyparsing for python 3.8 * Pin pyparsing for Python 3.8 --------- Co-authored-by: Owl Bot Co-authored-by: Anthonios Partheniou * chore(deps): update all dependencies (#1140) * chore(deps): update all dependencies * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot * Removes duplicate test --------- Co-authored-by: Mend Renovate Co-authored-by: Owl Bot Co-authored-by: Anthonios Partheniou --- sqlalchemy_bigquery/base.py | 97 +++++++++--- tests/system/test_sqlalchemy_bigquery.py | 11 +- tests/unit/test_table_options.py | 193 +++++++++++++++-------- 3 files changed, 210 insertions(+), 91 deletions(-) diff --git a/sqlalchemy_bigquery/base.py b/sqlalchemy_bigquery/base.py index c531c102..879bcb40 100644 --- a/sqlalchemy_bigquery/base.py +++ b/sqlalchemy_bigquery/base.py @@ -812,42 +812,99 @@ def _raise_for_type(self, option, value, expected_type): ) def _process_time_partitioning( - self, table: Table, time_partitioning: TimePartitioning + self, + table: Table, + time_partitioning: TimePartitioning, ): """ - Generates a SQL 'PARTITION BY' clause for partitioning a table by a date or timestamp. + Generates a SQL 'PARTITION BY' clause for partitioning a table, Args: - - table (Table): The SQLAlchemy table object representing the BigQuery table to be partitioned. + - table (Table): The SQLAlchemy table object representing the BigQuery + table to be partitioned. - time_partitioning (TimePartitioning): The time partitioning details, including the field to be used for partitioning. Returns: - - str: A SQL 'PARTITION BY' clause that uses either TIMESTAMP_TRUNC or DATE_TRUNC to - partition data on the specified field. + - str: A SQL 'PARTITION BY' clause. Example: - - Given a table with a TIMESTAMP type column 'event_timestamp' and setting - 'time_partitioning.field' to 'event_timestamp', the function returns + - Given a table with an 'event_timestamp' and setting time_partitioning.type + as DAY and by setting 'time_partitioning.field' as 'event_timestamp', the + function returns: "PARTITION BY TIMESTAMP_TRUNC(event_timestamp, DAY)". + + Current inputs allowed by BQ and covered by this function include: + * _PARTITIONDATE + * DATETIME_TRUNC(, DAY/HOUR/MONTH/YEAR) + * TIMESTAMP_TRUNC(, DAY/HOUR/MONTH/YEAR) + * DATE_TRUNC(, MONTH/YEAR) + + Additional options allowed by BQ but not explicitly covered by this + function include: + * DATE(_PARTITIONTIME) + * DATE() + * DATE() + * DATE column """ - field = "_PARTITIONDATE" - trunc_fn = "DATE_TRUNC" + sqltypes = { + "_PARTITIONDATE": ("_PARTITIONDATE", None), + "TIMESTAMP": ("TIMESTAMP_TRUNC", {"DAY", "HOUR", "MONTH", "YEAR"}), + "DATETIME": ("DATETIME_TRUNC", {"DAY", "HOUR", "MONTH", "YEAR"}), + "DATE": ("DATE_TRUNC", {"MONTH", "YEAR"}), + } + + # Extract field (i.e or _PARTITIONDATE) + # AND extract the name of the column_type (i.e. "TIMESTAMP", "DATE", + # "DATETIME", "_PARTITIONDATE") if time_partitioning.field is not None: field = time_partitioning.field - if isinstance( - table.columns[time_partitioning.field].type, - sqlalchemy.sql.sqltypes.DATE, - ): - return f"PARTITION BY {field}" - elif isinstance( - table.columns[time_partitioning.field].type, - sqlalchemy.sql.sqltypes.TIMESTAMP, - ): - trunc_fn = "TIMESTAMP_TRUNC" + column_type = table.columns[field].type.__visit_name__.upper() + + else: + field = "_PARTITIONDATE" + column_type = "_PARTITIONDATE" + + # Extract time_partitioning.type_ (DAY, HOUR, MONTH, YEAR) + # i.e. generates one partition per type (1/DAY, 1/HOUR) + # NOTE: if time_partitioning.type_ == None, it gets + # immediately overwritten by python-bigquery to a default of DAY. + partitioning_period = time_partitioning.type_ + + # Extract the truncation_function (i.e. DATE_TRUNC) + # and the set of allowable partition_periods + # that can be used in that function + trunc_fn, allowed_partitions = sqltypes[column_type] + + # Create output: + # Special Case: _PARTITIONDATE does NOT use a function or partitioning_period + if trunc_fn == "_PARTITIONDATE": + return f"PARTITION BY {field}" + + # Special Case: BigQuery will not accept DAY as partitioning_period for + # DATE_TRUNC. + # However, the default argument in python-bigquery for TimePartioning + # is DAY. This case overwrites that to avoid making a breaking change in + # python-bigquery. + # https://github.com/googleapis/python-bigquery/blob/a4d9534a900f13ae7355904cda05097d781f27e3/google/cloud/bigquery/table.py#L2916 + if trunc_fn == "DATE_TRUNC" and partitioning_period == "DAY": + raise ValueError( + "The TimePartitioning.type_ must be one of: " + f"{allowed_partitions}, received {partitioning_period}." + "NOTE: the `default` value for TimePartioning.type_ as set in " + "python-bigquery is 'DAY', if you wish to use 'DATE_TRUNC' " + "ensure that you overwrite the default TimePartitioning.type_. " + ) + + # Generic Case + if partitioning_period not in allowed_partitions: + raise ValueError( + "The TimePartitioning.type_ must be one of: " + f"{allowed_partitions}, received {partitioning_period}." + ) - return f"PARTITION BY {trunc_fn}({field}, {time_partitioning.type_})" + return f"PARTITION BY {trunc_fn}({field}, {partitioning_period})" def _process_range_partitioning( self, table: Table, range_partitioning: RangePartitioning diff --git a/tests/system/test_sqlalchemy_bigquery.py b/tests/system/test_sqlalchemy_bigquery.py index 7ea4ccc6..b3ebcc25 100644 --- a/tests/system/test_sqlalchemy_bigquery.py +++ b/tests/system/test_sqlalchemy_bigquery.py @@ -561,7 +561,16 @@ def test_dml(engine, session, table_dml): assert len(result) == 0 -@pytest.mark.parametrize("time_partitioning_field", ["timestamp_c", "date_c"]) +@pytest.mark.parametrize( + "time_partitioning_field", + [ + ("timestamp_c"), + ("datetime_c"), + # Fails because python-bigquery TimePartitioning.type_ defaults to "DAY", but + # the DATE_TRUNC() function only allows "MONTH"/"YEAR" + pytest.param("date_c", marks=[pytest.mark.xfail]), + ], +) def test_create_table(engine, bigquery_dataset, time_partitioning_field): meta = MetaData() Table( diff --git a/tests/unit/test_table_options.py b/tests/unit/test_table_options.py index 2b757e04..ea3322f9 100644 --- a/tests/unit/test_table_options.py +++ b/tests/unit/test_table_options.py @@ -104,111 +104,161 @@ def test_table_clustering_fields_dialect_option_type_error(faux_conn): ) -def test_table_time_partitioning_dialect_option(faux_conn): - # expect table creation to fail as SQLite does not support partitioned tables - with pytest.raises(sqlite3.OperationalError): - setup_table( - faux_conn, - "some_table", - sqlalchemy.Column("id", sqlalchemy.Integer), - sqlalchemy.Column("createdAt", sqlalchemy.DateTime), - bigquery_time_partitioning=TimePartitioning(), - ) +@pytest.mark.parametrize( + "column_dtype,time_partitioning_type,func_name", + [ + # DATE dtype + pytest.param( + sqlalchemy.DATE, + TimePartitioningType.HOUR, # Only MONTH/YEAR are permitted in BigQuery + "DATE_TRUNC", + marks=pytest.mark.xfail, + ), + pytest.param( + sqlalchemy.DATE, + TimePartitioningType.DAY, # Only MONTH/YEAR are permitted in BigQuery + "DATE_TRUNC", + marks=pytest.mark.xfail, + ), + (sqlalchemy.DATE, TimePartitioningType.MONTH, "DATE_TRUNC"), + (sqlalchemy.DATE, TimePartitioningType.YEAR, "DATE_TRUNC"), + # TIMESTAMP dtype + (sqlalchemy.TIMESTAMP, TimePartitioningType.HOUR, "TIMESTAMP_TRUNC"), + (sqlalchemy.TIMESTAMP, TimePartitioningType.DAY, "TIMESTAMP_TRUNC"), + (sqlalchemy.TIMESTAMP, TimePartitioningType.MONTH, "TIMESTAMP_TRUNC"), + (sqlalchemy.TIMESTAMP, TimePartitioningType.YEAR, "TIMESTAMP_TRUNC"), + # DATETIME dtype + (sqlalchemy.DATETIME, TimePartitioningType.HOUR, "DATETIME_TRUNC"), + (sqlalchemy.DATETIME, TimePartitioningType.DAY, "DATETIME_TRUNC"), + (sqlalchemy.DATETIME, TimePartitioningType.MONTH, "DATETIME_TRUNC"), + (sqlalchemy.DATETIME, TimePartitioningType.YEAR, "DATETIME_TRUNC"), + # TimePartitioning.type_ == None + (sqlalchemy.DATETIME, None, "DATETIME_TRUNC"), + ], +) +def test_table_time_partitioning_given_field_and_type__dialect_options( + faux_conn, column_dtype, time_partitioning_type, func_name +): + """NOTE: Expect table creation to fail as SQLite does not support + partitioned tables, despite that, we are still able to test the generation + of SQL statements. - assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( - "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" - " PARTITION BY DATE_TRUNC(_PARTITIONDATE, DAY)" - ) + Each parametrization ensures that the appropriate function is generated + depending on whether the column datatype is DATE, TIMESTAMP, DATETIME and + whether the TimePartitioningType is HOUR, DAY, MONTH, YEAR. + `DATE_TRUNC` only returns a result if TimePartitioningType is DAY, MONTH, + YEAR. BigQuery cannot partition on DATE by HOUR, so that is expected to + xfail. -def test_table_require_partition_filter_dialect_option(faux_conn): - # expect table creation to fail as SQLite does not support partitioned tables - with pytest.raises(sqlite3.OperationalError): - setup_table( - faux_conn, - "some_table", - sqlalchemy.Column("createdAt", sqlalchemy.DateTime), - bigquery_time_partitioning=TimePartitioning(field="createdAt"), - bigquery_require_partition_filter=True, - ) + A distinguishing characteristic of this test is we provide an argument to + the TimePartitioning class for both field and type_. - assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( - "CREATE TABLE `some_table` ( `createdAt` DATETIME )" - " PARTITION BY DATE_TRUNC(createdAt, DAY)" - " OPTIONS(require_partition_filter=true)" - ) + Special case: IF time_partitioning_type is None, the __init__() in the + TimePartitioning class will overwrite it with TimePartitioningType.DAY as + the default. + """ + if time_partitioning_type is None: + time_partitioning_type = TimePartitioningType.DAY -def test_table_time_partitioning_with_field_dialect_option(faux_conn): - # expect table creation to fail as SQLite does not support partitioned tables with pytest.raises(sqlite3.OperationalError): setup_table( faux_conn, "some_table", sqlalchemy.Column("id", sqlalchemy.Integer), - sqlalchemy.Column("createdAt", sqlalchemy.DateTime), - bigquery_time_partitioning=TimePartitioning(field="createdAt"), + sqlalchemy.Column("createdAt", column_dtype), + bigquery_time_partitioning=TimePartitioning( + field="createdAt", type_=time_partitioning_type + ), ) - assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( - "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" - " PARTITION BY DATE_TRUNC(createdAt, DAY)" + result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) + expected = ( + f"CREATE TABLE `some_table` ( `id` INT64, `createdAt` {column_dtype.__visit_name__} )" + f" PARTITION BY {func_name}(createdAt, {time_partitioning_type})" ) + assert result == expected -def test_table_time_partitioning_by_month_dialect_option(faux_conn): - # expect table creation to fail as SQLite does not support partitioned tables - with pytest.raises(sqlite3.OperationalError): - setup_table( - faux_conn, - "some_table", - sqlalchemy.Column("id", sqlalchemy.Integer), - sqlalchemy.Column("createdAt", sqlalchemy.DateTime), - bigquery_time_partitioning=TimePartitioning( - field="createdAt", - type_=TimePartitioningType.MONTH, - ), - ) +def test_table_time_partitioning_given_field_but_no_type__dialect_option(faux_conn): + """Expect table creation to fail as SQLite does not support partitioned tables - assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( - "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" - " PARTITION BY DATE_TRUNC(createdAt, MONTH)" - ) + Confirms that if the column datatype is DATETIME but no TimePartitioning.type_ + has been supplied, the system will default to DAY. + A distinguishing characteristic of this test is we provide an argument to + the TimePartitioning class for field but not type_. + """ -def test_table_time_partitioning_with_timestamp_dialect_option(faux_conn): - # expect table creation to fail as SQLite does not support partitioned tables with pytest.raises(sqlite3.OperationalError): setup_table( faux_conn, "some_table", sqlalchemy.Column("id", sqlalchemy.Integer), - sqlalchemy.Column("createdAt", sqlalchemy.TIMESTAMP), + sqlalchemy.Column("createdAt", sqlalchemy.DateTime), bigquery_time_partitioning=TimePartitioning(field="createdAt"), ) - - assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( - "CREATE TABLE `some_table` ( `id` INT64, `createdAt` TIMESTAMP )" - " PARTITION BY TIMESTAMP_TRUNC(createdAt, DAY)" + result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) + expected = ( + "CREATE TABLE `some_table` ( `id` INT64, `createdAt` DATETIME )" + " PARTITION BY DATETIME_TRUNC(createdAt, DAY)" ) + assert result == expected -def test_table_time_partitioning_with_date_dialect_option(faux_conn): - # expect table creation to fail as SQLite does not support partitioned tables +@pytest.mark.parametrize( + "column_dtype,time_partitioning_type", + [ + pytest.param( + sqlalchemy.DATE, + TimePartitioningType.HOUR, + marks=pytest.mark.xfail, + ), + (sqlalchemy.DATE, TimePartitioningType.DAY), + (sqlalchemy.DATE, TimePartitioningType.MONTH), + (sqlalchemy.DATE, TimePartitioningType.YEAR), + ], +) +def test_table_time_partitioning_given_type__but_no_field_dialect_option( + faux_conn, + column_dtype, + time_partitioning_type, +): + """NOTE: Expect table creation to fail as SQLite does not support + partitioned tables, despite that, we are still able to test the generation + of SQL statements + + If the `field` argument to TimePartitioning() is not provided, it defaults to + None. That causes the pseudocolumn "_PARTITIONDATE" to be used by default as + the column to partition by. + + _PARTITIONTIME only returns a result if TimePartitioningType is DAY, MONTH, + YEAR. BigQuery cannot partition on _PARTITIONDATE by HOUR, so that is + expected to xfail. + + A distinguishing characteristic of this test is we provide an argument to + the TimePartitioning class for type_ but not field. + """ + with pytest.raises(sqlite3.OperationalError): setup_table( faux_conn, "some_table_2", sqlalchemy.Column("id", sqlalchemy.Integer), - sqlalchemy.Column("createdAt", sqlalchemy.DATE), - bigquery_time_partitioning=TimePartitioning(field="createdAt"), + sqlalchemy.Column("createdAt", column_dtype), + bigquery_time_partitioning=TimePartitioning(type_=time_partitioning_type), ) # confirm that the following code creates the correct SQL string - assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( - "CREATE TABLE `some_table_2` ( `id` INT64, `createdAt` DATE )" - " PARTITION BY createdAt" + result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) + + # We need two versions of expected depending on whether we use _PARTITIONDATE + expected = ( + f"CREATE TABLE `some_table_2` ( `id` INT64, `createdAt` {column_dtype.__visit_name__} )" + f" PARTITION BY _PARTITIONDATE" ) + assert result == expected def test_table_time_partitioning_dialect_option_partition_expiration_days(faux_conn): @@ -227,7 +277,7 @@ def test_table_time_partitioning_dialect_option_partition_expiration_days(faux_c assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( "CREATE TABLE `some_table` ( `createdAt` DATETIME )" - " PARTITION BY DATE_TRUNC(createdAt, DAY)" + " PARTITION BY DATETIME_TRUNC(createdAt, DAY)" " OPTIONS(partition_expiration_days=0.25)" ) @@ -400,13 +450,16 @@ def test_table_all_dialect_option(faux_conn): ), ) - assert " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) == ( + result = " ".join(faux_conn.test_data["execute"][-1][0].strip().split()) + expected = ( "CREATE TABLE `some_table` ( `id` INT64, `country` STRING, `town` STRING, `createdAt` DATETIME )" - " PARTITION BY DATE_TRUNC(createdAt, DAY)" + " PARTITION BY DATETIME_TRUNC(createdAt, DAY)" " CLUSTER BY country, town" " OPTIONS(partition_expiration_days=30.0, expiration_timestamp=TIMESTAMP '2038-01-01 00:00:00+00:00', require_partition_filter=true, default_rounding_mode='ROUND_HALF_EVEN')" ) + assert result == expected + def test_validate_friendly_name_value_type(ddl_compiler): # expect option value to be transformed as a string expression