From ba17c973c6d01e7411e6eb071b4243aa4042bd19 Mon Sep 17 00:00:00 2001 From: Alik Kurdyukov Date: Mon, 22 Jan 2024 12:20:46 +0400 Subject: [PATCH 1/4] verbose errors for missing params --- .gitignore | 3 +++ target_clickhouse/engine_class.py | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/.gitignore b/.gitignore index 5353f53..4a2ef70 100644 --- a/.gitignore +++ b/.gitignore @@ -134,3 +134,6 @@ dmypy.json # Pyre type checker .pyre/ + +# JetBrains IDE config +.idea/ diff --git a/target_clickhouse/engine_class.py b/target_clickhouse/engine_class.py index d60f6ab..3e9837c 100644 --- a/target_clickhouse/engine_class.py +++ b/target_clickhouse/engine_class.py @@ -66,9 +66,15 @@ def create_engine_wrapper( table_path: Optional[str] = config.get("table_path") if table_path is not None: engine_args["table_path"] = table_path + else: + msg = f"Table path (table_path) is not defined." + raise ValueError(msg) replica_name: Optional[str] = config.get("replica_name") if replica_name is not None: engine_args["replica_name"] = replica_name + else: + msg = f"Replica name (replica_name) is not defined." + raise ValueError(msg) engine_class = get_engine_class(engine_type) From ee838d06836478c9614340c7c32477aa0634372f Mon Sep 17 00:00:00 2001 From: Alik Kurdyukov Date: Mon, 22 Jan 2024 21:55:13 +0400 Subject: [PATCH 2/4] connection settings as separate params added --- target_clickhouse/connectors.py | 31 +++++++++- target_clickhouse/target.py | 100 ++++++++++++++++++++++++++++++++ tests/test_core.py | 63 ++++++++++++++------ 3 files changed, 174 insertions(+), 20 deletions(-) diff --git a/target_clickhouse/connectors.py b/target_clickhouse/connectors.py index b8ec54b..9dac964 100644 --- a/target_clickhouse/connectors.py +++ b/target_clickhouse/connectors.py @@ -40,12 +40,41 @@ def get_sqlalchemy_url(self, config: dict) -> str: Args: config: The configuration for the connector. """ - return super().get_sqlalchemy_url(config) + if config.get("sqlalchemy_url"): + return super().get_sqlalchemy_url(config) + else: + if config['driver'] == 'http': + if config['secure']: + secure_options = f"protocol=https&verify={config['verify']}" + + if not config['verify']: + # disable urllib3 warning + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) + else: + secure_options = "protocol=http" + else: + secure_options = f"secure={config['secure']}&verify={config['verify']}" + return ( + f"clickhouse+{config['driver']}://{config['username']}:{config['password']}@" + f"{config['host']}:{config['port']}/" + f"{config['database']}?{secure_options}" + ) def create_engine(self) -> Engine: """Create a SQLAlchemy engine for clickhouse.""" return create_engine(self.get_sqlalchemy_url(self.config)) + @contextlib.contextmanager + def _connect(self) -> typing.Iterator[sqlalchemy.engine.Connection]: + # hack to overcome error in sqlalchemy-clickhouse driver + if self.config.get("driver") == "native": + kwargs = {"stream_results": True, "max_row_buffer": 1000} + else: + kwargs = {"stream_results": True} + with self._engine.connect().execution_options(**kwargs) as conn: + yield conn + def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: """Return a JSON Schema representation of the provided type. diff --git a/target_clickhouse/target.py b/target_clickhouse/target.py index a369cee..8c543a2 100644 --- a/target_clickhouse/target.py +++ b/target_clickhouse/target.py @@ -6,6 +6,7 @@ from target_clickhouse.sinks import ( ClickhouseSink, ) +from target_clickhouse.engine_class import SupportedEngines class TargetClickhouse(SQLTarget): @@ -14,20 +15,119 @@ class TargetClickhouse(SQLTarget): name = "target-clickhouse" config_jsonschema = th.PropertiesList( + # connection properties th.Property( "sqlalchemy_url", th.StringType, secret=True, # Flag config as protected. description="The SQLAlchemy connection string for the ClickHouse database", ), + th.Property( + "driver", + th.StringType, + required=False, + description="Driver type", + default="http", + allowed_values=["http", "native", "asynch"] + ), + th.Property( + "username", + th.StringType, + required=False, + description="Database user", + default="default", + ), + th.Property( + "password", + th.StringType, + required=False, + description="Username password", + secret=True + ), + th.Property( + "host", + th.StringType, + required=False, + description="Database host", + default="localhost" + ), + th.Property( + "port", + th.IntegerType, + required=False, + description="Database connection port", + default=8123, + ), + th.Property( + "database", + th.StringType, + required=False, + description="Database name", + default="default", + ), + th.Property( + "secure", + th.BooleanType, + required=False, + description="Should the connection be secure", + default=False + ), + th.Property( + "verify", + th.BooleanType, + description="Should secure connection need to verify SSL/TLS", + default=True + ), + + # other settings + th.Property( + "engine_type", + th.StringType, + required=False, + description="The engine type to use for the table.", + allowed_values=[e.value for e in SupportedEngines] + ), th.Property( "table_name", th.StringType, + required=False, description="The name of the table to write to. Defaults to stream name.", ), + th.Property( + "table_path", + th.StringType, + required=False, + description="The table path for replicated tables. This is required when using " + "any of the replication engines. Check out the " + "[documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) " + "for more information" + ), + th.Property( + "replica_name", + th.StringType, + required=False, + description="The `replica_name` for replicated tables. This is required when using any of the " + "replication engines." + ), + th.Property( + "cluster_name", + th.StringType, + required=False, + description="The cluster to create tables in. This is passed as the `clickhouse_cluster` " + "argument when creating a table. " + "[Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) " + "can be found here." + ), + th.Property( + "default_target_schema", + th.StringType, + required=False, + description="The default target database schema name to use for all streams." + ) ).to_dict() default_sink_class = ClickhouseSink + if __name__ == "__main__": TargetClickhouse.cli() diff --git a/tests/test_core.py b/tests/test_core.py index 51028a8..4bde499 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -5,7 +5,6 @@ import typing as t import pytest -from clickhouse_driver import Client from singer_sdk.testing import get_target_test_class from target_clickhouse.target import TargetClickhouse @@ -14,29 +13,55 @@ "sqlalchemy_url": "clickhouse+http://default:@localhost:18123", } +TEST_CONFIG_SPREAD: dict[str, t.Any] = { + "driver": "http", + "host": "localhost", + "port": 18123, + "username": "default", + "password": "", + "database": "default", + "secure": False, + "verify": False, +} + +TEST_CONFIG_NATIVE: dict[str, t.Any] = { + "driver": "native", + "host": "localhost", + "port": 19000, + "username": "default", + "password": "", + "database": "default", + "secure": False, + "verify": False, +} + # Run standard built-in target tests from the SDK: StandardTargetTests = get_target_test_class( target_class=TargetClickhouse, config=TEST_CONFIG, ) -class TestTargetClickhouse(StandardTargetTests): # type: ignore[misc, valid-type] + +class TestStandardTargetClickhouse(StandardTargetTests): # type: ignore[misc, valid-type] """Standard Target Tests.""" - @pytest.fixture(autouse=True) - def _resource(self) -> None: - """Generic external resource. - - This fixture is useful for setup and teardown of external resources, - such output folders, tables, buckets etc. for use during testing. - - Example usage can be found in the SDK samples test suite: - https://github.com/meltano/sdk/tree/main/tests/samples - """ - _ = Client( - host="localhost", - port=19000, - user="default", - password="", - database="default", - ) + +SpreadTargetTests = get_target_test_class( + target_class=TargetClickhouse, + config=TEST_CONFIG_SPREAD, +) + + +class TestSpreadTargetClickhouse(SpreadTargetTests): # type: ignore[misc, valid-type] + """Standard Target Tests.""" + + +# TODO: native driver seems to be broken +# NativeTargetTests = get_target_test_class( +# target_class=TargetClickhouse, +# config=TEST_CONFIG_NATIVE, +# ) +# +# +# class TestNativeTargetClickhouse(NativeTargetTests): # type: ignore[misc, valid-type] +# """Standard Target Tests.""" From cf4a67639fd1f04fd7dea1c5b25e5b5780079192 Mon Sep 17 00:00:00 2001 From: Alik Kurdyukov Date: Mon, 22 Jan 2024 22:29:58 +0400 Subject: [PATCH 3/4] README updated --- README.md | 36 +++++++++++++++++++++++------------- target_clickhouse/target.py | 3 ++- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 57aa325..e094af4 100644 --- a/README.md +++ b/README.md @@ -38,19 +38,29 @@ target-clickhouse --about --format=markdown ``` --> -| Setting | Required | Default | Description | -|:---------------------|:--------:|:-------:|:------------| -| sqlalchemy_url | False | None | SQLAlchemy connection string | -| table_name | False | None | The name of the table to write to. | -| engine_type | False | MergeTree | The engine type to use for the table. This must be one of the following engine types: MergeTree, ReplacingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplicatedMergeTree, ReplicatedReplacingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree. | -| table_path | False | None | The table path for replicated tables. This is required when using any of the replication engines. Check out the [documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) for more information | -| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. | -| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. | -| default_target_schema| False | None | The default target database schema name to use for all streams. | -| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | -| stream_map_config | False | None | User-defined config values to be used within map expressions. | -| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | -| flattening_max_depth | False | None | The max depth to flatten schemas. | +| Setting | Required | Default | Description | +|:---------------------|:--------:|:-------:|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| sqlalchemy_url | False | None | The SQLAlchemy connection string for the ClickHouse database. Used if set, otherwise separate settings are used | +| driver | False | http | Driver type | +| username | False | default | Database user | +| password | False | None | Username password | +| host | False | localhost | Database host | +| port | False | 8123 | Database connection port | +| database | False | default | Database name | +| secure | False | 0 | Should the connection be secure | +| verify | False | 1 | Should secure connection need to verify SSL/TLS | +| engine_type | False | None | The engine type to use for the table. | +| table_name | False | None | The name of the table to write to. Defaults to stream name. | +| table_path | False | None | The table path for replicated tables. This is required when using any of the replication engines. Check out the [documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) for more information | +| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. | +| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. | +| default_target_schema| False | None | The default target database schema name to use for all streams. | +| add_record_metadata | False | None | Add metadata to records. | +| load_method | False | TargetLoadMethods.APPEND_ONLY | The method to use when loading data into the destination. `append-only` will always write all input records whether that records already exists or not. `upsert` will update existing records and insert new records. `overwrite` will delete all existing records and insert all input records. | +| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). | +| stream_map_config | False | None | User-defined config values to be used within map expressions. | +| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. | +| flattening_max_depth | False | None | The max depth to flatten schemas. | A full list of supported settings and capabilities is available by running: `target-clickhouse --about` diff --git a/target_clickhouse/target.py b/target_clickhouse/target.py index 8c543a2..900765d 100644 --- a/target_clickhouse/target.py +++ b/target_clickhouse/target.py @@ -20,7 +20,8 @@ class TargetClickhouse(SQLTarget): "sqlalchemy_url", th.StringType, secret=True, # Flag config as protected. - description="The SQLAlchemy connection string for the ClickHouse database", + description="The SQLAlchemy connection string for the ClickHouse database. Used if set, " + "otherwise separate settings are used", ), th.Property( "driver", From d7b69b2b6080e058c21566622a26021cc27f0053 Mon Sep 17 00:00:00 2001 From: Alik Kurdyukov Date: Tue, 23 Jan 2024 17:21:14 +0400 Subject: [PATCH 4/4] make linter happy --- target_clickhouse/connectors.py | 72 +++++++++++++++---------------- target_clickhouse/engine_class.py | 4 +- target_clickhouse/target.py | 46 +++++++++++--------- tests/test_core.py | 12 ------ 4 files changed, 63 insertions(+), 71 deletions(-) diff --git a/target_clickhouse/connectors.py b/target_clickhouse/connectors.py index 9dac964..c1da69c 100644 --- a/target_clickhouse/connectors.py +++ b/target_clickhouse/connectors.py @@ -21,6 +21,7 @@ if TYPE_CHECKING: from sqlalchemy.engine import Engine + class ClickhouseConnector(SQLConnector): """Clickhouse Meltano Connector. @@ -42,24 +43,24 @@ def get_sqlalchemy_url(self, config: dict) -> str: """ if config.get("sqlalchemy_url"): return super().get_sqlalchemy_url(config) - else: - if config['driver'] == 'http': - if config['secure']: - secure_options = f"protocol=https&verify={config['verify']}" - - if not config['verify']: - # disable urllib3 warning - import urllib3 - urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) - else: - secure_options = "protocol=http" + + if config["driver"] == "http": + if config["secure"]: + secure_options = f"protocol=https&verify={config['verify']}" + + if not config["verify"]: + # disable urllib3 warning + import urllib3 + urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) else: - secure_options = f"secure={config['secure']}&verify={config['verify']}" - return ( - f"clickhouse+{config['driver']}://{config['username']}:{config['password']}@" - f"{config['host']}:{config['port']}/" - f"{config['database']}?{secure_options}" - ) + secure_options = "protocol=http" + else: + secure_options = f"secure={config['secure']}&verify={config['verify']}" + return ( + f"clickhouse+{config['driver']}://{config['username']}:{config['password']}@" + f"{config['host']}:{config['port']}/" + f"{config['database']}?{secure_options}" + ) def create_engine(self) -> Engine: """Create a SQLAlchemy engine for clickhouse.""" @@ -67,7 +68,7 @@ def create_engine(self) -> Engine: @contextlib.contextmanager def _connect(self) -> typing.Iterator[sqlalchemy.engine.Connection]: - # hack to overcome error in sqlalchemy-clickhouse driver + # patch to overcome error in sqlalchemy-clickhouse driver if self.config.get("driver") == "native": kwargs = {"stream_results": True, "max_row_buffer": 1000} else: @@ -103,12 +104,12 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine: return sql_type def create_empty_table( - self, - full_table_name: str, - schema: dict, - primary_keys: list[str] | None = None, - partition_keys: list[str] | None = None, - as_temp_table: bool = False, # noqa: FBT001, FBT002 + self, + full_table_name: str, + schema: dict, + primary_keys: list[str] | None = None, + partition_keys: list[str] | None = None, + as_temp_table: bool = False, # noqa: FBT001, FBT002 ) -> None: """Create an empty target table, using Clickhouse Engine. @@ -191,10 +192,10 @@ def prepare_schema(self, _: str) -> None: return def prepare_column( - self, - full_table_name: str, - column_name: str, - sql_type: sqlalchemy.types.TypeEngine, + self, + full_table_name: str, + column_name: str, + sql_type: sqlalchemy.types.TypeEngine, ) -> None: """Adapt target table to provided schema if possible. @@ -218,12 +219,11 @@ def prepare_column( sql_type=sql_type, ) - @staticmethod def get_column_add_ddl( - table_name: str, - column_name: str, - column_type: sqlalchemy.types.TypeEngine, + table_name: str, + column_name: str, + column_type: sqlalchemy.types.TypeEngine, ) -> sqlalchemy.DDL: """Get the create column DDL statement. @@ -255,10 +255,10 @@ def get_column_add_ddl( ) def get_column_alter_ddl( - self, - table_name: str, - column_name: str, - column_type: sqlalchemy.types.TypeEngine, + self, + table_name: str, + column_name: str, + column_type: sqlalchemy.types.TypeEngine, ) -> sqlalchemy.DDL: """Get the alter column DDL statement. diff --git a/target_clickhouse/engine_class.py b/target_clickhouse/engine_class.py index 3e9837c..edb93df 100644 --- a/target_clickhouse/engine_class.py +++ b/target_clickhouse/engine_class.py @@ -67,13 +67,13 @@ def create_engine_wrapper( if table_path is not None: engine_args["table_path"] = table_path else: - msg = f"Table path (table_path) is not defined." + msg = "Table path (table_path) is not defined." raise ValueError(msg) replica_name: Optional[str] = config.get("replica_name") if replica_name is not None: engine_args["replica_name"] = replica_name else: - msg = f"Replica name (replica_name) is not defined." + msg = "Replica name (replica_name) is not defined." raise ValueError(msg) engine_class = get_engine_class(engine_type) diff --git a/target_clickhouse/target.py b/target_clickhouse/target.py index 900765d..f9d455f 100644 --- a/target_clickhouse/target.py +++ b/target_clickhouse/target.py @@ -3,10 +3,10 @@ from singer_sdk import typing as th from singer_sdk.target_base import SQLTarget +from target_clickhouse.engine_class import SupportedEngines from target_clickhouse.sinks import ( ClickhouseSink, ) -from target_clickhouse.engine_class import SupportedEngines class TargetClickhouse(SQLTarget): @@ -20,8 +20,8 @@ class TargetClickhouse(SQLTarget): "sqlalchemy_url", th.StringType, secret=True, # Flag config as protected. - description="The SQLAlchemy connection string for the ClickHouse database. Used if set, " - "otherwise separate settings are used", + description="The SQLAlchemy connection string for the ClickHouse database. " + "Used if set, otherwise separate settings are used", ), th.Property( "driver", @@ -29,7 +29,7 @@ class TargetClickhouse(SQLTarget): required=False, description="Driver type", default="http", - allowed_values=["http", "native", "asynch"] + allowed_values=["http", "native", "asynch"], ), th.Property( "username", @@ -43,14 +43,14 @@ class TargetClickhouse(SQLTarget): th.StringType, required=False, description="Username password", - secret=True + secret=True, ), th.Property( "host", th.StringType, required=False, description="Database host", - default="localhost" + default="localhost", ), th.Property( "port", @@ -71,13 +71,13 @@ class TargetClickhouse(SQLTarget): th.BooleanType, required=False, description="Should the connection be secure", - default=False + default=False, ), th.Property( "verify", th.BooleanType, description="Should secure connection need to verify SSL/TLS", - default=True + default=True, ), # other settings @@ -86,7 +86,7 @@ class TargetClickhouse(SQLTarget): th.StringType, required=False, description="The engine type to use for the table.", - allowed_values=[e.value for e in SupportedEngines] + allowed_values=[e.value for e in SupportedEngines], ), th.Property( "table_name", @@ -98,33 +98,37 @@ class TargetClickhouse(SQLTarget): "table_path", th.StringType, required=False, - description="The table path for replicated tables. This is required when using " - "any of the replication engines. Check out the " - "[documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) " - "for more information" + description="The table path for replicated tables. This is required when " + "using any of the replication engines. Check out the " + "[documentation](https://clickhouse.com/docs/en/engines/table-engines/" + "mergetree-family/replication#replicatedmergetree-parameters) " + "for more information", ), th.Property( "replica_name", th.StringType, required=False, - description="The `replica_name` for replicated tables. This is required when using any of the " - "replication engines." + description="The `replica_name` for replicated tables. This is required " + "when using any of the replication engines.", ), th.Property( "cluster_name", th.StringType, required=False, - description="The cluster to create tables in. This is passed as the `clickhouse_cluster` " - "argument when creating a table. " - "[Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) " - "can be found here." + description="The cluster to create tables in. This is passed as the " + "`clickhouse_cluster` argument when creating a table. " + "[Documentation]" + "(https://clickhouse.com/docs/en/" + "sql-reference/distributed-ddl) " + "can be found here.", ), th.Property( "default_target_schema", th.StringType, required=False, - description="The default target database schema name to use for all streams." - ) + description="The default target database schema name to use for " + "all streams.", + ), ).to_dict() default_sink_class = ClickhouseSink diff --git a/tests/test_core.py b/tests/test_core.py index 4bde499..67cd6a5 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -4,7 +4,6 @@ import typing as t -import pytest from singer_sdk.testing import get_target_test_class from target_clickhouse.target import TargetClickhouse @@ -54,14 +53,3 @@ class TestStandardTargetClickhouse(StandardTargetTests): # type: ignore[misc, v class TestSpreadTargetClickhouse(SpreadTargetTests): # type: ignore[misc, valid-type] """Standard Target Tests.""" - - -# TODO: native driver seems to be broken -# NativeTargetTests = get_target_test_class( -# target_class=TargetClickhouse, -# config=TEST_CONFIG_NATIVE, -# ) -# -# -# class TestNativeTargetClickhouse(NativeTargetTests): # type: ignore[misc, valid-type] -# """Standard Target Tests."""