Skip to content

Commit

Permalink
Update import parquet script
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-snx committed Dec 19, 2024
1 parent 9c225ef commit a4534cf
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ deployments
logs

# local data
clickhouse_data
postgres-data
parquet-data
data
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
soft: 262144
hard: 262144
volumes:
- clickhouse_data:/var/lib/clickhouse
- ./clickhouse_data:/var/lib/clickhouse
- ./parquet-data:/var/lib/clickhouse/user_files/parquet-data
ports:
- 8123:8123
Expand Down
21 changes: 15 additions & 6 deletions indexers/scripts/import_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,21 @@
SCHEMAS_PATH = "/parquet-data/indexers/schemas"


def init_tables_from_schemas(client, network_name: str, protocol_name: str):
print(f"Initializing tables for {network_name} {protocol_name}")
schema_path = Path(f"{SCHEMAS_PATH}/{network_name}/{protocol_name}")
db_name = f"raw_{network_name}"

for schema_file in schema_path.glob("*.sql"):
event_name = schema_file.stem
table_name = f"{protocol_name}_{event_name}"

client.command(f"drop table if exists {db_name}.{table_name}")
create_table_from_schema(client, str(schema_file))


def import_parquet_files(client, network_name: str, protocol_name: str):
print(f"Inserting {network_name} {protocol_name} data into tables")
clean_path = Path(f"{CLEAN_DATA_PATH}/{network_name}/{protocol_name}")
db_name = f"raw_{network_name}"

Expand All @@ -19,10 +33,7 @@ def import_parquet_files(client, network_name: str, protocol_name: str):
event_name = event_name.name
table_name = f"{protocol_name}_{event_name}"
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network_name}/{protocol_name}/{event_name}/*.parquet"
schema_path = f"{SCHEMAS_PATH}/{network_name}/{protocol_name}/{event_name}.sql"

client.command(f"drop table if exists {db_name}.{table_name}")
create_table_from_schema(client, schema_path)
insert_data_from_path(client, db_name, table_name, file_path)


Expand All @@ -35,19 +46,17 @@ def import_parquet_files(client, network_name: str, protocol_name: str):
network_name = os.getenv("NETWORK_NAME") or args.network_name
protocol_name = os.getenv("PROTOCOL_NAME") or args.protocol_name

print(f"Importing {network_name} {protocol_name} to clickhouse")

if network_name is None or protocol_name is None:
raise ValueError("Network and protocol must be provided")

client = clickhouse_connect.get_client(
host="clickhouse",
port=8123,
username="default",
# settings={"allow_experimental_json_type": 1},
)

db_name = f"raw_{network_name}"
client.command(f"CREATE DATABASE IF NOT EXISTS {db_name}")

init_tables_from_schemas(client, network_name, protocol_name)
import_parquet_files(client, network_name, protocol_name)

0 comments on commit a4534cf

Please sign in to comment.