Skip to content

Commit

Permalink
Fix clean data column naming
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-snx committed Dec 19, 2024
1 parent a4534cf commit 89d63ea
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
2 changes: 2 additions & 0 deletions indexers/scripts/clean_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from pathlib import Path
import pandas as pd
import os
from utils import convert_case

RAW_DATA_PATH = "/parquet-data/indexers/raw"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"
Expand Down Expand Up @@ -33,6 +34,7 @@ def clean_parquet_files(network_name: str, protocol_name: str):
if df.empty:
empty_files += 1
continue
df.columns = [convert_case(col) for col in df.columns]
event_dir.mkdir(parents=True, exist_ok=True)
df.to_parquet(output_file, index=False)
written_files += 1
Expand Down
8 changes: 1 addition & 7 deletions indexers/scripts/listener.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,16 @@
from pathlib import Path
import time
import re
import pandas as pd
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import clickhouse_connect
from utils import create_table_from_path, insert_data_from_path
from utils import create_table_from_path, insert_data_from_path, convert_case

CLICKHOUSE_INTERNAL_PATH = "/var/lib/clickhouse/user_files/parquet-data/indexers/clean"
RAW_DATA_PATH = "/parquet-data/indexers/raw"
CLEAN_DATA_PATH = "/parquet-data/indexers/clean"


def convert_case(name):
snake_case = re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
return snake_case


class FolderEventHandler(FileSystemEventHandler):
def __init__(self):
super().__init__()
Expand Down
6 changes: 6 additions & 0 deletions indexers/scripts/utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import re
from clickhouse_connect.driver.client import Client


Expand All @@ -22,3 +23,8 @@ def insert_data_from_path(client: Client, db_name: str, table_name: str, path: s
client.command(query)
except Exception as e:
print(f"Error inserting data into {db_name}.{table_name}: {e}")


def convert_case(name):
snake_case = re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
return snake_case

0 comments on commit 89d63ea

Please sign in to comment.