Skip to content

Commit

Permalink
update the paths
Browse files Browse the repository at this point in the history
  • Loading branch information
Tburm committed Dec 6, 2024
1 parent 1b888cb commit 64404d2
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 27 deletions.
3 changes: 2 additions & 1 deletion extractors/configs/arbitrum_mainnet.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
network_id: 42161
protocol: "synthetix"

blocks:
min_block: "232500000"
min_block: "218M"
requests_per_second: 25
block_increment: 4000
chunk_size: 80
Expand Down
1 change: 1 addition & 0 deletions extractors/configs/arbitrum_sepolia.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
network_id: 421614
protocol: "synthetix"

blocks:
min_block: "41M"
Expand Down
7 changes: 4 additions & 3 deletions extractors/configs/base_mainnet.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
network_id: 8453
protocol: "synthetix"

blocks:
min_block: "7.5M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80

Expand All @@ -14,7 +15,7 @@ eth_calls:
- [1, "0xC74eA762cF06c9151cE074E6a569a5945b6302E7"]
- [1, "0x729Ef31D86d31440ecBF49f27F7cD7c16c6616d2"]
min_block: "7.5M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80

Expand All @@ -25,6 +26,6 @@ eth_calls:
- [1, "0xC74eA762cF06c9151cE074E6a569a5945b6302E7"]
- [1, "0x729Ef31D86d31440ecBF49f27F7cD7c16c6616d2"]
min_block: "7.5M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80
7 changes: 4 additions & 3 deletions extractors/configs/base_sepolia.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
network_id: 84532
protocol: "synthetix"

blocks:
min_block: "8M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80

Expand All @@ -13,7 +14,7 @@ eth_calls:
inputs:
- [1, "0x8069c44244e72443722cfb22DcE5492cba239d39"]
min_block: "8M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80

Expand All @@ -23,6 +24,6 @@ eth_calls:
inputs:
- [1, "0x8069c44244e72443722cfb22DcE5492cba239d39"]
min_block: "8M"
requests_per_second: 25
requests_per_second: 1000
block_increment: 500
chunk_size: 80
7 changes: 4 additions & 3 deletions extractors/configs/eth_mainnet.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
network_id: 1
protocol: "synthetix"

blocks:
min_block: "20000000"
requests_per_second: 25
requests_per_second: 1000
block_increment: 150
chunk_size: 50

Expand All @@ -13,7 +14,7 @@ eth_calls:
inputs:
- [1, "0xC011a73ee8576Fb46F5E1c5751cA3B9Fe0af2a6F"]
min_block: "20000000"
requests_per_second: 25
requests_per_second: 1000
block_increment: 150
chunk_size: 50

Expand All @@ -23,6 +24,6 @@ eth_calls:
inputs:
- [1, "0xC011a73ee8576Fb46F5E1c5751cA3B9Fe0af2a6F"]
min_block: "20000000"
requests_per_second: 25
requests_per_second: 1000
block_increment: 150
chunk_size: 50
16 changes: 9 additions & 7 deletions extractors/main.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
"""Main script to extract data from EVM nodes based on a provided configuration."""

import yaml
import argparse
from dotenv import load_dotenv
Expand All @@ -8,8 +9,8 @@
load_dotenv()

# parse command-line arguments
parser = argparse.ArgumentParser(description="Extract data from Ethereum nodes.")
parser.add_argument("config", help="Path to the YAML configuration file")
parser = argparse.ArgumentParser(description="Extract data from EVM nodes.")
parser.add_argument("config", help="Path to a YAML configuration file")
parser.add_argument("--name", help="Name of the configuration to use (optional)")
args = parser.parse_args()

Expand All @@ -19,34 +20,35 @@

network_id = config.get("network_id")
block_config = config.get("blocks")
protocol = config.get("protocol")
eth_call_configs = config.get("eth_calls", [])

# determine the flow based on the --name argument
if args.name:
if args.name == "blocks":
# run blocks only
extract_blocks(network_id=network_id, **block_config)
extract_blocks(network_id=network_id, protocol=protocol, **block_config)
else:
# run the specified eth_call only
eth_call_config = next(
(ec for ec in eth_call_configs if ec["function_name"] == args.name), None
)
if eth_call_config:
extract_data(network_id=network_id, **eth_call_config)
extract_data(network_id=network_id, protocol=protocol, **eth_call_config)
else:
print(f"No configuration found with name {args.name}")
else:
# run everything
exceptions = []
try:
extract_blocks(network_id=network_id, **block_config)
extract_blocks(network_id=network_id, protocol=protocol, **block_config)
except Exception as e:
exceptions.append(e)
print(f"Error extracting blocks: {e}")

