diff --git a/.gitignore b/.gitignore index 8970ed2c..30e306d9 100644 --- a/.gitignore +++ b/.gitignore @@ -173,6 +173,7 @@ deployments logs # local data +clickhouse_data postgres-data parquet-data data diff --git a/docker-compose.yml b/docker-compose.yml index 2ed3748a..7a470d9a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,7 +9,7 @@ services: soft: 262144 hard: 262144 volumes: - - clickhouse_data:/var/lib/clickhouse + - ./clickhouse_data:/var/lib/clickhouse - ./parquet-data:/var/lib/clickhouse/user_files/parquet-data ports: - 8123:8123 diff --git a/indexers/scripts/clean_parquet.py b/indexers/scripts/clean_parquet.py index 5f9343cc..397fd38d 100644 --- a/indexers/scripts/clean_parquet.py +++ b/indexers/scripts/clean_parquet.py @@ -2,6 +2,7 @@ from pathlib import Path import pandas as pd import os +from utils import convert_case RAW_DATA_PATH = "/parquet-data/indexers/raw" CLEAN_DATA_PATH = "/parquet-data/indexers/clean" @@ -19,6 +20,8 @@ def clean_parquet_files(network_name: str, protocol_name: str): for block_range_dir in sorted(raw_path.iterdir()): if not block_range_dir.is_dir(): continue + if "temp" in block_range_dir.name: + continue block_range = block_range_dir.name empty_files = 0 @@ -33,6 +36,7 @@ def clean_parquet_files(network_name: str, protocol_name: str): if df.empty: empty_files += 1 continue + df.columns = [convert_case(col) for col in df.columns] event_dir.mkdir(parents=True, exist_ok=True) df.to_parquet(output_file, index=False) written_files += 1 diff --git a/indexers/scripts/import_parquet.py b/indexers/scripts/import_parquet.py index 7808fdcf..0433bc9d 100644 --- a/indexers/scripts/import_parquet.py +++ b/indexers/scripts/import_parquet.py @@ -9,7 +9,21 @@ SCHEMAS_PATH = "/parquet-data/indexers/schemas" +def init_tables_from_schemas(client, network_name: str, protocol_name: str): + print(f"Initializing tables for {network_name} {protocol_name}") + schema_path = Path(f"{SCHEMAS_PATH}/{network_name}/{protocol_name}") + db_name = f"raw_{network_name}" + + for schema_file in schema_path.glob("*.sql"): + event_name = schema_file.stem + table_name = f"{protocol_name}_{event_name}" + + client.command(f"drop table if exists {db_name}.{table_name}") + create_table_from_schema(client, str(schema_file)) + + def import_parquet_files(client, network_name: str, protocol_name: str): + print(f"Inserting {network_name} {protocol_name} data into tables") clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}") db_name = f"raw_{network_name}" @@ -19,10 +33,7 @@ def import_parquet_files(client, network_name: str, protocol_name: str): event_name = event_name.name table_name = f"{protocol_name}_{event_name}" file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet" - schema_path = f"{SCHEMAS_PATH}/{network_name}/{protocol_name}/{event_name}.sql" - client.command(f"drop table if exists {db_name}.{table_name}") - create_table_from_schema(client, schema_path) insert_data_from_path(client, db_name, table_name, file_path) @@ -35,8 +46,6 @@ def import_parquet_files(client, network_name: str, protocol_name: str): network_name = os.getenv("NETWORK_NAME") or args.network_name protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name - print(f"Importing {network_name} {protocol_name} to clickhouse") - if network_name is None or protocol_name is None: raise ValueError("Network and protocol must be provided") @@ -44,10 +53,10 @@ def import_parquet_files(client, network_name: str, protocol_name: str): host="clickhouse", port=8123, username="default", - # settings={"allow_experimental_json_type": 1}, ) db_name = f"raw_{network_name}" client.command(f"CREATE DATABASE IF NOT EXISTS {db_name}") + init_tables_from_schemas(client, network_name, protocol_name) import_parquet_files(client, network_name, protocol_name) diff --git a/indexers/scripts/listener.py b/indexers/scripts/listener.py index 74bcb6b9..54fc09bc 100644 --- a/indexers/scripts/listener.py +++ b/indexers/scripts/listener.py @@ -1,22 +1,16 @@ from pathlib import Path import time -import re import pandas as pd from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler import clickhouse_connect -from utils import create_table_from_path, insert_data_from_path +from utils import create_table_from_path, insert_data_from_path, convert_case CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean" RAW_DATA_PATH = "/parquet-data/indexers/raw" CLEAN_DATA_PATH = "/parquet-data/indexers/clean" -def convert_case(name): - snake_case = re.sub(r"(?