diff --git a/indexers/scripts/clean_parquet.py b/indexers/scripts/clean_parquet.py index 2e1c0248..5f9343cc 100644 --- a/indexers/scripts/clean_parquet.py +++ b/indexers/scripts/clean_parquet.py @@ -16,7 +16,7 @@ def clean_parquet_files(network_name: str, protocol_name: str): clean_path.mkdir(parents=True, exist_ok=True) - for block_range_dir in raw_path.iterdir(): + for block_range_dir in sorted(raw_path.iterdir()): if not block_range_dir.is_dir(): continue block_range = block_range_dir.name @@ -37,8 +37,8 @@ def clean_parquet_files(network_name: str, protocol_name: str): df.to_parquet(output_file, index=False) written_files += 1 print( - f"Processed {network_name}.{protocol_name}.{block_range}: " - f"\t empty raw files {empty_files}, " + f"Processed {network_name}.{protocol_name}.{block_range}:\n" + f"\t empty raw files {empty_files}\n" f"\t written files {written_files}" ) diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py index fd729a35..9e7ba907 100644 --- a/indexers/scripts/import_parquet.py +++ b/indexers/scripts/import_parquet.py @@ -2,10 +2,11 @@ import os from pathlib import Path import clickhouse_connect -from .utils import create_table_from_path +from utils import create_table_from_schema, insert_data_from_path CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" CLEAN_DATA_PATH = "/parquet-data/indexers/clean" +SCHEMAS_PATH = "/parquet-data/indexers/schemas" def import_parquet_files(client, network_name: str, protocol_name: str): @@ -18,9 +19,11 @@ def import_parquet_files(client, network_name: str, protocol_name: str): event_name = event_name.name table_name = f"{protocol_name}_{event_name}" file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet" + schema_path = f"{SCHEMAS_PATH}/{network_name}/{protocol_name}/{event_name}.sql" client.command(f"drop table if exists {db_name}.{table_name}") - create_table_from_path(client, db_name, table_name, file_path) + create_table_from_schema(client, schema_path) + insert_data_from_path(client, db_name, table_name, file_path) if __name__ == "__main__": @@ -32,13 +35,13 @@ def import_parquet_files(client, network_name: str, protocol_name: str): network_name = os.getenv("NETWORK_NAME") or args.network_name protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name - print(f"Cleaning {network_name} {protocol_name}") + print(f"Importing {network_name} {protocol_name} to clickhouse") if network_name is None or protocol_name is None: raise ValueError("Network and protocol must be provided") client = clickhouse_connect.get_client( - host="localhost", port=8123, username="default" + host="clickhouse", port=8123, username="default" ) import_parquet_files(client, network_name, protocol_name) diff --git a/indexers/scripts/utils.py b/indexers/scripts/utils.py index b0100643..91f064b6 100644 --- a/indexers/scripts/utils.py +++ b/indexers/scripts/utils.py @@ -1,16 +1,17 @@ from clickhouse_connect.driver.client import Client -def create_table_from_path(client: Client, db_name: str, table_name: str, path: str): - query = ( - f"create table if not exists {db_name}.{table_name} " - f"engine = MergeTree order by tuple() as " - f"select * from file('{path}', 'Parquet')" - ) +def create_table_from_schema(client: Client, path: str): + try: + with open(path, "r") as file: + query = file.read() + except FileNotFoundError: + print(f"Schema file {path} not found") + return try: client.command(query) except Exception as e: - print(f"Error creating table {db_name}.{table_name}: {e}") + print(f"Error creating table from schema {path}: {e}") def insert_data_from_path(client: Client, db_name: str, table_name: str, path: str):