Skip to content
This repository has been archived by the owner on Nov 27, 2024. It is now read-only.

Geth support for inspect #112

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
34 changes: 26 additions & 8 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from mev_inspect.concurrency import coro
from mev_inspect.db import get_inspect_session, get_trace_session
from mev_inspect.inspector import MEVInspector
from mev_inspect.utils import RPCType

RPC_URL_ENV = "RPC_URL"

Expand All @@ -21,17 +22,29 @@ def cli():
@cli.command()
@click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option("--geth/--no-geth", default=False)
@click.option(
"--type",
type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False),
default=RPCType.parity.name,
)
@coro
async def inspect_block_command(block_number: int, rpc: str, geth: bool):
print("geth", geth)
async def inspect_block_command(block_number: int, rpc: str, type: str):
type_e = convert_str_to_enum(type)
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()

inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, geth)
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, type_e)
await inspector.inspect_single_block(block=block_number)


def convert_str_to_enum(type: str) -> RPCType:
if type == "parity":
return RPCType.parity
elif type == "geth":
return RPCType.geth
raise ValueError


@cli.command()
@click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
Expand All @@ -40,7 +53,7 @@ async def fetch_block_command(block_number: int, rpc: str):
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()

inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, False)
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, RPCType.parity)
block = await inspector.create_from_block(block_number=block_number)
print(block.json())

