Skip to content

Commit

Permalink
Update scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-snx committed Dec 18, 2024
1 parent 1474d4d commit c0bf476
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 14 deletions.
6 changes: 3 additions & 3 deletions indexers/scripts/clean_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def clean_parquet_files(network_name: str, protocol_name: str):

clean_path.mkdir(parents=True, exist_ok=True)

for block_range_dir in raw_path.iterdir():
for block_range_dir in sorted(raw_path.iterdir()):
if not block_range_dir.is_dir():
continue
block_range = block_range_dir.name
Expand All @@ -37,8 +37,8 @@ def clean_parquet_files(network_name: str, protocol_name: str):
df.to_parquet(output_file, index=False)
written_files += 1
print(
f"Processed {network_name}.{protocol_name}.{block_range}: "
f"\t empty raw files {empty_files}, "
f"Processed {network_name}.{protocol_name}.{block_range}:\n"
f"\t empty raw files {empty_files}\n"
f"\t written files {written_files}"
)

Expand Down
11 changes: 7 additions & 4 deletions indexers/scripts/import_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import os
from pathlib import Path
import clickhouse_connect
from .utils import create_table_from_path
from utils import create_table_from_schema, insert_data_from_path

CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"
SCHEMAS_PATH = "/parquet-data/indexers/schemas"


def import_parquet_files(client, network_name: str, protocol_name: str):
Expand All @@ -18,9 +19,11 @@ def import_parquet_files(client, network_name: str, protocol_name: str):
event_name = event_name.name
table_name = f"{protocol_name}_{event_name}"
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet"
schema_path = f"{SCHEMAS_PATH}/{network_name}/{protocol_name}/{event_name}.sql"

client.command(f"drop table if exists {db_name}.{table_name}")
create_table_from_path(client, db_name, table_name, file_path)
create_table_from_schema(client, schema_path)
insert_data_from_path(client, db_name, table_name, file_path)


if __name__ == "__main__":
Expand All @@ -32,13 +35,13 @@ def import_parquet_files(client, network_name: str, protocol_name: str):
network_name = os.getenv("NETWORK_NAME") or args.network_name
protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name

print(f"Cleaning {network_name} {protocol_name}")
print(f"Importing {network_name} {protocol_name} to clickhouse")

if network_name is None or protocol_name is None:
raise ValueError("Network and protocol must be provided")

client = clickhouse_connect.get_client(
host="localhost", port=8123, username="default"
host="clickhouse", port=8123, username="default"
)

import_parquet_files(client, network_name, protocol_name)
15 changes: 8 additions & 7 deletions indexers/scripts/utils.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
from clickhouse_connect.driver.client import Client


def create_table_from_path(client: Client, db_name: str, table_name: str, path: str):
query = (
f"create table if not exists {db_name}.{table_name} "
f"engine = MergeTree order by tuple() as "
f"select * from file('{path}', 'Parquet')"
)
def create_table_from_schema(client: Client, path: str):
try:
with open(path, "r") as file:
query = file.read()
except FileNotFoundError:
print(f"Schema file {path} not found")
return
try:
client.command(query)
except Exception as e:
print(f"Error creating table {db_name}.{table_name}: {e}")
print(f"Error creating table from schema {path}: {e}")


def insert_data_from_path(client: Client, db_name: str, table_name: str, path: str):
Expand Down

0 comments on commit c0bf476

Please sign in to comment.