Skip to content

Commit

Permalink
Merge pull request #147 from Synthetixio/extractors-v2
Browse files Browse the repository at this point in the history
Extractors Uprade
  • Loading branch information
Tburm authored Dec 10, 2024
2 parents 8af713c + 5ae56d6 commit 9008789
Show file tree
Hide file tree
Showing 15 changed files with 1,257 additions and 62 deletions.
10 changes: 5 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ build:
docker compose build transformer

extract:
docker compose run extractors python main.py configs/eth_mainnet.yaml
docker compose run extractors python main.py configs/base_mainnet.yaml
docker compose run extractors python main.py configs/base_sepolia.yaml
docker compose run extractors python main.py configs/arbitrum_mainnet.yaml
docker compose run extractors python main.py configs/arbitrum_sepolia.yaml
docker compose run extractors uv run python main.py configs/eth_mainnet.yaml
docker compose run extractors uv run python main.py configs/base_mainnet.yaml
docker compose run extractors uv run python main.py configs/base_sepolia.yaml
docker compose run extractors uv run python main.py configs/arbitrum_mainnet.yaml
docker compose run extractors uv run python main.py configs/arbitrum_sepolia.yaml

index:
docker compose run indexer --network_name base_mainnet --protocol_name synthetix
Expand Down
1 change: 1 addition & 0 deletions extractors/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.12
35 changes: 15 additions & 20 deletions extractors/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
# build rust package
FROM rust:1.79.0 as builder
# include rust
FROM rust:1.79.0 AS builder

RUN apt-get update && apt-get install -y python3-pip python3-venv

RUN python3 -m venv venv
ENV PATH="/venv/bin:$PATH"
# install wheel package and run
FROM python:3.12
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/

WORKDIR /usr/src
RUN git clone https://github.com/paradigmxyz/cryo
WORKDIR /usr/src/cryo/crates/python
# Copy Rust and Cargo binaries from the builder stage
COPY --from=builder /usr/local/cargo /usr/local/cargo
COPY --from=builder /usr/local/rustup /usr/local/rustup
COPY --from=builder /usr/local/bin /usr/local/bin

RUN pip install maturin
RUN maturin build --release
# Ensure the copied binaries are in the PATH
ENV PATH="/usr/local/cargo/bin:/usr/local/rustup/bin:${PATH}"

# install wheel package and run
FROM python:3.11-slim as runtime
RUN rustup default 1.79.0

WORKDIR /app

