Skip to content

Commit

Permalink
feat: pool server abstraction and local pool server
Browse files Browse the repository at this point in the history
  • Loading branch information
dancoombs committed Jul 28, 2023
1 parent 77a9dea commit 962879c
Show file tree
Hide file tree
Showing 36 changed files with 1,916 additions and 1,256 deletions.
62 changes: 52 additions & 10 deletions proto/op_pool.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,58 +65,95 @@ message AddOpRequest {
message AddOpResponse {
oneof result {
AddOpSuccess success = 1;
AddOpFailure failure = 2;
MempoolError failure = 2;
}
}

message AddOpSuccess {
bytes hash = 1;
}

message AddOpFailure {
MempoolError error = 1;
}

message GetOpsRequest {
bytes entry_point = 1;
uint64 max_ops = 2;
}
message GetOpsResponse {
oneof result {
GetOpsSuccess success = 1;
MempoolError failure = 2;
}
}
message GetOpsSuccess {
repeated MempoolOp ops = 1;
}

message RemoveOpsRequest {
bytes entry_point = 1;
repeated bytes hashes = 2;
}
message RemoveOpsResponse {}
message RemoveOpsResponse {
oneof result {
RemoveOpsSuccess success = 1;
MempoolError failure = 2;
}
}
message RemoveOpsSuccess {}

message RemoveEntitiesRequest {
bytes entry_point = 1;
repeated Entity entities = 2;
}
message RemoveEntitiesResponse {}
message RemoveEntitiesResponse {
oneof result {
RemoveEntitiesSuccess success = 1;
MempoolError failure = 2;
}
}
message RemoveEntitiesSuccess {}

message DebugClearStateRequest {}
message DebugClearStateResponse {}
message DebugClearStateResponse {
oneof result {
DebugClearStateSuccess success = 1;
MempoolError failure = 2;
}
}
message DebugClearStateSuccess {}

message DebugDumpMempoolRequest {
bytes entry_point = 1;
}
message DebugDumpMempoolResponse {
oneof result {
DebugDumpMempoolSuccess success = 1;
MempoolError failure = 2;
}
}
message DebugDumpMempoolSuccess {
repeated MempoolOp ops = 1;
}

message DebugSetReputationRequest {
bytes entry_point = 1;
repeated Reputation reputations = 2;
}
message DebugSetReputationResponse {}
message DebugSetReputationResponse {
oneof result {
DebugSetReputationSuccess success = 1;
MempoolError failure = 2;
}
}
message DebugSetReputationSuccess {}

message DebugDumpReputationRequest {
bytes entry_point = 1;
}
message DebugDumpReputationResponse {
oneof result {
DebugDumpReputationSuccess success = 1;
MempoolError failure = 2;
}
}
message DebugDumpReputationSuccess {
repeated Reputation reputations = 1;
}

Expand Down Expand Up @@ -145,9 +182,14 @@ message MempoolError {
SimulationViolationError simulation_violation = 7;
InvalidSignatureError invalid_signature = 8;
UnsupportedAggregatorError unsupported_aggregator = 9;
UnknownEntryPointError unknown_entry_point = 10;
}
}

message UnknownEntryPointError {
bytes entry_point = 1;
}