Expand All @@ -49,7 +62,11 @@ async def fetch_block_command(block_number: int, rpc: str):
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option("--geth/--no-geth", default=False)
@click.option(
"--type",
type=click.Choice(list(map(lambda x: x.name, RPCType)), case_sensitive=False),
default=RPCType.parity.name,
)
@click.option(
"--max-concurrency",
type=int,
Expand All @@ -66,15 +83,16 @@ async def inspect_many_blocks_command(
rpc: str,
max_concurrency: int,
request_timeout: int,
geth: bool,
type: str,
):
type_e = convert_str_to_enum(type)
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()
inspector = MEVInspector(
rpc,
inspect_db_session,
trace_db_session,
geth,
type_e,
max_concurrency=max_concurrency,
request_timeout=request_timeout,
)
Expand Down
182 changes: 92 additions & 90 deletions mev_inspect/block.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import asyncio
import logging
from typing import List, Optional
import json
import aiohttp

from sqlalchemy import orm
from web3 import Web3
Expand All @@ -11,10 +9,17 @@
from mev_inspect.schemas.blocks import Block
from mev_inspect.schemas.receipts import Receipt
from mev_inspect.schemas.traces import Trace, TraceType
from mev_inspect.utils import hex_to_int
from mev_inspect.utils import RPCType, hex_to_int


logger = logging.getLogger(__name__)
_calltype_mapping = {
"CALL": "call",
"DELEGATECALL": "delegateCall",
"CREATE": "create",
"SUICIDE": "suicide",
"REWARD": "reward",
}


async def get_latest_block_number(base_provider) -> int:
Expand All @@ -29,7 +34,7 @@ async def get_latest_block_number(base_provider) -> int:
async def create_from_block_number(
base_provider,
w3: Web3,
geth: bool,
type: RPCType,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> Block:
Expand All @@ -39,55 +44,63 @@ async def create_from_block_number(
block = _find_block(trace_db_session, block_number)

if block is None:
block = await _fetch_block(w3, base_provider, geth, block_number)
return block
if type is RPCType.parity:
block = await _fetch_block_parity(w3, base_provider, block_number)
elif type is RPCType.geth:
block = await _fetch_block_geth(w3, base_provider, block_number)
else:
logger.error(f"RPCType not known - {type}")
raise ValueError
return block


async def _fetch_block(
w3, base_provider, geth: bool, block_number: int, retries: int = 0
async def _fetch_block_parity(
w3, base_provider, block_number: int, retries: int = 0
) -> Block:
if not geth:
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
w3.eth.get_block(block_number),
base_provider.make_request("eth_getBlockReceipts", [block_number]),
base_provider.make_request("trace_block", [block_number]),
fetch_base_fee_per_gas(w3, block_number),
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
w3.eth.get_block(block_number),
base_provider.make_request("eth_getBlockReceipts", [block_number]),
base_provider.make_request("trace_block", [block_number]),
fetch_base_fee_per_gas(w3, block_number),
)

try:
receipts: List[Receipt] = [
Receipt(**receipt) for receipt in receipts_json["result"]
]
traces = [Trace(**trace_json) for trace_json in traces_json["result"]]
return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
except KeyError as e:
logger.warning(
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
)
if retries < 3:
await asyncio.sleep(5)
return await _fetch_block_parity(w3, base_provider, block_number, retries)
else:
raise

try:
receipts: List[Receipt] = [
Receipt(**receipt) for receipt in receipts_json["result"]
]
traces = [Trace(**trace_json) for trace_json in traces_json["result"]]
return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
except KeyError as e:
logger.warning(
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
)
if retries < 3:
await asyncio.sleep(5)
return await _fetch_block(
w3, base_provider, geth, block_number, retries
)
else:
raise
else:
# print(block_number)
block_json = await asyncio.gather(w3.eth.get_block(block_number))

async def _fetch_block_geth(
w3, base_provider, block_number: int, retries: int = 0
) -> Block:
block_json = await asyncio.gather(w3.eth.get_block(block_number))

try:
# Separate calls to help with load during block tracing
traces = await geth_get_tx_traces_parity_format(base_provider, block_json[0])
geth_tx_receipts = await geth_get_tx_receipts_async(
base_provider.endpoint_uri, block_json[0]["transactions"]
base_provider, block_json[0]["transactions"]
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these can be grouped with a gather to make in parallel

Copy link
Author

@supragya supragya Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have seen extra load while tracing is being done hamper tracer in polygon. Sometimes the tracer runs out of time (internal timeout) and errors. Better to keep separate.

receipts = geth_receipts_translator(block_json[0], geth_tx_receipts)
base_fee_per_gas = 0
base_fee_per_gas = 0 # Polygon specific, TODO for other chains

return Block(
block_number=block_number,
Expand All @@ -97,6 +110,15 @@ async def _fetch_block(
traces=traces,
receipts=receipts,
)
except KeyError as e:
logger.warning(
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
)
if retries < 3:
await asyncio.sleep(5)
return await _fetch_block_geth(w3, base_provider, block_number, retries)
else:
raise


def _find_block(
Expand Down Expand Up @@ -245,13 +267,6 @@ def unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, tx_trace, position=[]
) -> List[Trace]:
response_list = []
_calltype_mapping = {
"CALL": "call",
"DELEGATECALL": "delegateCall",
"CREATE": "create",
"SUICIDE": "suicide",
"REWARD": "reward",
}
try:
if tx_trace["type"] == "STATICCALL":
return []
Expand Down Expand Up @@ -279,7 +294,8 @@ def unwrap_tx_trace_for_parity(
type=TraceType(_calltype_mapping[tx_trace["type"]]),
)
)
except Exception:
except Exception as e:
logger.warn(f"error while unwraping tx trace for parity {e}")
return []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what causes this to get hit

Copy link
Author

@supragya supragya Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my experience, CREATE2. Not used in analysis

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, want to move this to the same as you have for STATICCALL where it checks for it explicitly?

It's better on our side to have it just blow up with an exception for now. We'd rather know if we're missing cases.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Added create2
  2. Added logger warning.
    Haven't yet made it to throw exception when stuff happens since it helps us back-fill in "best effort" form. Yet, person who wants to work on this further for translation to be "exact" can remove this. Can add a comment around here though. What do you think? @lukevs

Copy link
Collaborator

@lukevs lukevs Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1, should there be a line like this?

        if tx_trace["type"] == "CREATE2":
            return []

For 2, is there anything that still makes it throw now that create2 is supported?

I'm a little hesitant to blanket accept all Exception - if there are known failures I'm ok with skipping those blocks for now, but I think we should at least limit the scope of exceptions to what we know

Otherwise for geth users they could be unknowingly missing blocks


if "calls" in tx_trace.keys():
Expand All @@ -292,28 +308,20 @@ def unwrap_tx_trace_for_parity(
return response_list


async def geth_get_tx_receipts_task(session, endpoint_uri, tx):
data = {
"jsonrpc": "2.0",
"id": "0",
"method": "eth_getTransactionReceipt",
"params": [tx.hex()],
}
async with session.post(endpoint_uri, json=data) as response:
if response.status != 200:
response.raise_for_status()
return await response.text()
async def geth_get_tx_receipts_task(base_provider, tx):
receipt = await base_provider.make_request("eth_getTransactionReceipt", [tx.hex()])
return receipt


async def geth_get_tx_receipts_async(endpoint_uri, transactions):
async def geth_get_tx_receipts_async(base_provider, transactions):
geth_tx_receipts = []
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(geth_get_tx_receipts_task(session, endpoint_uri, tx))
for tx in transactions
]
geth_tx_receipts = await asyncio.gather(*tasks)
return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts]
tasks = [
asyncio.create_task(geth_get_tx_receipts_task(base_provider, tx))
for tx in transactions
]
geth_tx_receipts = await asyncio.gather(*tasks)
# return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts]
return geth_tx_receipts


def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]:
Expand All @@ -331,24 +339,18 @@ def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]:


def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt:
try:
if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16):
print(
"Alert the position of transaction in block is mismatched ",
tx_pos_in_block,
tx_receipt["transactionIndex"],
)
return Receipt(
block_number=block_json["number"],
transaction_hash=tx_receipt["transactionHash"],
transaction_index=tx_pos_in_block,
gas_used=tx_receipt["gasUsed"],
effective_gas_price=tx_receipt["effectiveGasPrice"],
cumulative_gas_used=tx_receipt["cumulativeGasUsed"],
to=tx_receipt["to"],
if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16):
logger.info(
"Alert the position of transaction in block is mismatched ",
tx_pos_in_block,
tx_receipt["transactionIndex"],
)