for eth_call_config in eth_call_configs:
try:
extract_data(network_id=network_id, **eth_call_config)
extract_data(network_id=network_id, protocol=protocol, **eth_call_config)
except Exception as e:
exceptions.append(e)
print(f"Error extracting eth_call {eth_call_config.get('name')}: {e}")
Expand Down
14 changes: 8 additions & 6 deletions extractors/src/clean.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def get_labels(contract, function_name):
return input_names, output_names


def clean_data(chain_name, contract, function_name, write=True):
def clean_data(chain_name, protocol, contract, function_name, write=True):
input_labels, output_labels = get_labels(contract, function_name)

# fix labels
Expand All @@ -86,7 +86,7 @@ def clean_data(chain_name, contract, function_name, write=True):
df = duckdb.sql(
f"""
SELECT DISTINCT *
FROM '../parquet-data/raw/{chain_name}/{function_name}/*.parquet'
FROM '../parquet-data/extractors/raw/{chain_name}/{protocol}/{function_name}/*.parquet'
WHERE
call_data IS NOT NULL
AND output_data IS NOT NULL
Expand Down Expand Up @@ -127,7 +127,7 @@ def clean_data(chain_name, contract, function_name, write=True):

# write the data
if write:
file_path = f"../parquet-data/clean/{chain_name}/{function_name}.parquet"
file_path = f"../parquet-data/extractors/clean/{chain_name}/{protocol}/{function_name}.parquet"

ensure_directory_exists(file_path)
# write the data
Expand All @@ -140,21 +140,23 @@ def clean_data(chain_name, contract, function_name, write=True):
return df


def clean_blocks(chain_name, write=True):
def clean_blocks(chain_name, protocol, write=True):
# read the data
df = duckdb.sql(
f"""
SELECT DISTINCT
CAST(timestamp as BIGINT) as timestamp,
CAST(block_number as BIGINT) as block_number
FROM '/parquet-data/raw/{chain_name}/blocks/*.parquet'
FROM '/parquet-data/extractors/raw/{chain_name}/{protocol}/blocks/*.parquet'
ORDER BY block_number
"""
)

# write the data
if write:
file_path = f"/parquet-data/clean/{chain_name}/blocks.parquet"
file_path = (
f"/parquet-data/extractors/clean/{chain_name}/{protocol}/blocks.parquet"
)

ensure_directory_exists(file_path)

Expand Down
14 changes: 10 additions & 4 deletions extractors/src/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def get_synthetix(chain_config):
# generalize a function
def extract_data(
network_id,
protocol,
contract_name,
package_name,
function_name,
Expand All @@ -37,7 +38,7 @@ def extract_data(
# get synthetix
chain_config = CHAIN_CONFIGS[network_id]
snx = get_synthetix(chain_config)
output_dir = f"/parquet-data/raw/{chain_config['name']}/{function_name}"
output_dir = f"/parquet-data/extractors/raw/{chain_config['name']}/{protocol}/{function_name}"

# encode the call data
contract = snx.contracts[package_name][contract_name]["contract"]
Expand All @@ -60,11 +61,13 @@ def extract_data(
)

if clean:
df_clean = clean_data(chain_config["name"], contract, function_name)
df_clean = clean_data(chain_config["name"], protocol, contract, function_name)
return df_clean


def extract_blocks(
network_id,
protocol,
clean=True,
min_block=0,
requests_per_second=25,
Expand All @@ -79,7 +82,9 @@ def extract_blocks(
snx = get_synthetix(chain_config)

# try reading and looking for latest block
output_dir = f"/parquet-data/raw/{chain_config['name']}/blocks"
output_dir = (
f"/parquet-data/extractors/raw/{chain_config['name']}/{protocol}/blocks"
)

cryo.freeze(
"blocks",
Expand All @@ -93,4 +98,5 @@ def extract_blocks(
)

if clean:
df_clean = clean_blocks(chain_config["name"])
df_clean = clean_blocks(chain_config["name"], protocol)
return df_clean

0 comments on commit 64404d2

Please sign in to comment.