Skip to content

Commit

Permalink
Organize parquet scripts for indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-snx committed Dec 17, 2024
1 parent 9f8b1da commit 1474d4d
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 35 deletions.
32 changes: 20 additions & 12 deletions indexers/scripts/clean.py → indexers/scripts/clean_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,44 @@
import pandas as pd
import os

RAW_DATA_PATH = "/parquet-data/indexers/raw"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"


def clean_parquet_files(network_name: str, protocol_name: str):
source_base = f"/parquet-data/indexers/raw/{network_name}/{protocol_name}"
target_base = f"/parquet-data/indexers/clean/{network_name}/{protocol_name}"
raw_path = Path(f"{RAW_DATA_PATH}/{network_name}/{protocol_name}")
clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}")

if not raw_path.exists():
raise ValueError(f"Source path {raw_path} does not exist")

protocol_path = Path(source_base)
if not protocol_path.exists():
raise ValueError(f"Source path {source_base} does not exist")
Path(target_base).mkdir(parents=True, exist_ok=True)
clean_path.mkdir(parents=True, exist_ok=True)

for block_range_dir in protocol_path.iterdir():
for block_range_dir in raw_path.iterdir():
if not block_range_dir.is_dir():
continue
block_range = block_range_dir.name

empty_files = 0
written_files = 0
for parquet_file in block_range_dir.glob("*.parquet"):
event_name = parquet_file.stem
event_dir = Path(target_base) / event_name
event_dir = clean_path / event_name
output_file = event_dir / f"{event_name}_{block_range}.parquet"

# Skip if file already exists
if output_file.exists():
continue

df = pd.read_parquet(parquet_file)
if df.empty:
empty_files += 1
continue
event_dir.mkdir(parents=True, exist_ok=True)
df.to_parquet(output_file, index=False)
print(f"Processed {protocol_name} {block_range}")
written_files += 1
print(
f"Processed {network_name}.{protocol_name}.{block_range}: "
f"\t empty raw files {empty_files}, "
f"\t written files {written_files}"
)


if __name__ == "__main__":
Expand Down
44 changes: 44 additions & 0 deletions indexers/scripts/import_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import argparse
import os
from pathlib import Path
import clickhouse_connect
from .utils import create_table_from_path

CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"


def import_parquet_files(client, network_name: str, protocol_name: str):
clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}")
db_name = f"raw_{network_name}"

for event_name in clean_path.iterdir():
if not event_name.is_dir():
continue
event_name = event_name.name
table_name = f"{protocol_name}_{event_name}"
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet"

client.command(f"drop table if exists {db_name}.{table_name}")
create_table_from_path(client, db_name, table_name, file_path)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--network_name", type=str)
parser.add_argument("--protocol_name", type=str)
args = parser.parse_args()

network_name = os.getenv("NETWORK_NAME") or args.network_name
protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name

print(f"Cleaning {network_name} {protocol_name}")

if network_name is None or protocol_name is None:
raise ValueError("Network and protocol must be provided")

client = clickhouse_connect.get_client(
host="localhost", port=8123, username="default"
)

import_parquet_files(client, network_name, protocol_name)
30 changes: 7 additions & 23 deletions indexers/scripts/listener.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import argparse
import os
from pathlib import Path
import time
import re
import pandas as pd
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import clickhouse_connect
from clickhouse_connect.driver.client import Client
from utils import create_table_from_path, insert_data_from_path

CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean"
RAW_DATA_PATH = "/parquet-data/indexers/raw"
Expand Down Expand Up @@ -75,10 +73,14 @@ def _clean_parquet(self, path: str):
table_name = f"{protocol_name}_{event_name}"
clickhouse_file_path = f"{self.clickhouse_path}/{network_name}/{protocol_name}/{event_name}/{event_name}_{block_range}.parquet"
if not self.client.command(f"exists {db_name}.{table_name}"):
self._create_table_from_file(db_name, table_name, clickhouse_file_path)
create_table_from_path(
self.client, db_name, table_name, clickhouse_file_path
)
tables_created += 1
else:
self._insert_data_from_file(db_name, table_name, clickhouse_file_path)
insert_data_from_path(
self.client, db_name, table_name, clickhouse_file_path
)
data_insertions += 1

print(
Expand All @@ -87,24 +89,6 @@ def _clean_parquet(self, path: str):
f"tables created {tables_created}, data insertions {data_insertions}"
)

def _create_table_from_file(self, db_name: str, table_name: str, file_path: str):
query = (
f"create table if not exists {db_name}.{table_name} "
f"engine = MergeTree order by tuple() as "
f"select * from file('{file_path}', 'Parquet')"
)
try:
self.client.command(query)
except Exception as e:
print(f"Error creating table {db_name}.{table_name}: {e}")

def _insert_data_from_file(self, db_name: str, table_name: str, file_path: str):
query = f"insert into {db_name}.{table_name} select * from file('{file_path}', 'Parquet')"
try:
self.client.command(query)
except Exception as e:
print(f"Error inserting data into {db_name}.{table_name}: {e}")


def main():
event_handler = FolderEventHandler()
Expand Down
23 changes: 23 additions & 0 deletions indexers/scripts/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from clickhouse_connect.driver.client import Client


def create_table_from_path(client: Client, db_name: str, table_name: str, path: str):
query = (
f"create table if not exists {db_name}.{table_name} "
f"engine = MergeTree order by tuple() as "
f"select * from file('{path}', 'Parquet')"
)
try:
client.command(query)
except Exception as e:
print(f"Error creating table {db_name}.{table_name}: {e}")


def insert_data_from_path(client: Client, db_name: str, table_name: str, path: str):
query = (
f"insert into {db_name}.{table_name} select * from file('{path}', 'Parquet')"
)
try:
client.command(query)
except Exception as e:
print(f"Error inserting data into {db_name}.{table_name}: {e}")

0 comments on commit 1474d4d

Please sign in to comment.