Skip to content

Commit

Permalink
feat: pool server abstraction and local pool server
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Aug 23, 2023
1 parent 0eaa5d1 commit ac87557
Show file tree
Hide file tree
Showing 37 changed files with 1,950 additions and 1,273 deletions.
63 changes: 55 additions & 8 deletions proto/op_pool/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,13 @@ message AddOpRequest {
message AddOpResponse {
oneof result {
AddOpSuccess success = 1;
AddOpFailure failure = 2;
MempoolError failure = 2;
}
}
message AddOpSuccess {
// The serialized UserOperation hash
bytes hash = 1;
}
message AddOpFailure {
MempoolError error = 1;
}

message GetOpsRequest {
// The serialized entry point address
Expand All @@ -154,6 +151,12 @@ message GetOpsRequest {
uint64 max_ops = 2;
}
message GetOpsResponse {
oneof result {
GetOpsSuccess success = 1;
MempoolError failure = 2;
}
}
message GetOpsSuccess {
repeated MempoolOp ops = 1;
}

Expand All @@ -163,7 +166,13 @@ message RemoveOpsRequest {
// The serialized UserOperation hashes to remove
repeated bytes hashes = 2;
}
message RemoveOpsResponse {}
message RemoveOpsResponse {
oneof result {
RemoveOpsSuccess success = 1;
MempoolError failure = 2;
}
}
message RemoveOpsSuccess {}

message RemoveEntitiesRequest {
// The serilaized entry point address
Expand All @@ -172,15 +181,33 @@ message RemoveEntitiesRequest {
// should be removed from the mempool
repeated Entity entities = 2;
}
message RemoveEntitiesResponse {}
message RemoveEntitiesResponse {
oneof result {
RemoveEntitiesSuccess success = 1;
MempoolError failure = 2;
}
}
message RemoveEntitiesSuccess {}

message DebugClearStateRequest {}
message DebugClearStateResponse {}
message DebugClearStateResponse {
oneof result {
DebugClearStateSuccess success = 1;
MempoolError failure = 2;
}
}
message DebugClearStateSuccess {}

message DebugDumpMempoolRequest {
bytes entry_point = 1;
}
message DebugDumpMempoolResponse {
oneof result {
DebugDumpMempoolSuccess success = 1;
MempoolError failure = 2;
}
}
message DebugDumpMempoolSuccess {
repeated MempoolOp ops = 1;
}

Expand All @@ -190,13 +217,25 @@ message DebugSetReputationRequest {
// An array of reputation entries to add/replace
repeated Reputation reputations = 2;
}
message DebugSetReputationResponse {}
message DebugSetReputationResponse {
oneof result {
DebugSetReputationSuccess success = 1;
MempoolError failure = 2;
}
}
message DebugSetReputationSuccess {}

message DebugDumpReputationRequest {
// The serialized entry point address.
bytes entry_point = 1;
}
message DebugDumpReputationResponse {
oneof result {
DebugDumpReputationSuccess success = 1;
MempoolError failure = 2;
}
}
message DebugDumpReputationSuccess {
repeated Reputation reputations = 1;
}

Expand Down Expand Up @@ -229,9 +268,15 @@ message MempoolError {
PrecheckViolationError precheck_violation = 6;
SimulationViolationError simulation_violation = 7;
UnsupportedAggregatorError unsupported_aggregator = 8;
InvalidSignatureError invalid_signature = 9;
UnknownEntryPointError unknown_entry_point = 10;
}
}

message UnknownEntryPointError {
bytes entry_point = 1;
}

message ReplacementUnderpricedError {
bytes current_fee = 1;
bytes current_priority_fee = 2;
Expand All @@ -252,6 +297,8 @@ message UnsupportedAggregatorError {
bytes aggregator_address = 1;
}

message InvalidSignatureError {}

// PRECHECK VIOLATIONS
message PrecheckViolationError {
oneof violation {
Expand Down
106 changes: 39 additions & 67 deletions src/builder/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use linked_hash_map::LinkedHashMap;
#[cfg(test)]
use mockall::automock;
use tokio::{sync::broadcast, try_join};
use tonic::{async_trait, transport::Channel};
use tonic::async_trait;
use tracing::{error, info};

use crate::{
Expand All @@ -22,16 +22,13 @@ use crate::{
emit::WithEntryPoint,
gas::{FeeEstimator, GasFees, PriorityFeeMode},
math,
protos::{
self,
op_pool::{op_pool_client::OpPoolClient, GetOpsRequest, MempoolOp},
},
simulation::{SimulationError, SimulationSuccess, Simulator},
types::{
Entity, EntityType, EntryPointLike, ExpectedStorage, HandleOpsOut, ProviderLike,
Timestamp, UserOperation,
},
},
op_pool::{PoolClient, PoolOperation},
};

/// A user op must be valid for at least this long into the future to be included.
Expand Down Expand Up @@ -74,13 +71,14 @@ pub trait BundleProposer: Send + Sync + 'static {
}

#[derive(Debug)]
pub struct BundleProposerImpl<S, E, P>
pub struct BundleProposerImpl<S, E, P, C>
where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
{
op_pool: OpPoolClient<Channel>,
op_pool: C,
simulator: S,
entry_point: E,
provider: Arc<P>,
Expand All @@ -101,11 +99,12 @@ pub struct Settings {
}

#[async_trait]
impl<S, E, P> BundleProposer for BundleProposerImpl<S, E, P>
impl<S, E, P, C> BundleProposer for BundleProposerImpl<S, E, P, C>
where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
{
async fn make_bundle(&self, required_fees: Option<GasFees>) -> anyhow::Result<Bundle> {
let (ops, block_hash, bundle_fees) = try_join!(
Expand All @@ -123,14 +122,14 @@ where
let simulation_futures = ops
.iter()
.filter(|op| {
op.op.max_fee_per_gas >= required_op_fees.max_fee_per_gas
&& op.op.max_priority_fee_per_gas >= required_op_fees.max_priority_fee_per_gas
op.uo.max_fee_per_gas >= required_op_fees.max_fee_per_gas
&& op.uo.max_priority_fee_per_gas >= required_op_fees.max_priority_fee_per_gas
})
.cloned()
.map(|op| self.simulate_validation(op, block_hash));

let ops_with_simulations_future = future::join_all(simulation_futures);
let all_paymaster_addresses = ops.iter().filter_map(|op| op.op.paymaster());
let all_paymaster_addresses = ops.iter().filter_map(|op| op.uo.paymaster());
let balances_by_paymaster_future =
self.get_balances_by_paymaster(all_paymaster_addresses, block_hash);
let (ops_with_simulations, balances_by_paymaster) =
Expand Down Expand Up @@ -176,14 +175,15 @@ where
}
}

impl<S, E, P> BundleProposerImpl<S, E, P>
impl<S, E, P, C> BundleProposerImpl<S, E, P, C>
where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
{
pub fn new(
op_pool: OpPoolClient<Channel>,
op_pool: C,
simulator: S,
entry_point: E,
provider: Arc<P>,
Expand Down Expand Up @@ -211,17 +211,17 @@ where

async fn simulate_validation(
&self,
op: OpFromPool,
op: PoolOperation,
block_hash: H256,
) -> anyhow::Result<(UserOperation, Result<SimulationSuccess, SimulationError>)> {
let result = self
.simulator
.simulate_validation(op.op.clone(), Some(block_hash), Some(op.expected_code_hash))
.simulate_validation(op.uo.clone(), Some(block_hash), Some(op.expected_code_hash))
.await;
match result {
Ok(success) => Ok((op.op, Ok(success))),
Ok(success) => Ok((op.uo, Ok(success))),
Err(error) => match error {
SimulationError::Violations(_) => Ok((op.op, Err(error))),
SimulationError::Violations(_) => Ok((op.uo, Err(error))),
SimulationError::Other(error) => Err(error),
},
}
Expand Down Expand Up @@ -397,20 +397,11 @@ where
}
}

async fn get_ops_from_pool(&self) -> anyhow::Result<Vec<OpFromPool>> {
async fn get_ops_from_pool(&self) -> anyhow::Result<Vec<PoolOperation>> {
self.op_pool
.clone()
.get_ops(GetOpsRequest {
entry_point: self.entry_point.address().as_bytes().to_vec(),
max_ops: self.settings.max_bundle_size,
})
.get_ops(self.entry_point.address(), self.settings.max_bundle_size)
.await
.context("should get ops from op pool to bundle")?
.into_inner()
.ops
.into_iter()
.map(OpFromPool::try_from)
.collect()
.context("should get ops from pool")
}

async fn get_balances_by_paymaster(
Expand Down Expand Up @@ -482,11 +473,11 @@ where
Ok(())
}

fn limit_gas_in_bundle(&self, ops: Vec<OpFromPool>) -> Vec<OpFromPool> {
fn limit_gas_in_bundle(&self, ops: Vec<PoolOperation>) -> Vec<PoolOperation> {
let mut gas_left = U256::from(MAX_BUNDLE_GAS_LIMIT);
let mut ops_in_bundle = Vec::new();
for op in ops {
let gas = op.op.total_execution_gas_limit(self.chain_id);
let gas = op.uo.total_execution_gas_limit(self.chain_id);
if gas_left < gas {
break;
}
Expand All @@ -508,26 +499,6 @@ where
}
}

#[derive(Clone, Debug)]
struct OpFromPool {
op: UserOperation,
expected_code_hash: H256,
}

impl TryFrom<MempoolOp> for OpFromPool {
type Error = anyhow::Error;

fn try_from(value: MempoolOp) -> Result<Self, Self::Error> {
Ok(Self {
op: value
.uo
.context("mempool op should contain user operation")?
.try_into()?,
expected_code_hash: protos::from_bytes(&value.expected_code_hash)?,
})
}
}

#[derive(Debug)]
struct OpWithSimulation {
op: UserOperation,
Expand Down Expand Up @@ -717,17 +688,17 @@ impl ProposalContext {
mod tests {
use anyhow::anyhow;
use ethers::{types::H160, utils::parse_units};
use tonic::Response;

use super::*;
use crate::common::{
grpc::mocks::{self, MockOpPool},
protos::op_pool::GetOpsResponse,
simulation::{
AggregatorSimOut, MockSimulator, SimulationError, SimulationSuccess,
SimulationViolation,
use crate::{
common::{
simulation::{
AggregatorSimOut, MockSimulator, SimulationError, SimulationSuccess,
SimulationViolation,
},
types::{MockEntryPointLike, MockProviderLike, ValidTimeRange},
},
types::{MockEntryPointLike, MockProviderLike, ValidTimeRange},
op_pool::MockPoolClient,
};

#[tokio::test]
Expand Down Expand Up @@ -1194,19 +1165,20 @@ mod tests {
let current_block_hash = hash(125);
let expected_code_hash = hash(126);
let max_bundle_size = mock_ops.len() as u64;
let mut op_pool = MockOpPool::new();
let ops: Vec<_> = mock_ops
.iter()
.map(|MockOp { op, .. }| MempoolOp {
uo: Some(op.into()),
expected_code_hash: expected_code_hash.as_bytes().to_vec(),
.map(|MockOp { op, .. }| PoolOperation {
uo: op.clone(),
expected_code_hash,
..Default::default()
})
.collect();
op_pool

let mut pool_client = MockPoolClient::new();
pool_client
.expect_get_ops()
.return_once(|_| Ok(Response::new(GetOpsResponse { ops })));
let op_pool_handle = mocks::mock_op_pool_client(op_pool).await;
.returning(move |_, _| Ok(ops.clone()));

let simulations_by_op: HashMap<_, _> = mock_ops
.into_iter()
.map(|op| (op.op.op_hash(entry_point_address, 0), op.simulation_result))
Expand Down Expand Up @@ -1255,7 +1227,7 @@ mod tests {
.returning(move |address, _| signatures_by_aggregator[&address]());
let (event_sender, _) = broadcast::channel(16);
let proposer = BundleProposerImpl::new(
op_pool_handle.client.clone(),
pool_client,
simulator,
entry_point,
Arc::new(provider),
Expand Down
Loading

0 comments on commit ac87557

Please sign in to comment.