Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ClickHouse DBT Arbitrum & Base Core Models #152

Merged
merged 22 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 2 additions & 1 deletion indexers/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ WORKDIR /app
COPY package*.json ./
COPY patches/ ./patches/

RUN apt-get update && apt-get install -y build-essential && npm ci && apt-get remove -y build-essential
RUN apt-get update && apt-get install -y build-essential clang
RUN npm ci

COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev
Expand Down
38 changes: 18 additions & 20 deletions indexers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,6 @@ def load_network_config(path):
# Load custom config
custom_config = config_file["configs"][protocol_name]

# Initialize Synthetix SDK (with optional Cannon config)
if "cannon_config" in custom_config:
snx = Synthetix(
provider_rpc=rpc_endpoint,
network_id=network_id,
cannon_config=custom_config["cannon_config"],
)
else:
snx = Synthetix(
provider_rpc=rpc_endpoint,
network_id=network_id,
)

# Set block range based on config.
block_range = {}
if args.block_from is not None:
Expand All @@ -152,12 +139,25 @@ def load_network_config(path):
user="default",
settings={"allow_experimental_json_type": 1},
)
client.command(f"CREATE DATABASE IF NOT EXISTS {network_name}")
client.command(f"CREATE DATABASE IF NOT EXISTS raw_{network_name}")

# Get contracts from SDK or ABI files
contracts = []
schemas_path = f"{SCHEMAS_BASE_PATH}/{network_name}/{protocol_name}"

if "contracts_from_sdk" in custom_config:
# Initialize Synthetix SDK (with optional Cannon config)
if "cannon_config" in custom_config:
snx = Synthetix(
provider_rpc=rpc_endpoint,
network_id=network_id,
cannon_config=custom_config["cannon_config"],
)
else:
snx = Synthetix(
provider_rpc=rpc_endpoint,
network_id=network_id,
)
contracts_from_sdk = custom_config["contracts_from_sdk"]
for contract in contracts_from_sdk:
name = contract["name"]
Expand All @@ -175,7 +175,7 @@ def load_network_config(path):
client=client,
)
contracts.append({"name": name, "address": address})
elif "contracts_from_abi" in custom_config:
if "contracts_from_abi" in custom_config:
contracts_from_abi = custom_config["contracts_from_abi"]
for contract in contracts_from_abi:
name = contract["name"]
Expand All @@ -192,8 +192,8 @@ def load_network_config(path):
client=client,
)
contracts.append({"name": name, "address": contract["address"]})
else:
message = "No contracts found in network config"
if not contracts:
message = "No contracts found"
raise Exception(message)

# Create squidgen generator config
Expand All @@ -209,6 +209,4 @@ def load_network_config(path):
)
write_yaml(squidgen_config, "squidgen.yaml")

snx.logger.info(
f"squidgen.yaml and ABI files have been generated for {args.network_name}"
)
print(f"squidgen.yaml and ABI files have been generated for {args.network_name}")
6 changes: 5 additions & 1 deletion indexers/networks/arbitrum_mainnet/network_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ configs:
package: perpsFactory
- name: PerpsAccountProxy
package: perpsFactory
cannon_config:
package: "synthetix-omnibus"
version: "latest"
preset: "main"

