Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add driver submission address to the autopilot #3065

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 130 additions & 6 deletions crates/autopilot/src/arguments.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
use {
crate::{domain::fee::FeeFactor, infra},
anyhow::Context,
anyhow::{anyhow, ensure, Context},
clap::ValueEnum,
primitive_types::H160,
primitive_types::{H160, U256},
shared::{
arguments::{display_list, display_option, ExternalSolver},
arguments::{display_list, display_option},
bad_token::token_owner_finder,
http_client,
price_estimation::{self, NativePriceEstimators},
},
std::{net::SocketAddr, num::NonZeroUsize, str::FromStr, time::Duration},
std::{
fmt,
fmt::{Display, Formatter},
net::SocketAddr,
num::NonZeroUsize,
str::FromStr,
time::Duration,
},
url::Url,
};

Expand Down Expand Up @@ -137,7 +144,8 @@ pub struct Arguments {
)]
pub trusted_tokens_update_interval: Duration,

/// A list of drivers in the following format: `<NAME>|<URL>,<NAME>|<URL>`
/// A list of drivers in the following format:
/// `<NAME>|<URL>|<SUBMISSION_ADDRESS>|<FAIRNESS_THRESHOLD>`
#[clap(long, env, use_value_delimiter = true)]
pub drivers: Vec<ExternalSolver>,

Expand Down Expand Up @@ -372,6 +380,77 @@ impl std::fmt::Display for Arguments {
}
}

