Skip to content

Commit

Permalink
Add clickhouse schemas generation schema
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-snx committed Dec 6, 2024
1 parent a38b43d commit 10804f3
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 0 deletions.
15 changes: 15 additions & 0 deletions indexers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import yaml
import clickhouse_connect
from synthetix import Synthetix
from utils.clickhouse_schema import process_abi_schemas

# load environment variables
load_dotenv()

RAW_DATA_PATH = "/parquet-data/indexers/raw"
SCHEMAS_BASE_PATH = "/parquet-data/indexers/schemas"


def save_abi(abi, contract_name):
Expand Down Expand Up @@ -145,6 +147,7 @@ def load_network_config(path):

# Get contracts from SDK or ABI files
contracts = []
schemas_path = f"{SCHEMAS_BASE_PATH}/{network_name}/{protocol_name}"
if "contracts_from_sdk" in custom_config:
contracts_from_sdk = custom_config["contracts_from_sdk"]
for contract in contracts_from_sdk:
Expand All @@ -154,6 +157,12 @@ def load_network_config(path):
abi = contract_data["abi"]
address = contract_data["address"]
save_abi(abi, name)
process_abi_schemas(
abi=abi,
path=schemas_path,
contract_name=name,
network_name=network_name,
)
contracts.append({"name": name, "address": address})
elif "contracts_from_abi" in custom_config:
contracts_from_abi = custom_config["contracts_from_abi"]
Expand All @@ -163,6 +172,12 @@ def load_network_config(path):
with open(f"{path}/{abi_name}", "r") as file:
abi = json.load(file)
save_abi(abi, name)
process_abi_schemas(
abi=abi,
path=schemas_path,
contract_name=name,
network_name=network_name,
)
contracts.append({"name": name, "address": contract["address"]})
else:
message = "No contracts found in network config"
Expand Down
92 changes: 92 additions & 0 deletions indexers/utils/clickhouse_schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import re
from pathlib import Path
from web3._utils.abi import get_abi_input_names, get_abi_input_types


def to_snake(name):
snake_name = re.sub(r"(?<!^)(?=[A-Z])", "_", name).lower()
return snake_name


def map_to_clickhouse_type(sol_type):
if sol_type in ["bytes32", "address", "string"]:
return "String"
elif re.search(r"\(.*\)|\[\[$", sol_type):
return "JSON"
elif re.match(r"uint\d+$", sol_type):
bit_size = int(re.search(r"\d+", sol_type).group())
if bit_size <= 8:
return "UInt8"
elif bit_size <= 16:
return "UInt16"
elif bit_size <= 32:
return "UInt32"
elif bit_size <= 64:
return "UInt64"
elif bit_size <= 128:
return "UInt128"
elif bit_size <= 256:
return "UInt256"
elif re.match(r"int\d+$", sol_type):
bit_size = int(re.search(r"\d+", sol_type).group())
if bit_size <= 8:
return "Int8"
elif bit_size <= 16:
return "Int16"
elif bit_size <= 32:
return "Int32"
elif bit_size <= 64:
return "Int64"
elif bit_size <= 128:
return "Int128"
elif bit_size <= 256:
return "Int256"
elif sol_type == "bool":
return "Bool"
elif sol_type.endswith("[]"):
base_type = sol_type[:-2]
clickhouse_type = f"Array({map_to_clickhouse_type(base_type)})"
return clickhouse_type
raise ValueError(f"Type {sol_type} not mapped")


def generate_clickhouse_schema(event_name, fields, network_name):
query = [f"CREATE TABLE IF NOT EXISTS {network_name}.{event_name} ("]
for field_name, field_type in fields:
clickhouse_type = map_to_clickhouse_type(field_type)
query.append(f" `{to_snake(field_name)}` {clickhouse_type},")
query[-1] = query[-1][:-1]
query.append(") ENGINE = MergeTree() ORDER BY tuple();")
return "\n".join(query)


def save_clickhouse_schema(path, event_name, schema):
path = Path(path)
path.mkdir(parents=True, exist_ok=True)

schema_file = path / f"{event_name}.sql"
schema_file.write_text(schema)


def process_abi_schemas(abi, path, contract_name, network_name):
"""
Process an ABI to generate ClickHouse schemas for all events.
Args:
abi: The contract ABI
output_path: Path where schema files will be saved
contract_name: Name of the contract (used for namespacing)
"""
events = [item for item in abi if item["type"] == "event"]

for event in events:
event_name = to_snake(event["name"])
contract_name = to_snake(contract_name)
event_name = f"{contract_name}_{event_name}"

input_names = get_abi_input_names(event)
input_types = get_abi_input_types(event)
fields = list(zip(input_names, input_types))

schema = generate_clickhouse_schema(event_name, fields, network_name)
save_clickhouse_schema(path, event_name, schema)

0 comments on commit 10804f3

Please sign in to comment.