Skip to content
This repository has been archived by the owner on Nov 27, 2024. It is now read-only.

Geth support for inspect #112

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Example:
export RPC_URL="http://111.111.111.111:8546"
```

**Note**: mev-inspect-py currently requires an RPC of a full archive node with support for Erigon traces and receipts (not geth 😔).
**Note**: mev-inspect-py currently requires an RPC of a full archive node with support for Erigon traces and receipts. Geth additions have been added to translate geth traces and receipts to Erigon ones and can be accessed using `--geth` flag.

Next, start all services with:

Expand All @@ -62,6 +62,7 @@ kubectl exec deploy/mev-inspect -- alembic upgrade head
### Inspect a single block

Inspecting block [12914944](https://twitter.com/mevalphaleak/status/1420416437575901185):
**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node.

```
kubectl exec deploy/mev-inspect -- poetry run inspect-block 12914944
Expand All @@ -70,6 +71,7 @@ kubectl exec deploy/mev-inspect -- poetry run inspect-block 12914944
### Inspect many blocks

Inspecting blocks 12914944 to 12914954:
**Note**: Add `--geth` at the end if RPC_URL points to a geth / geth like node.

```
kubectl exec deploy/mev-inspect -- poetry run inspect-many-blocks 12914944 12914954
Expand Down
13 changes: 11 additions & 2 deletions cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import click
from web3 import Web3
from web3.middleware import geth_poa_middleware

from mev_inspect.classifiers.trace import TraceClassifier
from mev_inspect.db import get_inspect_session, get_trace_session
Expand All @@ -26,12 +27,15 @@ def cli():
@click.argument("block_number", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option("--cache/--no-cache", default=True)
def inspect_block_command(block_number: int, rpc: str, cache: bool):
@click.option("--geth/--no-geth", default=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a way for us to derive this from the RPC directly?

would be great to plug in any RPC and have it "just work"

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just talked to @sragss who had some ideas for doing that in supporting OpenEthereum

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it seems to be possible to check whether the first RPC query failed or not and decide which method to use based on that. Then, if RPC endpoint client changes and request fails again, we could do the same thing just checking the response code every time.

OpenEthereum also supports block tracing btw: https://openethereum.github.io/JSONRPC-trace-module#trace_block

def inspect_block_command(block_number: int, rpc: str, cache: bool, geth: bool):
inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()

base_provider = get_base_provider(rpc)
w3 = Web3(base_provider)
if geth:
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
trace_classifier = TraceClassifier()

if not cache:
Expand All @@ -41,6 +45,7 @@ def inspect_block_command(block_number: int, rpc: str, cache: bool):
inspect_db_session,
base_provider,
w3,
geth,
trace_classifier,
block_number,
trace_db_session=trace_db_session,
Expand All @@ -52,15 +57,18 @@ def inspect_block_command(block_number: int, rpc: str, cache: bool):
@click.argument("before_block", type=int)
@click.option("--rpc", default=lambda: os.environ.get(RPC_URL_ENV, ""))
@click.option("--cache/--no-cache", default=True)
@click.option("--geth/--no-geth", default=False)
def inspect_many_blocks_command(
after_block: int, before_block: int, rpc: str, cache: bool
after_block: int, before_block: int, rpc: str, cache: bool, geth: bool
):

inspect_db_session = get_inspect_session()
trace_db_session = get_trace_session()

base_provider = get_base_provider(rpc)
w3 = Web3(base_provider)
if geth:
w3.middleware_onion.inject(geth_poa_middleware, layer=0)
supragya marked this conversation as resolved.
Show resolved Hide resolved
trace_classifier = TraceClassifier()

if not cache:
Expand All @@ -79,6 +87,7 @@ def inspect_many_blocks_command(
inspect_db_session,
base_provider,
w3,
geth,
trace_classifier,
block_number,
trace_db_session=trace_db_session,
Expand Down
173 changes: 165 additions & 8 deletions mev_inspect/block.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
from pathlib import Path
from typing import List, Optional
import json
import asyncio
import aiohttp

from sqlalchemy import orm
from web3 import Web3
Expand All @@ -19,6 +22,7 @@ def get_latest_block_number(w3: Web3) -> int:
def create_from_block_number(
base_provider,
w3: Web3,
geth: bool,
block_number: int,
trace_db_session: Optional[orm.Session],
) -> Block:
Expand All @@ -28,25 +32,37 @@ def create_from_block_number(
block = _find_block(trace_db_session, block_number)

if block is None:
return _fetch_block(w3, base_provider, block_number)
return _fetch_block(w3, base_provider, geth, block_number)
else:
return block


def _fetch_block(
w3,
base_provider,
geth,
block_number: int,
) -> Block:
block_json = w3.eth.get_block(block_number)
receipts_json = base_provider.make_request("eth_getBlockReceipts", [block_number])
traces_json = w3.parity.trace_block(block_number)

receipts: List[Receipt] = [
Receipt(**receipt) for receipt in receipts_json["result"]
]
traces = [Trace(**trace_json) for trace_json in traces_json]
base_fee_per_gas = fetch_base_fee_per_gas(w3, block_number)
if not geth:
supragya marked this conversation as resolved.
Show resolved Hide resolved
receipts_json = base_provider.make_request(
"eth_getBlockReceipts", [block_number]
)
traces_json = w3.parity.trace_block(block_number)

receipts: List[Receipt] = [
Receipt(**receipt) for receipt in receipts_json["result"]
]
traces = [Trace(**trace_json) for trace_json in traces_json]
base_fee_per_gas = fetch_base_fee_per_gas(w3, block_number)
else:
traces = geth_get_tx_traces_parity_format(base_provider, block_json)
geth_tx_receipts = geth_get_tx_receipts(
base_provider, block_json["transactions"]
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these can be grouped with a gather to make in parallel

Copy link
Author

@supragya supragya Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have seen extra load while tracing is being done hamper tracer in polygon. Sometimes the tracer runs out of time (internal timeout) and errors. Better to keep separate.

receipts = geth_receipts_translator(block_json, geth_tx_receipts)
base_fee_per_gas = 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there no way to fetch base fee for geth?

Copy link
Author

@supragya supragya Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be, however, we don't use in polygon. Could be taken up in a separate PR/GH issue since it will need to be tested for the concerned chain before merge.


return Block(
block_number=block_number,
Expand Down Expand Up @@ -164,3 +180,144 @@ def cache_block(cache_path: Path, block: Block):
def _get_cache_path(block_number: int) -> Path:
cache_directory_path = Path(cache_directory)
return cache_directory_path / f"{block_number}.json"


# Geth specific additions


def geth_get_tx_traces_parity_format(base_provider, block_json):
block_hash = block_json["hash"]
block_trace = geth_get_tx_traces(base_provider, block_hash)
parity_traces = []
for idx, trace in enumerate(block_trace["result"]):
if "result" in trace:
parity_traces.extend(
unwrap_tx_trace_for_parity(block_json, idx, trace["result"])
)
return parity_traces


def geth_get_tx_traces(base_provider, block_hash):
block_trace = base_provider.make_request(
"debug_traceBlockByHash", [block_hash.hex(), {"tracer": "callTracer"}]
)
return block_trace


def unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, tx_trace, position=[]
) -> List[Trace]:
response_list = []
_calltype_mapping = {
supragya marked this conversation as resolved.
Show resolved Hide resolved
"CALL": "call",
"DELEGATECALL": "delegateCall",
"CREATE": "create",
"SUICIDE": "suicide",
"REWARD": "reward",
}
try:
if tx_trace["type"] == "STATICCALL":
return []
action_dict = dict()
action_dict["callType"] = _calltype_mapping[tx_trace["type"]]
if action_dict["callType"] == "call":
action_dict["value"] = tx_trace["value"]
for key in ["from", "to", "gas", "input"]:
action_dict[key] = tx_trace[key]

result_dict = dict()
for key in ["gasUsed", "output"]:
result_dict[key] = tx_trace[key]

response_list.append(
Trace(
action=action_dict,
block_hash=str(block_json["hash"]),
block_number=int(block_json["number"]),
result=result_dict,
subtraces=len(tx_trace["calls"]) if "calls" in tx_trace.keys() else 0,
trace_address=position,
transaction_hash=block_json["transactions"][tx_pos_in_block].hex(),
transaction_position=tx_pos_in_block,
type=TraceType(_calltype_mapping[tx_trace["type"]]),
)
)
except Exception:
return []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what causes this to get hit

Copy link
Author

@supragya supragya Dec 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from my experience, CREATE2. Not used in analysis

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, want to move this to the same as you have for STATICCALL where it checks for it explicitly?

It's better on our side to have it just blow up with an exception for now. We'd rather know if we're missing cases.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Added create2
  2. Added logger warning.
    Haven't yet made it to throw exception when stuff happens since it helps us back-fill in "best effort" form. Yet, person who wants to work on this further for translation to be "exact" can remove this. Can add a comment around here though. What do you think? @lukevs

Copy link
Collaborator

@lukevs lukevs Dec 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For 1, should there be a line like this?

        if tx_trace["type"] == "CREATE2":
            return []

For 2, is there anything that still makes it throw now that create2 is supported?

I'm a little hesitant to blanket accept all Exception - if there are known failures I'm ok with skipping those blocks for now, but I think we should at least limit the scope of exceptions to what we know

Otherwise for geth users they could be unknowingly missing blocks


if "calls" in tx_trace.keys():
for idx, subcall in enumerate(tx_trace["calls"]):
response_list.extend(
unwrap_tx_trace_for_parity(
block_json, tx_pos_in_block, subcall, position + [idx]
)
)
return response_list


async def geth_get_tx_receipts_task(session, endpoint_uri, tx):
data = {
"jsonrpc": "2.0",
"id": "0",
"method": "eth_getTransactionReceipt",
"params": [tx.hex()],
}
supragya marked this conversation as resolved.
Show resolved Hide resolved
async with session.post(endpoint_uri, json=data) as response:
if response.status != 200:
response.raise_for_status()
return await response.text()


async def geth_get_tx_receipts_async(endpoint_uri, transactions):
geth_tx_receipts = []
async with aiohttp.ClientSession() as session:
tasks = [
asyncio.create_task(geth_get_tx_receipts_task(session, endpoint_uri, tx))
for tx in transactions
]
geth_tx_receipts = await asyncio.gather(*tasks)
supragya marked this conversation as resolved.
Show resolved Hide resolved
return [json.loads(tx_receipts) for tx_receipts in geth_tx_receipts]


def geth_get_tx_receipts(base_provider, transactions):
return asyncio.run(
geth_get_tx_receipts_async(base_provider.endpoint_uri, transactions)
)


def geth_receipts_translator(block_json, geth_tx_receipts) -> List[Receipt]:
json_decoded_receipts = [
tx_receipt["result"]
if tx_receipt != None and ("result" in tx_receipt.keys())
else None
for tx_receipt in geth_tx_receipts
]
results = []
for idx, tx_receipt in enumerate(json_decoded_receipts):
if tx_receipt != None:
results.append(unwrap_tx_receipt_for_parity(block_json, idx, tx_receipt))
return results


def unwrap_tx_receipt_for_parity(block_json, tx_pos_in_block, tx_receipt) -> Receipt:
try:
if tx_pos_in_block != int(tx_receipt["transactionIndex"], 16):
print(
supragya marked this conversation as resolved.
Show resolved Hide resolved
"Alert the position of transaction in block is mismatched ",
tx_pos_in_block,
tx_receipt["transactionIndex"],
)
return Receipt(
block_number=block_json["number"],
transaction_hash=tx_receipt["transactionHash"],
transaction_index=tx_pos_in_block,
gas_used=tx_receipt["gasUsed"],
effective_gas_price=tx_receipt["effectiveGasPrice"],
cumulative_gas_used=tx_receipt["cumulativeGasUsed"],
to=tx_receipt["to"],
)

except Exception as e:
print("error while decoding receipt", tx_receipt, e)
supragya marked this conversation as resolved.
Show resolved Hide resolved

return Receipt()
supragya marked this conversation as resolved.
Show resolved Hide resolved
2 changes: 2 additions & 0 deletions mev_inspect/inspect_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def inspect_block(
inspect_db_session: orm.Session,
base_provider,
w3: Web3,
geth: bool,
trace_clasifier: TraceClassifier,
block_number: int,
trace_db_session: Optional[orm.Session],
Expand All @@ -47,6 +48,7 @@ def inspect_block(
block = create_from_block_number(
base_provider,
w3,
geth,
block_number,
trace_db_session,
)
Expand Down
16 changes: 15 additions & 1 deletion poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pydantic = "^1.8.2"
hexbytes = "^0.2.1"
click = "^8.0.1"
psycopg2 = "^2.9.1"
aiohttp = "^3.7.4"
asyncio = "^3.4.3"

[tool.poetry.dev-dependencies]
pre-commit = "^2.13.0"
Expand Down