curve:
range:
from: 236000000
contracts_from_abi:
- name: CurveUsdx
address: '0x096A8865367686290639bc50bF8D85C0110d9Fea'
address: "0x096A8865367686290639bc50bF8D85C0110d9Fea"
abi: ./abi/CurvePool.json
58 changes: 55 additions & 3 deletions indexers/patches/@subsquid+squid-gen-evm+2.0.0.patch
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,43 @@ index 2839b6f..d1e7826 100644
this.out.line(`})`);
return this.out.write();
}
diff --git a/node_modules/@subsquid/squid-gen-evm/lib/handlers.d.ts b/node_modules/@subsquid/squid-gen-evm/lib/handlers.d.ts
index 236fa01..28058ed 100644
--- a/node_modules/@subsquid/squid-gen-evm/lib/handlers.d.ts
+++ b/node_modules/@subsquid/squid-gen-evm/lib/handlers.d.ts
@@ -10,6 +10,8 @@ export declare class HandlersCodegen {
constructor(outDir: OutDir, options: {
contract: SquidContract;
dataTarget: DataTarget;
+ events: boolean;
+ functions: boolean;
});
generate(): void;
private printImports;
diff --git a/node_modules/@subsquid/squid-gen-evm/lib/handlers.js b/node_modules/@subsquid/squid-gen-evm/lib/handlers.js
index 59ace5e..3c5a187 100644
index 59ace5e..64b8493 100644
--- a/node_modules/@subsquid/squid-gen-evm/lib/handlers.js
+++ b/node_modules/@subsquid/squid-gen-evm/lib/handlers.js
@@ -64,7 +64,7 @@ class HandlersCodegen {
@@ -41,12 +41,15 @@ class HandlersCodegen {
out.line(`import {toJSON} from '@subsquid/util-internal-json'`);
}
out.line(`import type {Store} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `db`))}'`);
- out.line(`import {${handlerType}} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
- if (handlerType === 'functions') {
+ const hasFunctions = Object.keys(this.options.contract.functions || {}).length > 0;
+ const hasEvents = Object.keys(this.options.contract.events || {}).length > 0;
+ if (hasFunctions && handlerType === 'functions') {
+ out.line(`import {${handlerType}} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
this.getFunctionTargetPrinter().printImports();
out.line(`import {Transaction} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `processor`))}'`);
}
- else {
+ else if (hasEvents && handlerType === 'events') {
+ out.line(`import {${handlerType}} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
this.getEventTargetPrinter().printImports();
out.line(`import {Log} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `processor`))}'`);
}
@@ -64,7 +67,7 @@ class HandlersCodegen {
`log.id`,
`log.block.height`,
`new Date(log.block.timestamp)`,
Expand All @@ -48,7 +80,7 @@ index 59ace5e..3c5a187 100644
`log.address`,
`'${e}'`,
...fragment.params.slice(6).map((p) => {
@@ -72,6 +72,9 @@ class HandlersCodegen {
@@ -72,6 +75,9 @@ class HandlersCodegen {
this.useJSON();
return `toJSON(e.${p.originalName ?? p.name})`;
}
Expand All @@ -58,6 +90,26 @@ index 59ace5e..3c5a187 100644
else {
return `e.${p.originalName ?? p.name}`;
}
diff --git a/node_modules/@subsquid/squid-gen-evm/lib/mappings.js b/node_modules/@subsquid/squid-gen-evm/lib/mappings.js
index 2c135a7..9105868 100644
--- a/node_modules/@subsquid/squid-gen-evm/lib/mappings.js
+++ b/node_modules/@subsquid/squid-gen-evm/lib/mappings.js
@@ -45,7 +45,14 @@ class MappingCodegen {
}
this.out.line(`import {Store} from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `db`))}'`);
this.getTargetPrinter().printImports();
- this.out.line(`import {functions, events} from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
+ const hasFunctions = Object.keys(this.options.contract.functions || {}).length > 0;
+ const hasEvents = Object.keys(this.options.contract.events || {}).length > 0;
+ if (hasFunctions) {
+ this.out.line(`import {functions} from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
+ }
+ if (hasEvents) {
+ this.out.line(`import {events} from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
+ }
this.out.line(`import * as eventHandlers from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `handlers`, `${this.options.contract.name}_events`))}'`);
this.out.line(`import * as functionHandlers from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `handlers`, `${this.options.contract.name}_functions`))}'`);
if (this.util.size > 0) {
diff --git a/node_modules/@subsquid/squid-gen-evm/lib/processor.js b/node_modules/@subsquid/squid-gen-evm/lib/processor.js
index f6c72a1..b8c16a2 100644
--- a/node_modules/@subsquid/squid-gen-evm/lib/processor.js
Expand Down
2 changes: 1 addition & 1 deletion indexers/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies = [
"pandas>=2.2.3",
"python-dotenv>=1.0.1",
"pyyaml>=6.0.2",
"synthetix>=0.1.21",
"synthetix>=0.1.22",
]

[tool.uv]
Expand Down
53 changes: 37 additions & 16 deletions indexers/scripts/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,12 @@ def convert_case(name):
return snake_case


def insert_data(
client: Client, network: str, protocol: str, event: str, block_range: str
):
table_name = f"{network}.{protocol}_{convert_case(event)}"
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network}/{protocol}/{event}/{event}_{block_range}.parquet"
query = f"insert into {table_name} select * from file('{file_path}', 'Parquet')"
client.command(query)


class FolderEventHandler(FileSystemEventHandler):
def __init__(self):
super().__init__()
self.source_path = Path(f"{RAW_DATA_PATH}")
self.target_path = Path(f"{CLEAN_DATA_PATH}")
self.clickhouse_path = Path(f"{CLICKHOUSE_INTERNAL_PATH}")
if not self.source_path.exists():
print(f"Creating source path {self.source_path}")
self.source_path.mkdir(parents=True, exist_ok=True)
Expand All @@ -54,10 +46,12 @@ def _clean_parquet(self, path: str):
block_range = path.name
protocol_name = path.parent.name
network_name = path.parent.parent.name
db_name = f"raw_{network_name}"

# Initialize counters
empty_files = 0
written_files = 0
tables_created = 0
data_insertions = 0

for parquet_file in path.glob("*.parquet"):
Expand All @@ -66,23 +60,50 @@ def _clean_parquet(self, path: str):
self.target_path / f"{network_name}" / f"{protocol_name}" / event_name
)
output_file = event_dir / f"{event_name}_{block_range}.parquet"
output_file.parent.mkdir(parents=True, exist_ok=True)

# Move file to target directory
df = pd.read_parquet(parquet_file)
if df.empty:
empty_files += 1
continue
output_file.parent.mkdir(parents=True, exist_ok=True)
df.columns = [convert_case(col) for col in df.columns]
df.to_parquet(output_file, index=False)
written_files += 1

# import data into clickhouse
insert_data(
self.client, network_name, protocol_name, event_name, block_range
)
data_insertions += 1
# Import data into clickhouse
table_name = f"{protocol_name}_{event_name}"
clickhouse_file_path = f"{self.clickhouse_path}/{network_name}/{protocol_name}/{event_name}/{event_name}_{block_range}.parquet"
if not self.client.command(f"exists {db_name}.{table_name}"):
self._create_table_from_file(db_name, table_name, clickhouse_file_path)
tables_created += 1
else:
self._insert_data_from_file(db_name, table_name, clickhouse_file_path)
data_insertions += 1

print(
f"Processed {network_name}.{protocol_name}.{block_range}: empty files {empty_files}, written files {written_files}, data insertions {data_insertions}"
f"Processed {network_name}.{protocol_name}.{block_range}: "
f"empty files {empty_files}, written files {written_files}, "
f"tables created {tables_created}, data insertions {data_insertions}"
)

def _create_table_from_file(self, db_name: str, table_name: str, file_path: str):
query = (
f"create table if not exists {db_name}.{table_name} "
f"engine = MergeTree order by tuple() as "
f"select * from file('{file_path}', 'Parquet')"
)
try:
self.client.command(query)
except Exception as e:
print(f"Error creating table {db_name}.{table_name}: {e}")

def _insert_data_from_file(self, db_name: str, table_name: str, file_path: str):
query = f"insert into {db_name}.{table_name} select * from file('{file_path}', 'Parquet')"
try:
self.client.command(query)
except Exception as e:
print(f"Error inserting data into {db_name}.{table_name}: {e}")


def main():
Expand Down
21 changes: 16 additions & 5 deletions indexers/utils/clickhouse_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,21 @@ def map_to_clickhouse_type(sol_type):

def generate_clickhouse_schema(event_name, fields, network_name, protocol_name):
query = [
f"CREATE TABLE IF NOT EXISTS {network_name}.{protocol_name}_{event_name} ("
f"CREATE TABLE IF NOT EXISTS raw_{network_name}.{protocol_name}_{event_name} (",
" `id` String,",
" `block_number` UInt64,",
" `block_timestamp` DateTime64(3, 'UTC'),",
" `transaction_hash` String,",
" `contract` String,",
" `event_name` String,",
]
for field_name, field_type in fields:
clickhouse_type = map_to_clickhouse_type(field_type)
query.append(f" `{to_snake(field_name)}` {clickhouse_type},")
if field_name == "id":
clickhouse_type = "String"
query.append(f" `param_id` String,")
else:
clickhouse_type = map_to_clickhouse_type(field_type)
query.append(f" `{to_snake(field_name)}` {clickhouse_type},")
query[-1] = query[-1][:-1]
query.append(") ENGINE = MergeTree() ORDER BY tuple();")
return "\n".join(query)
Expand Down Expand Up @@ -84,7 +94,7 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol
for event in events:
event_name = to_snake(event["name"])
contract_name = to_snake(contract_name)
event_name = f"{contract_name}_{event_name}"
event_name = f"{contract_name}_event_{event_name}"

input_names = get_abi_input_names(event)
input_types = get_abi_input_types(event)
Expand All @@ -96,11 +106,12 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol
network_name=network_name,
protocol_name=protocol_name,
)
print(schema)
client.command(schema)
save_clickhouse_schema(path=path, event_name=event_name, schema=schema)

block_schema = (
f"CREATE TABLE IF NOT EXISTS {network_name}.{protocol_name}_block (\n"
f"CREATE TABLE IF NOT EXISTS raw_{network_name}.{protocol_name}_block (\n"
" `number` UInt64,\n"
" `timestamp` DateTime64(3, 'UTC')\n"
") ENGINE = MergeTree() ORDER BY tuple();"
Expand Down
2 changes: 1 addition & 1 deletion transformers/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pandas
pyarrow
psycopg2-binary
dbt-postgres
dbt-clickhouse
sqlfluff
synthetix==0.1.21
3 changes: 3 additions & 0 deletions transformers/synthetix/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ allow_scalar = False
[sqlfluff:rules:capitalisation.identifiers]
extended_capitalisation_policy = lower

[sqlfluff:rules:capitalisation.types]
extended_capitalisation_policy = pascal

[sqlfluff:rules:capitalisation.functions]
capitalisation_policy = lower

Expand Down
Loading
Loading