Skip to content

Commit

Permalink
Add 'optimize_after' option (#107)
Browse files Browse the repository at this point in the history
* verbose errors for missing params

* connection settings as separate params added

* README updated

* make linter happy

* add template to table_path

* README updated

* make linter happy

* all 'optimize_after' option
  • Loading branch information
akurdyukov authored Feb 6, 2024
1 parent f70e4a7 commit 89b67ec
Show file tree
Hide file tree
Showing 7 changed files with 247 additions and 55 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,6 @@ dmypy.json

# Pyre type checker
.pyre/

# JetBrains IDE config
.idea/
37 changes: 24 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,30 @@ target-clickhouse --about --format=markdown
```
-->

| Setting | Required | Default | Description |
|:---------------------|:--------:|:-------:|:------------|
| sqlalchemy_url | False | None | SQLAlchemy connection string |
| table_name | False | None | The name of the table to write to. |
| engine_type | False | MergeTree | The engine type to use for the table. This must be one of the following engine types: MergeTree, ReplacingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplicatedMergeTree, ReplicatedReplacingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree. |
| table_path | False | None | The table path for replicated tables. This is required when using any of the replication engines. Check out the [documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) for more information |
| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. |
| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. |
| default_target_schema| False | None | The default target database schema name to use for all streams. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |
| Setting | Required | Default | Description |
|:---------------------|:--------:|:-------:|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| sqlalchemy_url | False | None | The SQLAlchemy connection string for the ClickHouse database. Used if set, otherwise separate settings are used |
| driver | False | http | Driver type |
| username | False | default | Database user |
| password | False | None | Username password |
| host | False | localhost | Database host |
| port | False | 8123 | Database connection port |
| database | False | default | Database name |
| secure | False | 0 | Should the connection be secure |
| verify | False | 1 | Should secure connection need to verify SSL/TLS |
| engine_type | False | None | The engine type to use for the table. |
| table_name | False | None | The name of the table to write to. Defaults to stream name. |
| table_path | False | None | The table path for replicated tables. This is required when using any of the replication engines. Check out the [documentation](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replication#replicatedmergetree-parameters) for more information. Use `$table_name` to substitute the table name. |
| replica_name | False | None | The `replica_name` for replicated tables. This is required when using any of the replication engines. |
| cluster_name | False | None | The cluster to create tables in. This is passed as the `clickhouse_cluster` argument when creating a table. [Documentation](https://clickhouse.com/docs/en/sql-reference/distributed-ddl) can be found here. |
| default_target_schema| False | None | The default target database schema name to use for all streams. |
| optimize_after | False | 0 | Run 'OPTIMIZE TABLE' after data insert |
| optimize_after | False | 0 | Run 'OPTIMIZE TABLE' after data insert. Useful whentable engine removes duplicate rows. |
| load_method | False | TargetLoadMethods.APPEND_ONLY | The method to use when loading data into the destination. `append-only` will always write all input records whether that records already exists or not. `upsert` will update existing records and insert new records. `overwrite` will delete all existing records and insert all input records. |
| stream_maps | False | None | Config object for stream maps capability. For more information check out [Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html). |
| stream_map_config | False | None | User-defined config values to be used within map expressions. |
| flattening_enabled | False | None | 'True' to enable schema flattening and automatically expand nested properties. |
| flattening_max_depth | False | None | The max depth to flatten schemas. |

A full list of supported settings and capabilities is available by running: `target-clickhouse --about`

Expand Down
72 changes: 52 additions & 20 deletions target_clickhouse/connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
if TYPE_CHECKING:
from sqlalchemy.engine import Engine


class ClickhouseConnector(SQLConnector):
"""Clickhouse Meltano Connector.
Expand All @@ -40,12 +41,41 @@ def get_sqlalchemy_url(self, config: dict) -> str:
Args:
config: The configuration for the connector.
"""
return super().get_sqlalchemy_url(config)
if config.get("sqlalchemy_url"):
return super().get_sqlalchemy_url(config)

if config["driver"] == "http":
if config["secure"]:
secure_options = f"protocol=https&verify={config['verify']}"

if not config["verify"]:
# disable urllib3 warning
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
else:
secure_options = "protocol=http"
else:
secure_options = f"secure={config['secure']}&verify={config['verify']}"
return (
f"clickhouse+{config['driver']}://{config['username']}:{config['password']}@"
f"{config['host']}:{config['port']}/"
f"{config['database']}?{secure_options}"
)

def create_engine(self) -> Engine:
"""Create a SQLAlchemy engine for clickhouse."""
return create_engine(self.get_sqlalchemy_url(self.config))

@contextlib.contextmanager
def _connect(self) -> typing.Iterator[sqlalchemy.engine.Connection]:
# patch to overcome error in sqlalchemy-clickhouse driver
if self.config.get("driver") == "native":
kwargs = {"stream_results": True, "max_row_buffer": 1000}
else:
kwargs = {"stream_results": True}
with self._engine.connect().execution_options(**kwargs) as conn:
yield conn

def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
"""Return a JSON Schema representation of the provided type.
Expand Down Expand Up @@ -74,12 +104,12 @@ def to_sql_type(self, jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:
return sql_type

def create_empty_table(
self,
full_table_name: str,
schema: dict,
primary_keys: list[str] | None = None,
partition_keys: list[str] | None = None,
as_temp_table: bool = False, # noqa: FBT001, FBT002
self,
full_table_name: str,
schema: dict,
primary_keys: list[str] | None = None,
partition_keys: list[str] | None = None,
as_temp_table: bool = False, # noqa: FBT001, FBT002
) -> None:
"""Create an empty target table, using Clickhouse Engine.
Expand Down Expand Up @@ -141,7 +171,10 @@ def create_empty_table(
)

table_engine = create_engine_wrapper(
engine_type=engine_type, primary_keys=primary_keys, config=self.config,
engine_type=engine_type,
primary_keys=primary_keys,
table_name=table_name,
config=self.config,
)

table_args = {}
Expand All @@ -162,10 +195,10 @@ def prepare_schema(self, _: str) -> None:
return

def prepare_column(
self,
full_table_name: str,
column_name: str,
sql_type: sqlalchemy.types.TypeEngine,
self,
full_table_name: str,
column_name: str,
sql_type: sqlalchemy.types.TypeEngine,
) -> None:
"""Adapt target table to provided schema if possible.
Expand All @@ -189,12 +222,11 @@ def prepare_column(
sql_type=sql_type,
)


@staticmethod
def get_column_add_ddl(
table_name: str,
column_name: str,
column_type: sqlalchemy.types.TypeEngine,
table_name: str,
column_name: str,
column_type: sqlalchemy.types.TypeEngine,
) -> sqlalchemy.DDL:
"""Get the create column DDL statement.
Expand Down Expand Up @@ -226,10 +258,10 @@ def get_column_add_ddl(
)

def get_column_alter_ddl(
self,
table_name: str,
column_name: str,
column_type: sqlalchemy.types.TypeEngine,
self,
table_name: str,
column_name: str,
column_type: sqlalchemy.types.TypeEngine,
) -> sqlalchemy.DDL:
"""Get the alter column DDL statement.
Expand Down
10 changes: 10 additions & 0 deletions target_clickhouse/engine_class.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from enum import Enum
from string import Template
from typing import List, Optional

from clickhouse_sqlalchemy import engines
Expand Down Expand Up @@ -41,6 +42,7 @@ def get_engine_class(engine_type):
def create_engine_wrapper(
engine_type,
primary_keys: List[str],
table_name: str,
config: Optional[dict] = None,
):
# check if engine type is in supported engines
Expand All @@ -65,10 +67,18 @@ def create_engine_wrapper(
):
table_path: Optional[str] = config.get("table_path")
if table_path is not None:
if "$" in table_path:
table_path = Template(table_path).substitute(table_name=table_name)
engine_args["table_path"] = table_path
else:
msg = "Table path (table_path) is not defined."
raise ValueError(msg)
replica_name: Optional[str] = config.get("replica_name")
if replica_name is not None:
engine_args["replica_name"] = replica_name
else:
msg = "Replica name (replica_name) is not defined."
raise ValueError(msg)

engine_class = get_engine_class(engine_type)

Expand Down
10 changes: 9 additions & 1 deletion target_clickhouse/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,15 @@ def bulk_insert_records(
if isinstance(value, (dict, list)):
record[key] = json.dumps(value)

return super().bulk_insert_records(full_table_name, schema, records)
res = super().bulk_insert_records(full_table_name, schema, records)

if self.config.get("optimize_after", False):
with self.connector._connect() as conn, conn.begin(): # noqa: SLF001
self.logger.info("Optimizing table: %s", self.full_table_name)
conn.execute(sqlalchemy.text(
f"OPTIMIZE TABLE {self.full_table_name}"))

return res

def activate_version(self, new_version: int) -> None:
"""Bump the active version of the target table.
Expand Down
Loading

0 comments on commit 89b67ec

Please sign in to comment.