From c08b2ef88f9e09bede60565c0bd9eb853240fe8a Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Fri, 1 Nov 2024 14:27:59 -0400 Subject: [PATCH] dex: add `DexTester` harness prop Co-authored-by: Lucas Meier --- crates/bin/pd/src/network/generate.rs | 25 +- crates/core/component/dex/Cargo.toml | 4 + .../src/component/router/route_and_fill.rs | 65 ++-- .../core/component/dex/src/swap_execution.rs | 2 + .../dex/tests/integration/harness.rs | 313 ++++++++++++++++++ .../component/dex/tests/integration/mod.rs | 1 + 6 files changed, 374 insertions(+), 36 deletions(-) create mode 100644 crates/core/component/dex/tests/integration/harness.rs create mode 100644 crates/core/component/dex/tests/integration/mod.rs diff --git a/crates/bin/pd/src/network/generate.rs b/crates/bin/pd/src/network/generate.rs index 437c255e3b..cefb71750c 100644 --- a/crates/bin/pd/src/network/generate.rs +++ b/crates/bin/pd/src/network/generate.rs @@ -428,8 +428,8 @@ pub fn network_generate( /// Represents initial allocations to the testnet. #[derive(Debug, Deserialize)] pub struct NetworkAllocation { - #[serde(deserialize_with = "string_u64")] - pub amount: u64, + #[serde(deserialize_with = "string_u128")] + pub amount: u128, pub denom: String, pub address: String, } @@ -670,17 +670,17 @@ impl TryFrom for shielded_pool_genesis::Allocation { } } -fn string_u64<'de, D>(deserializer: D) -> Result +fn string_u128<'de, D>(deserializer: D) -> Result where D: de::Deserializer<'de>, { - struct U64StringVisitor; + struct U128StringVisitor; - impl<'de> de::Visitor<'de> for U64StringVisitor { - type Value = u64; + impl<'de> de::Visitor<'de> for U128StringVisitor { + type Value = u128; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - formatter.write_str("a string containing a u64 with optional underscores") + formatter.write_str("a string containing a u128 with optional underscores") } fn visit_str(self, v: &str) -> Result @@ -688,10 +688,17 @@ where E: de::Error, { let r = v.replace('_', ""); - r.parse::().map_err(E::custom) + r.parse::().map_err(E::custom) } fn visit_u64(self, v: u64) -> Result + where + E: de::Error, + { + Ok(v as u128) + } + + fn visit_u128(self, v: u128) -> std::prelude::v1::Result where E: de::Error, { @@ -699,7 +706,7 @@ where } } - deserializer.deserialize_any(U64StringVisitor) + deserializer.deserialize_any(U128StringVisitor) } #[cfg(test)] diff --git a/crates/core/component/dex/Cargo.toml b/crates/core/component/dex/Cargo.toml index d4eb923904..4c70f895b7 100644 --- a/crates/core/component/dex/Cargo.toml +++ b/crates/core/component/dex/Cargo.toml @@ -3,6 +3,10 @@ name = "penumbra-dex" version = {workspace = true} edition = {workspace = true} +[[test]] +name = "penumbra-dex-integration-tests" +path = "tests/integration/mod.rs" + [features] component = [ "cnidarium-component", diff --git a/crates/core/component/dex/src/component/router/route_and_fill.rs b/crates/core/component/dex/src/component/router/route_and_fill.rs index f72443de6c..4d7b5476a8 100644 --- a/crates/core/component/dex/src/component/router/route_and_fill.rs +++ b/crates/core/component/dex/src/component/router/route_and_fill.rs @@ -44,11 +44,21 @@ pub trait HandleBatchSwaps: StateWrite + Sized { // executions up to the specified `execution_budget` parameter. let execution_circuit_breaker = ExecutionCircuitBreaker::new(execution_budget); + // We clamp the deltas to the maximum input for batch swaps. + let clamped_delta_1 = delta_1.min(MAX_RESERVE_AMOUNT.into()); + let clamped_delta_2 = delta_2.min(MAX_RESERVE_AMOUNT.into()); + + tracing::debug!( + ?clamped_delta_1, + ?clamped_delta_2, + "clamped deltas to maximum amount" + ); + let swap_execution_1_for_2 = self .route_and_fill( trading_pair.asset_1(), trading_pair.asset_2(), - delta_1, + clamped_delta_1, params.clone(), execution_circuit_breaker.clone(), ) @@ -58,7 +68,7 @@ pub trait HandleBatchSwaps: StateWrite + Sized { .route_and_fill( trading_pair.asset_2(), trading_pair.asset_1(), - delta_2, + clamped_delta_2, params.clone(), execution_circuit_breaker, ) @@ -67,6 +77,7 @@ pub trait HandleBatchSwaps: StateWrite + Sized { let (lambda_2, unfilled_1) = match &swap_execution_1_for_2 { Some(swap_execution) => ( swap_execution.output.amount, + // The unfilled amount of asset 1 is the trade input minus the amount consumed, plus the excess. delta_1 - swap_execution.input.amount, ), None => (0u64.into(), delta_1), @@ -148,9 +159,10 @@ pub trait RouteAndFill: StateWrite + Sized { where Self: 'static, { - tracing::debug!(?input, ?asset_1, ?asset_2, "starting route_and_fill"); + tracing::debug!(?input, ?asset_1, ?asset_2, "prepare to route and fill"); if input == Amount::zero() { + tracing::debug!("no input, short-circuit exit"); return Ok(None); } @@ -162,13 +174,12 @@ pub trait RouteAndFill: StateWrite + Sized { // An ordered list of execution traces that were used to fill the trade. let mut traces: Vec> = Vec::new(); - let max_delta_1: Amount = MAX_RESERVE_AMOUNT.into(); - // Termination conditions: // 1. We have no more `delta_1` remaining // 2. A path can no longer be found // 3. We have reached the `RoutingParams` specified price limit // 4. The execution circuit breaker has been triggered based on the number of path searches and executions + // 5. An unrecoverable error occurred during the execution of the route. loop { // Check if we have exceeded the execution circuit breaker limits. if execution_circuit_breaker.exceeded_limits() { @@ -196,21 +207,20 @@ pub trait RouteAndFill: StateWrite + Sized { break; } - // We split off the entire batch swap into smaller chunks to avoid causing - // a series of overflow in the DEX. + // We prepare the input for this execution round, which is the remaining unfilled amount of asset 1. let delta_1 = Value { - amount: total_unfilled_1.min(max_delta_1), asset_id: asset_1, + amount: total_unfilled_1, }; - tracing::debug!(?path, delta_1 = ?delta_1.amount, "found path, filling up to spill price"); + tracing::debug!(?path, ?delta_1, "found path, filling up to spill price"); - let execution = Arc::get_mut(self) + let execution_result = Arc::get_mut(self) .expect("expected state to have no other refs") .fill_route(delta_1, &path, spill_price) .await; - let execution = match execution { + let swap_execution = match execution_result { Ok(execution) => execution, Err(FillError::ExecutionOverflow(position_id)) => { // We have encountered an overflow during the execution of the route. @@ -233,24 +243,25 @@ pub trait RouteAndFill: StateWrite + Sized { // Immediately track the execution in the state. (total_output_2, total_unfilled_1) = { - let lambda_2 = execution.output; - let unfilled_1 = Value { - amount: total_unfilled_1 - .checked_sub(&execution.input.amount) - .expect("unable to subtract unfilled input from total input"), - asset_id: asset_1, - }; - tracing::debug!(input = ?delta_1.amount, output = ?lambda_2.amount, unfilled = ?unfilled_1.amount, "filled along best path"); + // The exact amount of asset 1 that was consumed in this execution round. + let consumed_input = swap_execution.input; + // The output of this execution round is the amount of asset 2 that was filled. + let produced_output = swap_execution.output; + + tracing::debug!(consumed_input = ?consumed_input.amount, output = ?produced_output.amount, "filled along best path"); - assert_eq!(lambda_2.asset_id, asset_2); - assert_eq!(unfilled_1.asset_id, asset_1); + // Sanity check that the input and output assets are correct. + assert_eq!(produced_output.asset_id, asset_2); + assert_eq!(consumed_input.asset_id, asset_1); // Append the traces from this execution to the outer traces. - traces.append(&mut execution.traces.clone()); + traces.append(&mut swap_execution.traces.clone()); ( - total_output_2 + lambda_2.amount, - total_unfilled_1 - delta_1.amount + unfilled_1.amount, + // The total output of asset 2 is the sum of all outputs. + total_output_2 + produced_output.amount, + // The total unfilled amount of asset 1 is the remaining unfilled amount minus the amount consumed. + total_unfilled_1 - consumed_input.amount, ) }; @@ -260,7 +271,7 @@ pub trait RouteAndFill: StateWrite + Sized { } // Ensure that we've actually executed, or else bail out. - let Some(accurate_max_price) = execution.max_price() else { + let Some(accurate_max_price) = swap_execution.max_price() else { tracing::debug!("no traces in execution, exiting route_and_fill"); break; }; @@ -278,8 +289,7 @@ pub trait RouteAndFill: StateWrite + Sized { } } - // If we didn't execute against any position at all, there - // are no execution records to return. + // If we didn't execute against any position at all, there are no execution records to return. if traces.is_empty() { return Ok(None); } else { @@ -287,6 +297,7 @@ pub trait RouteAndFill: StateWrite + Sized { traces, input: Value { asset_id: asset_1, + // The total amount of asset 1 that was actually consumed across rounds. amount: input - total_unfilled_1, }, output: Value { diff --git a/crates/core/component/dex/src/swap_execution.rs b/crates/core/component/dex/src/swap_execution.rs index c14daa213c..a602ce74c6 100644 --- a/crates/core/component/dex/src/swap_execution.rs +++ b/crates/core/component/dex/src/swap_execution.rs @@ -9,7 +9,9 @@ use serde::{Deserialize, Serialize}; #[serde(try_from = "pb::SwapExecution", into = "pb::SwapExecution")] pub struct SwapExecution { pub traces: Vec>, + /// The input value that was consumed. pub input: Value, + /// The output value that was produced. pub output: Value, } diff --git a/crates/core/component/dex/tests/integration/harness.rs b/crates/core/component/dex/tests/integration/harness.rs new file mode 100644 index 0000000000..a9348fd592 --- /dev/null +++ b/crates/core/component/dex/tests/integration/harness.rs @@ -0,0 +1,313 @@ +//! This module contains a testing harness which allows us to exercise common dex functionality. +//! +//! The basic flow is that you build up a `TestingStrategy` given a `TestingStrategyBuilder`, +//! and then run this strategy on an empty dex, in order to get an outcome. +//! The strategy involves setting up certain positions, and then performing a batch swap +//! with particular swap flows combined together. +//! +//! The outcome consists of the remaining reserves for each position, and the output for each swapper. +use cnidarium::{Snapshot, StateDelta, TempStorage}; +use cnidarium_component::{ActionHandler, Component}; +use penumbra_asset::asset; +use penumbra_dex::{ + component::{Dex, PositionRead, StateReadExt}, + lp::{ + position::{self, Position}, + Reserves, + }, + swap::{self, proof::SwapProof, SwapPlaintext}, + BatchSwapOutputData, DirectedTradingPair, PositionClose, PositionOpen, PositionWithdraw, Swap, + TradingPair, +}; +use penumbra_keys::test_keys; +use penumbra_num::Amount; +use penumbra_proto::core::component::dex::v1::ZkSwapProof; +use penumbra_sct::{ + component::{clock::EpochManager, source::SourceContext}, + epoch::Epoch, +}; +use rand_core::{CryptoRngCore, OsRng}; +use std::{mem, sync::Arc}; +use tendermint::abci::request::EndBlock; + +struct DexTester { + storage: TempStorage, + height: u8, + handle: Option>, +} + +impl DexTester { + async fn init() -> anyhow::Result { + let mut out = Self { + storage: TempStorage::new().await?, + height: 0, + handle: None, + }; + Dex::init_chain(out.handle(), Some(&Default::default())).await; + Ok(out) + } + + fn handle(&mut self) -> &mut StateDelta { + if let None = self.handle { + self.handle = Some(self.consume_handle()); + } + // NO-PANIC: we defined this above. + self.handle.as_mut().unwrap() + } + + fn consume_handle(&mut self) -> StateDelta { + match mem::replace(&mut self.handle, None) { + Some(x) => x, + None => { + let mut out = StateDelta::new(self.storage.latest_snapshot()); + out.put_mock_source(self.height); + out.put_block_height(self.height.into()); + out.put_epoch_by_height( + self.height.into(), + Epoch { + index: 0, + start_height: 0, + }, + ); + out + } + } + } + + async fn position_open(&mut self, tx: PositionOpen) -> anyhow::Result<()> { + let handle = self.handle(); + tx.check_and_execute(handle).await?; + Ok(()) + } + + async fn position_close(&mut self, tx: PositionClose) -> anyhow::Result<()> { + let handle = self.handle(); + tx.check_and_execute(handle).await?; + Ok(()) + } + + async fn position_withdraw(&mut self, tx: PositionWithdraw) -> anyhow::Result<()> { + let handle = self.handle(); + tx.check_and_execute(handle).await?; + Ok(()) + } + + async fn position_by_id(&mut self, id: &position::Id) -> anyhow::Result> { + let handle = self.handle(); + handle.position_by_id(id).await + } + + async fn swap(&mut self, tx: Swap) -> anyhow::Result<()> { + let handle = self.handle(); + tx.check_and_execute(handle).await?; + Ok(()) + } + + async fn end_block(&mut self) -> anyhow::Result<()> { + let handle = self.consume_handle(); + let mut temp_handle = Arc::new(handle); + Dex::end_block( + &mut temp_handle, + &EndBlock { + height: self.height.into(), + }, + ) + .await; + self.storage + .commit(Arc::into_inner(temp_handle).unwrap()) + .await?; + self.height += 1; + Ok(()) + } + + async fn previous_bsod( + &mut self, + trading_pair: TradingPair, + ) -> anyhow::Result> { + assert!( + self.height >= 1, + "did not call `end_block` before calling `previous_bsod`" + ); + let height = u64::from(self.height - 1); + let handle = self.handle(); + let bsod = handle.output_data(height, trading_pair).await?; + Ok(bsod) + } +} + +/// This allows incrementally building up a test strategy. +pub struct TestingStrategyBuilder<'rng> { + rng: &'rng mut dyn CryptoRngCore, + strategy: TestingStrategy, + asset1: asset::Id, + asset2: asset::Id, +} + +impl<'rng> TestingStrategyBuilder<'rng> { + pub fn new(rng: &'rng mut dyn CryptoRngCore, asset1: asset::Id, asset2: asset::Id) -> Self { + Self { + rng, + strategy: TestingStrategy::new(asset1, asset2), + asset1, + asset2, + } + } + + pub fn with_position(mut self, reserves1: Amount, reserves2: Amount) -> Self { + let position = Position::new( + &mut self.rng, + DirectedTradingPair::new(self.asset1, self.asset2), + 0, + 1u64.into(), + 1u64.into(), + Reserves { + r1: reserves1, + r2: reserves2, + }, + ); + self.strategy.positions.push(position); + self + } + + pub fn with_swap(mut self, amount: Amount) -> Self { + let pair = TradingPair::new(self.asset1, self.asset2); + let plaintext = SwapPlaintext::new( + &mut OsRng, + pair, + if pair.asset_1() == self.asset1 { + amount + } else { + 0u64.into() + }, + if pair.asset_2() == self.asset1 { + amount + } else { + 0u64.into() + }, + Default::default(), + test_keys::ADDRESS_0.clone(), + ); + let swap = swap::Body { + trading_pair: plaintext.trading_pair, + delta_1_i: plaintext.delta_1_i, + delta_2_i: plaintext.delta_2_i, + fee_commitment: plaintext.claim_fee.commit(Default::default()), + payload: plaintext.encrypt(test_keys::FULL_VIEWING_KEY.outgoing()), + }; + self.strategy.swaps.push(swap); + self + } + + pub fn build(self) -> TestingStrategy { + self.strategy + } +} + +pub struct TestingStrategy { + asset1: asset::Id, + asset2: asset::Id, + positions: Vec, + swaps: Vec, +} + +impl TestingStrategy { + fn new(asset1: asset::Id, asset2: asset::Id) -> Self { + Self { + asset1, + asset2, + positions: Vec::new(), + swaps: Vec::new(), + } + } + + /// Run a given strategy, producing a result. + /// + /// With the current strategy implementation, this function will: + /// - create all the positions built up here, + /// - perform a batch swap with all the flows built up here, + /// - withdraw and close all positions. + pub async fn run(self) -> anyhow::Result { + let mut dex = DexTester::init().await?; + + let mut position_ids = Vec::new(); + for position in self.positions { + position_ids.push(position.id()); + let tx = PositionOpen { position }; + dex.position_open(tx).await?; + } + dex.end_block().await?; + + for body in &self.swaps { + let swap = Swap { + proof: SwapProof::try_from(ZkSwapProof { + inner: vec![0u8; 192], + }) + .expect("should be able to create dummy proof"), + body: body.clone(), + }; + dex.swap(swap).await?; + } + dex.end_block().await?; + + let bsod = dex + .previous_bsod(TradingPair::new(self.asset1, self.asset2)) + .await?; + for position_id in position_ids.iter().copied() { + dex.position_close(PositionClose { position_id }).await?; + } + dex.end_block().await?; + + let mut position_reserves = Vec::new(); + for position_id in position_ids { + let position = dex + .position_by_id(&position_id) + .await? + .expect("position should have been created"); + + dex.position_withdraw(PositionWithdraw { + position_id, + reserves_commitment: position + .reserves + .balance(&position.phi.pair) + .commit(Default::default()), + sequence: 0, + }) + .await?; + + position_reserves.push(( + position + .reserves_for(self.asset1) + .expect("position should have reserves"), + position + .reserves_for(self.asset2) + .expect("position should have reserves"), + )); + } + dex.end_block().await?; + + let mut outputs = Vec::new(); + for swap in self.swaps { + let bsod = bsod.unwrap(); + let (a, b) = bsod.pro_rata_outputs((swap.delta_1_i, swap.delta_2_i)); + let (a, b) = if bsod.trading_pair.asset_1() == self.asset1 { + (a, b) + } else { + (b, a) + }; + outputs.push((a, b)); + } + + Ok(TestingOutcome { + position_reserves, + outputs, + }) + } +} + +#[derive(Debug, Clone, PartialEq)] +pub struct TestingOutcome { + /// The reserves of each position, in the order of the strategy. + pub position_reserves: Vec<(Amount, Amount)>, + /// The outcome of the swap. + pub outputs: Vec<(Amount, Amount)>, +} diff --git a/crates/core/component/dex/tests/integration/mod.rs b/crates/core/component/dex/tests/integration/mod.rs new file mode 100644 index 0000000000..8d7ed6d308 --- /dev/null +++ b/crates/core/component/dex/tests/integration/mod.rs @@ -0,0 +1 @@ +mod harness;