Skip to content

Commit

Permalink
Merge branch 'feat/clickhouse-raw-models' into extractors-directory
Browse files Browse the repository at this point in the history
  • Loading branch information
Tburm committed Dec 18, 2024
2 parents c9bd0c1 + 9a536c8 commit 8120ba1
Show file tree
Hide file tree
Showing 525 changed files with 1,817 additions and 33,174 deletions.
3 changes: 2 additions & 1 deletion indexers/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ WORKDIR /app
COPY package*.json ./
COPY patches/ ./patches/

RUN apt-get update && apt-get install -y build-essential clang && npm ci
RUN apt-get update && apt-get install -y build-essential clang
RUN npm ci

COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev
Expand Down
38 changes: 18 additions & 20 deletions indexers/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,19 +121,6 @@ def load_network_config(path):
# Load custom config
custom_config = config_file["configs"][protocol_name]

# Initialize Synthetix SDK (with optional Cannon config)
if "cannon_config" in custom_config:
snx = Synthetix(
provider_rpc=rpc_endpoint,
network_id=network_id,
cannon_config=custom_config["cannon_config"],
)
else:
snx = Synthetix(
provider_rpc=rpc_endpoint,
network_id=network_id,
)

# Set block range based on config.
block_range = {}
if args.block_from is not None:
Expand All @@ -152,12 +139,25 @@ def load_network_config(path):
user="default",
settings={"allow_experimental_json_type": 1},
)
client.command(f"CREATE DATABASE IF NOT EXISTS {network_name}")
client.command(f"CREATE DATABASE IF NOT EXISTS raw_{network_name}")

# Get contracts from SDK or ABI files
contracts = []
schemas_path = f"{SCHEMAS_BASE_PATH}/{network_name}/{protocol_name}"

if "contracts_from_sdk" in custom_config:
# Initialize Synthetix SDK (with optional Cannon config)
if "cannon_config" in custom_config:
snx = Synthetix(
provider_rpc=rpc_endpoint,
network_id=network_id,
cannon_config=custom_config["cannon_config"],
)
else:
snx = Synthetix(
provider_rpc=rpc_endpoint,
network_id=network_id,
)
contracts_from_sdk = custom_config["contracts_from_sdk"]
for contract in contracts_from_sdk:
name = contract["name"]
Expand All @@ -175,7 +175,7 @@ def load_network_config(path):
client=client,
)
contracts.append({"name": name, "address": address})
elif "contracts_from_abi" in custom_config:
if "contracts_from_abi" in custom_config:
contracts_from_abi = custom_config["contracts_from_abi"]
for contract in contracts_from_abi:
name = contract["name"]
Expand All @@ -192,8 +192,8 @@ def load_network_config(path):
client=client,
)
contracts.append({"name": name, "address": contract["address"]})
else:
message = "No contracts found in network config"
if not contracts:
message = "No contracts found"
raise Exception(message)

# Create squidgen generator config
Expand All @@ -209,6 +209,4 @@ def load_network_config(path):
)
write_yaml(squidgen_config, "squidgen.yaml")

snx.logger.info(
f"squidgen.yaml and ABI files have been generated for {args.network_name}"
)
print(f"squidgen.yaml and ABI files have been generated for {args.network_name}")
6 changes: 5 additions & 1 deletion indexers/networks/arbitrum_mainnet/network_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@ configs:
package: perpsFactory
- name: PerpsAccountProxy
package: perpsFactory
cannon_config:
package: "synthetix-omnibus"
version: "latest"
preset: "main"

