Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extractors: Update ingestion #158

Merged
merged 18 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ deployments
logs

# local data
clickhouse_data
postgres-data
parquet-data
data
Expand Down
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ services:
soft: 262144
hard: 262144
volumes:
- clickhouse_data:/var/lib/clickhouse
- ./clickhouse_data:/var/lib/clickhouse
- ./parquet-data:/var/lib/clickhouse/user_files/parquet-data
ports:
- 8123:8123
Expand Down Expand Up @@ -85,6 +85,8 @@ services:
image: ghcr.io/synthetixio/data/extractors:${VERSION}
build:
context: ./extractors
networks:
- data
env_file:
- .env
volumes:
Expand Down
1 change: 1 addition & 0 deletions extractors/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"clickhouse-connect>=0.8.9",
"cryo==0.3.0",
"duckdb>=1.1.3",
"maturin>=1.7.7",
Expand Down
17 changes: 13 additions & 4 deletions extractors/src/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from eth_utils import decode_hex
import polars as pl
import duckdb
from .insert import insert_data


def fix_labels(labels):
Expand Down Expand Up @@ -58,7 +59,7 @@ def decode_output(contract, function_name, call):


def camel_to_snake(name):
name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name).lower()
name = re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
return name


Expand All @@ -75,7 +76,12 @@ def get_labels(contract, function_name):
return input_names, output_names


def clean_data(chain_name, protocol, contract, function_name, write=True):
def clean_data(
chain_name, protocol, contract, contract_name, function_name, write=True
):
function_name_snake = camel_to_snake(function_name)
contract_name_snake = camel_to_snake(contract_name)
directory_name = f"{contract_name_snake}_function_{function_name_snake}"
input_labels, output_labels = get_labels(contract, function_name)

# fix labels
Expand All @@ -86,7 +92,7 @@ def clean_data(chain_name, protocol, contract, function_name, write=True):
df = duckdb.sql(
f"""
SELECT DISTINCT *
FROM '../parquet-data/extractors/raw/{chain_name}/{protocol}/{function_name}/*.parquet'
FROM '../parquet-data/extractors/raw/{chain_name}/{protocol}/{directory_name}/*.parquet'
WHERE
call_data IS NOT NULL
AND output_data IS NOT NULL
Expand Down Expand Up @@ -127,7 +133,7 @@ def clean_data(chain_name, protocol, contract, function_name, write=True):

# write the data
if write:
file_path = f"../parquet-data/extractors/clean/{chain_name}/{protocol}/{function_name}.parquet"
file_path = f"../parquet-data/extractors/clean/{chain_name}/{protocol}/{directory_name}/{directory_name}.parquet"

ensure_directory_exists(file_path)
# write the data
Expand All @@ -137,6 +143,9 @@ def clean_data(chain_name, protocol, contract, function_name, write=True):
"""
)

# insert to clickhouse
insert_data(chain_name, protocol, contract_name_snake, function_name_snake)

return df


Expand Down
10 changes: 7 additions & 3 deletions extractors/src/extract.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import cryo
from synthetix import Synthetix
from .constants import CHAIN_CONFIGS
from .clean import clean_data, clean_blocks
from .clean import clean_data, clean_blocks, camel_to_snake


def get_synthetix(chain_config):
Expand Down Expand Up @@ -38,7 +38,9 @@ def extract_data(
# get synthetix
chain_config = CHAIN_CONFIGS[network_id]
snx = get_synthetix(chain_config)
output_dir = f"/parquet-data/extractors/raw/{chain_config['name']}/{protocol}/{function_name}"
function_name_snake = camel_to_snake(function_name)
contract_name_snake = camel_to_snake(contract_name)
output_dir = f"/parquet-data/extractors/raw/{chain_config['name']}/{protocol}/{contract_name_snake}_function_{function_name_snake}"

# encode the call data
contract = snx.contracts[package_name][contract_name]["contract"]
Expand All @@ -61,7 +63,9 @@ def extract_data(
)

if clean:
df_clean = clean_data(chain_config["name"], protocol, contract, function_name)
df_clean = clean_data(
chain_config["name"], protocol, contract, contract_name, function_name
)
return df_clean


Expand Down
25 changes: 25 additions & 0 deletions extractors/src/insert.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import re
import clickhouse_connect
from clickhouse_connect.driver.client import Client

CLICKHOUSE_INTERNAL_PATH = (
"/var/lib/clickhouse/user_files/parquet-data/extractors/clean"
)
RAW_DATA_PATH = "/parquet-data/extractors/raw"
CLEAN_DATA_PATH = "/parquet-data/extractors/clean"


def convert_case(name):
snake_case = re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
return snake_case


def insert_data(network: str, protocol: str, contract_name: str, function_name: str):
client: Client = clickhouse_connect.get_client(
host="clickhouse", port=8123, user="default"
)
table_name = f"raw_{network}.{protocol}_{contract_name}_function_{function_name}"
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network}/{protocol}/{contract_name}_function_{function_name}/*.parquet"
query = f"insert into {table_name} select * from file('{file_path}', 'Parquet')"
client.command(query)
client.close()
Loading
Loading