message ReplacementUnderpricedError {
bytes current_fee = 1;
bytes current_priority_fee = 2;
Expand Down
116 changes: 45 additions & 71 deletions src/builder/bundle_proposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,21 @@ use linked_hash_map::LinkedHashMap;
#[cfg(test)]
use mockall::automock;
use tokio::try_join;
use tonic::{async_trait, transport::Channel};
use tonic::async_trait;
use tracing::{error, info};

use crate::common::{
contracts::entry_point::UserOpsPerAggregator,
gas::{FeeEstimator, GasFees, PriorityFeeMode},
math,
protos::{
self,
op_pool::{op_pool_client::OpPoolClient, GetOpsRequest, MempoolOp},
},
simulation::{SimulationError, SimulationSuccess, Simulator},
types::{
Entity, EntityType, EntryPointLike, ExpectedStorage, HandleOpsOut, ProviderLike, Timestamp,
UserOperation,
use crate::{
common::{
contracts::entry_point::UserOpsPerAggregator,
gas::{FeeEstimator, GasFees, PriorityFeeMode},
math,
simulation::{SimulationError, SimulationSuccess, Simulator},
types::{
Entity, EntityType, EntryPointLike, ExpectedStorage, HandleOpsOut, ProviderLike,
Timestamp, UserOperation,
},
},
op_pool::{PoolClient, PoolOperation},
};

/// A user op must be valid for at least this long into the future to be included.
Expand Down Expand Up @@ -66,13 +65,14 @@ pub trait BundleProposer: Send + Sync + 'static {
}

#[derive(Debug)]
pub struct BundleProposerImpl<S, E, P>
pub struct BundleProposerImpl<S, E, P, C>
where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
{
op_pool: OpPoolClient<Channel>,
op_pool: C,
simulator: S,
entry_point: E,
provider: Arc<P>,
Expand All @@ -91,11 +91,12 @@ pub struct Settings {
}

#[async_trait]
impl<S, E, P> BundleProposer for BundleProposerImpl<S, E, P>
impl<S, E, P, C> BundleProposer for BundleProposerImpl<S, E, P, C>
where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
{
async fn make_bundle(&self, required_fees: Option<GasFees>) -> anyhow::Result<Bundle> {
let (ops, block_hash, bundle_fees) = try_join!(
Expand All @@ -110,14 +111,14 @@ where
let simulation_futures = ops
.iter()
.filter(|op| {
op.op.max_fee_per_gas >= required_op_fees.max_fee_per_gas
&& op.op.max_priority_fee_per_gas >= required_op_fees.max_priority_fee_per_gas
op.uo.max_fee_per_gas >= required_op_fees.max_fee_per_gas
&& op.uo.max_priority_fee_per_gas >= required_op_fees.max_priority_fee_per_gas
})
.cloned()
.map(|op| self.simulate_validation(op, block_hash));

let ops_with_simulations_future = future::join_all(simulation_futures);
let all_paymaster_addresses = ops.iter().filter_map(|op| op.op.paymaster());
let all_paymaster_addresses = ops.iter().filter_map(|op| op.uo.paymaster());
let balances_by_paymaster_future =
self.get_balances_by_paymaster(all_paymaster_addresses, block_hash);
let (ops_with_simulations, balances_by_paymaster) =
Expand Down Expand Up @@ -163,14 +164,15 @@ where
}
}

impl<S, E, P> BundleProposerImpl<S, E, P>
impl<S, E, P, C> BundleProposerImpl<S, E, P, C>
where
S: Simulator,
E: EntryPointLike,
P: ProviderLike,
C: PoolClient,
{
pub fn new(
op_pool: OpPoolClient<Channel>,
op_pool: C,
simulator: S,
entry_point: E,
provider: Arc<P>,
Expand All @@ -196,16 +198,16 @@ where

async fn simulate_validation(
&self,
op: OpFromPool,
op: PoolOperation,
block_hash: H256,
) -> anyhow::Result<(UserOperation, Option<SimulationSuccess>)> {
let result = self
.simulator
.simulate_validation(op.op.clone(), Some(block_hash), Some(op.expected_code_hash))
.simulate_validation(op.uo.clone(), Some(block_hash), Some(op.expected_code_hash))
.await;
match result {
Ok(success) => Ok((
op.op,
op.uo,
Some(success).filter(|success| {
!success.signature_failed
&& success
Expand All @@ -214,7 +216,7 @@ where
}),
)),
Err(error) => match error {
SimulationError::Violations(_) => Ok((op.op, None)),
SimulationError::Violations(_) => Ok((op.uo, None)),
SimulationError::Other(error) => Err(error),
},
}
Expand Down Expand Up @@ -357,20 +359,11 @@ where
}
}

async fn get_ops_from_pool(&self) -> anyhow::Result<Vec<OpFromPool>> {
async fn get_ops_from_pool(&self) -> anyhow::Result<Vec<PoolOperation>> {
self.op_pool
.clone()
.get_ops(GetOpsRequest {
entry_point: self.entry_point.address().as_bytes().to_vec(),
max_ops: self.settings.max_bundle_size,
})
.get_ops(self.entry_point.address(), self.settings.max_bundle_size)
.await
.context("should get ops from op pool to bundle")?
.into_inner()
.ops
.into_iter()
.map(OpFromPool::try_from)
.collect()
.context("should get ops from pool")
}

async fn get_balances_by_paymaster(
Expand Down Expand Up @@ -443,26 +436,6 @@ where
}
}

#[derive(Clone, Debug)]
struct OpFromPool {
op: UserOperation,
expected_code_hash: H256,
}

impl TryFrom<MempoolOp> for OpFromPool {
type Error = anyhow::Error;

fn try_from(value: MempoolOp) -> Result<Self, Self::Error> {
Ok(Self {
op: value
.uo
.context("mempool op should contain user operation")?
.try_into()?,
expected_code_hash: protos::from_bytes(&value.expected_code_hash)?,
})
}
}

#[derive(Debug)]
struct OpWithSimulation {
op: UserOperation,
Expand Down Expand Up @@ -665,14 +638,14 @@ impl ProposalContext {
mod tests {
use anyhow::anyhow;
use ethers::{types::H160, utils::parse_units};
use tonic::Response;

use super::*;
use crate::common::{
grpc::mocks::{self, MockOpPool},
protos::op_pool::GetOpsResponse,
simulation::{AggregatorSimOut, MockSimulator, SimulationError, SimulationSuccess},
types::{MockEntryPointLike, MockProviderLike, ValidTimeRange},
use crate::{
common::{
simulation::{AggregatorSimOut, MockSimulator, SimulationError, SimulationSuccess},
types::{MockEntryPointLike, MockProviderLike, ValidTimeRange},
},
op_pool::MockPoolClient,
};

#[tokio::test]
Expand Down Expand Up @@ -1095,19 +1068,20 @@ mod tests {
let current_block_hash = hash(125);
let expected_code_hash = hash(126);
let max_bundle_size = mock_ops.len() as u64;
let mut op_pool = MockOpPool::new();
let ops: Vec<_> = mock_ops
.iter()
.map(|MockOp { op, .. }| MempoolOp {
uo: Some(op.into()),
expected_code_hash: expected_code_hash.as_bytes().to_vec(),
.map(|MockOp { op, .. }| PoolOperation {
uo: op.clone(),
expected_code_hash,
..Default::default()
})
.collect();
op_pool

let mut pool_client = MockPoolClient::new();
pool_client
.expect_get_ops()
.return_once(|_| Ok(Response::new(GetOpsResponse { ops })));
let op_pool_handle = mocks::mock_op_pool_client(op_pool).await;
.returning(move |_, _| Ok(ops.clone()));

let simulations_by_op: HashMap<_, _> = mock_ops
.into_iter()
.map(|op| (op.op.op_hash(entry_point_address, 0), op.simulation_result))
Expand Down Expand Up @@ -1155,7 +1129,7 @@ mod tests {
.expect_aggregate_signatures()
.returning(move |address, _| signatures_by_aggregator[&address]());
let proposer = BundleProposerImpl::new(
op_pool_handle.client.clone(),
pool_client,
simulator,
entry_point,
Arc::new(provider),
Expand Down
Loading

0 comments on commit 962879c

Please sign in to comment.