Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet Indexer Data Scripts #157

Merged
merged 7 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ deployments
logs

# local data
clickhouse_data
postgres-data
parquet-data
data
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
soft: 262144
hard: 262144
volumes:
- clickhouse_data:/var/lib/clickhouse
- ./clickhouse_data:/var/lib/clickhouse
- ./parquet-data:/var/lib/clickhouse/user_files/parquet-data
ports:
- 8123:8123
Expand Down
36 changes: 24 additions & 12 deletions indexers/scripts/clean.py → indexers/scripts/clean_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,49 @@
from pathlib import Path
import pandas as pd
import os
from utils import convert_case

RAW_DATA_PATH = "/parquet-data/indexers/raw"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"


def clean_parquet_files(network_name: str, protocol_name: str):
source_base = f"/parquet-data/indexers/raw/{network_name}/{protocol_name}"
target_base = f"/parquet-data/indexers/clean/{network_name}/{protocol_name}"
raw_path = Path(f"{RAW_DATA_PATH}/{network_name}/{protocol_name}")
clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}")

if not raw_path.exists():
raise ValueError(f"Source path {raw_path} does not exist")

protocol_path = Path(source_base)
if not protocol_path.exists():
raise ValueError(f"Source path {source_base} does not exist")
Path(target_base).mkdir(parents=True, exist_ok=True)
clean_path.mkdir(parents=True, exist_ok=True)

for block_range_dir in protocol_path.iterdir():
for block_range_dir in sorted(raw_path.iterdir()):
if not block_range_dir.is_dir():
continue
if "temp" in block_range_dir.name:
continue
block_range = block_range_dir.name

empty_files = 0
written_files = 0
for parquet_file in block_range_dir.glob("*.parquet"):
event_name = parquet_file.stem
event_dir = Path(target_base) / event_name
event_dir = clean_path / event_name
output_file = event_dir / f"{event_name}_{block_range}.parquet"

# Skip if file already exists
if output_file.exists():
continue

df = pd.read_parquet(parquet_file)
if df.empty:
empty_files += 1
continue
df.columns = [convert_case(col) for col in df.columns]
event_dir.mkdir(parents=True, exist_ok=True)
df.to_parquet(output_file, index=False)
print(f"Processed {protocol_name} {block_range}")
written_files += 1
print(
f"Processed {network_name}.{protocol_name}.{block_range}:\n"
f"\t empty raw files {empty_files}\n"
f"\t written files {written_files}"
)


if __name__ == "__main__":
Expand Down
62 changes: 62 additions & 0 deletions indexers/scripts/import_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import argparse
import os
from pathlib import Path
import clickhouse_connect
from utils import create_table_from_schema, insert_data_from_path

CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"
SCHEMAS_PATH = "/parquet-data/indexers/schemas"


def init_tables_from_schemas(client, network_name: str, protocol_name: str):
print(f"Initializing tables for {network_name} {protocol_name}")
schema_path = Path(f"{SCHEMAS_PATH}/{network_name}/{protocol_name}")
db_name = f"raw_{network_name}"

for schema_file in schema_path.glob("*.sql"):
event_name = schema_file.stem
table_name = f"{protocol_name}_{event_name}"

client.command(f"drop table if exists {db_name}.{table_name}")
create_table_from_schema(client, str(schema_file))


def import_parquet_files(client, network_name: str, protocol_name: str):
print(f"Inserting {network_name} {protocol_name} data into tables")
clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}")
db_name = f"raw_{network_name}"

for event_name in clean_path.iterdir():
if not event_name.is_dir():
continue
event_name = event_name.name
table_name = f"{protocol_name}_{event_name}"
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet"

insert_data_from_path(client, db_name, table_name, file_path)


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--network_name", type=str)
parser.add_argument("--protocol_name", type=str)
args = parser.parse_args()

network_name = os.getenv("NETWORK_NAME") or args.network_name
protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name

if network_name is None or protocol_name is None:
raise ValueError("Network and protocol must be provided")

client = clickhouse_connect.get_client(
host="clickhouse",
port=8123,
username="default",
)