/// External solver driver configuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: does it make sense to move this type into it's own arguments module?

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ExternalSolver {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

naming nit (there is no such thing as an internal solver anymore)

Suggested change
pub struct ExternalSolver {
pub struct Solver {

pub name: String,
pub url: Url,
pub submission_account: Account,
pub fairness_threshold: Option<U256>,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum Account {
/// AWS KMS is used to retrieve the solver public key
Kms(Arn),
/// Solver public key
Address(H160),
}

// Wrapper type for AWS ARN identifiers
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Arn(pub String);

impl FromStr for Arn {
type Err = anyhow::Error;

fn from_str(s: &str) -> Result<Self, Self::Err> {
// Could be more strict here, but this should suffice to catch unintended
// configuration mistakes
if s.starts_with("arn:aws:kms:") {
Ok(Self(s.to_string()))
} else {
Err(anyhow!("Invalid ARN identifier: {}", s))
}
}
}

impl Display for ExternalSolver {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
write!(f, "{}({})", self.name, self.url)
}
}

impl FromStr for ExternalSolver {
type Err = anyhow::Error;

fn from_str(solver: &str) -> anyhow::Result<Self> {
let parts: Vec<&str> = solver.split('|').collect();
ensure!(parts.len() >= 3, "not enough arguments for external solver");
let (name, url) = (parts[0], parts[1]);
let url: Url = url.parse()?;
let submission_account = if let Ok(value) = Arn::from_str(parts[2]) {
Account::Kms(value)
} else {
Account::Address(H160::from_str(parts[2]).context("failed to parse submission")?)
};

let fairness_threshold = match parts.get(3) {
Some(value) => {
Some(U256::from_dec_str(value).context("failed to parse fairness threshold")?)
}
None => None,
};

Ok(Self {
name: name.to_owned(),
url,
fairness_threshold,
submission_account,
})
}
}

/// A fee policy to be used for orders base on it's class.
/// Examples:
/// - Surplus with a high enough cap for limit orders: surplus:0.5:0.9:limit
Expand Down Expand Up @@ -530,7 +609,7 @@ impl FromStr for CowAmmConfig {

#[cfg(test)]
mod test {
use super::*;
use {super::*, hex_literal::hex};

#[test]
fn test_fee_factor_limits() {
Expand All @@ -555,4 +634,49 @@ mod test {
.contains("Factor must be in the range [0, 1)"),)
}
}

#[test]
fn parse_driver_submission_account_address() {
let argument = "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2";
let driver = ExternalSolver::from_str(argument).unwrap();
let expected = ExternalSolver {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
fairness_threshold: None,
submission_account: Account::Address(H160::from_slice(&hex!(
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
};
assert_eq!(driver, expected);
}

#[test]
fn parse_driver_submission_account_arn() {
let argument = "name1|http://localhost:8080|arn:aws:kms:supersecretstuff";
let driver = ExternalSolver::from_str(argument).unwrap();
let expected = ExternalSolver {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
fairness_threshold: None,
submission_account: Account::Kms(
Arn::from_str("arn:aws:kms:supersecretstuff").unwrap(),
),
};
assert_eq!(driver, expected);
}

#[test]
fn parse_driver_with_threshold() {
let argument = "name1|http://localhost:8080|0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2|1000000000000000000";
let driver = ExternalSolver::from_str(argument).unwrap();
let expected = ExternalSolver {
name: "name1".into(),
url: Url::parse("http://localhost:8080").unwrap(),
submission_account: Account::Address(H160::from_slice(&hex!(
"C02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2"
))),
fairness_threshold: Some(U256::exp10(18)),
};
assert_eq!(driver, expected);
}
}
37 changes: 33 additions & 4 deletions crates/autopilot/src/infra/solvers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use {
self::dto::{reveal, settle, solve},
crate::{domain::eth, util},
crate::{arguments::Account, domain::eth, util},
anyhow::{anyhow, Context, Result},
reqwest::{Client, StatusCode},
std::time::Duration,
thiserror::Error,
url::Url,
};

Expand All @@ -19,20 +20,48 @@ pub struct Driver {
// winning solution should be discarded if it contains at least one order, which
// another driver solved with surplus exceeding this driver's surplus by `threshold`
pub fairness_threshold: Option<eth::Ether>,
pub submission_address: eth::Address,
client: Client,
}

#[derive(Error, Debug)]
pub enum Error {
#[error("unable to load KMS account")]
UnableToLoadKmsAccount,
}

impl Driver {
pub fn new(url: Url, name: String, fairness_threshold: Option<eth::Ether>) -> Self {
Self {
pub async fn new(
url: Url,
name: String,
fairness_threshold: Option<eth::Ether>,
submission_account: Account,
) -> Result<Self, Error> {
let submission_address = match submission_account {
Account::Kms(key_id) => {
let config = ethcontract::aws_config::load_from_env().await;
let account =
ethcontract::transaction::kms::Account::new((&config).into(), &key_id.0)
.await
.map_err(|_| {
tracing::error!(?name, ?key_id, "Unable to load KMS account");
Error::UnableToLoadKmsAccount
})?;
account.public_address()
}
Account::Address(address) => address,
};

Ok(Self {
name,
url,
fairness_threshold,
client: Client::builder()
.timeout(RESPONSE_TIME_LIMIT)
.build()
.unwrap(),
}
submission_address: submission_address.into(),
})
}

pub async fn solve(&self, request: &solve::Request) -> Result<solve::Response> {
Expand Down
65 changes: 48 additions & 17 deletions crates/autopilot/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use {
contracts::{BalancerV2Vault, IUniswapV3Factory},
ethcontract::{dyns::DynWeb3, errors::DeployError, BlockNumber},
ethrpc::block_stream::block_number_to_block_number_hash,
futures::StreamExt,
futures::stream::StreamExt,
model::DomainSeparator,
observe::metrics::LivenessChecking,
shared::{
Expand Down Expand Up @@ -347,7 +347,12 @@ pub async fn run(args: Arguments) {

let price_estimator = price_estimator_factory
.price_estimator(
&args.order_quoting.price_estimation_drivers,
&args
.order_quoting
.price_estimation_drivers
.iter()
.map(|price_estimator_driver| price_estimator_driver.clone().into())
.collect::<Vec<_>>(),
native_price_estimator.clone(),
gas_price_estimator.clone(),
)
Expand Down Expand Up @@ -529,21 +534,35 @@ pub async fn run(args: Arguments) {
max_winners_per_auction: args.max_winners_per_auction,
max_solutions_per_solver: args.max_solutions_per_solver,
};
let drivers_futures = args
.drivers
.into_iter()
.map(|driver| async move {
match infra::Driver::new(
driver.url,
driver.name.clone(),
driver.fairness_threshold.map(Into::into),
driver.submission_account,
)
.await
{
Ok(driver) => Some(Arc::new(driver)),
Err(_) => None,
Comment on lines +549 to +550
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to introduce a new error when it is completely ignored on the caller's side? Either an Option should be returned or the error needs to be logged here instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing an error is good for the function itself, that we don't use it now doesn't mean it won't be used. The logging is done within the function instead of the caller, mostly to avoid code repetition since this is instantiated in multiple times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that we don't use it now doesn't mean it won't be used.

IMO, then it should only be introduced then. Currently, it is redundant.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree. If we design a function that errors, then it has to emit an error. How the error is treated is responsibility of the caller.

}
})
.collect::<Vec<_>>();

let drivers = futures::future::join_all(drivers_futures)
.await
.into_iter()
.flatten()
.collect();

let run = RunLoop::new(
run_loop_config,
eth,
persistence.clone(),
args.drivers
.into_iter()
.map(|driver| {
Arc::new(infra::Driver::new(
driver.url,
driver.name,
driver.fairness_threshold.map(Into::into),
))
})
.collect(),
drivers,
solvable_orders_cache,
trusted_tokens,
liveness.clone(),
Expand All @@ -560,16 +579,28 @@ async fn shadow_mode(args: Arguments) -> ! {
args.shadow.expect("missing shadow mode configuration"),
);

let drivers = args
let drivers_futures = args
.drivers
.into_iter()
.map(|driver| {
Arc::new(infra::Driver::new(
.map(|driver| async move {
match infra::Driver::new(
driver.url,
driver.name,
driver.name.clone(),
driver.fairness_threshold.map(Into::into),
))
driver.submission_account,
)
.await
{
Ok(driver) => Some(Arc::new(driver)),
Err(_) => None,
}
})
.collect::<Vec<_>>();

let drivers = futures::future::join_all(drivers_futures)
.await
.into_iter()
.flatten()
.collect();

let trusted_tokens = {
Expand Down
Loading
Loading