Skip to content
This repository has been archived by the owner on Nov 27, 2024. It is now read-only.

Geth support for inspect #112

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Example:
export RPC_URL="http://111.111.111.111:8546"
```

**Note**: mev-inspect-py currently requires an RPC of a full archive node with support for Erigon traces and receipts. Geth additions have been added to translate geth traces and receipts to Erigon ones and can be accessed using `--geth` flag.

Next, start all services with:

Expand All @@ -65,6 +66,7 @@ On first startup, you'll need to apply database migrations with:
### Inspect a single block

Inspecting block [12914944](https://twitter.com/mevalphaleak/status/1420416437575901185):
**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node.

```
./mev inspect 12914944
Expand All @@ -73,6 +75,7 @@ Inspecting block [12914944](https://twitter.com/mevalphaleak/status/142041643757
### Inspect many blocks

Inspecting blocks 12914944 to 12914954:
**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node.

```
./mev inspect-many 12914944 12914954
Expand Down
12 changes: 8 additions & 4 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ def cli():
@cli.command()
@click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option("--geth/--no-geth", default=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way for us to derive this from the RPC directly?

would be great to plug in any RPC and have it "just work"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just talked to @sragss who had some ideas for doing that in supporting OpenEthereum

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it seems to be possible to check whether the first RPC query failed or not and decide which method to use based on that. Then, if RPC endpoint client changes and request fails again, we could do the same thing just checking the response code every time.

OpenEthereum also supports block tracing btw: https://openethereum.github.io/JSONRPC-trace-module#trace_block

@coro
async def inspect_block_command(block_number: int, rpc: str):
async def inspect_block_command(block_number: int, rpc: str, geth: bool):
supragya marked this conversation as resolved.
Show resolved Hide resolved
print("geth", geth)
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()

inspector = MEVInspector(rpc, inspect_db_session, trace_db_session)
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, geth)
await inspector.inspect_single_block(block=block_number)


Expand All @@ -38,7 +40,7 @@ async def fetch_block_command(block_number: int, rpc: str):
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()

inspector = MEVInspector(rpc, inspect_db_session, trace_db_session)
inspector = MEVInspector(rpc, inspect_db_session, trace_db_session, False)
supragya marked this conversation as resolved.
Show resolved Hide resolved
block = await inspector.create_from_block(block_number=block_number)
print(block.json())

Expand All @@ -47,6 +49,7 @@ async def fetch_block_command(block_number: int, rpc: str):
@click.argument("after_block", type=int)
@click.argument("before_block", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option("--geth/--no-geth", default=False)
@click.option(
"--max-concurrency",
type=int,
Expand All @@ -63,14 +66,15 @@ async def inspect_many_blocks_command(
rpc: str,
max_concurrency: int,
request_timeout: int,
geth: bool,
):
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()

inspector = MEVInspector(
rpc,
inspect_db_session,
trace_db_session,
geth,
max_concurrency=max_concurrency,
request_timeout=request_timeout,
)
Expand Down
225 changes: 193 additions & 32 deletions mev_inspect/block.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import logging
from typing import List, Optional
import json
import aiohttp

from sqlalchemy import orm
from web3 import Web3
Expand All @@ -27,6 +29,7 @@ async def get_latest_block_number(base_provider) -> int:
async def create_from_block_number(
base_provider,
w3: Web3,
geth: bool,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> Block:
Expand All @@ -36,43 +39,64 @@ async def create_from_block_number(
block = _find_block(trace_db_session, block_number)

if block is None:
block = await _fetch_block(w3, base_provider, block_number)
return block
else:
block = await _fetch_block(w3, base_provider, geth, block_number)
return block
return block


async def _fetch_block(w3, base_provider, block_number: int, retries: int = 0) -> Block:
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
w3.eth.get_block(block_number),
base_provider.make_request("eth_getBlockReceipts", [block_number]),
base_provider.make_request("trace_block", [block_number]),
fetch_base_fee_per_gas(w3, block_number),
)

try:
receipts: List[Receipt] = [
Receipt(**receipt) for receipt in receipts_json["result"]
]
traces = [Trace(**trace_json) for trace_json in traces_json["result"]]
except KeyError as e:
logger.warning(
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
async def _fetch_block(
w3, base_provider, geth: bool, block_number: int, retries: int = 0
) -> Block:
if not geth:
supragya marked this conversation as resolved.
Show resolved Hide resolved
block_json, receipts_json, traces_json, base_fee_per_gas = await asyncio.gather(
w3.eth.get_block(block_number),
base_provider.make_request("eth_getBlockReceipts", [block_number]),
base_provider.make_request("trace_block", [block_number]),
fetch_base_fee_per_gas(w3, block_number),
)
if retries < 3:
await asyncio.sleep(5)
return await _fetch_block(w3, base_provider, block_number, retries)
else:
raise

return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
try:
receipts: List[Receipt] = [
Receipt(**receipt) for receipt in receipts_json["result"]
]
traces = [Trace(**trace_json) for trace_json in traces_json["result"]]
return Block(
block_number=block_number,
block_timestamp=block_json["timestamp"],
miner=block_json["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)
except KeyError as e:
logger.warning(
f"Failed to create objects from block: {block_number}: {e}, retrying: {retries + 1} / 3"
)
if retries < 3:
await asyncio.sleep(5)
return await _fetch_block(
w3, base_provider, geth, block_number, retries
)
else:
raise
else:
# print(block_number)
block_json = await asyncio.gather(w3.eth.get_block(block_number))
supragya marked this conversation as resolved.
Show resolved Hide resolved
traces = await geth_get_tx_traces_parity_format(base_provider, block_json[0])
geth_tx_receipts = await geth_get_tx_receipts_async(
base_provider.endpoint_uri, block_json[0]["transactions"]
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these can be grouped with a gather to make in parallel

Copy link
Author

@supragya supragya Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have seen extra load while tracing is being done hamper tracer in polygon. Sometimes the tracer runs out of time (internal timeout) and errors. Better to keep separate.

receipts = geth_receipts_translator(block_json[0], geth_tx_receipts)
base_fee_per_gas = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no way to fetch base fee for geth?

Copy link
Author

@supragya supragya Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be, however, we don't use in polygon. Could be taken up in a separate PR/GH issue since it will need to be tested for the concerned chain before merge.


return Block(
block_number=block_number,
block_timestamp=block_json[0]["timestamp"],
miner=block_json[0]["miner"],
base_fee_per_gas=base_fee_per_gas,
traces=traces,
receipts=receipts,
)


def _find_block(
Expand Down Expand Up @@ -191,3 +215,140 @@ def get_transaction_hashes(calls: List[Trace]) -> List[str]:
result.append(call.transaction_hash)

return result


# Geth specific additions


async def geth_get_tx_traces_parity_format(base_provider, block_json: dict):
# print(block_json['hash'].hex())
block_hash = block_json["hash"]
block_trace = await geth_get_tx_traces(base_provider, block_hash)
# print(block_trace)
parity_traces = []
for idx, trace in enumerate(block_trace["result"]):
if "result" in trace:
parity_traces.extend(
unwrap_tx_trace_for_parity(block_json, idx, trace["result"])
)
return parity_traces


async def geth_get_tx_traces(base_provider, block_hash):
block_trace = await base_provider.make_request(
"debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}]
)
return block_trace


def unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, tx_trace, position=[]
) -> List[Trace]:
response_list = []
_calltype_mapping = {
supragya marked this conversation as resolved.
Show resolved Hide resolved
"CALL": "call",
"DELEGATECALL": "delegateCall",
"CREATE": "create",
"SUICIDE": "suicide",
"REWARD": "reward",
}
try:
if tx_trace["type"] == "STATICCALL":
return []
action_dict = dict()
action_dict["callType"] = _calltype_mapping[tx_trace["type"]]
if action_dict["callType"] == "call":
action_dict["value"] = tx_trace["value"]
for key in ["from", "to", "gas", "input"]:
action_dict[key] = tx_trace[key]

result_dict = dict()
for key in ["gasUsed", "output"]:
result_dict[key] = tx_trace[key]

response_list.append(
Trace(
action=action_dict,
block_hash=str(block_json["hash"]),
block_number=int(block_json["number"]),
result=result_dict,
subtraces=len(tx_trace["calls"]) if "calls" in tx_trace.keys() else 0,
trace_address=position,
transaction_hash=block_json["transactions"][tx_pos_in_block].hex(),
transaction_position=tx_pos_in_block,
type=TraceType(_calltype_mapping[tx_trace["type"]]),
)
)
except Exception:
return []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what causes this to get hit

Copy link
Author

@supragya supragya Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my experience, CREATE2. Not used in analysis

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, want to move this to the same as you have for STATICCALL where it checks for it explicitly?

It's better on our side to have it just blow up with an exception for now. We'd rather know if we're missing cases.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Added create2
  2. Added logger warning.
    Haven't yet made it to throw exception when stuff happens since it helps us back-fill in "best effort" form. Yet, person who wants to work on this further for translation to be "exact" can remove this. Can add a comment around here though. What do you think? @lukevs

Copy link
Collaborator

@lukevs lukevs Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1, should there be a line like this?

        if tx_trace["type"] == "CREATE2":
            return []

For 2, is there anything that still makes it throw now that create2 is supported?

I'm a little hesitant to blanket accept all Exception - if there are known failures I'm ok with skipping those blocks for now, but I think we should at least limit the scope of exceptions to what we know

Otherwise for geth users they could be unknowingly missing blocks


if "calls" in tx_trace.keys():
for idx, subcall in enumerate(tx_trace["calls"]):
response_list.extend(
unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, subcall, position + [idx]
)
)
return response_list


async def geth_get_tx_receipts_task(session, endpoint_uri, tx):
data = {
"jsonrpc": "2.0",
"id": "0",
"method": "eth_getTransactionReceipt",
"params": [tx.hex()],
}
supragya marked this conversation as resolved.
Show resolved Hide resolved
async with session.post(endpoint_uri, json=data) as response:
if response.status != 200:
response.raise_for_status()
return await response.text()


async def geth_get_tx_receipts_async(endpoint_uri, transactions):
geth_tx_receipts = []
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(geth_get_tx_receipts_task(session, endpoint_uri, tx))
for tx in transactions
]
geth_tx_receipts = await asyncio.gather(*tasks)
supragya marked this conversation as resolved.
Show resolved Hide resolved
return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts]


def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]:
json_decoded_receipts = [
tx_receipt["result"]
if tx_receipt != None and ("result" in tx_receipt.keys())
else None
for tx_receipt in geth_tx_receipts
]
results = []
for idx, tx_receipt in enumerate(json_decoded_receipts):
if tx_receipt != None:
results.append(unwrap_tx_receipt_for_parity(block_json, idx, tx_receipt))
return results


def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt:
try:
if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16):
print(
supragya marked this conversation as resolved.
Show resolved Hide resolved
"Alert the position of transaction in block is mismatched ",
tx_pos_in_block,
tx_receipt["transactionIndex"],
)
return Receipt(
block_number=block_json["number"],
transaction_hash=tx_receipt["transactionHash"],
transaction_index=tx_pos_in_block,
gas_used=tx_receipt["gasUsed"],
effective_gas_price=tx_receipt["effectiveGasPrice"],
cumulative_gas_used=tx_receipt["cumulativeGasUsed"],
to=tx_receipt["to"],
)

except Exception as e:
print("error while decoding receipt", tx_receipt, e)
supragya marked this conversation as resolved.
Show resolved Hide resolved

return Receipt()
supragya marked this conversation as resolved.
Show resolved Hide resolved
Loading