db_name = f"raw_{network_name}"
client.command(f"CREATE DATABASE IF NOT EXISTS {db_name}")

init_tables_from_schemas(client, network_name, protocol_name)
import_parquet_files(client, network_name, protocol_name)
36 changes: 7 additions & 29 deletions indexers/scripts/listener.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,16 @@
import argparse
import os
from pathlib import Path
import time
import re
import pandas as pd
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import clickhouse_connect
from clickhouse_connect.driver.client import Client
from utils import create_table_from_path, insert_data_from_path, convert_case

CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean"
RAW_DATA_PATH = "/parquet-data/indexers/raw"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"


def convert_case(name):
snake_case = re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
return snake_case


class FolderEventHandler(FileSystemEventHandler):
def __init__(self):
super().__init__()
Expand Down Expand Up @@ -75,10 +67,14 @@ def _clean_parquet(self, path: str):
table_name = f"{protocol_name}_{event_name}"
clickhouse_file_path = f"{self.clickhouse_path}/{network_name}/{protocol_name}/{event_name}/{event_name}_{block_range}.parquet"
if not self.client.command(f"exists {db_name}.{table_name}"):
self._create_table_from_file(db_name, table_name, clickhouse_file_path)
create_table_from_path(
self.client, db_name, table_name, clickhouse_file_path
)
tables_created += 1
else:
self._insert_data_from_file(db_name, table_name, clickhouse_file_path)
insert_data_from_path(
self.client, db_name, table_name, clickhouse_file_path
)
data_insertions += 1

print(
Expand All @@ -87,24 +83,6 @@ def _clean_parquet(self, path: str):
f"tables created {tables_created}, data insertions {data_insertions}"
)

def _create_table_from_file(self, db_name: str, table_name: str, file_path: str):
query = (
f"create table if not exists {db_name}.{table_name} "
f"engine = MergeTree order by tuple() as "
f"select * from file('{file_path}', 'Parquet')"
)
try:
self.client.command(query)
except Exception as e:
print(f"Error creating table {db_name}.{table_name}: {e}")

def _insert_data_from_file(self, db_name: str, table_name: str, file_path: str):
query = f"insert into {db_name}.{table_name} select * from file('{file_path}', 'Parquet')"
try:
self.client.command(query)
except Exception as e:
print(f"Error inserting data into {db_name}.{table_name}: {e}")


def main():
event_handler = FolderEventHandler()
Expand Down
30 changes: 30 additions & 0 deletions indexers/scripts/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import re
from clickhouse_connect.driver.client import Client


def create_table_from_schema(client: Client, path: str):
try:
with open(path, "r") as file:
query = file.read()
except FileNotFoundError:
print(f"Schema file {path} not found")
return
try:
client.command(query)
except Exception as e:
print(f"Error creating table from schema {path}: {e}")


def insert_data_from_path(client: Client, db_name: str, table_name: str, path: str):
query = (
f"insert into {db_name}.{table_name} select * from file('{path}', 'Parquet')"
)
try:
client.command(query)
except Exception as e:
print(f"Error inserting data into {db_name}.{table_name}: {e}")


def convert_case(name):
snake_case = re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
return snake_case
9 changes: 5 additions & 4 deletions indexers/utils/clickhouse_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ def to_snake(name):
def map_to_clickhouse_type(sol_type):
if sol_type in ["bytes32", "address", "string"]:
return "String"
elif re.search(r"\(.*\)|\[\[$", sol_type):
return "JSON"
elif re.match(r"uint\d+$", sol_type):
bit_size = int(re.search(r"\d+", sol_type).group())
if bit_size <= 8:
Expand Down Expand Up @@ -43,10 +41,13 @@ def map_to_clickhouse_type(sol_type):
return "Int256"
elif sol_type == "bool":
return "Bool"
elif re.search(r"\(.*\)|\[\[$", sol_type):
return "String"
elif sol_type.endswith("[]"):
base_type = sol_type[:-2]
clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})"
return clickhouse_type
# clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})"
# return clickhouse_type
return "String"
raise ValueError(f"Type {sol_type} not mapped")


Expand Down
Loading