From 2e7ead9e467e0f87c6b6bc88b5eb3c6c9a6bb5d8 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 4 Apr 2024 12:26:20 -0400 Subject: [PATCH 1/6] dex: add position counter state key --- crates/core/component/dex/src/state_key.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index dad76e571e..e73d88c6e1 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -84,6 +84,25 @@ pub(crate) mod internal { use super::*; use crate::lp::BareTradingFunction; + pub mod counter { + pub mod num_positions { + use crate::TradingPair; + + pub fn prefix() -> &'static str { + "dex/internal/counter/num_positions/" + } + + pub fn key(trading_pair: &TradingPair) -> [u8; 99] { + let mut key = [0u8; 99]; + let prefix_bytes = prefix().as_bytes(); + let canonical_pair_bytes = trading_pair.to_bytes(); + + key[0..35].copy_from_slice(&prefix_bytes); + key[35..99].copy_from_slice(&canonical_pair_bytes); + key + } + } + } /// Find assets with liquidity positions from asset `from`, ordered by price. pub mod routable_assets { use penumbra_asset::asset; From 147d5bd4f80b64f85c0d57cc5bd97d1ef6f4f927 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 4 Apr 2024 15:51:52 -0400 Subject: [PATCH 2/6] dex: update position counters at open/close --- .../component/action_handler/position/open.rs | 1 - .../core/component/dex/src/component/mod.rs | 2 +- .../dex/src/component/position_manager.rs | 29 +++++++++++++++++++ crates/core/component/dex/src/state_key.rs | 4 +-- 4 files changed, 32 insertions(+), 4 deletions(-) diff --git a/crates/core/component/dex/src/component/action_handler/position/open.rs b/crates/core/component/dex/src/component/action_handler/position/open.rs index cdc9dd2995..4f2445292b 100644 --- a/crates/core/component/dex/src/component/action_handler/position/open.rs +++ b/crates/core/component/dex/src/component/action_handler/position/open.rs @@ -13,7 +13,6 @@ use crate::{ impl ActionHandler for PositionOpen { type CheckStatelessContext = (); async fn check_stateless(&self, _context: ()) -> Result<()> { - // TODO(chris, erwan, henry): brainstorm safety on `TradingFunction`. // Check: // + reserves are at most 80 bits wide, // + the trading function coefficients are at most 80 bits wide. diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index 871bd67748..8a6459ee47 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -11,6 +11,7 @@ mod arb; pub(crate) mod circuit_breaker; mod dex; mod flow; +pub(crate) mod position_counter; pub(crate) mod position_manager; mod swap_manager; @@ -21,6 +22,5 @@ pub(crate) use circuit_breaker::ValueCircuitBreaker; pub use dex::{Dex, StateReadExt, StateWriteExt}; pub use position_manager::{PositionManager, PositionRead}; pub use swap_manager::SwapManager; - #[cfg(test)] pub(crate) mod tests; diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 1b27fb573f..069d6e0cdb 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -16,6 +16,7 @@ use crate::event; use crate::lp::position::State; use crate::lp::Reserves; use crate::{ + component::position_counter::PositionCounter, component::ValueCircuitBreaker, lp::position::{self, Position}, state_key, DirectedTradingPair, @@ -156,12 +157,26 @@ pub trait PositionManager: StateWrite + PositionRead { prev_state.state ); + // Skip state updates if the position is already closed: to keep the position counter + // accurate and skip unnecessary I/O. + if prev_state.state == position::State::Closed { + // A position could be closed multiple times e.g. it is queued for closure by the user + // and preemptively closed by the DEX engine during filling. + tracing::debug!( + ?id, + "position is already closed so we can skip state updates" + ); + return Ok(()); + } + let new_state = { let mut new_state = prev_state.clone(); new_state.state = position::State::Closed; new_state }; + self.decrement_position_counter(&new_state.phi.pair).await?; + self.update_position(Some(prev_state), new_state).await?; Ok(()) @@ -186,6 +201,17 @@ pub trait PositionManager: StateWrite + PositionRead { /// Opens a new position, updating all necessary indexes and checking for /// its nonexistence prior to being opened. + /// + /// # Errors + /// This method returns an error if the position is malformed + /// e.g. it is set to a state other than `Opened` + /// or, it specifies a position identifier already used by another position. + /// + /// An error can also occur if a DEX engine invariant is breached + /// e.g. overflowing the position counter (`u16::MAX`) + /// or, overflowing the value circuit breaker (`u128::MAX`) + /// + /// In any of those cases, we do not want to allow a new position to be opened. #[tracing::instrument(level = "debug", skip_all)] async fn open_position(&mut self, position: position::Position) -> Result<()> { // Double-check that the position is in the `Opened` state @@ -202,6 +228,9 @@ pub trait PositionManager: StateWrite + PositionRead { ); } + // Increase the position counter + self.increment_position_counter(&position.phi.pair).await?; + // Credit the DEX for the inflows from this position. self.vcb_credit(position.reserves_1()).await?; self.vcb_credit(position.reserves_2()).await?; diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index e73d88c6e1..05e3e85ee3 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -92,12 +92,12 @@ pub(crate) mod internal { "dex/internal/counter/num_positions/" } - pub fn key(trading_pair: &TradingPair) -> [u8; 99] { + pub fn by_trading_pair(trading_pair: &TradingPair) -> [u8; 99] { let mut key = [0u8; 99]; let prefix_bytes = prefix().as_bytes(); let canonical_pair_bytes = trading_pair.to_bytes(); - key[0..35].copy_from_slice(&prefix_bytes); + key[0..35].copy_from_slice(prefix_bytes); key[35..99].copy_from_slice(&canonical_pair_bytes); key } From 8148f1a2d597c0b8b33be1d99e9b6b9016fe1cab Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 4 Apr 2024 15:52:20 -0400 Subject: [PATCH 3/6] dex: implement internal position counter --- .../dex/src/component/position_counter.rs | 118 ++++++++++++++++++ 1 file changed, 118 insertions(+) create mode 100644 crates/core/component/dex/src/component/position_counter.rs diff --git a/crates/core/component/dex/src/component/position_counter.rs b/crates/core/component/dex/src/component/position_counter.rs new file mode 100644 index 0000000000..d197b3fc52 --- /dev/null +++ b/crates/core/component/dex/src/component/position_counter.rs @@ -0,0 +1,118 @@ +use anyhow::bail; +use async_trait::async_trait; +use cnidarium::StateWrite; + +use crate::state_key; +use crate::TradingPair; +use anyhow::Result; + +#[async_trait] +pub(crate) trait PositionCounter: StateWrite { + /// Returns the number of position for a [`TradingPair`]. + /// If there were no counter initialized for a given pair, this default to zero. + async fn get_position_count(&self, trading_pair: &TradingPair) -> u16 { + let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair); + self.get_position_count_from_key(path).await + } + + async fn get_position_count_from_key(&self, path: [u8; 99]) -> u16 { + let Some(raw_count) = self + .nonverifiable_get_raw(&path) + .await + .expect("no deserialization failure") + else { + return 0; + }; + + // This is safe because we only increment the counter via a [`Self::increase_position_counter`]. + let raw_count: [u8; 2] = raw_count + .try_into() + .expect("position counter is at most four bytes"); + u16::from_be_bytes(raw_count) + } + + /// Increment the number of position for a [`TradingPair`]. + /// Returns the updated total, or an error if overflow occurred. + async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result { + let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair); + let prev = self.get_position_count_from_key(path).await; + + let Some(new_total) = prev.checked_add(1) else { + bail!("incrementing position counter would overflow") + }; + self.nonverifiable_put_raw(path.to_vec(), new_total.to_be_bytes().to_vec()); + Ok(new_total) + } + + /// Decrement the number of positions for a [`TradingPair`], unless it would underflow. + /// Returns the updated total, or an error if underflow occurred. + async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result { + let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair); + let prev = self.get_position_count_from_key(path).await; + + let Some(new_total) = prev.checked_sub(1) else { + bail!("decrementing position counter would underflow") + }; + self.nonverifiable_put_raw(path.to_vec(), new_total.to_be_bytes().to_vec()); + Ok(new_total) + } +} +impl PositionCounter for T {} + +// For some reason, `rust-analyzer` is complaining about used imports. +// Silence the warnings until I find a fix. +#[allow(unused_imports)] +mod tests { + use cnidarium::{StateDelta, TempStorage}; + use penumbra_asset::{asset::REGISTRY, Value}; + + use crate::component::position_counter::PositionCounter; + use crate::TradingPair; + + #[tokio::test] + /// Test that we can detect overflows and that they are handled properly: increment is ignored / no crash. + async fn test_no_overflow() -> anyhow::Result<()> { + let asset_a = REGISTRY.parse_denom("upenumbra").unwrap().id(); + let asset_b = REGISTRY.parse_denom("pizza").unwrap().id(); + let trading_pair = TradingPair::new(asset_a, asset_b); + + let storage = TempStorage::new().await?; + let mut delta = StateDelta::new(storage.latest_snapshot()); + + for i in 0..u16::MAX { + let total = delta.increment_position_counter(&trading_pair).await?; + + anyhow::ensure!( + total == i + 1, + "the total amount should be total={}, found={total}", + i + 1 + ); + } + + assert!(delta + .increment_position_counter(&trading_pair) + .await + .is_err()); + assert_eq!(delta.get_position_count(&trading_pair).await, u16::MAX); + + Ok(()) + } + + #[tokio::test] + /// Test that we can detect underflow and that they are handled properly: decrement is ignored / no crash. + async fn test_no_underflow() -> anyhow::Result<()> { + let asset_a = REGISTRY.parse_denom("upenumbra").unwrap().id(); + let asset_b = REGISTRY.parse_denom("pizza").unwrap().id(); + let trading_pair = TradingPair::new(asset_a, asset_b); + + let storage = TempStorage::new().await?; + let mut delta = StateDelta::new(storage.latest_snapshot()); + + let maybe_total = delta.decrement_position_counter(&trading_pair).await; + assert!(maybe_total.is_err()); + + let counter = delta.get_position_count(&trading_pair).await; + assert_eq!(counter, 0u16); + Ok(()) + } +} From 331307504b0917dcf66aca9bc9e823b8587b7c1e Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Fri, 5 Apr 2024 14:42:45 -0400 Subject: [PATCH 4/6] dex: move decrement counter into `update_position` --- .../src/component/circuit_breaker/value.rs | 2 +- .../dex/src/component/position_manager.rs | 22 ++++++++++++++----- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/crates/core/component/dex/src/component/circuit_breaker/value.rs b/crates/core/component/dex/src/component/circuit_breaker/value.rs index 8f4b6396b2..1824eda1a6 100644 --- a/crates/core/component/dex/src/component/circuit_breaker/value.rs +++ b/crates/core/component/dex/src/component/circuit_breaker/value.rs @@ -227,7 +227,7 @@ mod tests { let position = buy_1; state_tx.index_position_by_price(&position, &position.id()); state_tx - .update_available_liquidity(&position, &None) + .update_available_liquidity(&None, &position) .await .expect("able to update liquidity"); state_tx.put(state_key::position_by_id(&id), position); diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 069d6e0cdb..0c0fd44d35 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -175,8 +175,6 @@ pub trait PositionManager: StateWrite + PositionRead { new_state }; - self.decrement_position_counter(&new_state.phi.pair).await?; - self.update_position(Some(prev_state), new_state).await?; Ok(()) @@ -393,6 +391,8 @@ pub(crate) trait Inner: StateWrite { prev_state: Option, new_state: Position, ) -> Result<()> { + use position::State::*; + tracing::debug!(?prev_state, ?new_state, "updating position state"); let id = new_state.id(); @@ -404,13 +404,25 @@ pub(crate) trait Inner: StateWrite { } // Only index the position's liquidity if it is active. - if new_state.state == position::State::Opened { + if new_state.state == Opened { self.index_position_by_price(&new_state, &id); } + if new_state.state == Closed { + // Make sure that we don't double decrement the position + // counter if a position was queued for closure AND closed + // by the DEX engine. + let is_already_closed = prev_state + .as_ref() + .map_or(false, |old_position| old_position.state == Closed); + if !is_already_closed { + self.decrement_position_counter(&new_state.phi.pair).await?; + } + } + // Update the available liquidity for this position's trading pair. // TODO: refactor and streamline this method while implementing eviction. - self.update_available_liquidity(&new_state, &prev_state) + self.update_available_liquidity(&prev_state, &new_state) .await?; self.put(state_key::position_by_id(&id), new_state); @@ -609,8 +621,8 @@ pub(crate) trait Inner: StateWrite { async fn update_available_liquidity( &mut self, - position: &Position, prev_position: &Option, + position: &Position, ) -> Result<()> { // Since swaps may be performed in either direction, the available liquidity indices // need to be calculated and stored for both the A -> B and B -> A directions. From f8ea7d82243f961f7e53d3fdb4362f0787d77c51 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Fri, 5 Apr 2024 14:57:23 -0400 Subject: [PATCH 5/6] dex: amend comment in `close_position_by_id` --- .../core/component/dex/src/component/position_manager.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 0c0fd44d35..c22947c376 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -157,11 +157,10 @@ pub trait PositionManager: StateWrite + PositionRead { prev_state.state ); - // Skip state updates if the position is already closed: to keep the position counter - // accurate and skip unnecessary I/O. + // Optimization: skip state update if the position is already closed. + // This can happen if the position was queued for closure and premptively + // closed by the DEX engine during execution (e.g. auto-closing). if prev_state.state == position::State::Closed { - // A position could be closed multiple times e.g. it is queued for closure by the user - // and preemptively closed by the DEX engine during filling. tracing::debug!( ?id, "position is already closed so we can skip state updates" From c4d2a317eaf2278197cd03930cb0c86eb3e9827c Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Fri, 5 Apr 2024 15:15:50 -0400 Subject: [PATCH 6/6] dex: fix comment in counter --- crates/core/component/dex/src/component/position_counter.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/core/component/dex/src/component/position_counter.rs b/crates/core/component/dex/src/component/position_counter.rs index d197b3fc52..ed4e755e46 100644 --- a/crates/core/component/dex/src/component/position_counter.rs +++ b/crates/core/component/dex/src/component/position_counter.rs @@ -27,7 +27,7 @@ pub(crate) trait PositionCounter: StateWrite { // This is safe because we only increment the counter via a [`Self::increase_position_counter`]. let raw_count: [u8; 2] = raw_count .try_into() - .expect("position counter is at most four bytes"); + .expect("position counter is at most two bytes"); u16::from_be_bytes(raw_count) }