From 1474d4d182886c3e2038bbc1534bfa6399fb6946 Mon Sep 17 00:00:00 2001 From: marcus-snx Date: Tue, 17 Dec 2024 17:56:14 +0200 Subject: [PATCH 1/3] Organize parquet scripts for indexer --- .../scripts/{clean.py => clean_parquet.py} | 32 +++++++++----- indexers/scripts/import_parquet.py | 44 +++++++++++++++++++ indexers/scripts/listener.py | 30 +++---------- indexers/scripts/utils.py | 23 ++++++++++ 4 files changed, 94 insertions(+), 35 deletions(-) rename indexers/scripts/{clean.py => clean_parquet.py} (61%) create mode 100644 indexers/scripts/import_parquet.py create mode 100644 indexers/scripts/utils.py diff --git a/indexers/scripts/clean.py b/indexers/scripts/clean_parquet.py similarity index 61% rename from indexers/scripts/clean.py rename to indexers/scripts/clean_parquet.py index d456a588..2e1c0248 100644 --- a/indexers/scripts/clean.py +++ b/indexers/scripts/clean_parquet.py @@ -3,36 +3,44 @@ import pandas as pd import os +RAW_DATA_PATH = "/parquet-data/indexers/raw" +CLEAN_DATA_PATH = "/parquet-data/indexers/clean" + def clean_parquet_files(network_name: str, protocol_name: str): - source_base = f"/parquet-data/indexers/raw/{network_name}/{protocol_name}" - target_base = f"/parquet-data/indexers/clean/{network_name}/{protocol_name}" + raw_path = Path(f"{RAW_DATA_PATH}/{network_name}/{protocol_name}") + clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") + + if not raw_path.exists(): + raise ValueError(f"Source path {raw_path} does not exist") - protocol_path = Path(source_base) - if not protocol_path.exists(): - raise ValueError(f"Source path {source_base} does not exist") - Path(target_base).mkdir(parents=True, exist_ok=True) + clean_path.mkdir(parents=True, exist_ok=True) - for block_range_dir in protocol_path.iterdir(): + for block_range_dir in raw_path.iterdir(): if not block_range_dir.is_dir(): continue block_range = block_range_dir.name + empty_files = 0 + written_files = 0 for parquet_file in block_range_dir.glob("*.parquet"): event_name = parquet_file.stem - event_dir = Path(target_base) / event_name + event_dir = clean_path / event_name output_file = event_dir / f"{event_name}_{block_range}.parquet" - - # Skip if file already exists if output_file.exists(): continue - df = pd.read_parquet(parquet_file) if df.empty: + empty_files += 1 continue event_dir.mkdir(parents=True, exist_ok=True) df.to_parquet(output_file, index=False) - print(f"Processed {protocol_name} {block_range}") + written_files += 1 + print( + f"Processed {network_name}.{protocol_name}.{block_range}: " + f"\t empty raw files {empty_files}, " + f"\t written files {written_files}" + ) if __name__ == "__main__": diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py new file mode 100644 index 00000000..fd729a35 --- /dev/null +++ b/indexers/scripts/import_parquet.py @@ -0,0 +1,44 @@ +import argparse +import os +from pathlib import Path +import clickhouse_connect +from .utils import create_table_from_path + +CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" +CLEAN_DATA_PATH = "/parquet-data/indexers/clean" + + +def import_parquet_files(client, network_name: str, protocol_name: str): + clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") + db_name = f"raw_{network_name}" + + for event_name in clean_path.iterdir(): + if not event_name.is_dir(): + continue + event_name = event_name.name + table_name = f"{protocol_name}_{event_name}" + file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet" + + client.command(f"drop table if exists {db_name}.{table_name}") + create_table_from_path(client, db_name, table_name, file_path) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--network_name", type=str) + parser.add_argument("--protocol_name", type=str) + args = parser.parse_args() + + network_name = os.getenv("NETWORK_NAME") or args.network_name + protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name + + print(f"Cleaning {network_name} {protocol_name}") + + if network_name is None or protocol_name is None: + raise ValueError("Network and protocol must be provided") + + client = clickhouse_connect.get_client( + host="localhost", port=8123, username="default" + ) + + import_parquet_files(client, network_name, protocol_name) diff --git a/indexers/scripts/listener.py b/indexers/scripts/listener.py index 90e8d405..74bcb6b9 100644 --- a/indexers/scripts/listener.py +++ b/indexers/scripts/listener.py @@ -1,5 +1,3 @@ -import argparse -import os from pathlib import Path import time import re @@ -7,7 +5,7 @@ from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import clickhouse_connect -from clickhouse_connect.driver.client import Client +from utils import create_table_from_path, insert_data_from_path CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" RAW_DATA_PATH = "/parquet-data/indexers/raw" @@ -75,10 +73,14 @@ def _clean_parquet(self, path: str): table_name = f"{protocol_name}_{event_name}" clickhouse_file_path = f"{self.clickhouse_path}/{network_name}/{protocol_name}/{event_name}/{event_name}_{block_range}.parquet" if not self.client.command(f"exists {db_name}.{table_name}"): - self._create_table_from_file(db_name, table_name, clickhouse_file_path) + create_table_from_path( + self.client, db_name, table_name, clickhouse_file_path + ) tables_created += 1 else: - self._insert_data_from_file(db_name, table_name, clickhouse_file_path) + insert_data_from_path( + self.client, db_name, table_name, clickhouse_file_path + ) data_insertions += 1 print( @@ -87,24 +89,6 @@ def _clean_parquet(self, path: str): f"tables created {tables_created}, data insertions {data_insertions}" ) - def _create_table_from_file(self, db_name: str, table_name: str, file_path: str): - query = ( - f"create table if not exists {db_name}.{table_name} " - f"engine = MergeTree order by tuple() as " - f"select * from file('{file_path}', 'Parquet')" - ) - try: - self.client.command(query) - except Exception as e: - print(f"Error creating table {db_name}.{table_name}: {e}") - - def _insert_data_from_file(self, db_name: str, table_name: str, file_path: str): - query = f"insert into {db_name}.{table_name} select * from file('{file_path}', 'Parquet')" - try: - self.client.command(query) - except Exception as e: - print(f"Error inserting data into {db_name}.{table_name}: {e}") - def main(): event_handler = FolderEventHandler() diff --git a/indexers/scripts/utils.py b/indexers/scripts/utils.py new file mode 100644 index 00000000..b0100643 --- /dev/null +++ b/indexers/scripts/utils.py @@ -0,0 +1,23 @@ +from clickhouse_connect.driver.client import Client + + +def create_table_from_path(client: Client, db_name: str, table_name: str, path: str): + query = ( + f"create table if not exists {db_name}.{table_name} " + f"engine = MergeTree order by tuple() as " + f"select * from file('{path}', 'Parquet')" + ) + try: + client.command(query) + except Exception as e: + print(f"Error creating table {db_name}.{table_name}: {e}") + + +def insert_data_from_path(client: Client, db_name: str, table_name: str, path: str): + query = ( + f"insert into {db_name}.{table_name} select * from file('{path}', 'Parquet')" + ) + try: + client.command(query) + except Exception as e: + print(f"Error inserting data into {db_name}.{table_name}: {e}") From c0bf476e31f8298255cc60d2d49f6cfedefda40c Mon Sep 17 00:00:00 2001 From: marcus-snx Date: Wed, 18 Dec 2024 19:55:23 +0200 Subject: [PATCH 2/3] Update scripts --- indexers/scripts/clean_parquet.py | 6 +++--- indexers/scripts/import_parquet.py | 11 +++++++---- indexers/scripts/utils.py | 15 ++++++++------- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/indexers/scripts/clean_parquet.py b/indexers/scripts/clean_parquet.py index 2e1c0248..5f9343cc 100644 --- a/indexers/scripts/clean_parquet.py +++ b/indexers/scripts/clean_parquet.py @@ -16,7 +16,7 @@ def clean_parquet_files(network_name: str, protocol_name: str): clean_path.mkdir(parents=True, exist_ok=True) - for block_range_dir in raw_path.iterdir(): + for block_range_dir in sorted(raw_path.iterdir()): if not block_range_dir.is_dir(): continue block_range = block_range_dir.name @@ -37,8 +37,8 @@ def clean_parquet_files(network_name: str, protocol_name: str): df.to_parquet(output_file, index=False) written_files += 1 print( - f"Processed {network_name}.{protocol_name}.{block_range}: " - f"\t empty raw files {empty_files}, " + f"Processed {network_name}.{protocol_name}.{block_range}:\n" + f"\t empty raw files {empty_files}\n" f"\t written files {written_files}" ) diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py index fd729a35..9e7ba907 100644 --- a/indexers/scripts/import_parquet.py +++ b/indexers/scripts/import_parquet.py @@ -2,10 +2,11 @@ import os from pathlib import Path import clickhouse_connect -from .utils import create_table_from_path +from utils import create_table_from_schema, insert_data_from_path CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" CLEAN_DATA_PATH = "/parquet-data/indexers/clean" +SCHEMAS_PATH = "/parquet-data/indexers/schemas" def import_parquet_files(client, network_name: str, protocol_name: str): @@ -18,9 +19,11 @@ def import_parquet_files(client, network_name: str, protocol_name: str): event_name = event_name.name table_name = f"{protocol_name}_{event_name}" file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet" + schema_path = f"{SCHEMAS_PATH}/{network_name}/{protocol_name}/{event_name}.sql" client.command(f"drop table if exists {db_name}.{table_name}") - create_table_from_path(client, db_name, table_name, file_path) + create_table_from_schema(client, schema_path) + insert_data_from_path(client, db_name, table_name, file_path) if __name__ == "__main__": @@ -32,13 +35,13 @@ def import_parquet_files(client, network_name: str, protocol_name: str): network_name = os.getenv("NETWORK_NAME") or args.network_name protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name - print(f"Cleaning {network_name} {protocol_name}") + print(f"Importing {network_name} {protocol_name} to clickhouse") if network_name is None or protocol_name is None: raise ValueError("Network and protocol must be provided") client = clickhouse_connect.get_client( - host="localhost", port=8123, username="default" + host="clickhouse", port=8123, username="default" ) import_parquet_files(client, network_name, protocol_name) diff --git a/indexers/scripts/utils.py b/indexers/scripts/utils.py index b0100643..91f064b6 100644 --- a/indexers/scripts/utils.py +++ b/indexers/scripts/utils.py @@ -1,16 +1,17 @@ from clickhouse_connect.driver.client import Client -def create_table_from_path(client: Client, db_name: str, table_name: str, path: str): - query = ( - f"create table if not exists {db_name}.{table_name} " - f"engine = MergeTree order by tuple() as " - f"select * from file('{path}', 'Parquet')" - ) +def create_table_from_schema(client: Client, path: str): + try: + with open(path, "r") as file: + query = file.read() + except FileNotFoundError: + print(f"Schema file {path} not found") + return try: client.command(query) except Exception as e: - print(f"Error creating table {db_name}.{table_name}: {e}") + print(f"Error creating table from schema {path}: {e}") def insert_data_from_path(client: Client, db_name: str, table_name: str, path: str): From e7204262962ad8dc72428c655c39f92ba6c9ccb2 Mon Sep 17 00:00:00 2001 From: marcus-snx Date: Wed, 18 Dec 2024 20:22:24 +0200 Subject: [PATCH 3/3] Store objects and arrays as string --- indexers/scripts/import_parquet.py | 8 +++++++- indexers/utils/clickhouse_schema.py | 9 +++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py index 9e7ba907..7808fdcf 100644 --- a/indexers/scripts/import_parquet.py +++ b/indexers/scripts/import_parquet.py @@ -41,7 +41,13 @@ def import_parquet_files(client, network_name: str, protocol_name: str): raise ValueError("Network and protocol must be provided") client = clickhouse_connect.get_client( - host="clickhouse", port=8123, username="default" + host="clickhouse", + port=8123, + username="default", + # settings={"allow_experimental_json_type": 1}, ) + db_name = f"raw_{network_name}" + client.command(f"CREATE DATABASE IF NOT EXISTS {db_name}") + import_parquet_files(client, network_name, protocol_name) diff --git a/indexers/utils/clickhouse_schema.py b/indexers/utils/clickhouse_schema.py index b1e7edd8..fee006eb 100644 --- a/indexers/utils/clickhouse_schema.py +++ b/indexers/utils/clickhouse_schema.py @@ -11,8 +11,6 @@ def to_snake(name): def map_to_clickhouse_type(sol_type): if sol_type in ["bytes32", "address", "string"]: return "String" - elif re.search(r"\(.*\)|\[\[$", sol_type): - return "JSON" elif re.match(r"uint\d+$", sol_type): bit_size = int(re.search(r"\d+", sol_type).group()) if bit_size <= 8: @@ -43,10 +41,13 @@ def map_to_clickhouse_type(sol_type): return "Int256" elif sol_type == "bool": return "Bool" + elif re.search(r"\(.*\)|\[\[$", sol_type): + return "String" elif sol_type.endswith("[]"): base_type = sol_type[:-2] - clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})" - return clickhouse_type + # clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})" + # return clickhouse_type + return "String" raise ValueError(f"Type {sol_type} not mapped")