diff --git a/crates/core/component/dex/src/circuit_breaker/mod.rs b/crates/core/component/dex/src/circuit_breaker/mod.rs deleted file mode 100644 index 6dcbec4acc..0000000000 --- a/crates/core/component/dex/src/circuit_breaker/mod.rs +++ /dev/null @@ -1,5 +0,0 @@ -mod execution; -mod value; - -pub(crate) use execution::ExecutionCircuitBreaker; -pub(crate) use value::ValueCircuitBreaker; diff --git a/crates/core/component/dex/src/component/action_handler/position/open.rs b/crates/core/component/dex/src/component/action_handler/position/open.rs index 2474d50290..588c8972b7 100644 --- a/crates/core/component/dex/src/component/action_handler/position/open.rs +++ b/crates/core/component/dex/src/component/action_handler/position/open.rs @@ -5,7 +5,7 @@ use cnidarium_component::ActionHandler; use penumbra_proto::StateWriteProto as _; use crate::{ - component::{PositionManager, PositionRead}, + component::{PositionManager, PositionRead, ValueCircuitBreaker}, event, lp::{action::PositionOpen, position}, }; @@ -33,6 +33,13 @@ impl ActionHandler for PositionOpen { async fn check_and_execute(&self, mut state: S) -> Result<()> { // Validate that the position ID doesn't collide state.check_position_id_unused(&self.position.id()).await?; + + // Credit the DEX for the inflows from this position. + // TODO: in a future PR, split current PositionManager to PositionManagerInner + // and fold this into a position open method + state.vcb_credit(self.position.reserves_1()).await?; + state.vcb_credit(self.position.reserves_2()).await?; + state.put_position(self.position.clone()).await?; state.record_proto(event::position_open(self)); Ok(()) diff --git a/crates/core/component/dex/src/component/action_handler/position/withdraw.rs b/crates/core/component/dex/src/component/action_handler/position/withdraw.rs index aed55aa0cd..a012ae4c9c 100644 --- a/crates/core/component/dex/src/component/action_handler/position/withdraw.rs +++ b/crates/core/component/dex/src/component/action_handler/position/withdraw.rs @@ -7,7 +7,7 @@ use decaf377::Fr; use penumbra_proto::StateWriteProto; use crate::{ - component::{PositionManager, PositionRead}, + component::{PositionManager, PositionRead, ValueCircuitBreaker}, event, lp::{action::PositionWithdraw, position, Reserves}, }; @@ -90,6 +90,12 @@ impl ActionHandler for PositionWithdraw { // the current reserves. state.record_proto(event::position_withdraw(self, &metadata)); + // Debit the DEX for the outflows from this position. + // TODO: in a future PR, split current PositionManager to PositionManagerInner + // and fold this into a position open method + state.vcb_debit(metadata.reserves_1()).await?; + state.vcb_debit(metadata.reserves_2()).await?; + // Finally, update the position. This has two steps: // - update the state with the correct sequence number; // - zero out the reserves, to prevent double-withdrawals. diff --git a/crates/core/component/dex/src/component/action_handler/swap.rs b/crates/core/component/dex/src/component/action_handler/swap.rs index b1dce652dc..b735a3b295 100644 --- a/crates/core/component/dex/src/component/action_handler/swap.rs +++ b/crates/core/component/dex/src/component/action_handler/swap.rs @@ -47,7 +47,9 @@ impl ActionHandler for Swap { swap_flow.1 += swap.body.delta_2_i; // Set the batch swap flow for the trading pair. - state.put_swap_flow(&swap.body.trading_pair, swap_flow); + state + .put_swap_flow(&swap.body.trading_pair, swap_flow) + .await?; // Record the swap commitment in the state. let source = state.get_current_source().expect("source is set"); diff --git a/crates/core/component/dex/src/component/arb.rs b/crates/core/component/dex/src/component/arb.rs index b1d9009de5..b802ca2279 100644 --- a/crates/core/component/dex/src/component/arb.rs +++ b/crates/core/component/dex/src/component/arb.rs @@ -8,7 +8,7 @@ use penumbra_proto::StateWriteProto as _; use penumbra_sct::component::clock::EpochRead; use tracing::instrument; -use crate::{event, ExecutionCircuitBreaker, SwapExecution}; +use crate::{component::ExecutionCircuitBreaker, event, SwapExecution}; use super::{ router::{RouteAndFill, RoutingParams}, diff --git a/crates/core/component/dex/src/circuit_breaker/execution.rs b/crates/core/component/dex/src/component/circuit_breaker/execution.rs similarity index 100% rename from crates/core/component/dex/src/circuit_breaker/execution.rs rename to crates/core/component/dex/src/component/circuit_breaker/execution.rs diff --git a/crates/core/component/dex/src/component/circuit_breaker/mod.rs b/crates/core/component/dex/src/component/circuit_breaker/mod.rs new file mode 100644 index 0000000000..448223c17c --- /dev/null +++ b/crates/core/component/dex/src/component/circuit_breaker/mod.rs @@ -0,0 +1,5 @@ +mod execution; +mod value; + +pub use execution::ExecutionCircuitBreaker; +pub use value::ValueCircuitBreaker; diff --git a/crates/core/component/dex/src/circuit_breaker/value.rs b/crates/core/component/dex/src/component/circuit_breaker/value.rs similarity index 86% rename from crates/core/component/dex/src/circuit_breaker/value.rs rename to crates/core/component/dex/src/component/circuit_breaker/value.rs index 34a93ae165..fcf5f6236f 100644 --- a/crates/core/component/dex/src/circuit_breaker/value.rs +++ b/crates/core/component/dex/src/component/circuit_breaker/value.rs @@ -1,42 +1,46 @@ -use penumbra_asset::{asset::Id, Balance, Value}; +use anyhow::{anyhow, Result}; +use cnidarium::StateWrite; +use penumbra_asset::Value; use penumbra_num::Amount; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, Serialize, Deserialize, Default)] -pub struct ValueCircuitBreaker { - balance: Balance, -} - -impl ValueCircuitBreaker { - pub fn tally(&mut self, balance: Balance) { - self.balance += balance; - } - - pub fn check(&self) -> anyhow::Result<()> { - // No assets should ever be "required" by the circuit breaker's - // internal balance tracking, only "provided". - if let Some(r) = self.balance.required().next() { - return Err(anyhow::anyhow!( - "balance for asset {} is negative: -{}", - r.asset_id, - r.amount - )); - } - +use penumbra_proto::{StateReadProto, StateWriteProto}; +use tonic::async_trait; + +use crate::state_key; + +/// Tracks the aggregate value of deposits in the DEX. +#[async_trait] +pub trait ValueCircuitBreaker: StateWrite { + /// Credits a deposit into the DEX. + async fn vcb_credit(&mut self, value: Value) -> Result<()> { + let balance: Amount = self + .get(&state_key::value_balance(&value.asset_id)) + .await? + .unwrap_or_default(); + let new_balance = balance + .checked_add(&value.amount) + .ok_or_else(|| anyhow!("overflowed balance while crediting value circuit breaker"))?; + self.put(state_key::value_balance(&value.asset_id), new_balance); Ok(()) } - pub fn available(&self, asset_id: Id) -> Value { - self.balance - .provided() - .find(|b| b.asset_id == asset_id) - .unwrap_or(Value { - asset_id, - amount: Amount::from(0u64), - }) + /// Debits a deposit from the DEX. + async fn vcb_debit(&mut self, value: Value) -> Result<()> { + let balance: Amount = self + .get(&state_key::value_balance(&value.asset_id)) + .await? + .unwrap_or_default(); + let new_balance = balance + .checked_sub(&value.amount) + .ok_or_else(|| anyhow!("underflowed balance while debiting value circuit breaker"))?; + self.put(state_key::value_balance(&value.asset_id), new_balance); + Ok(()) } } +impl ValueCircuitBreaker for T {} + +/* + #[cfg(test)] mod tests { use std::sync::Arc; @@ -248,3 +252,4 @@ mod tests { .expect("unable to process batch swaps"); } } +*/ diff --git a/crates/core/component/dex/src/component/dex.rs b/crates/core/component/dex/src/component/dex.rs index f070d98bd1..6832d1286a 100644 --- a/crates/core/component/dex/src/component/dex.rs +++ b/crates/core/component/dex/src/component/dex.rs @@ -18,7 +18,7 @@ use crate::{ use super::{ router::{HandleBatchSwaps, RoutingParams}, - Arbitrage, PositionManager, + Arbitrage, PositionManager, ValueCircuitBreaker, }; pub struct Dex {} @@ -209,12 +209,26 @@ pub trait StateWriteExt: StateWrite + StateReadExt { self.object_put(state_key::config::dex_params_updated(), ()) } - fn set_output_data( + async fn set_output_data( &mut self, output_data: BatchSwapOutputData, swap_execution_1_for_2: Option, swap_execution_2_for_1: Option, - ) { + ) -> Result<()> { + // Debit the DEX for the swap outflows. + // Note that since we credited the DEX for _all_ inflows, we need to debit the + // unfilled amounts as well as the filled amounts. + self.vcb_debit(Value { + amount: output_data.unfilled_1 + output_data.lambda_1, + asset_id: output_data.trading_pair.asset_1, + }) + .await?; + self.vcb_debit(Value { + amount: output_data.unfilled_2 + output_data.lambda_2, + asset_id: output_data.trading_pair.asset_2, + }) + .await?; + // Write the output data to the state under a known key, for querying, ... let height = output_data.height; let trading_pair = output_data.trading_pair; @@ -247,17 +261,40 @@ pub trait StateWriteExt: StateWrite + StateReadExt { swap_execution_1_for_2, swap_execution_2_for_1, )); + + Ok(()) } fn set_arb_execution(&mut self, height: u64, execution: SwapExecution) { self.put(state_key::arb_execution(height), execution); } - fn put_swap_flow(&mut self, trading_pair: &TradingPair, swap_flow: SwapFlow) { + async fn put_swap_flow( + &mut self, + trading_pair: &TradingPair, + swap_flow: SwapFlow, + ) -> Result<()> { + // Credit the DEX for the swap inflows. + // + // Note that we credit the DEX for _all_ inflows, since we don't know + // how much will eventually be filled. + self.vcb_credit(Value { + amount: swap_flow.0, + asset_id: trading_pair.asset_1, + }) + .await?; + self.vcb_credit(Value { + amount: swap_flow.1, + asset_id: trading_pair.asset_2, + }) + .await?; + // TODO: replace with IM struct later let mut swap_flows = self.swap_flows(); swap_flows.insert(*trading_pair, swap_flow); - self.object_put(state_key::swap_flows(), swap_flows) + self.object_put(state_key::swap_flows(), swap_flows); + + Ok(()) } } diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index 11b8eb217b..871bd67748 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -8,6 +8,7 @@ pub mod router; mod action_handler; mod arb; +pub(crate) mod circuit_breaker; mod dex; mod flow; pub(crate) mod position_manager; @@ -15,6 +16,8 @@ mod swap_manager; pub use self::metrics::register_metrics; pub use arb::Arbitrage; +pub use circuit_breaker::ExecutionCircuitBreaker; +pub(crate) use circuit_breaker::ValueCircuitBreaker; pub use dex::{Dex, StateReadExt, StateWriteExt}; pub use position_manager::{PositionManager, PositionRead}; pub use swap_manager::SwapManager; diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index a004547db1..0ef1739b81 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -12,7 +12,6 @@ use penumbra_num::Amount; use penumbra_proto::DomainType; use penumbra_proto::{StateReadProto, StateWriteProto}; -use crate::circuit_breaker::ValueCircuitBreaker; use crate::lp::position::State; use crate::{ lp::position::{self, Position}, @@ -570,32 +569,6 @@ pub(crate) trait Inner: StateWrite { "updating position assets' aggregate balances" ); - let mut value_circuit_breaker: ValueCircuitBreaker = match self - .nonverifiable_get_raw(state_key::aggregate_value().as_bytes()) - .await - .expect("able to retrieve value circuit breaker from nonverifiable storage") - { - Some(bytes) => serde_json::from_slice(&bytes).expect( - "able to deserialize stored value circuit breaker from nonverifiable storage", - ), - None => ValueCircuitBreaker::default(), - }; - - // Add the change to the value circuit breaker for assets A and B. - value_circuit_breaker.tally(net_change_for_a); - value_circuit_breaker.tally(net_change_for_b); - - // Confirm that the value circuit breaker is still within the limits. - // This call will panic if the value circuit breaker detects inflation. - value_circuit_breaker.check()?; - - // Store the value circuit breaker back to nonconsensus storage with the updated tallies. - self.nonverifiable_put_raw( - state_key::aggregate_value().as_bytes().to_vec(), - serde_json::to_vec(&value_circuit_breaker) - .expect("able to serialize value circuit breaker for nonverifiable storage"), - ); - Ok(()) } } diff --git a/crates/core/component/dex/src/component/router/route_and_fill.rs b/crates/core/component/dex/src/component/router/route_and_fill.rs index 19671b4edd..b18a786200 100644 --- a/crates/core/component/dex/src/component/router/route_and_fill.rs +++ b/crates/core/component/dex/src/component/router/route_and_fill.rs @@ -8,14 +8,13 @@ use penumbra_num::Amount; use tracing::instrument; use crate::{ - circuit_breaker::ValueCircuitBreaker, component::{ flow::SwapFlow, router::{FillRoute, PathSearch, RoutingParams}, - PositionManager, StateWriteExt, + ExecutionCircuitBreaker, PositionManager, StateWriteExt, }, lp::position::MAX_RESERVE_AMOUNT, - state_key, BatchSwapOutputData, ExecutionCircuitBreaker, SwapExecution, TradingPair, + BatchSwapOutputData, SwapExecution, TradingPair, }; use super::fill_route::FillError; @@ -49,19 +48,6 @@ pub trait HandleBatchSwaps: StateWrite + Sized { tracing::debug!(?delta_1, ?delta_2, ?trading_pair, "decrypted batch swaps"); let execution_circuit_breaker = ExecutionCircuitBreaker::default(); - // Fetch the ValueCircuitBreaker prior to calling `route_and_fill`, so - // we know the total aggregate amount of each asset prior to executing and - // can ensure the total outflows don't exceed the total balances. - let value_circuit_breaker: ValueCircuitBreaker = match self - .nonverifiable_get_raw(state_key::aggregate_value().as_bytes()) - .await - .expect("able to retrieve value circuit breaker from nonverifiable storage") - { - Some(bytes) => serde_json::from_slice(&bytes).expect( - "able to deserialize stored value circuit breaker from nonverifiable storage", - ), - None => ValueCircuitBreaker::default(), - }; let swap_execution_1_for_2 = if delta_1.value() > 0 { Some( @@ -121,19 +107,6 @@ pub trait HandleBatchSwaps: StateWrite + Sized { unfilled_2, }; - // Check that the output data doesn't exceed the ValueCircuitBreaker's quantities - // (i.e. we didn't outflow more value than existed within liquidity positions). - let available_asset_1 = value_circuit_breaker.available(trading_pair.asset_1()); - let available_asset_2 = value_circuit_breaker.available(trading_pair.asset_2()); - assert!( - output_data.lambda_1 <= available_asset_1.amount, - "asset 1 outflow exceeds available balance" - ); - assert!( - output_data.lambda_2 <= available_asset_2.amount, - "asset 2 outflow exceeds available balance" - ); - // Fetch the swap execution object that should have been modified during the routing and filling. tracing::debug!( ?output_data, @@ -142,7 +115,8 @@ pub trait HandleBatchSwaps: StateWrite + Sized { ); Arc::get_mut(self) .expect("expected state to have no other refs") - .set_output_data(output_data, swap_execution_1_for_2, swap_execution_2_for_1); + .set_output_data(output_data, swap_execution_1_for_2, swap_execution_2_for_1) + .await?; Ok(()) } diff --git a/crates/core/component/dex/src/component/router/tests.rs b/crates/core/component/dex/src/component/router/tests.rs index 226974ba71..92081a87de 100644 --- a/crates/core/component/dex/src/component/router/tests.rs +++ b/crates/core/component/dex/src/component/router/tests.rs @@ -1015,7 +1015,9 @@ async fn best_position_route_and_fill() -> anyhow::Result<()> { // Set the batch swap flow for the trading pair. Arc::get_mut(&mut state) .unwrap() - .put_swap_flow(&trading_pair, swap_flow.clone()); + .put_swap_flow(&trading_pair, swap_flow.clone()) + .await + .unwrap(); let routing_params = state.routing_params().await.unwrap(); state .handle_batch_swaps(trading_pair, swap_flow, 0u32.into(), 0, routing_params) @@ -1154,7 +1156,9 @@ async fn multi_hop_route_and_fill() -> anyhow::Result<()> { // Set the batch swap flow for the trading pair. Arc::get_mut(&mut state) .unwrap() - .put_swap_flow(&trading_pair, swap_flow.clone()); + .put_swap_flow(&trading_pair, swap_flow.clone()) + .await + .unwrap(); let routing_params = state.routing_params().await.unwrap(); state .handle_batch_swaps(trading_pair, swap_flow, 0u32.into(), 0, routing_params) diff --git a/crates/core/component/dex/src/component/rpc.rs b/crates/core/component/dex/src/component/rpc.rs index 64aaccc2f4..ed3e6914d0 100644 --- a/crates/core/component/dex/src/component/rpc.rs +++ b/crates/core/component/dex/src/component/rpc.rs @@ -23,7 +23,7 @@ use penumbra_proto::{ DomainType, StateReadProto, }; -use crate::ExecutionCircuitBreaker; +use super::ExecutionCircuitBreaker; use crate::{ lp::position::{self, Position}, state_key, DirectedTradingPair, SwapExecution, TradingPair, diff --git a/crates/core/component/dex/src/component/tests.rs b/crates/core/component/dex/src/component/tests.rs index 3f7b727a7a..82451333f4 100644 --- a/crates/core/component/dex/src/component/tests.rs +++ b/crates/core/component/dex/src/component/tests.rs @@ -579,7 +579,9 @@ async fn swap_execution_tests() -> anyhow::Result<()> { // Set the batch swap flow for the trading pair. Arc::get_mut(&mut state) .unwrap() - .put_swap_flow(&trading_pair, swap_flow.clone()); + .put_swap_flow(&trading_pair, swap_flow.clone()) + .await + .unwrap(); let routing_params = state.routing_params().await.unwrap(); state .handle_batch_swaps(trading_pair, swap_flow, 0, 0, routing_params) @@ -685,7 +687,9 @@ async fn swap_execution_tests() -> anyhow::Result<()> { // Set the batch swap flow for the trading pair. Arc::get_mut(&mut state) .unwrap() - .put_swap_flow(&trading_pair, swap_flow.clone()); + .put_swap_flow(&trading_pair, swap_flow.clone()) + .await + .unwrap(); let routing_params = state.routing_params().await.unwrap(); state .handle_batch_swaps(trading_pair, swap_flow, 0u32.into(), 0, routing_params) diff --git a/crates/core/component/dex/src/lib.rs b/crates/core/component/dex/src/lib.rs index a7f77e9543..a330e0e971 100644 --- a/crates/core/component/dex/src/lib.rs +++ b/crates/core/component/dex/src/lib.rs @@ -8,13 +8,11 @@ pub mod genesis; pub mod state_key; mod batch_swap_output_data; -mod circuit_breaker; mod params; mod swap_execution; mod trading_pair; pub use batch_swap_output_data::BatchSwapOutputData; -pub(crate) use circuit_breaker::ExecutionCircuitBreaker; pub use params::DexParameters; pub use swap_execution::SwapExecution; pub use trading_pair::{DirectedTradingPair, DirectedUnitPair, TradingPair, TradingPairVar}; diff --git a/crates/core/component/dex/src/lp/position.rs b/crates/core/component/dex/src/lp/position.rs index b31878f2ab..e6aae9c636 100644 --- a/crates/core/component/dex/src/lp/position.rs +++ b/crates/core/component/dex/src/lp/position.rs @@ -1,5 +1,5 @@ use anyhow::{anyhow, Context}; -use penumbra_asset::asset; +use penumbra_asset::{asset, Value}; use penumbra_num::Amount; use penumbra_proto::{ penumbra::core::component::dex::v1 as pb, serializers::bech32str, DomainType, @@ -156,6 +156,22 @@ impl Position { None } } + + /// Returns the amount of reserves for asset 1. + pub fn reserves_1(&self) -> Value { + Value { + amount: self.reserves.r1, + asset_id: self.phi.pair.asset_1(), + } + } + + /// Returns the amount of reserves for asset 2. + pub fn reserves_2(&self) -> Value { + Value { + amount: self.reserves.r2, + asset_id: self.phi.pair.asset_2(), + } + } } /// A hash of a [`Position`]. diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 5d152fa96d..80f403ad7d 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -1,5 +1,7 @@ use std::string::String; +use penumbra_asset::asset; + use crate::{lp::position, DirectedTradingPair, TradingPair}; pub mod config { @@ -12,6 +14,10 @@ pub mod config { } } +pub fn value_balance(asset_id: &asset::Id) -> String { + format!("dex/value_balance/{asset_id}") +} + pub fn positions(trading_pair: &TradingPair, position_id: &str) -> String { format!("dex/positions/{trading_pair}/opened/{position_id}") }