diff --git a/.gitignore b/.gitignore index 8970ed2c..30e306d9 100644 --- a/.gitignore +++ b/.gitignore @@ -173,6 +173,7 @@ deployments logs # local data +clickhouse_data postgres-data parquet-data data diff --git a/docker-compose.yml b/docker-compose.yml index b2eb8762..039749e4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: soft: 262144 hard: 262144 volumes: - - clickhouse_data:/var/lib/clickhouse + - ./clickhouse_data:/var/lib/clickhouse - ./parquet-data:/var/lib/clickhouse/user_files/parquet-data ports: - 8123:8123 diff --git a/indexers/scripts/clean.py b/indexers/scripts/clean_parquet.py similarity index 56% rename from indexers/scripts/clean.py rename to indexers/scripts/clean_parquet.py index d456a588..397fd38d 100644 --- a/indexers/scripts/clean.py +++ b/indexers/scripts/clean_parquet.py @@ -2,37 +2,49 @@ from pathlib import Path import pandas as pd import os +from utils import convert_case + +RAW_DATA_PATH = "/parquet-data/indexers/raw" +CLEAN_DATA_PATH = "/parquet-data/indexers/clean" def clean_parquet_files(network_name: str, protocol_name: str): - source_base = f"/parquet-data/indexers/raw/{network_name}/{protocol_name}" - target_base = f"/parquet-data/indexers/clean/{network_name}/{protocol_name}" + raw_path = Path(f"{RAW_DATA_PATH}/{network_name}/{protocol_name}") + clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") + + if not raw_path.exists(): + raise ValueError(f"Source path {raw_path} does not exist") - protocol_path = Path(source_base) - if not protocol_path.exists(): - raise ValueError(f"Source path {source_base} does not exist") - Path(target_base).mkdir(parents=True, exist_ok=True) + clean_path.mkdir(parents=True, exist_ok=True) - for block_range_dir in protocol_path.iterdir(): + for block_range_dir in sorted(raw_path.iterdir()): if not block_range_dir.is_dir(): continue + if "temp" in block_range_dir.name: + continue block_range = block_range_dir.name + empty_files = 0 + written_files = 0 for parquet_file in block_range_dir.glob("*.parquet"): event_name = parquet_file.stem - event_dir = Path(target_base) / event_name + event_dir = clean_path / event_name output_file = event_dir / f"{event_name}_{block_range}.parquet" - - # Skip if file already exists if output_file.exists(): continue - df = pd.read_parquet(parquet_file) if df.empty: + empty_files += 1 continue + df.columns = [convert_case(col) for col in df.columns] event_dir.mkdir(parents=True, exist_ok=True) df.to_parquet(output_file, index=False) - print(f"Processed {protocol_name} {block_range}") + written_files += 1 + print( + f"Processed {network_name}.{protocol_name}.{block_range}:\n" + f"\t empty raw files {empty_files}\n" + f"\t written files {written_files}" + ) if __name__ == "__main__": diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py new file mode 100644 index 00000000..0433bc9d --- /dev/null +++ b/indexers/scripts/import_parquet.py @@ -0,0 +1,62 @@ +import argparse +import os +from pathlib import Path +import clickhouse_connect +from utils import create_table_from_schema, insert_data_from_path + +CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" +CLEAN_DATA_PATH = "/parquet-data/indexers/clean" +SCHEMAS_PATH = "/parquet-data/indexers/schemas" + + +def init_tables_from_schemas(client, network_name: str, protocol_name: str): + print(f"Initializing tables for {network_name} {protocol_name}") + schema_path = Path(f"{SCHEMAS_PATH}/{network_name}/{protocol_name}") + db_name = f"raw_{network_name}" + + for schema_file in schema_path.glob("*.sql"): + event_name = schema_file.stem + table_name = f"{protocol_name}_{event_name}" + + client.command(f"drop table if exists {db_name}.{table_name}") + create_table_from_schema(client, str(schema_file)) + + +def import_parquet_files(client, network_name: str, protocol_name: str): + print(f"Inserting {network_name} {protocol_name} data into tables") + clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") + db_name = f"raw_{network_name}" + + for event_name in clean_path.iterdir(): + if not event_name.is_dir(): + continue + event_name = event_name.name + table_name = f"{protocol_name}_{event_name}" + file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet" + + insert_data_from_path(client, db_name, table_name, file_path) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--network_name", type=str) + parser.add_argument("--protocol_name", type=str) + args = parser.parse_args() + + network_name = os.getenv("NETWORK_NAME") or args.network_name + protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name + + if network_name is None or protocol_name is None: + raise ValueError("Network and protocol must be provided") + + client = clickhouse_connect.get_client( + host="clickhouse", + port=8123, + username="default", + ) + + db_name = f"raw_{network_name}" + client.command(f"CREATE DATABASE IF NOT EXISTS {db_name}") + + init_tables_from_schemas(client, network_name, protocol_name) + import_parquet_files(client, network_name, protocol_name) diff --git a/indexers/scripts/listener.py b/indexers/scripts/listener.py index 90e8d405..54fc09bc 100644 --- a/indexers/scripts/listener.py +++ b/indexers/scripts/listener.py @@ -1,24 +1,16 @@ -import argparse -import os from pathlib import Path import time -import re import pandas as pd from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import clickhouse_connect -from clickhouse_connect.driver.client import Client +from utils import create_table_from_path, insert_data_from_path, convert_case CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" RAW_DATA_PATH = "/parquet-data/indexers/raw" CLEAN_DATA_PATH = "/parquet-data/indexers/clean" -def convert_case(name): - snake_case = re.sub(r"(?