except Exception as e:
print("error while decoding receipt", tx_receipt, e)

return Receipt()
return Receipt(
block_number=block_json["number"],
transaction_hash=tx_receipt["transactionHash"],
transaction_index=tx_pos_in_block,
gas_used=tx_receipt["gasUsed"],
effective_gas_price=tx_receipt["effectiveGasPrice"],
cumulative_gas_used=tx_receipt["cumulativeGasUsed"],
to=tx_receipt["to"],
)
4 changes: 4 additions & 0 deletions mev_inspect/geth_poa_middleware.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
"""
Modified asynchronous geth_poa_middleware which mirrors functionality of
https://github.com/ethereum/web3.py/blob/master/web3/middleware/geth_poa.py
"""
from typing import (
supragya marked this conversation as resolved.
Show resolved Hide resolved
Any,
Callable,
Expand Down
5 changes: 3 additions & 2 deletions mev_inspect/inspect_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from mev_inspect.swaps import get_swaps
from mev_inspect.transfers import get_transfers
from mev_inspect.liquidations import get_liquidations
from mev_inspect.utils import RPCType


logger = logging.getLogger(__name__)
Expand All @@ -43,7 +44,7 @@ async def inspect_block(
inspect_db_session: orm.Session,
base_provider,
w3: Web3,
geth: bool,
type: RPCType,
trace_classifier: TraceClassifier,
block_number: int,
trace_db_session: Optional[orm.Session],
Expand All @@ -52,7 +53,7 @@ async def inspect_block(
block = await create_from_block_number(
base_provider,
w3,
geth,
type,
block_number,
trace_db_session,
)
Expand Down
Loading