diff --git a/indexers/scripts/clean.py b/indexers/scripts/clean_parquet.py similarity index 61% rename from indexers/scripts/clean.py rename to indexers/scripts/clean_parquet.py index d456a588..5f9343cc 100644 --- a/indexers/scripts/clean.py +++ b/indexers/scripts/clean_parquet.py @@ -3,36 +3,44 @@ import pandas as pd import os +RAW_DATA_PATH = "/parquet-data/indexers/raw" +CLEAN_DATA_PATH = "/parquet-data/indexers/clean" + def clean_parquet_files(network_name: str, protocol_name: str): - source_base = f"/parquet-data/indexers/raw/{network_name}/{protocol_name}" - target_base = f"/parquet-data/indexers/clean/{network_name}/{protocol_name}" + raw_path = Path(f"{RAW_DATA_PATH}/{network_name}/{protocol_name}") + clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") + + if not raw_path.exists(): + raise ValueError(f"Source path {raw_path} does not exist") - protocol_path = Path(source_base) - if not protocol_path.exists(): - raise ValueError(f"Source path {source_base} does not exist") - Path(target_base).mkdir(parents=True, exist_ok=True) + clean_path.mkdir(parents=True, exist_ok=True) - for block_range_dir in protocol_path.iterdir(): + for block_range_dir in sorted(raw_path.iterdir()): if not block_range_dir.is_dir(): continue block_range = block_range_dir.name + empty_files = 0 + written_files = 0 for parquet_file in block_range_dir.glob("*.parquet"): event_name = parquet_file.stem - event_dir = Path(target_base) / event_name + event_dir = clean_path / event_name output_file = event_dir / f"{event_name}_{block_range}.parquet" - - # Skip if file already exists if output_file.exists(): continue - df = pd.read_parquet(parquet_file) if df.empty: + empty_files += 1 continue event_dir.mkdir(parents=True, exist_ok=True) df.to_parquet(output_file, index=False) - print(f"Processed {protocol_name} {block_range}") + written_files += 1 + print( + f"Processed {network_name}.{protocol_name}.{block_range}:\n" + f"\t empty raw files {empty_files}\n" + f"\t written files {written_files}" + ) if __name__ == "__main__": diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py new file mode 100644 index 00000000..7808fdcf --- /dev/null +++ b/indexers/scripts/import_parquet.py @@ -0,0 +1,53 @@ +import argparse +import os +from pathlib import Path +import clickhouse_connect +from utils import create_table_from_schema, insert_data_from_path + +CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" +CLEAN_DATA_PATH = "/parquet-data/indexers/clean" +SCHEMAS_PATH = "/parquet-data/indexers/schemas" + + +def import_parquet_files(client, network_name: str, protocol_name: str): + clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") + db_name = f"raw_{network_name}" + + for event_name in clean_path.iterdir(): + if not event_name.is_dir(): + continue + event_name = event_name.name + table_name = f"{protocol_name}_{event_name}" + file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet" + schema_path = f"{SCHEMAS_PATH}/{network_name}/{protocol_name}/{event_name}.sql" + + client.command(f"drop table if exists {db_name}.{table_name}") + create_table_from_schema(client, schema_path) + insert_data_from_path(client, db_name, table_name, file_path) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--network_name", type=str) + parser.add_argument("--protocol_name", type=str) + args = parser.parse_args() + + network_name = os.getenv("NETWORK_NAME") or args.network_name + protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name + + print(f"Importing {network_name} {protocol_name} to clickhouse") + + if network_name is None or protocol_name is None: + raise ValueError("Network and protocol must be provided") + + client = clickhouse_connect.get_client( + host="clickhouse", + port=8123, + username="default", + # settings={"allow_experimental_json_type": 1}, + ) + + db_name = f"raw_{network_name}" + client.command(f"CREATE DATABASE IF NOT EXISTS {db_name}") + + import_parquet_files(client, network_name, protocol_name) diff --git a/indexers/scripts/listener.py b/indexers/scripts/listener.py index 90e8d405..74bcb6b9 100644 --- a/indexers/scripts/listener.py +++ b/indexers/scripts/listener.py @@ -1,5 +1,3 @@ -import argparse -import os from pathlib import Path import time import re @@ -7,7 +5,7 @@ from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import clickhouse_connect -from clickhouse_connect.driver.client import Client +from utils import create_table_from_path, insert_data_from_path CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" RAW_DATA_PATH = "/parquet-data/indexers/raw" @@ -75,10 +73,14 @@ def _clean_parquet(self, path: str): table_name = f"{protocol_name}_{event_name}" clickhouse_file_path = f"{self.clickhouse_path}/{network_name}/{protocol_name}/{event_name}/{event_name}_{block_range}.parquet" if not self.client.command(f"exists {db_name}.{table_name}"): - self._create_table_from_file(db_name, table_name, clickhouse_file_path) + create_table_from_path( + self.client, db_name, table_name, clickhouse_file_path + ) tables_created += 1 else: - self._insert_data_from_file(db_name, table_name, clickhouse_file_path) + insert_data_from_path( + self.client, db_name, table_name, clickhouse_file_path + ) data_insertions += 1 print( @@ -87,24 +89,6 @@ def _clean_parquet(self, path: str): f"tables created {tables_created}, data insertions {data_insertions}" ) - def _create_table_from_file(self, db_name: str, table_name: str, file_path: str): - query = ( - f"create table if not exists {db_name}.{table_name} " - f"engine = MergeTree order by tuple() as " - f"select * from file('{file_path}', 'Parquet')" - ) - try: - self.client.command(query) - except Exception as e: - print(f"Error creating table {db_name}.{table_name}: {e}") - - def _insert_data_from_file(self, db_name: str, table_name: str, file_path: str): - query = f"insert into {db_name}.{table_name} select * from file('{file_path}', 'Parquet')" - try: - self.client.command(query) - except Exception as e: - print(f"Error inserting data into {db_name}.{table_name}: {e}") - def main(): event_handler = FolderEventHandler() diff --git a/indexers/scripts/utils.py b/indexers/scripts/utils.py new file mode 100644 index 00000000..91f064b6 --- /dev/null +++ b/indexers/scripts/utils.py @@ -0,0 +1,24 @@ +from clickhouse_connect.driver.client import Client + + +def create_table_from_schema(client: Client, path: str): + try: + with open(path, "r") as file: + query = file.read() + except FileNotFoundError: + print(f"Schema file {path} not found") + return + try: + client.command(query) + except Exception as e: + print(f"Error creating table from schema {path}: {e}") + + +def insert_data_from_path(client: Client, db_name: str, table_name: str, path: str): + query = ( + f"insert into {db_name}.{table_name} select * from file('{path}', 'Parquet')" + ) + try: + client.command(query) + except Exception as e: + print(f"Error inserting data into {db_name}.{table_name}: {e}") diff --git a/indexers/utils/clickhouse_schema.py b/indexers/utils/clickhouse_schema.py index aec4af59..9a4fddb1 100644 --- a/indexers/utils/clickhouse_schema.py +++ b/indexers/utils/clickhouse_schema.py @@ -15,8 +15,6 @@ def to_snake(name): def map_to_clickhouse_type(sol_type): if sol_type in ["address", "string"] or sol_type.startswith("bytes"): return "String" - elif re.search(r"\(.*\)|\[\[$", sol_type): - return "JSON" elif re.match(r"uint\d+$", sol_type): bit_size = int(re.search(r"\d+", sol_type).group()) if bit_size <= 8: @@ -47,10 +45,13 @@ def map_to_clickhouse_type(sol_type): return "Int256" elif sol_type == "bool": return "Bool" + elif re.search(r"\(.*\)|\[\[$", sol_type): + return "String" elif sol_type.endswith("[]"): base_type = sol_type[:-2] - clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})" - return clickhouse_type + # clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})" + # return clickhouse_type + return "String" raise ValueError(f"Type {sol_type} not mapped")