Skip to content

Commit

Permalink
Merge branch 'feat/indexer-scripts' into extractors-directory
Browse files Browse the repository at this point in the history
  • Loading branch information
Tburm committed Dec 19, 2024
2 parents 681e569 + 9c225ef commit 9a7ac97
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 39 deletions.
32 changes: 20 additions & 12 deletions indexers/scripts/clean.py → indexers/scripts/clean_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,44 @@
import pandas as pd
import os

RAW_DATA_PATH = "/parquet-data/indexers/raw"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"


def clean_parquet_files(network_name: str, protocol_name: str):
source_base = f"/parquet-data/indexers/raw/{network_name}/{protocol_name}"
target_base = f"/parquet-data/indexers/clean/{network_name}/{protocol_name}"
raw_path = Path(f"{RAW_DATA_PATH}/{network_name}/{protocol_name}")
clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}")

if not raw_path.exists():
raise ValueError(f"Source path {raw_path} does not exist")

protocol_path = Path(source_base)
if not protocol_path.exists():
raise ValueError(f"Source path {source_base} does not exist")
Path(target_base).mkdir(parents=True, exist_ok=True)
clean_path.mkdir(parents=True, exist_ok=True)

for block_range_dir in protocol_path.iterdir():
for block_range_dir in sorted(raw_path.iterdir()):
if not block_range_dir.is_dir():
continue
block_range = block_range_dir.name

empty_files = 0
written_files = 0
for parquet_file in block_range_dir.glob("*.parquet"):
event_name = parquet_file.stem
event_dir = Path(target_base) / event_name
event_dir = clean_path / event_name
output_file = event_dir / f"{event_name}_{block_range}.parquet"

# Skip if file already exists
if output_file.exists():
continue

df = pd.read_parquet(parquet_file)
if df.empty:
empty_files += 1
continue
event_dir.mkdir(parents=True, exist_ok=True)
df.to_parquet(output_file, index=False)
print(f"Processed {protocol_name} {block_range}")
written_files += 1
print(
f"Processed {network_name}.{protocol_name}.{block_range}:\n"
f"\t empty raw files {empty_files}\n"
f"\t written files {written_files}"
)


if __name__ == "__main__":
Expand Down
53 changes: 53 additions & 0 deletions indexers/scripts/import_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import argparse
import os
from pathlib import Path
import clickhouse_connect
from utils import create_table_from_schema, insert_data_from_path

CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"
SCHEMAS_PATH = "/parquet-data/indexers/schemas"


def import_parquet_files(client, network_name: str, protocol_name: str):
clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}")
db_name = f"raw_{network_name}"

for event_name in clean_path.iterdir():
if not event_name.is_dir():
continue
event_name = event_name.name
table_name = f"{protocol_name}_{event_name}"
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet"
schema_path = f"{SCHEMAS_PATH}/{network_name}/{protocol_name}/{event_name}.sql"

client.command(f"drop table if exists {db_name}.{table_name}")
create_table_from_schema(client, schema_path)
insert_data_from_path(client, db_name, table_name, file_path)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--network_name", type=str)
parser.add_argument("--protocol_name", type=str)
args = parser.parse_args()

network_name = os.getenv("NETWORK_NAME") or args.network_name
protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name

print(f"Importing {network_name} {protocol_name} to clickhouse")

if network_name is None or protocol_name is None:
raise ValueError("Network and protocol must be provided")

client = clickhouse_connect.get_client(
host="clickhouse",
port=8123,
username="default",
# settings={"allow_experimental_json_type": 1},
)

db_name = f"raw_{network_name}"
client.command(f"CREATE DATABASE IF NOT EXISTS {db_name}")

import_parquet_files(client, network_name, protocol_name)
30 changes: 7 additions & 23 deletions indexers/scripts/listener.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import argparse
import os
from pathlib import Path
import time
import re
import pandas as pd
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import clickhouse_connect
from clickhouse_connect.driver.client import Client
from utils import create_table_from_path, insert_data_from_path

CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean"
RAW_DATA_PATH = "/parquet-data/indexers/raw"
Expand Down Expand Up @@ -75,10 +73,14 @@ def _clean_parquet(self, path: str):
table_name = f"{protocol_name}_{event_name}"
clickhouse_file_path = f"{self.clickhouse_path}/{network_name}/{protocol_name}/{event_name}/{event_name}_{block_range}.parquet"
if not self.client.command(f"exists {db_name}.{table_name}"):
self._create_table_from_file(db_name, table_name, clickhouse_file_path)
create_table_from_path(
self.client, db_name, table_name, clickhouse_file_path
)
tables_created += 1
else:
self._insert_data_from_file(db_name, table_name, clickhouse_file_path)
insert_data_from_path(
self.client, db_name, table_name, clickhouse_file_path
)
data_insertions += 1

print(
Expand All @@ -87,24 +89,6 @@ def _clean_parquet(self, path: str):
f"tables created {tables_created}, data insertions {data_insertions}"
)

def _create_table_from_file(self, db_name: str, table_name: str, file_path: str):
query = (
f"create table if not exists {db_name}.{table_name} "
f"engine = MergeTree order by tuple() as "
f"select * from file('{file_path}', 'Parquet')"
)
try:
self.client.command(query)
except Exception as e:
print(f"Error creating table {db_name}.{table_name}: {e}")

def _insert_data_from_file(self, db_name: str, table_name: str, file_path: str):
query = f"insert into {db_name}.{table_name} select * from file('{file_path}', 'Parquet')"
try:
self.client.command(query)
except Exception as e:
print(f"Error inserting data into {db_name}.{table_name}: {e}")


def main():
event_handler = FolderEventHandler()
Expand Down
24 changes: 24 additions & 0 deletions indexers/scripts/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from clickhouse_connect.driver.client import Client


def create_table_from_schema(client: Client, path: str):
try:
with open(path, "r") as file:
query = file.read()
except FileNotFoundError:
print(f"Schema file {path} not found")
return
try:
client.command(query)
except Exception as e:
print(f"Error creating table from schema {path}: {e}")


def insert_data_from_path(client: Client, db_name: str, table_name: str, path: str):
query = (
f"insert into {db_name}.{table_name} select * from file('{path}', 'Parquet')"
)
try:
client.command(query)
except Exception as e:
print(f"Error inserting data into {db_name}.{table_name}: {e}")
9 changes: 5 additions & 4 deletions indexers/utils/clickhouse_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ def to_snake(name):
def map_to_clickhouse_type(sol_type):
if sol_type in ["address", "string"] or sol_type.startswith("bytes"):
return "String"
elif re.search(r"\(.*\)|\[\[$", sol_type):
return "JSON"
elif re.match(r"uint\d+$", sol_type):
bit_size = int(re.search(r"\d+", sol_type).group())
if bit_size <= 8:
Expand Down Expand Up @@ -47,10 +45,13 @@ def map_to_clickhouse_type(sol_type):
return "Int256"
elif sol_type == "bool":
return "Bool"
elif re.search(r"\(.*\)|\[\[$", sol_type):
return "String"
elif sol_type.endswith("[]"):
base_type = sol_type[:-2]
clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})"
return clickhouse_type
# clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})"
# return clickhouse_type
return "String"
raise ValueError(f"Type {sol_type} not mapped")


Expand Down

0 comments on commit 9a7ac97

Please sign in to comment.