Skip to content

Commit

Permalink
Merge branch 'feat/indexer-scripts' into extractors-directory
Browse files Browse the repository at this point in the history
  • Loading branch information
Tburm committed Dec 19, 2024
2 parents 929bf49 + 3cac5c0 commit 06c5515
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 14 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ deployments
logs

# local data
clickhouse_data
postgres-data
parquet-data
data
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
soft: 262144
hard: 262144
volumes:
- clickhouse_data:/var/lib/clickhouse
- ./clickhouse_data:/var/lib/clickhouse
- ./parquet-data:/var/lib/clickhouse/user_files/parquet-data
ports:
- 8123:8123
Expand Down
4 changes: 4 additions & 0 deletions indexers/scripts/clean_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path
import pandas as pd
import os
from utils import convert_case

RAW_DATA_PATH = "/parquet-data/indexers/raw"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"
Expand All @@ -19,6 +20,8 @@ def clean_parquet_files(network_name: str, protocol_name: str):
for block_range_dir in sorted(raw_path.iterdir()):
if not block_range_dir.is_dir():
continue
if "temp" in block_range_dir.name:
continue
block_range = block_range_dir.name

empty_files = 0
Expand All @@ -33,6 +36,7 @@ def clean_parquet_files(network_name: str, protocol_name: str):
if df.empty:
empty_files += 1
continue
df.columns = [convert_case(col) for col in df.columns]
event_dir.mkdir(parents=True, exist_ok=True)
df.to_parquet(output_file, index=False)
written_files += 1
Expand Down
21 changes: 15 additions & 6 deletions indexers/scripts/import_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,21 @@
SCHEMAS_PATH = "/parquet-data/indexers/schemas"


def init_tables_from_schemas(client, network_name: str, protocol_name: str):
print(f"Initializing tables for {network_name} {protocol_name}")
schema_path = Path(f"{SCHEMAS_PATH}/{network_name}/{protocol_name}")
db_name = f"raw_{network_name}"

for schema_file in schema_path.glob("*.sql"):
event_name = schema_file.stem
table_name = f"{protocol_name}_{event_name}"

client.command(f"drop table if exists {db_name}.{table_name}")
create_table_from_schema(client, str(schema_file))


def import_parquet_files(client, network_name: str, protocol_name: str):
print(f"Inserting {network_name} {protocol_name} data into tables")
clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}")
db_name = f"raw_{network_name}"

Expand All @@ -19,10 +33,7 @@ def import_parquet_files(client, network_name: str, protocol_name: str):
event_name = event_name.name
table_name = f"{protocol_name}_{event_name}"
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet"
schema_path = f"{SCHEMAS_PATH}/{network_name}/{protocol_name}/{event_name}.sql"

client.command(f"drop table if exists {db_name}.{table_name}")
create_table_from_schema(client, schema_path)
insert_data_from_path(client, db_name, table_name, file_path)


Expand All @@ -35,19 +46,17 @@ def import_parquet_files(client, network_name: str, protocol_name: str):
network_name = os.getenv("NETWORK_NAME") or args.network_name
protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name

print(f"Importing {network_name} {protocol_name} to clickhouse")

if network_name is None or protocol_name is None:
raise ValueError("Network and protocol must be provided")

client = clickhouse_connect.get_client(
host="clickhouse",
port=8123,
username="default",
# settings={"allow_experimental_json_type": 1},
)

db_name = f"raw_{network_name}"
client.command(f"CREATE DATABASE IF NOT EXISTS {db_name}")

init_tables_from_schemas(client, network_name, protocol_name)
import_parquet_files(client, network_name, protocol_name)
8 changes: 1 addition & 7 deletions indexers/scripts/listener.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
from pathlib import Path
import time
import re
import pandas as pd
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import clickhouse_connect
from utils import create_table_from_path, insert_data_from_path
from utils import create_table_from_path, insert_data_from_path, convert_case

CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean"
RAW_DATA_PATH = "/parquet-data/indexers/raw"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"


def convert_case(name):
snake_case = re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
return snake_case


class FolderEventHandler(FileSystemEventHandler):
def __init__(self):
super().__init__()
Expand Down
6 changes: 6 additions & 0 deletions indexers/scripts/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from clickhouse_connect.driver.client import Client


Expand All @@ -22,3 +23,8 @@ def insert_data_from_path(client: Client, db_name: str, table_name: str, path: s
client.command(query)
except Exception as e:
print(f"Error inserting data into {db_name}.{table_name}: {e}")


def convert_case(name):
snake_case = re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
return snake_case

0 comments on commit 06c5515

Please sign in to comment.