curve:
range:
from: 236000000
contracts_from_abi:
- name: CurveUsdx
address: '0x096A8865367686290639bc50bF8D85C0110d9Fea'
address: "0x096A8865367686290639bc50bF8D85C0110d9Fea"
abi: ./abi/CurvePool.json
58 changes: 55 additions & 3 deletions indexers/patches/@subsquid+squid-gen-evm+2.0.0.patch
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,43 @@ index 2839b6f..d1e7826 100644
this.out.line(`})`);
return this.out.write();
}
diff --git a/node_modules/@subsquid/squid-gen-evm/lib/handlers.d.ts b/node_modules/@subsquid/squid-gen-evm/lib/handlers.d.ts
index 236fa01..28058ed 100644
--- a/node_modules/@subsquid/squid-gen-evm/lib/handlers.d.ts
+++ b/node_modules/@subsquid/squid-gen-evm/lib/handlers.d.ts
@@ -10,6 +10,8 @@ export declare class HandlersCodegen {
constructor(outDir: OutDir, options: {
contract: SquidContract;
dataTarget: DataTarget;
+ events: boolean;
+ functions: boolean;
});
generate(): void;
private printImports;
diff --git a/node_modules/@subsquid/squid-gen-evm/lib/handlers.js b/node_modules/@subsquid/squid-gen-evm/lib/handlers.js
index 59ace5e..3c5a187 100644
index 59ace5e..64b8493 100644
--- a/node_modules/@subsquid/squid-gen-evm/lib/handlers.js
+++ b/node_modules/@subsquid/squid-gen-evm/lib/handlers.js
@@ -64,7 +64,7 @@ class HandlersCodegen {
@@ -41,12 +41,15 @@ class HandlersCodegen {
out.line(`import {toJSON} from '@subsquid/util-internal-json'`);
}
out.line(`import type {Store} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `db`))}'`);
- out.line(`import {${handlerType}} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
- if (handlerType === 'functions') {
+ const hasFunctions = Object.keys(this.options.contract.functions || {}).length > 0;
+ const hasEvents = Object.keys(this.options.contract.events || {}).length > 0;
+ if (hasFunctions && handlerType === 'functions') {
+ out.line(`import {${handlerType}} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
this.getFunctionTargetPrinter().printImports();
out.line(`import {Transaction} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `processor`))}'`);
}
- else {
+ else if (hasEvents && handlerType === 'events') {
+ out.line(`import {${handlerType}} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
this.getEventTargetPrinter().printImports();
out.line(`import {Log} from '${(0, squid_gen_utils_1.resolveModule)(out.file, path_1.default.resolve(`src`, `processor`))}'`);
}
@@ -64,7 +67,7 @@ class HandlersCodegen {
`log.id`,
`log.block.height`,
`new Date(log.block.timestamp)`,
Expand All @@ -48,7 +80,7 @@ index 59ace5e..3c5a187 100644
`log.address`,
`'${e}'`,
...fragment.params.slice(6).map((p) => {
@@ -72,6 +72,9 @@ class HandlersCodegen {
@@ -72,6 +75,9 @@ class HandlersCodegen {
this.useJSON();
return `toJSON(e.${p.originalName ?? p.name})`;
}
Expand All @@ -58,6 +90,26 @@ index 59ace5e..3c5a187 100644
else {
return `e.${p.originalName ?? p.name}`;
}
diff --git a/node_modules/@subsquid/squid-gen-evm/lib/mappings.js b/node_modules/@subsquid/squid-gen-evm/lib/mappings.js
index 2c135a7..9105868 100644
--- a/node_modules/@subsquid/squid-gen-evm/lib/mappings.js
+++ b/node_modules/@subsquid/squid-gen-evm/lib/mappings.js
@@ -45,7 +45,14 @@ class MappingCodegen {
}
this.out.line(`import {Store} from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `db`))}'`);
this.getTargetPrinter().printImports();
- this.out.line(`import {functions, events} from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
+ const hasFunctions = Object.keys(this.options.contract.functions || {}).length > 0;
+ const hasEvents = Object.keys(this.options.contract.events || {}).length > 0;
+ if (hasFunctions) {
+ this.out.line(`import {functions} from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
+ }
+ if (hasEvents) {
+ this.out.line(`import {events} from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `abi`, this.options.contract.spec))}'`);
+ }
this.out.line(`import * as eventHandlers from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `handlers`, `${this.options.contract.name}_events`))}'`);
this.out.line(`import * as functionHandlers from '${(0, squid_gen_utils_1.resolveModule)(this.out.file, path_1.default.resolve(`src`, `handlers`, `${this.options.contract.name}_functions`))}'`);
if (this.util.size > 0) {
diff --git a/node_modules/@subsquid/squid-gen-evm/lib/processor.js b/node_modules/@subsquid/squid-gen-evm/lib/processor.js
index f6c72a1..b8c16a2 100644
--- a/node_modules/@subsquid/squid-gen-evm/lib/processor.js
Expand Down
2 changes: 1 addition & 1 deletion indexers/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies = [
"pandas>=2.2.3",
"python-dotenv>=1.0.1",
"pyyaml>=6.0.2",
"synthetix>=0.1.21",
"synthetix>=0.1.22",
]

[tool.uv]
Expand Down
53 changes: 37 additions & 16 deletions indexers/scripts/listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,12 @@ def convert_case(name):
return snake_case


def insert_data(
client: Client, network: str, protocol: str, event: str, block_range: str
):
table_name = f"{network}.{protocol}_{convert_case(event)}"
file_path = f"{CLICKHOUSE_INTERNAL_PATH}/{network}/{protocol}/{event}/{event}_{block_range}.parquet"
query = f"insert into {table_name} select * from file('{file_path}', 'Parquet')"
client.command(query)


class FolderEventHandler(FileSystemEventHandler):
def __init__(self):
super().__init__()
self.source_path = Path(f"{RAW_DATA_PATH}")
self.target_path = Path(f"{CLEAN_DATA_PATH}")
self.clickhouse_path = Path(f"{CLICKHOUSE_INTERNAL_PATH}")
if not self.source_path.exists():
print(f"Creating source path {self.source_path}")
self.source_path.mkdir(parents=True, exist_ok=True)
Expand All @@ -54,10 +46,12 @@ def _clean_parquet(self, path: str):
block_range = path.name
protocol_name = path.parent.name
network_name = path.parent.parent.name
db_name = f"raw_{network_name}"

# Initialize counters
empty_files = 0
written_files = 0
tables_created = 0
data_insertions = 0

for parquet_file in path.glob("*.parquet"):
Expand All @@ -66,23 +60,50 @@ def _clean_parquet(self, path: str):
self.target_path / f"{network_name}" / f"{protocol_name}" / event_name
)
output_file = event_dir / f"{event_name}_{block_range}.parquet"
output_file.parent.mkdir(parents=True, exist_ok=True)

# Move file to target directory
df = pd.read_parquet(parquet_file)
if df.empty:
empty_files += 1
continue
output_file.parent.mkdir(parents=True, exist_ok=True)
df.columns = [convert_case(col) for col in df.columns]
df.to_parquet(output_file, index=False)
written_files += 1

# import data into clickhouse
insert_data(
self.client, network_name, protocol_name, event_name, block_range
)
data_insertions += 1
# Import data into clickhouse
table_name = f"{protocol_name}_{event_name}"
clickhouse_file_path = f"{self.clickhouse_path}/{network_name}/{protocol_name}/{event_name}/{event_name}_{block_range}.parquet"
if not self.client.command(f"exists {db_name}.{table_name}"):
self._create_table_from_file(db_name, table_name, clickhouse_file_path)
tables_created += 1
else:
self._insert_data_from_file(db_name, table_name, clickhouse_file_path)
data_insertions += 1

print(
f"Processed {network_name}.{protocol_name}.{block_range}: empty files {empty_files}, written files {written_files}, data insertions {data_insertions}"
f"Processed {network_name}.{protocol_name}.{block_range}: "
f"empty files {empty_files}, written files {written_files}, "
f"tables created {tables_created}, data insertions {data_insertions}"
)

def _create_table_from_file(self, db_name: str, table_name: str, file_path: str):
query = (
f"create table if not exists {db_name}.{table_name} "
f"engine = MergeTree order by tuple() as "
f"select * from file('{file_path}', 'Parquet')"
)
try:
self.client.command(query)
except Exception as e:
print(f"Error creating table {db_name}.{table_name}: {e}")

def _insert_data_from_file(self, db_name: str, table_name: str, file_path: str):
query = f"insert into {db_name}.{table_name} select * from file('{file_path}', 'Parquet')"
try:
self.client.command(query)
except Exception as e:
print(f"Error inserting data into {db_name}.{table_name}: {e}")


def main():
Expand Down
21 changes: 16 additions & 5 deletions indexers/utils/clickhouse_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,21 @@ def map_to_clickhouse_type(sol_type):

def generate_clickhouse_schema(event_name, fields, network_name, protocol_name):
query = [
f"CREATE TABLE IF NOT EXISTS {network_name}.{protocol_name}_{event_name} ("
f"CREATE TABLE IF NOT EXISTS raw_{network_name}.{protocol_name}_{event_name} (",
" `id` String,",
" `block_number` UInt64,",
" `block_timestamp` DateTime64(3, 'UTC'),",
" `transaction_hash` String,",
" `contract` String,",
" `event_name` String,",
]
for field_name, field_type in fields:
clickhouse_type = map_to_clickhouse_type(field_type)
query.append(f" `{to_snake(field_name)}` {clickhouse_type},")
if field_name == "id":
clickhouse_type = "String"
query.append(f" `param_id` String,")
else:
clickhouse_type = map_to_clickhouse_type(field_type)
query.append(f" `{to_snake(field_name)}` {clickhouse_type},")
query[-1] = query[-1][:-1]
query.append(") ENGINE = MergeTree() ORDER BY tuple();")
return "\n".join(query)
Expand Down Expand Up @@ -89,7 +99,7 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol
for event in events:
event_name = to_snake(event["name"])
contract_name = to_snake(contract_name)
event_name = f"{contract_name}_{event_name}"
event_name = f"{contract_name}_event_{event_name}"

input_names = get_abi_input_names(event)
input_types = get_abi_input_types(event)
Expand All @@ -101,6 +111,7 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol
network_name=network_name,
protocol_name=protocol_name,
)
print(schema)
client.command(schema)
save_clickhouse_schema(path=path, event_name=event_name, schema=schema)

Expand Down Expand Up @@ -140,7 +151,7 @@ def process_abi_schemas(client, abi, path, contract_name, network_name, protocol

# do the blocks
block_schema = (
f"CREATE TABLE IF NOT EXISTS {network_name}.{protocol_name}_block (\n"
f"CREATE TABLE IF NOT EXISTS raw_{network_name}.{protocol_name}_block (\n"
" `number` UInt64,\n"
" `timestamp` DateTime64(3, 'UTC')\n"
") ENGINE = MergeTree() ORDER BY tuple();"
Expand Down
2 changes: 1 addition & 1 deletion transformers/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pandas
pyarrow
psycopg2-binary
dbt-postgres
dbt-clickhouse
sqlfluff
synthetix==0.1.21
3 changes: 3 additions & 0 deletions transformers/synthetix/.sqlfluff
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ allow_scalar = False
[sqlfluff:rules:capitalisation.identifiers]
extended_capitalisation_policy = lower

[sqlfluff:rules:capitalisation.types]
extended_capitalisation_policy = pascal

[sqlfluff:rules:capitalisation.functions]
capitalisation_policy = lower

Expand Down
Loading

0 comments on commit 8120ba1

Please sign in to comment.