Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for sqlalchemy_url and separate settings #104

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,6 @@ dmypy.json

# Pyre type checker
.pyre/

# JetBrains IDE config
.idea/
36 changes: 23 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,29 @@ target-clickhouse --about --format=markdown
```
-->

| Setting | Required | Default | Description |
|:---------------------|:--------:|:-------:|:------------|
| sqlalchemy_url | False | None | SQLAlchemy connection string |
| table_name | False | None | The name of the table to write to. |
| engine_type | False | MergeTree | The engine type to use for the table. This must be one of the following engine types: MergeTree, ReplacingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplicatedMergeTree, ReplicatedReplacingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree. |
| table_path | False | None | The table path for replicated tables. This is required when using any of the replication engines. Check out the [documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) for more information |
| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. |
| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. |
| default_target_schema| False | None | The default target database schema name to use for all streams. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |
| Setting | Required | Default | Description |
|:---------------------|:--------:|:-------:|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| sqlalchemy_url | False | None | The SQLAlchemy connection string for the ClickHouse database. Used if set, otherwise separate settings are used |
| driver | False | http | Driver type |
| username | False | default | Database user |
| password | False | None | Username password |
| host | False | localhost | Database host |
| port | False | 8123 | Database connection port |
| database | False | default | Database name |
| secure | False | 0 | Should the connection be secure |
| verify | False | 1 | Should secure connection need to verify SSL/TLS |
| engine_type | False | None | The engine type to use for the table. |
| table_name | False | None | The name of the table to write to. Defaults to stream name. |
| table_path | False | None | The table path for replicated tables. This is required when using any of the replication engines. Check out the [documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) for more information |
| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. |
| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. |
| default_target_schema| False | None | The default target database schema name to use for all streams. |
| add_record_metadata | False | None | Add metadata to records. |
| load_method | False | TargetLoadMethods.APPEND_ONLY | The method to use when loading data into the destination. `append-only` will always write all input records whether that records already exists or not. `upsert` will update existing records and insert new records. `overwrite` will delete all existing records and insert all input records. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |

A full list of supported settings and capabilities is available by running: `target-clickhouse --about`

Expand Down
67 changes: 48 additions & 19 deletions target_clickhouse/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
if TYPE_CHECKING:
from sqlalchemy.engine import Engine


class ClickhouseConnector(SQLConnector):
"""Clickhouse Meltano Connector.

Expand All @@ -40,12 +41,41 @@ def get_sqlalchemy_url(self, config: dict) -> str:
Args:
config: The configuration for the connector.
"""
return super().get_sqlalchemy_url(config)
if config.get("sqlalchemy_url"):
return super().get_sqlalchemy_url(config)

if config["driver"] == "http":
if config["secure"]:
secure_options = f"protocol=https&verify={config['verify']}"

if not config["verify"]:
# disable urllib3 warning
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
else:
secure_options = "protocol=http"
else:
secure_options = f"secure={config['secure']}&verify={config['verify']}"
return (
f"clickhouse+{config['driver']}://{config['username']}:{config['password']}@"
f"{config['host']}:{config['port']}/"
f"{config['database']}?{secure_options}"
)

def create_engine(self) -> Engine:
"""Create a SQLAlchemy engine for clickhouse."""
return create_engine(self.get_sqlalchemy_url(self.config))

@contextlib.contextmanager
def _connect(self) -> typing.Iterator[sqlalchemy.engine.Connection]:
# patch to overcome error in sqlalchemy-clickhouse driver
if self.config.get("driver") == "native":
kwargs = {"stream_results": True, "max_row_buffer": 1000}
else:
kwargs = {"stream_results": True}
with self._engine.connect().execution_options(**kwargs) as conn:
yield conn

def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
"""Return a JSON Schema representation of the provided type.

Expand Down Expand Up @@ -74,12 +104,12 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
return sql_type

def create_empty_table(
self,
full_table_name: str,
schema: dict,
primary_keys: list[str] | None = None,
partition_keys: list[str] | None = None,
as_temp_table: bool = False, # noqa: FBT001, FBT002
self,
full_table_name: str,
schema: dict,
primary_keys: list[str] | None = None,
partition_keys: list[str] | None = None,
as_temp_table: bool = False, # noqa: FBT001, FBT002
) -> None:
"""Create an empty target table, using Clickhouse Engine.

Expand Down Expand Up @@ -162,10 +192,10 @@ def prepare_schema(self, _: str) -> None:
return

def prepare_column(
self,
full_table_name: str,
column_name: str,
sql_type: sqlalchemy.types.TypeEngine,
self,
full_table_name: str,
column_name: str,
sql_type: sqlalchemy.types.TypeEngine,
) -> None:
"""Adapt target table to provided schema if possible.

Expand All @@ -189,12 +219,11 @@ def prepare_column(
sql_type=sql_type,
)


@staticmethod
def get_column_add_ddl(
table_name: str,
column_name: str,
column_type: sqlalchemy.types.TypeEngine,
table_name: str,
column_name: str,
column_type: sqlalchemy.types.TypeEngine,
) -> sqlalchemy.DDL:
"""Get the create column DDL statement.

Expand Down Expand Up @@ -226,10 +255,10 @@ def get_column_add_ddl(
)

def get_column_alter_ddl(
self,
table_name: str,
column_name: str,
column_type: sqlalchemy.types.TypeEngine,
self,
table_name: str,
column_name: str,
column_type: sqlalchemy.types.TypeEngine,
) -> sqlalchemy.DDL:
"""Get the alter column DDL statement.

Expand Down
6 changes: 6 additions & 0 deletions target_clickhouse/engine_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,15 @@ def create_engine_wrapper(
table_path: Optional[str] = config.get("table_path")
if table_path is not None:
engine_args["table_path"] = table_path
else:
msg = "Table path (table_path) is not defined."
raise ValueError(msg)
replica_name: Optional[str] = config.get("replica_name")
if replica_name is not None:
engine_args["replica_name"] = replica_name
else:
msg = "Replica name (replica_name) is not defined."
raise ValueError(msg)

engine_class = get_engine_class(engine_type)

Expand Down
107 changes: 106 additions & 1 deletion target_clickhouse/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from singer_sdk import typing as th
from singer_sdk.target_base import SQLTarget

from target_clickhouse.engine_class import SupportedEngines
from target_clickhouse.sinks import (
ClickhouseSink,
)
Expand All @@ -14,20 +15,124 @@ class TargetClickhouse(SQLTarget):
name = "target-clickhouse"

config_jsonschema = th.PropertiesList(
# connection properties
th.Property(
"sqlalchemy_url",
th.StringType,
secret=True, # Flag config as protected.
description="The SQLAlchemy connection string for the ClickHouse database",
description="The SQLAlchemy connection string for the ClickHouse database. "
"Used if set, otherwise separate settings are used",
),
th.Property(
"driver",
th.StringType,
required=False,
description="Driver type",
default="http",
allowed_values=["http", "native", "asynch"],
),
th.Property(
"username",
th.StringType,
required=False,
description="Database user",
default="default",
),
th.Property(
"password",
th.StringType,
required=False,
description="Username password",
secret=True,
),
th.Property(
"host",
th.StringType,
required=False,
description="Database host",
default="localhost",
),
th.Property(
"port",
th.IntegerType,
required=False,
description="Database connection port",
default=8123,
),
th.Property(
"database",
th.StringType,
required=False,
description="Database name",
default="default",
),
th.Property(
"secure",
th.BooleanType,
required=False,
description="Should the connection be secure",
default=False,
),
th.Property(
"verify",
th.BooleanType,
description="Should secure connection need to verify SSL/TLS",
default=True,
),

# other settings
th.Property(
"engine_type",
th.StringType,
required=False,
description="The engine type to use for the table.",
allowed_values=[e.value for e in SupportedEngines],
),
th.Property(
"table_name",
th.StringType,
required=False,
description="The name of the table to write to. Defaults to stream name.",
),
th.Property(
"table_path",
th.StringType,
required=False,
description="The table path for replicated tables. This is required when "
"using any of the replication engines. Check out the "
"[documentation](https://clickhouse.com/docs/en/engines/table-engines/"
"mergetree-family/replication#replicatedmergetree-parameters) "
"for more information",
),
th.Property(
"replica_name",
th.StringType,
required=False,
description="The `replica_name` for replicated tables. This is required "
"when using any of the replication engines.",
),
th.Property(
"cluster_name",
th.StringType,
required=False,
description="The cluster to create tables in. This is passed as the "
"`clickhouse_cluster` argument when creating a table. "
"[Documentation]"
"(https://clickhouse.com/docs/en/"
"sql-reference/distributed-ddl) "
"can be found here.",
),
th.Property(
"default_target_schema",
th.StringType,
required=False,
description="The default target database schema name to use for "
"all streams.",
),
).to_dict()

default_sink_class = ClickhouseSink


if __name__ == "__main__":
TargetClickhouse.cli()
Loading