COPY --from=builder /usr/src/cryo/target/wheels/*.whl /app/

RUN pip install --no-cache-dir /app/*.whl

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen

COPY . .

CMD ["python3", "main.py"]
CMD ["uv", "run", "python", "main.py"]
3 changes: 2 additions & 1 deletion extractors/configs/arbitrum_mainnet.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
network_id: 42161
protocol: "synthetix"

blocks:
min_block: "232500000"
min_block: "218M"
requests_per_second: 25
block_increment: 4000
chunk_size: 80
Expand Down
1 change: 1 addition & 0 deletions extractors/configs/arbitrum_sepolia.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
network_id: 421614
protocol: "synthetix"

blocks:
min_block: "41M"
Expand Down
7 changes: 4 additions & 3 deletions extractors/configs/base_mainnet.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
network_id: 8453
protocol: "synthetix"

blocks:
min_block: "7.5M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80

Expand All @@ -14,7 +15,7 @@ eth_calls:
- [1, "0xC74eA762cF06c9151cE074E6a569a5945b6302E7"]
- [1, "0x729Ef31D86d31440ecBF49f27F7cD7c16c6616d2"]
min_block: "7.5M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80

Expand All @@ -25,6 +26,6 @@ eth_calls:
- [1, "0xC74eA762cF06c9151cE074E6a569a5945b6302E7"]
- [1, "0x729Ef31D86d31440ecBF49f27F7cD7c16c6616d2"]
min_block: "7.5M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80
7 changes: 4 additions & 3 deletions extractors/configs/base_sepolia.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
network_id: 84532
protocol: "synthetix"

blocks:
min_block: "8M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80

Expand All @@ -13,7 +14,7 @@ eth_calls:
inputs:
- [1, "0x8069c44244e72443722cfb22DcE5492cba239d39"]
min_block: "8M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80

Expand All @@ -23,6 +24,6 @@ eth_calls:
inputs:
- [1, "0x8069c44244e72443722cfb22DcE5492cba239d39"]
min_block: "8M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80
7 changes: 4 additions & 3 deletions extractors/configs/eth_mainnet.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
network_id: 1
protocol: "synthetix"

blocks:
min_block: "20000000"
requests_per_second: 25
requests_per_second: 1000
block_increment: 150
chunk_size: 50

Expand All @@ -13,7 +14,7 @@ eth_calls:
inputs:
- [1, "0xC011a73ee8576Fb46F5E1c5751cA3B9Fe0af2a6F"]
min_block: "20000000"
requests_per_second: 25
requests_per_second: 1000
block_increment: 150
chunk_size: 50

Expand All @@ -23,6 +24,6 @@ eth_calls:
inputs:
- [1, "0xC011a73ee8576Fb46F5E1c5751cA3B9Fe0af2a6F"]
min_block: "20000000"
requests_per_second: 25
requests_per_second: 1000
block_increment: 150
chunk_size: 50
16 changes: 9 additions & 7 deletions extractors/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
"""Main script to extract data from EVM nodes based on a provided configuration."""

import yaml
import argparse
from dotenv import load_dotenv
Expand All @@ -8,8 +9,8 @@
load_dotenv()

# parse command-line arguments
parser = argparse.ArgumentParser(description="Extract data from Ethereum nodes.")
parser.add_argument("config", help="Path to the YAML configuration file")
parser = argparse.ArgumentParser(description="Extract data from EVM nodes.")
parser.add_argument("config", help="Path to a YAML configuration file")
parser.add_argument("--name", help="Name of the configuration to use (optional)")
args = parser.parse_args()

Expand All @@ -19,34 +20,35 @@

network_id = config.get("network_id")
block_config = config.get("blocks")
protocol = config.get("protocol")
eth_call_configs = config.get("eth_calls", [])

# determine the flow based on the --name argument
if args.name:
if args.name == "blocks":
# run blocks only
extract_blocks(network_id=network_id, **block_config)
extract_blocks(network_id=network_id, protocol=protocol, **block_config)
else:
# run the specified eth_call only
eth_call_config = next(
(ec for ec in eth_call_configs if ec["function_name"] == args.name), None
)
if eth_call_config:
extract_data(network_id=network_id, **eth_call_config)
extract_data(network_id=network_id, protocol=protocol, **eth_call_config)
else:
print(f"No configuration found with name {args.name}")
else:
# run everything
exceptions = []
try:
extract_blocks(network_id=network_id, **block_config)
extract_blocks(network_id=network_id, protocol=protocol, **block_config)
except Exception as e:
exceptions.append(e)
print(f"Error extracting blocks: {e}")

for eth_call_config in eth_call_configs:
try:
extract_data(network_id=network_id, **eth_call_config)
extract_data(network_id=network_id, protocol=protocol, **eth_call_config)
except Exception as e:
exceptions.append(e)
print(f"Error extracting eth_call {eth_call_config.get('name')}: {e}")
Expand Down
18 changes: 18 additions & 0 deletions extractors/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[project]
name = "extractors"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"cryo==0.3.0",
"duckdb>=1.1.3",
"maturin>=1.7.7",
"numpy>=2.1.3",
"pandas>=2.2.3",
"polars-lts-cpu>=1.16.0",
"pyarrow>=18.1.0",
"python-dotenv>=1.0.1",
"pyyaml>=6.0.2",
"synthetix>=0.1.21",
]
9 changes: 0 additions & 9 deletions extractors/requirements.txt

This file was deleted.

14 changes: 8 additions & 6 deletions extractors/src/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def get_labels(contract, function_name):
return input_names, output_names


def clean_data(chain_name, contract, function_name, write=True):
def clean_data(chain_name, protocol, contract, function_name, write=True):
input_labels, output_labels = get_labels(contract, function_name)

# fix labels
Expand All @@ -86,7 +86,7 @@ def clean_data(chain_name, contract, function_name, write=True):
df = duckdb.sql(
f"""
SELECT DISTINCT *
FROM '../parquet-data/raw/{chain_name}/{function_name}/*.parquet'
FROM '../parquet-data/extractors/raw/{chain_name}/{protocol}/{function_name}/*.parquet'
WHERE
call_data IS NOT NULL
AND output_data IS NOT NULL
Expand Down Expand Up @@ -127,7 +127,7 @@ def clean_data(chain_name, contract, function_name, write=True):

# write the data
if write:
file_path = f"../parquet-data/clean/{chain_name}/{function_name}.parquet"
file_path = f"../parquet-data/extractors/clean/{chain_name}/{protocol}/{function_name}.parquet"

ensure_directory_exists(file_path)
# write the data
Expand All @@ -140,21 +140,23 @@ def clean_data(chain_name, contract, function_name, write=True):
return df


def clean_blocks(chain_name, write=True):
def clean_blocks(chain_name, protocol, write=True):
# read the data
df = duckdb.sql(
f"""
SELECT DISTINCT
CAST(timestamp as BIGINT) as timestamp,
CAST(block_number as BIGINT) as block_number
FROM '/parquet-data/raw/{chain_name}/blocks/*.parquet'
FROM '/parquet-data/extractors/raw/{chain_name}/{protocol}/blocks/*.parquet'
ORDER BY block_number
"""
)

# write the data
if write:
file_path = f"/parquet-data/clean/{chain_name}/blocks.parquet"
file_path = (
f"/parquet-data/extractors/clean/{chain_name}/{protocol}/blocks.parquet"
)

ensure_directory_exists(file_path)

Expand Down
14 changes: 10 additions & 4 deletions extractors/src/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def get_synthetix(chain_config):
# generalize a function
def extract_data(
network_id,
protocol,
contract_name,
package_name,
function_name,
Expand All @@ -37,7 +38,7 @@ def extract_data(
# get synthetix
chain_config = CHAIN_CONFIGS[network_id]
snx = get_synthetix(chain_config)
output_dir = f"/parquet-data/raw/{chain_config['name']}/{function_name}"
output_dir = f"/parquet-data/extractors/raw/{chain_config['name']}/{protocol}/{function_name}"

# encode the call data
contract = snx.contracts[package_name][contract_name]["contract"]
Expand All @@ -60,11 +61,13 @@ def extract_data(
)

if clean:
df_clean = clean_data(chain_config["name"], contract, function_name)
df_clean = clean_data(chain_config["name"], protocol, contract, function_name)
return df_clean


def extract_blocks(
network_id,
protocol,
clean=True,
min_block=0,
requests_per_second=25,
Expand All @@ -79,7 +82,9 @@ def extract_blocks(
snx = get_synthetix(chain_config)

# try reading and looking for latest block
output_dir = f"/parquet-data/raw/{chain_config['name']}/blocks"
output_dir = (
f"/parquet-data/extractors/raw/{chain_config['name']}/{protocol}/blocks"
)

cryo.freeze(
"blocks",
Expand All @@ -93,4 +98,5 @@ def extract_blocks(
)

if clean:
df_clean = clean_blocks(chain_config["name"])
df_clean = clean_blocks(chain_config["name"], protocol)
return df_clean
Loading

0 comments on commit 9008789

Please sign in to comment.