From 89b67ec544718001f23405bdde1a4f97029704b1 Mon Sep 17 00:00:00 2001 From: Alik Kurdyukov Date: Wed, 7 Feb 2024 00:16:06 +0300 Subject: [PATCH] Add 'optimize_after' option (#107) * verbose errors for missing params * connection settings as separate params added * README updated * make linter happy * add template to table_path * README updated * make linter happy * all 'optimize_after' option --- .gitignore | 3 + README.md | 37 ++++++---- target_clickhouse/connectors.py | 72 +++++++++++++------ target_clickhouse/engine_class.py | 10 +++ target_clickhouse/sinks.py | 10 ++- target_clickhouse/target.py | 116 +++++++++++++++++++++++++++++- tests/test_core.py | 54 ++++++++------ 7 files changed, 247 insertions(+), 55 deletions(-) diff --git a/.gitignore b/.gitignore index 5353f53..4a2ef70 100644 --- a/.gitignore +++ b/.gitignore @@ -134,3 +134,6 @@ dmypy.json # Pyre type checker .pyre/ + +# JetBrains IDE config +.idea/ diff --git a/README.md b/README.md index 57aa325..8328631 100644 --- a/README.md +++ b/README.md @@ -38,19 +38,30 @@ target-clickhouse --about --format=markdown ``` --> -| Setting | Required | Default | Description | -|:---------------------|:--------:|:-------:|:------------| -| sqlalchemy_url | False | None | SQLAlchemy connection string | -| table_name | False | None | The name of the table to write to. | -| engine_type | False | MergeTree | The engine type to use for the table. This must be one of the following engine types: MergeTree, ReplacingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplicatedMergeTree, ReplicatedReplacingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree. | -| table_path | False | None | The table path for replicated tables. This is required when using any of the replication engines. Check out the [documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) for more information | -| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. | -| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. | -| default_target_schema| False | None | The default target database schema name to use for all streams. | -| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | -| stream_map_config | False | None | User-defined config values to be used within map expressions. | -| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | -| flattening_max_depth | False | None | The max depth to flatten schemas. | +| Setting | Required | Default | Description | +|:---------------------|:--------:|:-------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| sqlalchemy_url | False | None | The SQLAlchemy connection string for the ClickHouse database. Used if set, otherwise separate settings are used | +| driver | False | http | Driver type | +| username | False | default | Database user | +| password | False | None | Username password | +| host | False | localhost | Database host | +| port | False | 8123 | Database connection port | +| database | False | default | Database name | +| secure | False | 0 | Should the connection be secure | +| verify | False | 1 | Should secure connection need to verify SSL/TLS | +| engine_type | False | None | The engine type to use for the table. | +| table_name | False | None | The name of the table to write to. Defaults to stream name. | +| table_path | False | None | The table path for replicated tables. This is required when using any of the replication engines. Check out the [documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) for more information. Use `$table_name` to substitute the table name. | +| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. | +| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. | +| default_target_schema| False | None | The default target database schema name to use for all streams. | +| optimize_after | False | 0 | Run 'OPTIMIZE TABLE' after data insert | +| optimize_after | False | 0 | Run 'OPTIMIZE TABLE' after data insert. Useful whentable engine removes duplicate rows. | +| load_method | False | TargetLoadMethods.APPEND_ONLY | The method to use when loading data into the destination. `append-only` will always write all input records whether that records already exists or not. `upsert` will update existing records and insert new records. `overwrite` will delete all existing records and insert all input records. | +| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | +| stream_map_config | False | None | User-defined config values to be used within map expressions. | +| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | +| flattening_max_depth | False | None | The max depth to flatten schemas. | A full list of supported settings and capabilities is available by running: `target-clickhouse --about` diff --git a/target_clickhouse/connectors.py b/target_clickhouse/connectors.py index b8ec54b..4655af5 100644 --- a/target_clickhouse/connectors.py +++ b/target_clickhouse/connectors.py @@ -21,6 +21,7 @@ if TYPE_CHECKING: from sqlalchemy.engine import Engine + class ClickhouseConnector(SQLConnector): """Clickhouse Meltano Connector. @@ -40,12 +41,41 @@ def get_sqlalchemy_url(self, config: dict) -> str: Args: config: The configuration for the connector. """ - return super().get_sqlalchemy_url(config) + if config.get("sqlalchemy_url"): + return super().get_sqlalchemy_url(config) + + if config["driver"] == "http": + if config["secure"]: + secure_options = f"protocol=https&verify={config['verify']}" + + if not config["verify"]: + # disable urllib3 warning + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + else: + secure_options = "protocol=http" + else: + secure_options = f"secure={config['secure']}&verify={config['verify']}" + return ( + f"clickhouse+{config['driver']}://{config['username']}:{config['password']}@" + f"{config['host']}:{config['port']}/" + f"{config['database']}?{secure_options}" + ) def create_engine(self) -> Engine: """Create a SQLAlchemy engine for clickhouse.""" return create_engine(self.get_sqlalchemy_url(self.config)) + @contextlib.contextmanager + def _connect(self) -> typing.Iterator[sqlalchemy.engine.Connection]: + # patch to overcome error in sqlalchemy-clickhouse driver + if self.config.get("driver") == "native": + kwargs = {"stream_results": True, "max_row_buffer": 1000} + else: + kwargs = {"stream_results": True} + with self._engine.connect().execution_options(**kwargs) as conn: + yield conn + def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: """Return a JSON Schema representation of the provided type. @@ -74,12 +104,12 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: return sql_type def create_empty_table( - self, - full_table_name: str, - schema: dict, - primary_keys: list[str] | None = None, - partition_keys: list[str] | None = None, - as_temp_table: bool = False, # noqa: FBT001, FBT002 + self, + full_table_name: str, + schema: dict, + primary_keys: list[str] | None = None, + partition_keys: list[str] | None = None, + as_temp_table: bool = False, # noqa: FBT001, FBT002 ) -> None: """Create an empty target table, using Clickhouse Engine. @@ -141,7 +171,10 @@ def create_empty_table( ) table_engine = create_engine_wrapper( - engine_type=engine_type, primary_keys=primary_keys, config=self.config, + engine_type=engine_type, + primary_keys=primary_keys, + table_name=table_name, + config=self.config, ) table_args = {} @@ -162,10 +195,10 @@ def prepare_schema(self, _: str) -> None: return def prepare_column( - self, - full_table_name: str, - column_name: str, - sql_type: sqlalchemy.types.TypeEngine, + self, + full_table_name: str, + column_name: str, + sql_type: sqlalchemy.types.TypeEngine, ) -> None: """Adapt target table to provided schema if possible. @@ -189,12 +222,11 @@ def prepare_column( sql_type=sql_type, ) - @staticmethod def get_column_add_ddl( - table_name: str, - column_name: str, - column_type: sqlalchemy.types.TypeEngine, + table_name: str, + column_name: str, + column_type: sqlalchemy.types.TypeEngine, ) -> sqlalchemy.DDL: """Get the create column DDL statement. @@ -226,10 +258,10 @@ def get_column_add_ddl( ) def get_column_alter_ddl( - self, - table_name: str, - column_name: str, - column_type: sqlalchemy.types.TypeEngine, + self, + table_name: str, + column_name: str, + column_type: sqlalchemy.types.TypeEngine, ) -> sqlalchemy.DDL: """Get the alter column DDL statement. diff --git a/target_clickhouse/engine_class.py b/target_clickhouse/engine_class.py index d60f6ab..49f6faa 100644 --- a/target_clickhouse/engine_class.py +++ b/target_clickhouse/engine_class.py @@ -1,4 +1,5 @@ from enum import Enum +from string import Template from typing import List, Optional from clickhouse_sqlalchemy import engines @@ -41,6 +42,7 @@ def get_engine_class(engine_type): def create_engine_wrapper( engine_type, primary_keys: List[str], + table_name: str, config: Optional[dict] = None, ): # check if engine type is in supported engines @@ -65,10 +67,18 @@ def create_engine_wrapper( ): table_path: Optional[str] = config.get("table_path") if table_path is not None: + if "$" in table_path: + table_path = Template(table_path).substitute(table_name=table_name) engine_args["table_path"] = table_path + else: + msg = "Table path (table_path) is not defined." + raise ValueError(msg) replica_name: Optional[str] = config.get("replica_name") if replica_name is not None: engine_args["replica_name"] = replica_name + else: + msg = "Replica name (replica_name) is not defined." + raise ValueError(msg) engine_class = get_engine_class(engine_type) diff --git a/target_clickhouse/sinks.py b/target_clickhouse/sinks.py index 50d4508..f6aed66 100644 --- a/target_clickhouse/sinks.py +++ b/target_clickhouse/sinks.py @@ -86,7 +86,15 @@ def bulk_insert_records( if isinstance(value, (dict, list)): record[key] = json.dumps(value) - return super().bulk_insert_records(full_table_name, schema, records) + res = super().bulk_insert_records(full_table_name, schema, records) + + if self.config.get("optimize_after", False): + with self.connector._connect() as conn, conn.begin(): # noqa: SLF001 + self.logger.info("Optimizing table: %s", self.full_table_name) + conn.execute(sqlalchemy.text( + f"OPTIMIZE TABLE {self.full_table_name}")) + + return res def activate_version(self, new_version: int) -> None: """Bump the active version of the target table. diff --git a/target_clickhouse/target.py b/target_clickhouse/target.py index a369cee..dc628a0 100644 --- a/target_clickhouse/target.py +++ b/target_clickhouse/target.py @@ -3,6 +3,7 @@ from singer_sdk import typing as th from singer_sdk.target_base import SQLTarget +from target_clickhouse.engine_class import SupportedEngines from target_clickhouse.sinks import ( ClickhouseSink, ) @@ -14,20 +15,133 @@ class TargetClickhouse(SQLTarget): name = "target-clickhouse" config_jsonschema = th.PropertiesList( + # connection properties th.Property( "sqlalchemy_url", th.StringType, secret=True, # Flag config as protected. - description="The SQLAlchemy connection string for the ClickHouse database", + description="The SQLAlchemy connection string for the ClickHouse database. " + "Used if set, otherwise separate settings are used", + ), + th.Property( + "driver", + th.StringType, + required=False, + description="Driver type", + default="http", + allowed_values=["http", "native", "asynch"], + ), + th.Property( + "username", + th.StringType, + required=False, + description="Database user", + default="default", + ), + th.Property( + "password", + th.StringType, + required=False, + description="Username password", + secret=True, + ), + th.Property( + "host", + th.StringType, + required=False, + description="Database host", + default="localhost", + ), + th.Property( + "port", + th.IntegerType, + required=False, + description="Database connection port", + default=8123, + ), + th.Property( + "database", + th.StringType, + required=False, + description="Database name", + default="default", + ), + th.Property( + "secure", + th.BooleanType, + required=False, + description="Should the connection be secure", + default=False, + ), + th.Property( + "verify", + th.BooleanType, + description="Should secure connection need to verify SSL/TLS", + default=True, + ), + + # other settings + th.Property( + "engine_type", + th.StringType, + required=False, + description="The engine type to use for the table.", + allowed_values=[e.value for e in SupportedEngines], ), th.Property( "table_name", th.StringType, + required=False, description="The name of the table to write to. Defaults to stream name.", ), + th.Property( + "table_path", + th.StringType, + required=False, + description="The table path for replicated tables. This is required when " + "using any of the replication engines. Check out the " + "[documentation](https://clickhouse.com/docs/en/engines/table-engines/" + "mergetree-family/replication#replicatedmergetree-parameters) " + "for more information. Use `$table_name` to substitute the " + "table name.", + ), + th.Property( + "replica_name", + th.StringType, + required=False, + description="The `replica_name` for replicated tables. This is required " + "when using any of the replication engines.", + ), + th.Property( + "cluster_name", + th.StringType, + required=False, + description="The cluster to create tables in. This is passed as the " + "`clickhouse_cluster` argument when creating a table. " + "[Documentation]" + "(https://clickhouse.com/docs/en/" + "sql-reference/distributed-ddl) " + "can be found here.", + ), + th.Property( + "default_target_schema", + th.StringType, + required=False, + description="The default target database schema name to use for " + "all streams.", + ), + th.Property( + "optimize_after", + th.BooleanType, + required=False, + default=False, + description="Run 'OPTIMIZE TABLE' after data insert. Useful when" + "table engine removes duplicate rows.", + ), ).to_dict() default_sink_class = ClickhouseSink + if __name__ == "__main__": TargetClickhouse.cli() diff --git a/tests/test_core.py b/tests/test_core.py index 51028a8..c892993 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -4,8 +4,6 @@ import typing as t -import pytest -from clickhouse_driver import Client from singer_sdk.testing import get_target_test_class from target_clickhouse.target import TargetClickhouse @@ -14,29 +12,45 @@ "sqlalchemy_url": "clickhouse+http://default:@localhost:18123", } +TEST_CONFIG_SPREAD: dict[str, t.Any] = { + "driver": "http", + "host": "localhost", + "port": 18123, + "username": "default", + "password": "", + "database": "default", + "secure": False, + "verify": False, + "optimize_after": True, +} + +TEST_CONFIG_NATIVE: dict[str, t.Any] = { + "driver": "native", + "host": "localhost", + "port": 19000, + "username": "default", + "password": "", + "database": "default", + "secure": False, + "verify": False, +} + # Run standard built-in target tests from the SDK: StandardTargetTests = get_target_test_class( target_class=TargetClickhouse, config=TEST_CONFIG, ) -class TestTargetClickhouse(StandardTargetTests): # type: ignore[misc, valid-type] + +class TestStandardTargetClickhouse(StandardTargetTests): # type: ignore[misc, valid-type] """Standard Target Tests.""" - @pytest.fixture(autouse=True) - def _resource(self) -> None: - """Generic external resource. - - This fixture is useful for setup and teardown of external resources, - such output folders, tables, buckets etc. for use during testing. - - Example usage can be found in the SDK samples test suite: - https://github.com/meltano/sdk/tree/main/tests/samples - """ - _ = Client( - host="localhost", - port=19000, - user="default", - password="", - database="default", - ) + +SpreadTargetTests = get_target_test_class( + target_class=TargetClickhouse, + config=TEST_CONFIG_SPREAD, +) + + +class TestSpreadTargetClickhouse(SpreadTargetTests): # type: ignore[misc, valid-type] + """Standard Target Tests."""