From 312691a06fdc7d464752971946de265bb01059f6 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Wed, 10 Apr 2024 20:02:54 -0400 Subject: [PATCH 01/11] dex: break out engine idx into modules --- .../src/component/circuit_breaker/value.rs | 3 +- .../dex/src/component/position_manager.rs | 263 +----------------- .../position_manager/base_liquidity_index.rs | 174 ++++++++++++ .../counter.rs} | 17 +- .../position_manager/inventory_index.rs | 59 ++++ .../component/position_manager/price_index.rs | 51 ++++ .../dex/src/component/router/path.rs | 2 +- 7 files changed, 307 insertions(+), 262 deletions(-) create mode 100644 crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs rename crates/core/component/dex/src/component/{position_counter.rs => position_manager/counter.rs} (92%) create mode 100644 crates/core/component/dex/src/component/position_manager/inventory_index.rs create mode 100644 crates/core/component/dex/src/component/position_manager/price_index.rs diff --git a/crates/core/component/dex/src/component/circuit_breaker/value.rs b/crates/core/component/dex/src/component/circuit_breaker/value.rs index 1824eda1a6..78441249d3 100644 --- a/crates/core/component/dex/src/component/circuit_breaker/value.rs +++ b/crates/core/component/dex/src/component/circuit_breaker/value.rs @@ -47,7 +47,8 @@ impl ValueCircuitBreaker for T {} mod tests { use std::sync::Arc; - use crate::component::position_manager::Inner as _; + use crate::component::position_manager::base_liquidity_index::AssetByLiquidityIndex as _; + use crate::component::position_manager::price_index::PositionByPriceIndex; use crate::component::router::HandleBatchSwaps as _; use crate::component::{StateReadExt as _, StateWriteExt as _}; use crate::lp::plan::PositionWithdrawPlan; diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 0b51ae5718..9c0854190e 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -8,24 +8,29 @@ use cnidarium::{EscapedByteSlice, StateRead, StateWrite}; use futures::Stream; use futures::StreamExt; use penumbra_asset::{asset, Balance}; -use penumbra_num::Amount; use penumbra_proto::DomainType; use penumbra_proto::{StateReadProto, StateWriteProto}; -use crate::lp::position::State; +use crate::component::position_manager::{ + base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex, + price_index::PositionByPriceIndex, +}; use crate::lp::Reserves; use crate::{ component::position_counter::PositionCounter, component::ValueCircuitBreaker, lp::position::{self, Position}, state_key::engine, - state_key::eviction_queue, DirectedTradingPair, }; use crate::{event, state_key}; const DYNAMIC_ASSET_LIMIT: usize = 10; +pub(crate) mod base_liquidity_index; +pub(crate) mod inventory_index; +pub(crate) mod price_index; + #[async_trait] pub trait PositionRead: StateRead { /// Return a stream of all [`position::Metadata`] available. @@ -431,257 +436,5 @@ pub(crate) trait Inner: StateWrite { self.put(state_key::position_by_id(&id), new_state); Ok(()) } - - // TODO(erwan): break this out into a `position_manager::inventory_index` module. - fn index_position_by_inventory(&mut self, position: &position::Position, id: &position::Id) { - tracing::debug!("indexing position by inventory"); - let canonical_pair = position.phi.pair; - // A position is bound to an unordered trading pair: A <> B. - // We want to index the position by inventory for each direction: - // A -> B - let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); - let inventory_a = position - .reserves_for(pair_ab.start) - .expect("the directed trading pair is correct"); - let key_ab = eviction_queue::inventory_index::key(&pair_ab, inventory_a, id).to_vec(); - self.nonverifiable_put_raw(key_ab, vec![]); - - // B -> A - let pair_ba = pair_ab.flip(); - let inventory_b = position - .reserves_for(pair_ba.start) - .expect("the directed trading pair is correct"); - let key_ba = eviction_queue::inventory_index::key(&pair_ba, inventory_b, id).to_vec(); - self.nonverifiable_put_raw(key_ba, vec![]); - } - - fn deindex_position_by_inventory( - &mut self, - prev_position: &position::Position, - id: &position::Id, - ) { - let canonical_pair = prev_position.phi.pair; - - // To deindex the position, we need to reconstruct the tuple of keys - // that correspond to each direction of the trading pair: - // A -> B - let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); - let inventory_a = prev_position - .reserves_for(pair_ab.start) - .expect("the directed trading pair is correct"); - let key_ab = eviction_queue::inventory_index::key(&pair_ab, inventory_a, id).to_vec(); - self.nonverifiable_delete(key_ab); - - // B -> A - let pair_ba = pair_ab.flip(); - let inventory_b = prev_position - .reserves_for(pair_ba.start) - .expect("the directed trading pair is correct"); - let key_ba = eviction_queue::inventory_index::key(&pair_ba, inventory_b, id).to_vec(); - self.nonverifiable_delete(key_ba); - } - - fn index_position_by_price(&mut self, position: &position::Position, id: &position::Id) { - let (pair, phi) = (position.phi.pair, &position.phi); - if position.reserves.r2 != 0u64.into() { - // Index this position for trades FROM asset 1 TO asset 2, since the position has asset 2 to give out. - let pair12 = DirectedTradingPair { - start: pair.asset_1(), - end: pair.asset_2(), - }; - let phi12 = phi.component.clone(); - self.nonverifiable_put_raw(engine::price_index::key(&pair12, &phi12, &id), vec![]); - tracing::debug!("indexing position for 1=>2 trades"); - } - - if position.reserves.r1 != 0u64.into() { - // Index this position for trades FROM asset 2 TO asset 1, since the position has asset 1 to give out. - let pair21 = DirectedTradingPair { - start: pair.asset_2(), - end: pair.asset_1(), - }; - let phi21 = phi.component.flip(); - self.nonverifiable_put_raw(engine::price_index::key(&pair21, &phi21, &id), vec![]); - tracing::debug!("indexing position for 2=>1 trades"); - } - } - - fn deindex_position_by_price(&mut self, position: &Position, id: &position::Id) { - tracing::debug!("deindexing position"); - let pair12 = DirectedTradingPair { - start: position.phi.pair.asset_1(), - end: position.phi.pair.asset_2(), - }; - let phi12 = position.phi.component.clone(); - let pair21 = DirectedTradingPair { - start: position.phi.pair.asset_2(), - end: position.phi.pair.asset_1(), - }; - let phi21 = position.phi.component.flip(); - self.nonverifiable_delete(engine::price_index::key(&pair12, &phi12, &id)); - self.nonverifiable_delete(engine::price_index::key(&pair21, &phi21, &id)); - } - - /// Updates the nonverifiable liquidity indices given a [`Position`] in the direction specified by the [`DirectedTradingPair`]. - /// An [`Option`] may be specified to allow for the case where a position is being updated. - async fn update_liquidity_index( - &mut self, - pair: DirectedTradingPair, - position: &Position, - prev: &Option, - ) -> Result<()> { - tracing::debug!(?pair, "updating available liquidity indices"); - - let (new_a_from_b, current_a_from_b) = match (position.state, prev) { - (State::Opened, None) => { - // Add the new position's contribution to the index, no cancellation of the previous version necessary. - - // Query the current available liquidity for this trading pair, or zero if the trading pair - // has no current liquidity. - let current_a_from_b = self - .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) - .await? - .map(|bytes| { - Amount::from_be_bytes( - bytes - .try_into() - .expect("liquidity index amount can always be parsed"), - ) - }) - .unwrap_or_default(); - - // Use the new reserves to compute `new_position_contribution`, - // the amount of asset A contributed by the position (i.e. the reserves of asset A). - let new_position_contribution = position - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Compute `new_A_from_B`. - let new_a_from_b = - // Add the contribution from the updated version. - current_a_from_b.saturating_add(&new_position_contribution); - - tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, "newly opened position, adding contribution to existing available liquidity for trading pair"); - - (new_a_from_b, current_a_from_b) - } - (State::Opened, Some(prev)) => { - // Add the new position's contribution to the index, deleting the previous version's contribution. - - // Query the current available liquidity for this trading pair, or zero if the trading pair - // has no current liquidity. - let current_a_from_b = self - .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) - .await? - .map(|bytes| { - Amount::from_be_bytes( - bytes - .try_into() - .expect("liquidity index amount can always be parsed"), - ) - }) - .unwrap_or_default(); - - // Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1). - let prev_position_contribution = prev - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Use the new reserves to compute `new_position_contribution`, - // the amount of asset A contributed by the position (i.e. the reserves of asset A). - let new_position_contribution = position - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Compute `new_A_from_B`. - let new_a_from_b = - // Subtract the previous version of the position's contribution to represent that position no longer - // being correct, and add the contribution from the updated version. - (current_a_from_b.saturating_sub(&prev_position_contribution)).saturating_add(&new_position_contribution); - - tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, ?prev_position_contribution, "updated position, adding new contribution and subtracting previous contribution to existing available liquidity for trading pair"); - - (new_a_from_b, current_a_from_b) - } - (State::Closed, Some(prev)) => { - // Compute the previous contribution and erase it from the current index - - // Query the current available liquidity for this trading pair, or zero if the trading pair - // has no current liquidity. - let current_a_from_b = self - .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) - .await? - .map(|bytes| { - Amount::from_be_bytes( - bytes - .try_into() - .expect("liquidity index amount can always be parsed"), - ) - }) - .unwrap_or_default(); - - // Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1). - let prev_position_contribution = prev - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Compute `new_A_from_B`. - let new_a_from_b = - // Subtract the previous version of the position's contribution to represent that position no longer - // being correct, and since the updated version is Closed, it has no contribution. - current_a_from_b.saturating_sub(&prev_position_contribution); - - tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?prev_position_contribution, "closed position, subtracting previous contribution to existing available liquidity for trading pair"); - - (new_a_from_b, current_a_from_b) - } - (State::Withdrawn { .. }, _) | (State::Closed, None) => { - // The position already went through the `Closed` state or was opened in the `Closed` state, so its contribution has already been subtracted. - return Ok(()); - } - }; - - // Delete the existing key for this position if the reserve amount has changed. - if new_a_from_b != current_a_from_b { - self.nonverifiable_delete( - engine::routable_assets::key(&pair.start, current_a_from_b).to_vec(), - ); - } - - // Write the new key indicating that asset B is routable from asset A with `new_a_from_b` liquidity. - self.nonverifiable_put_raw( - engine::routable_assets::key(&pair.start, new_a_from_b).to_vec(), - pair.end.encode_to_vec(), - ); - tracing::debug!(start = ?pair.start, end = ?pair.end, "marking routable from start -> end"); - - // Write the new lookup index storing `new_a_from_b` for this trading pair. - self.nonverifiable_put_raw( - engine::routable_assets::a_from_b(&pair).to_vec(), - new_a_from_b.to_be_bytes().to_vec(), - ); - tracing::debug!(available_liquidity = ?new_a_from_b, ?pair, "marking available liquidity for trading pair"); - - Ok(()) - } - - async fn update_available_liquidity( - &mut self, - prev_position: &Option, - position: &Position, - ) -> Result<()> { - // Since swaps may be performed in either direction, the available liquidity indices - // need to be calculated and stored for both the A -> B and B -> A directions. - let (a, b) = (position.phi.pair.asset_1(), position.phi.pair.asset_2()); - - // A -> B - self.update_liquidity_index(DirectedTradingPair::new(a, b), position, prev_position) - .await?; - // B -> A - self.update_liquidity_index(DirectedTradingPair::new(b, a), position, prev_position) - .await?; - - Ok(()) - } } impl Inner for T {} diff --git a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs new file mode 100644 index 0000000000..8441ab061d --- /dev/null +++ b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs @@ -0,0 +1,174 @@ +use anyhow::Result; +use cnidarium::StateWrite; +use penumbra_num::Amount; + +use crate::lp::position::{Position, State}; +use crate::state_key::engine; +use crate::DirectedTradingPair; +use penumbra_proto::DomainType; + +pub(crate) trait AssetByLiquidityIndex: StateWrite { + /// Updates the nonverifiable liquidity indices given a [`Position`] in the direction specified by the [`DirectedTradingPair`]. + /// An [`Option`] may be specified to allow for the case where a position is being updated. + async fn update_liquidity_index( + &mut self, + pair: DirectedTradingPair, + position: &Position, + prev: &Option, + ) -> Result<()> { + tracing::debug!(?pair, "updating available liquidity indices"); + + let (new_a_from_b, current_a_from_b) = match (position.state, prev) { + (State::Opened, None) => { + // Add the new position's contribution to the index, no cancellation of the previous version necessary. + + // Query the current available liquidity for this trading pair, or zero if the trading pair + // has no current liquidity. + let current_a_from_b = self + .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) + .await? + .map(|bytes| { + Amount::from_be_bytes( + bytes + .try_into() + .expect("liquidity index amount can always be parsed"), + ) + }) + .unwrap_or_default(); + + // Use the new reserves to compute `new_position_contribution`, + // the amount of asset A contributed by the position (i.e. the reserves of asset A). + let new_position_contribution = position + .reserves_for(pair.start) + .expect("specified position should match provided trading pair"); + + // Compute `new_A_from_B`. + let new_a_from_b = + // Add the contribution from the updated version. + current_a_from_b.saturating_add(&new_position_contribution); + + tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, "newly opened position, adding contribution to existing available liquidity for trading pair"); + + (new_a_from_b, current_a_from_b) + } + (State::Opened, Some(prev)) => { + // Add the new position's contribution to the index, deleting the previous version's contribution. + + // Query the current available liquidity for this trading pair, or zero if the trading pair + // has no current liquidity. + let current_a_from_b = self + .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) + .await? + .map(|bytes| { + Amount::from_be_bytes( + bytes + .try_into() + .expect("liquidity index amount can always be parsed"), + ) + }) + .unwrap_or_default(); + + // Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1). + let prev_position_contribution = prev + .reserves_for(pair.start) + .expect("specified position should match provided trading pair"); + + // Use the new reserves to compute `new_position_contribution`, + // the amount of asset A contributed by the position (i.e. the reserves of asset A). + let new_position_contribution = position + .reserves_for(pair.start) + .expect("specified position should match provided trading pair"); + + // Compute `new_A_from_B`. + let new_a_from_b = + // Subtract the previous version of the position's contribution to represent that position no longer + // being correct, and add the contribution from the updated version. + (current_a_from_b.saturating_sub(&prev_position_contribution)).saturating_add(&new_position_contribution); + + tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, ?prev_position_contribution, "updated position, adding new contribution and subtracting previous contribution to existing available liquidity for trading pair"); + + (new_a_from_b, current_a_from_b) + } + (State::Closed, Some(prev)) => { + // Compute the previous contribution and erase it from the current index + + // Query the current available liquidity for this trading pair, or zero if the trading pair + // has no current liquidity. + let current_a_from_b = self + .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) + .await? + .map(|bytes| { + Amount::from_be_bytes( + bytes + .try_into() + .expect("liquidity index amount can always be parsed"), + ) + }) + .unwrap_or_default(); + + // Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1). + let prev_position_contribution = prev + .reserves_for(pair.start) + .expect("specified position should match provided trading pair"); + + // Compute `new_A_from_B`. + let new_a_from_b = + // Subtract the previous version of the position's contribution to represent that position no longer + // being correct, and since the updated version is Closed, it has no contribution. + current_a_from_b.saturating_sub(&prev_position_contribution); + + tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?prev_position_contribution, "closed position, subtracting previous contribution to existing available liquidity for trading pair"); + + (new_a_from_b, current_a_from_b) + } + (State::Withdrawn { .. }, _) | (State::Closed, None) => { + // The position already went through the `Closed` state or was opened in the `Closed` state, so its contribution has already been subtracted. + return Ok(()); + } + }; + + // Delete the existing key for this position if the reserve amount has changed. + if new_a_from_b != current_a_from_b { + self.nonverifiable_delete( + engine::routable_assets::key(&pair.start, current_a_from_b).to_vec(), + ); + } + + // Write the new key indicating that asset B is routable from asset A with `new_a_from_b` liquidity. + self.nonverifiable_put_raw( + engine::routable_assets::key(&pair.start, new_a_from_b).to_vec(), + pair.end.encode_to_vec(), + ); + tracing::debug!(start = ?pair.start, end = ?pair.end, "marking routable from start -> end"); + + // Write the new lookup index storing `new_a_from_b` for this trading pair. + self.nonverifiable_put_raw( + engine::routable_assets::a_from_b(&pair).to_vec(), + new_a_from_b.to_be_bytes().to_vec(), + ); + tracing::debug!(available_liquidity = ?new_a_from_b, ?pair, "marking available liquidity for trading pair"); + + Ok(()) + } + + async fn update_available_liquidity( + &mut self, + prev_position: &Option, + position: &Position, + ) -> Result<()> { + // Since swaps may be performed in either direction, the available liquidity indices + // need to be calculated and stored for both the A -> B and B -> A directions. + let (a, b) = (position.phi.pair.asset_1(), position.phi.pair.asset_2()); + + // A -> B + self.update_liquidity_index(DirectedTradingPair::new(a, b), position, prev_position) + .await?; + // B -> A + self.update_liquidity_index(DirectedTradingPair::new(b, a), position, prev_position) + .await?; + + Ok(()) + } +} + +impl AssetByLiquidityIndex for T {} diff --git a/crates/core/component/dex/src/component/position_counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs similarity index 92% rename from crates/core/component/dex/src/component/position_counter.rs rename to crates/core/component/dex/src/component/position_manager/counter.rs index 5ccec60cdf..e3ef313eee 100644 --- a/crates/core/component/dex/src/component/position_counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -1,13 +1,14 @@ -use anyhow::bail; +use anyhow::{bail, ensure}; use async_trait::async_trait; -use cnidarium::StateWrite; +use cnidarium::{StateRead, StateWrite}; +use crate::lp::position::{self, Position}; use crate::state_key::engine; use crate::TradingPair; use anyhow::Result; #[async_trait] -pub(crate) trait PositionCounter: StateWrite { +pub(crate) trait PositionCounterRead: StateRead { /// Returns the number of position for a [`TradingPair`]. /// If there were no counter initialized for a given pair, this default to zero. async fn get_position_count(&self, trading_pair: &TradingPair) -> u16 { @@ -24,12 +25,18 @@ pub(crate) trait PositionCounter: StateWrite { return 0; }; - // This is safe because we only increment the counter via a [`Self::increase_position_counter`]. + // This is safe because we only increment the counter via [`Self::increase_position_counter`]. let raw_count: [u8; 2] = raw_count .try_into() .expect("position counter is at most two bytes"); u16::from_be_bytes(raw_count) } +} + +impl PositionCounterRead for T {} + +#[async_trait] +pub(crate) trait PositionCounter: StateWrite { /// Increment the number of position for a [`TradingPair`]. /// Returns the updated total, or an error if overflow occurred. @@ -66,7 +73,7 @@ mod tests { use cnidarium::{StateDelta, TempStorage}; use penumbra_asset::{asset::REGISTRY, Value}; - use crate::component::position_counter::PositionCounter; + use crate::component::position_manager::counter::{PositionCounter, PositionCounterRead}; use crate::TradingPair; #[tokio::test] diff --git a/crates/core/component/dex/src/component/position_manager/inventory_index.rs b/crates/core/component/dex/src/component/position_manager/inventory_index.rs new file mode 100644 index 0000000000..b04d05ed8a --- /dev/null +++ b/crates/core/component/dex/src/component/position_manager/inventory_index.rs @@ -0,0 +1,59 @@ +use cnidarium::StateWrite; + +use crate::{ + lp::position::{self}, + state_key::eviction_queue, + DirectedTradingPair, +}; + +pub(crate) trait PositionByInventoryIndex: StateWrite { + fn index_position_by_inventory(&mut self, position: &position::Position, id: &position::Id) { + tracing::debug!("indexing position by inventory"); + let canonical_pair = position.phi.pair; + // A position is bound to an unordered trading pair: A <> B. + // We want to index the position by inventory for each direction: + // A -> B + let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); + let inventory_a = position + .reserves_for(pair_ab.start) + .expect("the directed trading pair is correct"); + let key_ab = eviction_queue::inventory_index::key(&pair_ab, inventory_a, id).to_vec(); + self.nonverifiable_put_raw(key_ab, vec![]); + + // B -> A + let pair_ba = pair_ab.flip(); + let inventory_b = position + .reserves_for(pair_ba.start) + .expect("the directed trading pair is correct"); + let key_ba = eviction_queue::inventory_index::key(&pair_ba, inventory_b, id).to_vec(); + self.nonverifiable_put_raw(key_ba, vec![]); + } + + fn deindex_position_by_inventory( + &mut self, + prev_position: &position::Position, + id: &position::Id, + ) { + let canonical_pair = prev_position.phi.pair; + + // To deindex the position, we need to reconstruct the tuple of keys + // that correspond to each direction of the trading pair: + // A -> B + let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); + let inventory_a = prev_position + .reserves_for(pair_ab.start) + .expect("the directed trading pair is correct"); + let key_ab = eviction_queue::inventory_index::key(&pair_ab, inventory_a, id).to_vec(); + self.nonverifiable_delete(key_ab); + + // B -> A + let pair_ba = pair_ab.flip(); + let inventory_b = prev_position + .reserves_for(pair_ba.start) + .expect("the directed trading pair is correct"); + let key_ba = eviction_queue::inventory_index::key(&pair_ba, inventory_b, id).to_vec(); + self.nonverifiable_delete(key_ba); + } +} + +impl PositionByInventoryIndex for T {} diff --git a/crates/core/component/dex/src/component/position_manager/price_index.rs b/crates/core/component/dex/src/component/position_manager/price_index.rs new file mode 100644 index 0000000000..cb55754c37 --- /dev/null +++ b/crates/core/component/dex/src/component/position_manager/price_index.rs @@ -0,0 +1,51 @@ +use cnidarium::StateWrite; + +use crate::{ + lp::position::{self, Position}, + state_key::engine, + DirectedTradingPair, +}; + +pub(crate) trait PositionByPriceIndex: StateWrite { + fn index_position_by_price(&mut self, position: &position::Position, id: &position::Id) { + let (pair, phi) = (position.phi.pair, &position.phi); + if position.reserves.r2 != 0u64.into() { + // Index this position for trades FROM asset 1 TO asset 2, since the position has asset 2 to give out. + let pair12 = DirectedTradingPair { + start: pair.asset_1(), + end: pair.asset_2(), + }; + let phi12 = phi.component.clone(); + self.nonverifiable_put_raw(engine::price_index::key(&pair12, &phi12, &id), vec![]); + tracing::debug!("indexing position for 1=>2 trades"); + } + + if position.reserves.r1 != 0u64.into() { + // Index this position for trades FROM asset 2 TO asset 1, since the position has asset 1 to give out. + let pair21 = DirectedTradingPair { + start: pair.asset_2(), + end: pair.asset_1(), + }; + let phi21 = phi.component.flip(); + self.nonverifiable_put_raw(engine::price_index::key(&pair21, &phi21, &id), vec![]); + tracing::debug!("indexing position for 2=>1 trades"); + } + } + + fn deindex_position_by_price(&mut self, position: &Position, id: &position::Id) { + tracing::debug!("deindexing position"); + let pair12 = DirectedTradingPair { + start: position.phi.pair.asset_1(), + end: position.phi.pair.asset_2(), + }; + let phi12 = position.phi.component.clone(); + let pair21 = DirectedTradingPair { + start: position.phi.pair.asset_2(), + end: position.phi.pair.asset_1(), + }; + let phi21 = position.phi.component.flip(); + self.nonverifiable_delete(engine::price_index::key(&pair12, &phi12, &id)); + self.nonverifiable_delete(engine::price_index::key(&pair21, &phi21, &id)); + } +} +impl PositionByPriceIndex for T {} diff --git a/crates/core/component/dex/src/component/router/path.rs b/crates/core/component/dex/src/component/router/path.rs index a9fd283590..bcf2b28ba4 100644 --- a/crates/core/component/dex/src/component/router/path.rs +++ b/crates/core/component/dex/src/component/router/path.rs @@ -73,7 +73,7 @@ impl Path { }; // Deindex the position we "consumed" in this and all descendant state forks, // ensuring we don't double-count liquidity while traversing cycles. - use super::super::position_manager::Inner as _; + use crate::component::position_manager::price_index::PositionByPriceIndex; self.state .deindex_position_by_price(&best_price_position, &best_price_position.id()); From 62adefebf6a6ac7ac036d42182643f5eac728b63 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Wed, 10 Apr 2024 21:55:37 -0400 Subject: [PATCH 02/11] dex: implement update index methods --- .../position_manager/base_liquidity_index.rs | 15 +++++----- .../src/component/position_manager/counter.rs | 28 +++++++++++++++++++ .../position_manager/inventory_index.rs | 24 +++++++++++++++- .../component/position_manager/price_index.rs | 22 +++++++++++++++ 4 files changed, 81 insertions(+), 8 deletions(-) diff --git a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs index 8441ab061d..de42b5f637 100644 --- a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs +++ b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs @@ -2,7 +2,7 @@ use anyhow::Result; use cnidarium::StateWrite; use penumbra_num::Amount; -use crate::lp::position::{Position, State}; +use crate::lp::position::{self, Position, State}; use crate::state_key::engine; use crate::DirectedTradingPair; use penumbra_proto::DomainType; @@ -151,20 +151,21 @@ pub(crate) trait AssetByLiquidityIndex: StateWrite { Ok(()) } - async fn update_available_liquidity( + async fn update_asset_by_base_liquidity_index( &mut self, - prev_position: &Option, - position: &Position, + prev_state: &Option, + new_state: &Position, + _id: &position::Id, ) -> Result<()> { // Since swaps may be performed in either direction, the available liquidity indices // need to be calculated and stored for both the A -> B and B -> A directions. - let (a, b) = (position.phi.pair.asset_1(), position.phi.pair.asset_2()); + let (a, b) = (new_state.phi.pair.asset_1(), new_state.phi.pair.asset_2()); // A -> B - self.update_liquidity_index(DirectedTradingPair::new(a, b), position, prev_position) + self.update_liquidity_index(DirectedTradingPair::new(a, b), new_state, prev_state) .await?; // B -> A - self.update_liquidity_index(DirectedTradingPair::new(b, a), position, prev_position) + self.update_liquidity_index(DirectedTradingPair::new(b, a), new_state, prev_state) .await?; Ok(()) diff --git a/crates/core/component/dex/src/component/position_manager/counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs index e3ef313eee..fc62a59e80 100644 --- a/crates/core/component/dex/src/component/position_manager/counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -37,6 +37,34 @@ impl PositionCounterRead for T {} #[async_trait] pub(crate) trait PositionCounter: StateWrite { + async fn update_trading_pair_position_counter( + &mut self, + prev_state: &Option, + new_state: &Position, + _id: &position::Id, + ) -> Result<()> { + use position::State::*; + let trading_pair = new_state.phi.pair; + + match prev_state { + Some(prev_state) => match (prev_state.state, new_state.state) { + (Opened, Closed) => { + let _ = self.decrement_position_counter(&trading_pair).await?; + } + _ => { /* no-op */ } + }, + + None => { + ensure!( + matches!(new_state.state, Opened), + "fresh position must start in the `Opened` state, found: {:?}", + new_state.state + ); + let _ = self.increment_position_counter(&trading_pair).await?; + } + } + Ok(()) + } /// Increment the number of position for a [`TradingPair`]. /// Returns the updated total, or an error if overflow occurred. diff --git a/crates/core/component/dex/src/component/position_manager/inventory_index.rs b/crates/core/component/dex/src/component/position_manager/inventory_index.rs index b04d05ed8a..9cad16e159 100644 --- a/crates/core/component/dex/src/component/position_manager/inventory_index.rs +++ b/crates/core/component/dex/src/component/position_manager/inventory_index.rs @@ -1,12 +1,34 @@ use cnidarium::StateWrite; use crate::{ - lp::position::{self}, + lp::position::{self, Position}, state_key::eviction_queue, DirectedTradingPair, }; +use anyhow::Result; +use position::State::*; + pub(crate) trait PositionByInventoryIndex: StateWrite { + fn update_position_by_inventory_index( + &mut self, + prev_state: &Option, + new_state: &Position, + position_id: &position::Id, + ) -> Result<()> { + // Clear an existing index of the position, since changes to the + // reserves or the position state might have invalidated it. + if let Some(prev_lp) = prev_state { + self.deindex_position_by_inventory(prev_lp, position_id); + } + + if matches!(new_state.state, Opened) { + self.index_position_by_inventory(new_state, position_id); + } + + Ok(()) + } + fn index_position_by_inventory(&mut self, position: &position::Position, id: &position::Id) { tracing::debug!("indexing position by inventory"); let canonical_pair = position.phi.pair; diff --git a/crates/core/component/dex/src/component/position_manager/price_index.rs b/crates/core/component/dex/src/component/position_manager/price_index.rs index cb55754c37..c40dfc4f83 100644 --- a/crates/core/component/dex/src/component/position_manager/price_index.rs +++ b/crates/core/component/dex/src/component/position_manager/price_index.rs @@ -6,7 +6,29 @@ use crate::{ DirectedTradingPair, }; +use anyhow::Result; +use position::State::*; + pub(crate) trait PositionByPriceIndex: StateWrite { + fn update_position_by_price_index( + &mut self, + prev_state: &Option, + new_state: &Position, + position_id: &position::Id, + ) -> Result<()> { + // Clear an existing index of the position, since changes to the + // reserves or the position state might have invalidated it. + if let Some(prev_lp) = prev_state { + self.deindex_position_by_price(prev_lp, position_id); + } + + if matches!(new_state.state, Opened) { + self.index_position_by_price(new_state, position_id); + } + + Ok(()) + } + fn index_position_by_price(&mut self, position: &position::Position, id: &position::Id) { let (pair, phi) = (position.phi.pair, &position.phi); if position.reserves.r2 != 0u64.into() { From ffbd9a0dcb2de25db6720fccd6ae313dc3a3aab6 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Wed, 10 Apr 2024 21:57:42 -0400 Subject: [PATCH 03/11] dex: make `update_position` use update index methods --- .../dex/src/component/position_manager.rs | 43 ++++++------------- 1 file changed, 14 insertions(+), 29 deletions(-) diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 9c0854190e..6d6ebdc7c8 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -1,7 +1,7 @@ use std::future; use std::{pin::Pin, sync::Arc}; -use anyhow::Result; +use anyhow::{bail, ensure, Result}; use async_stream::try_stream; use async_trait::async_trait; use cnidarium::{EscapedByteSlice, StateRead, StateWrite}; @@ -17,7 +17,7 @@ use crate::component::position_manager::{ }; use crate::lp::Reserves; use crate::{ - component::position_counter::PositionCounter, + component::position_manager::counter::PositionCounter, component::ValueCircuitBreaker, lp::position::{self, Position}, state_key::engine, @@ -28,6 +28,7 @@ use crate::{event, state_key}; const DYNAMIC_ASSET_LIMIT: usize = 10; pub(crate) mod base_liquidity_index; +pub(crate) mod counter; pub(crate) mod inventory_index; pub(crate) mod price_index; @@ -397,43 +398,27 @@ pub(crate) trait Inner: StateWrite { prev_state: Option, new_state: Position, ) -> Result<()> { - use position::State::*; - tracing::debug!(?prev_state, ?new_state, "updating position state"); let id = new_state.id(); - // Clear any existing indexes of the position, since changes to the - // reserves or the position state might have invalidated them. - if let Some(prev_state) = prev_state.as_ref() { - self.deindex_position_by_price(&prev_state, &id); - self.deindex_position_by_inventory(&prev_state, &id); } - // Only index the position's liquidity if it is active. - if new_state.state == Opened { - self.index_position_by_price(&new_state, &id); - self.index_position_by_inventory(&new_state, &id); - } + // Update the DEX engine indices: + self.update_position_by_price_index(&prev_state, &new_state, &id)?; + self.update_position_by_inventory_index(&prev_state, &new_state, &id)?; + self.update_asset_by_base_liquidity_index(&prev_state, &new_state, &id) + .await?; + self.update_trading_pair_position_counter(&prev_state, &new_state, &id) + .await?; + + self.put(state_key::position_by_id(&id), new_state); + Ok(()) + } - if new_state.state == Closed { - // Make sure that we don't double decrement the position - // counter if a position was queued for closure AND closed - // by the DEX engine. - let is_already_closed = prev_state - .as_ref() - .map_or(false, |old_position| old_position.state == Closed); - if !is_already_closed { - self.decrement_position_counter(&new_state.phi.pair).await?; } } - // Update the available liquidity for this position's trading pair. - // TODO: refactor and streamline this method while implementing eviction. - self.update_available_liquidity(&prev_state, &new_state) - .await?; - - self.put(state_key::position_by_id(&id), new_state); Ok(()) } } From 902463415ef58be5ae17497c005118e6b53c7d97 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Wed, 10 Apr 2024 22:01:21 -0400 Subject: [PATCH 04/11] dex: DiD guard against bad position transitions --- .../dex/src/component/position_manager.rs | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 6d6ebdc7c8..dd513c89e0 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -402,7 +402,8 @@ pub(crate) trait Inner: StateWrite { let id = new_state.id(); - } + // Assert `update_position` state transitions invariants: + Self::guard_invalid_transitions(&prev_state, &new_state, &id)?; // Update the DEX engine indices: self.update_position_by_price_index(&prev_state, &new_state, &id)?; @@ -416,7 +417,44 @@ pub(crate) trait Inner: StateWrite { Ok(()) } + fn guard_invalid_transitions( + prev_state: &Option, + new_state: &Position, + id: &position::Id, + ) -> Result<()> { + use position::State::*; + + if let Some(prev_lp) = prev_state { + tracing::debug!(?id, prev = ?prev_lp.state, new = ?new_state.state, "evaluating state transition"); + match (prev_lp.state, new_state.state) { + (Opened, Opened) => {} + (Opened, Closed) => {} + (Closed, Closed) => { /* no-op but allowed */ } + (Closed, Withdrawn { sequence }) => { + ensure!( + sequence == 0, + "withdrawn positions must have their sequence start at zero (found: {})", + sequence + ); + } + (Withdrawn { sequence: old_seq }, Withdrawn { sequence: new_seq }) => { + let expected_seq = old_seq.saturating_add(1); + ensure!( + new_seq == expected_seq, + "withdrawn must increase 1-by-1 (old: {}, new: {}, expected: {})", + old_seq, + new_seq, + expected_seq + ); + } + _ => bail!("invalid transition"), } + } else { + ensure!( + matches!(new_state.state, Opened), + "fresh positions MUST start in the `Opened` state (found: {:?})", + new_state.state + ); } Ok(()) From aa0d66b71a815e3073a40b82d19a3e41974361e4 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Wed, 10 Apr 2024 22:02:18 -0400 Subject: [PATCH 05/11] dex: don't expose dex engine internals --- .../component/dex/src/component/circuit_breaker/value.rs | 2 +- crates/core/component/dex/src/component/mod.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/core/component/dex/src/component/circuit_breaker/value.rs b/crates/core/component/dex/src/component/circuit_breaker/value.rs index 78441249d3..1ea141d5d4 100644 --- a/crates/core/component/dex/src/component/circuit_breaker/value.rs +++ b/crates/core/component/dex/src/component/circuit_breaker/value.rs @@ -228,7 +228,7 @@ mod tests { let position = buy_1; state_tx.index_position_by_price(&position, &position.id()); state_tx - .update_available_liquidity(&None, &position) + .update_asset_by_base_liquidity_index(&None, &position, &position.id()) .await .expect("able to update liquidity"); state_tx.put(state_key::position_by_id(&id), position); diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index 8a6459ee47..6e36ea8ead 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -11,16 +11,16 @@ mod arb; pub(crate) mod circuit_breaker; mod dex; mod flow; -pub(crate) mod position_counter; -pub(crate) mod position_manager; +mod position_manager; mod swap_manager; pub use self::metrics::register_metrics; -pub use arb::Arbitrage; +pub(crate) use arb::Arbitrage; pub use circuit_breaker::ExecutionCircuitBreaker; pub(crate) use circuit_breaker::ValueCircuitBreaker; pub use dex::{Dex, StateReadExt, StateWriteExt}; -pub use position_manager::{PositionManager, PositionRead}; +pub(crate) use position_manager::PositionManager; +pub use position_manager::PositionRead; pub use swap_manager::SwapManager; #[cfg(test)] pub(crate) mod tests; From 4e150c1f1f3abde1dc3bed86ce549ea9143e9474 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Wed, 10 Apr 2024 22:42:11 -0400 Subject: [PATCH 06/11] dex: refactor base liquidity index --- .../dex/src/component/position_manager.rs | 4 +- .../position_manager/base_liquidity_index.rs | 290 +++++++++--------- crates/core/component/dex/src/state_key.rs | 43 ++- 3 files changed, 169 insertions(+), 168 deletions(-) diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index dd513c89e0..30f3693a08 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -124,9 +124,9 @@ pub trait PositionRead: StateRead { /// Returns a stream of [`asset::Id`] routable from a given asset, ordered by liquidity. fn ordered_routable_assets( &self, - from: &asset::Id, + start: &asset::Id, ) -> Pin> + Send + 'static>> { - let prefix = engine::routable_assets::prefix(from); + let prefix = engine::routable_assets::starting_from(start); tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for routable assets by liquidity"); self.nonverifiable_prefix_raw(&prefix) .map(|entry| match entry { diff --git a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs index de42b5f637..42ab912703 100644 --- a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs +++ b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs @@ -1,175 +1,167 @@ use anyhow::Result; use cnidarium::StateWrite; use penumbra_num::Amount; +use position::State::*; use crate::lp::position::{self, Position, State}; use crate::state_key::engine; use crate::DirectedTradingPair; -use penumbra_proto::DomainType; +use penumbra_proto::{StateReadProto, StateWriteProto}; pub(crate) trait AssetByLiquidityIndex: StateWrite { - /// Updates the nonverifiable liquidity indices given a [`Position`] in the direction specified by the [`DirectedTradingPair`]. - /// An [`Option`] may be specified to allow for the case where a position is being updated. - async fn update_liquidity_index( + /// Update the base liquidity index, used by the DEX engine during path search. + /// + /// # Overview + /// Given a directed trading pair `A -> B`, the index tracks the amount of + /// liquidity available to convert the quote asset B, into a base asset A. + /// + /// # Index schema + /// The liquidity index schema is as follow: + /// - A primary index that maps a "start" asset A (aka. base asset) + /// to an "end" asset B (aka. quote asset) ordered by the amount of + /// liquidity available for B -> A (not a typo). + /// - An auxilliary index that maps a directed trading pair `A -> B` + /// to the aggregate liquidity for B -> A (used in the primary composite key) + /// + /// # Diagram + /// + /// Liquidity index: + /// For an asset `A`, surface asset + /// `B` with the best liquidity + /// score. + /// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ + /// + /// ┌──┐ ▼ ┌─────────┐ │ + /// ▲ │ │ ┌──────────────────┐ │ │ + /// │ │ ─┼───▶│{asset_A}{agg_liq}│──▶│{asset_B}│ │ + /// │ ├──┤ └──────────────────┘ │ │ + /// sorted │ │ └─────────┘ │ + /// by agg │ │ + /// liq ├──┤ │ + /// │ │ │ used in the + /// │ ├──┤ composite + /// │ │ │ key + /// │ │ │ Auxiliary look-up index: │ + /// │ │ │ "Find the aggregate liquidity + /// │ │ │ per directed trading pair" │ + /// │ │ │ ┌───────┐ ┌─────────┐ + /// │ │ │ ├───────┤ ┌──────────────────┐ │ │ + /// │ │ │ │ ────┼─▶│{asset_A}{asset_B}│────▶│{agg_liq}│ + /// │ ├──┤ ├───────┤ └──────────────────┘ │ │ + /// │ │ │ ├───────┤ └─────────┘ + /// │ │ │ ├───────┤ + /// │ │ │ ├───────┤ + /// │ ├──┤ └───────┘ + /// │ │ │ + /// │ │ │ + /// │ └──┘ + async fn update_asset_by_base_liquidity_index( &mut self, - pair: DirectedTradingPair, - position: &Position, - prev: &Option, + prev_state: &Option, + new_state: &Position, + id: &position::Id, ) -> Result<()> { - tracing::debug!(?pair, "updating available liquidity indices"); - - let (new_a_from_b, current_a_from_b) = match (position.state, prev) { - (State::Opened, None) => { - // Add the new position's contribution to the index, no cancellation of the previous version necessary. - - // Query the current available liquidity for this trading pair, or zero if the trading pair - // has no current liquidity. - let current_a_from_b = self - .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) - .await? - .map(|bytes| { - Amount::from_be_bytes( - bytes - .try_into() - .expect("liquidity index amount can always be parsed"), - ) - }) - .unwrap_or_default(); - - // Use the new reserves to compute `new_position_contribution`, - // the amount of asset A contributed by the position (i.e. the reserves of asset A). - let new_position_contribution = position - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Compute `new_A_from_B`. - let new_a_from_b = - // Add the contribution from the updated version. - current_a_from_b.saturating_add(&new_position_contribution); - - tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, "newly opened position, adding contribution to existing available liquidity for trading pair"); - - (new_a_from_b, current_a_from_b) - } - (State::Opened, Some(prev)) => { - // Add the new position's contribution to the index, deleting the previous version's contribution. - - // Query the current available liquidity for this trading pair, or zero if the trading pair - // has no current liquidity. - let current_a_from_b = self - .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) - .await? - .map(|bytes| { - Amount::from_be_bytes( - bytes - .try_into() - .expect("liquidity index amount can always be parsed"), - ) - }) - .unwrap_or_default(); - - // Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1). - let prev_position_contribution = prev - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Use the new reserves to compute `new_position_contribution`, - // the amount of asset A contributed by the position (i.e. the reserves of asset A). - let new_position_contribution = position - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Compute `new_A_from_B`. - let new_a_from_b = - // Subtract the previous version of the position's contribution to represent that position no longer - // being correct, and add the contribution from the updated version. - (current_a_from_b.saturating_sub(&prev_position_contribution)).saturating_add(&new_position_contribution); - - tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, ?prev_position_contribution, "updated position, adding new contribution and subtracting previous contribution to existing available liquidity for trading pair"); - - (new_a_from_b, current_a_from_b) - } - (State::Closed, Some(prev)) => { - // Compute the previous contribution and erase it from the current index - - // Query the current available liquidity for this trading pair, or zero if the trading pair - // has no current liquidity. - let current_a_from_b = self - .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) - .await? - .map(|bytes| { - Amount::from_be_bytes( - bytes - .try_into() - .expect("liquidity index amount can always be parsed"), - ) - }) - .unwrap_or_default(); - - // Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1). - let prev_position_contribution = prev - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Compute `new_A_from_B`. - let new_a_from_b = - // Subtract the previous version of the position's contribution to represent that position no longer - // being correct, and since the updated version is Closed, it has no contribution. - current_a_from_b.saturating_sub(&prev_position_contribution); - - tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?prev_position_contribution, "closed position, subtracting previous contribution to existing available liquidity for trading pair"); - - (new_a_from_b, current_a_from_b) - } - (State::Withdrawn { .. }, _) | (State::Closed, None) => { - // The position already went through the `Closed` state or was opened in the `Closed` state, so its contribution has already been subtracted. - return Ok(()); - } - }; - - // Delete the existing key for this position if the reserve amount has changed. - if new_a_from_b != current_a_from_b { - self.nonverifiable_delete( - engine::routable_assets::key(&pair.start, current_a_from_b).to_vec(), - ); + match prev_state { + Some(prev_state) => match (prev_state.state, new_state.state) { + // We only want to update the index when we process active positions. + (Opened, Closed) => {} + (Opened, Opened) => {} + _ => return Ok(()), + }, + None => {} } - // Write the new key indicating that asset B is routable from asset A with `new_a_from_b` liquidity. - self.nonverifiable_put_raw( - engine::routable_assets::key(&pair.start, new_a_from_b).to_vec(), - pair.end.encode_to_vec(), - ); - tracing::debug!(start = ?pair.start, end = ?pair.end, "marking routable from start -> end"); + let canonical_pair = new_state.phi.pair; + let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); - // Write the new lookup index storing `new_a_from_b` for this trading pair. - self.nonverifiable_put_raw( - engine::routable_assets::a_from_b(&pair).to_vec(), - new_a_from_b.to_be_bytes().to_vec(), - ); - tracing::debug!(available_liquidity = ?new_a_from_b, ?pair, "marking available liquidity for trading pair"); + let (prev_a, prev_b) = prev_state + .as_ref() + .map(|p| { + ( + p.reserves_for(pair_ab.start).expect("asset ids match"), + p.reserves_for(pair_ab.end).expect("asset ids match"), + ) + }) + .unwrap_or_else(|| (Amount::zero(), Amount::zero())); + + // A -> B + self.update_asset_by_base_liquidity_index_inner(id, pair_ab, prev_a, new_state) + .await?; + // B -> A + self.update_asset_by_base_liquidity_index_inner(id, pair_ab.flip(), prev_b, new_state) + .await?; Ok(()) } +} - async fn update_asset_by_base_liquidity_index( +impl AssetByLiquidityIndex for T {} + +trait Inner: StateWrite { + async fn update_asset_by_base_liquidity_index_inner( &mut self, - prev_state: &Option, - new_state: &Position, - _id: &position::Id, + id: &position::Id, + pair: DirectedTradingPair, + old_contrib: Amount, + new_position: &Position, ) -> Result<()> { - // Since swaps may be performed in either direction, the available liquidity indices - // need to be calculated and stored for both the A -> B and B -> A directions. - let (a, b) = (new_state.phi.pair.asset_1(), new_state.phi.pair.asset_2()); + let aggregate_key = &engine::routable_assets::lookup_base_liquidity_by_pair(&pair); + + let prev_tally: Amount = self + .nonverifiable_get(aggregate_key) + .await? + .unwrap_or_default(); + + // The previous contribution for this position is supplied to us by + // the caller. This default to zero if the position was just created. + // We use this to compute a view of the tally that excludes the position + // we are currently processing (and avoid double-counting). + let old_contrib = old_contrib; + + // The updated contribution is the total amount of base asset routable + // from an adjacent asset. + let new_contrib = new_position + .reserves_for(pair.start) + .expect("asset ids should match"); + + let new_tally = match new_position.state { + State::Opened => prev_tally + .saturating_sub(&old_contrib) + .saturating_add(&new_contrib), + State::Closed => prev_tally.saturating_sub(&old_contrib), + _ => unreachable!("inner impl is guarded"), + }; - // A -> B - self.update_liquidity_index(DirectedTradingPair::new(a, b), new_state, prev_state) - .await?; - // B -> A - self.update_liquidity_index(DirectedTradingPair::new(b, a), new_state, prev_state) - .await?; + // If the update operation is a no-op, we can skip the update + // and return early. + if prev_tally == new_tally { + tracing::debug!( + ?prev_tally, + ?pair, + ?id, + "skipping routable asset index update" + ); + return Ok(()); + } + + // Update the primary and auxiliary indices: + let old_primary_key = engine::routable_assets::key(&pair.start, prev_tally).to_vec(); + // This could make the `StateDelta` more expensive to scan, but this doesn't show on profiles yet. + self.nonverifiable_delete(old_primary_key); + + let new_primary_key = engine::routable_assets::key(&pair.start, new_tally).to_vec(); + self.nonverifiable_put(new_primary_key, pair.end); + tracing::debug!(?pair, ?new_tally, "base liquidity entry updated"); + + let auxiliary_key = engine::routable_assets::lookup_base_liquidity_by_pair(&pair).to_vec(); + self.nonverifiable_put(auxiliary_key, new_tally); + tracing::debug!( + ?pair, + "base liquidity heuristic marked directed pair as routable" + ); Ok(()) } } -impl AssetByLiquidityIndex for T {} +impl Inner for T {} diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 4b2646c3dd..7cf1aac337 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -103,52 +103,61 @@ pub(crate) mod engine { } } - /// Find assets with liquidity positions from asset `from`, ordered by price. pub(crate) mod routable_assets { use penumbra_asset::asset; use penumbra_num::Amount; use super::*; - /// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`. - /// `a_from_b` represents the amount of `A` that can be bought with `B`. - /// The prefix query includes only the `A` portion, meaning the keys will be returned in order of liquidity. - pub(crate) fn prefix(from: &asset::Id) -> [u8; 39] { + /// A prefix key that takes a start asset `A` (aka. base asset) and surface adjacent + /// assets `B` (aka. quote asset), in ascending order of the base liquidity available. + /// + /// # Encoding + /// The prefix key is encoded as `domain || A`. + pub(crate) fn starting_from(from: &asset::Id) -> [u8; 39] { let mut key = [0u8; 39]; key[0..7].copy_from_slice(b"dex/ra/"); - key[7..7 + 32].copy_from_slice(&from.to_bytes()); + key[7..39].copy_from_slice(&from.to_bytes()); key } - /// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`. - /// `a_from_b` represents the amount of `A` that can be bought with `B`. + /// An entry in the routable asset index that implements the mapping between + /// a base asset `A` and a quote asset `B`, based on the aggregate liquidity + /// available to route from `B` to `A` (aka. the base liquidity). + /// + /// # Encoding + /// The full key is encoded as: `prefix || BE(aggregate_base_liquidity)` pub(crate) fn key(from: &asset::Id, a_from_b: Amount) -> [u8; 55] { let mut key = [0u8; 55]; key[0..7].copy_from_slice(b"dex/ra/"); - key[7..32 + 7].copy_from_slice(&from.to_bytes()); - key[32 + 7..32 + 7 + 16].copy_from_slice(&a_from_b.to_be_bytes()); + key[7..39].copy_from_slice(&from.to_bytes()); + key[39..55].copy_from_slice(&a_from_b.to_be_bytes()); key } - /// `(A, B) => A_from_B` this will encode the current amount of `A` tradable into `B` for every directly routable trading pair. - /// This index can be used to determine the key values for the [`super::key`] ordered index to perform updates efficiently. - pub(crate) fn a_from_b(pair: &DirectedTradingPair) -> [u8; 71] { + /// A lookup index used to reconstruct and update the primary index entries. + /// It maps a directed trading pair `A -> B` to the current base liquidity available. + /// + /// # Encoding + /// The lookup key is encoded as `prefix_lookup || start_asset|| end_asset`. + pub(crate) fn lookup_base_liquidity_by_pair(pair: &DirectedTradingPair) -> [u8; 71] { let mut key = [0u8; 71]; key[0..7].copy_from_slice(b"dex/ab/"); - key[7..7 + 32].copy_from_slice(&pair.start.to_bytes()); - key[7 + 32..7 + 32 + 32].copy_from_slice(&pair.end.to_bytes()); + key[7..39].copy_from_slice(&pair.start.to_bytes()); + key[39..71].copy_from_slice(&pair.end.to_bytes()); key } } pub(crate) mod price_index { + use super::*; pub(crate) fn prefix(pair: &DirectedTradingPair) -> [u8; 71] { let mut key = [0u8; 71]; key[0..7].copy_from_slice(b"dex/pi/"); - key[7..7 + 32].copy_from_slice(&pair.start.to_bytes()); - key[7 + 32..7 + 32 + 32].copy_from_slice(&pair.end.to_bytes()); + key[7..39].copy_from_slice(&pair.start.to_bytes()); + key[39..71].copy_from_slice(&pair.end.to_bytes()); key } From 61d65cd64df1af0885e186b666bf9515eeaafc2d Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 11 Apr 2024 09:51:19 -0400 Subject: [PATCH 07/11] dex(position_manager): restrict interface to internal indices --- .../src/component/circuit_breaker/value.rs | 4 +- .../dex/src/component/position_manager.rs | 2 +- .../position_manager/base_liquidity_index.rs | 2 +- .../src/component/position_manager/counter.rs | 9 +---- .../position_manager/inventory_index.rs | 9 +++-- .../component/position_manager/price_index.rs | 40 ++++++++++--------- 6 files changed, 35 insertions(+), 31 deletions(-) diff --git a/crates/core/component/dex/src/component/circuit_breaker/value.rs b/crates/core/component/dex/src/component/circuit_breaker/value.rs index 1ea141d5d4..3ccccdad6e 100644 --- a/crates/core/component/dex/src/component/circuit_breaker/value.rs +++ b/crates/core/component/dex/src/component/circuit_breaker/value.rs @@ -226,7 +226,9 @@ mod tests { let id = buy_1.id(); let position = buy_1; - state_tx.index_position_by_price(&position, &position.id()); + state_tx + .update_position_by_price_index(&None, &position, &position.id()) + .expect("can update price index"); state_tx .update_asset_by_base_liquidity_index(&None, &position, &position.id()) .await diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 30f3693a08..48a5344118 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -387,7 +387,7 @@ pub trait PositionManager: StateWrite + PositionRead { impl PositionManager for T {} #[async_trait] -pub(crate) trait Inner: StateWrite { +trait Inner: StateWrite { /// Writes a position to the state, updating all necessary indexes. /// /// This should be the SOLE ENTRYPOINT for writing positions to the state. diff --git a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs index 42ab912703..a038270486 100644 --- a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs +++ b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs @@ -164,4 +164,4 @@ trait Inner: StateWrite { } } -impl Inner for T {} +impl Inner for T {} diff --git a/crates/core/component/dex/src/component/position_manager/counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs index fc62a59e80..50b1bb363f 100644 --- a/crates/core/component/dex/src/component/position_manager/counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -1,4 +1,4 @@ -use anyhow::{bail, ensure}; +use anyhow::{bail}; use async_trait::async_trait; use cnidarium::{StateRead, StateWrite}; @@ -51,15 +51,10 @@ pub(crate) trait PositionCounter: StateWrite { (Opened, Closed) => { let _ = self.decrement_position_counter(&trading_pair).await?; } - _ => { /* no-op */ } + _ => {} }, None => { - ensure!( - matches!(new_state.state, Opened), - "fresh position must start in the `Opened` state, found: {:?}", - new_state.state - ); let _ = self.increment_position_counter(&trading_pair).await?; } } diff --git a/crates/core/component/dex/src/component/position_manager/inventory_index.rs b/crates/core/component/dex/src/component/position_manager/inventory_index.rs index 9cad16e159..f797dbc5ea 100644 --- a/crates/core/component/dex/src/component/position_manager/inventory_index.rs +++ b/crates/core/component/dex/src/component/position_manager/inventory_index.rs @@ -16,7 +16,7 @@ pub(crate) trait PositionByInventoryIndex: StateWrite { new_state: &Position, position_id: &position::Id, ) -> Result<()> { - // Clear an existing index of the position, since changes to the + // Clear an existing record of the position, since changes to the // reserves or the position state might have invalidated it. if let Some(prev_lp) = prev_state { self.deindex_position_by_inventory(prev_lp, position_id); @@ -28,7 +28,11 @@ pub(crate) trait PositionByInventoryIndex: StateWrite { Ok(()) } +} + +impl PositionByInventoryIndex for T {} +trait Inner: StateWrite { fn index_position_by_inventory(&mut self, position: &position::Position, id: &position::Id) { tracing::debug!("indexing position by inventory"); let canonical_pair = position.phi.pair; @@ -77,5 +81,4 @@ pub(crate) trait PositionByInventoryIndex: StateWrite { self.nonverifiable_delete(key_ba); } } - -impl PositionByInventoryIndex for T {} +impl Inner for T {} diff --git a/crates/core/component/dex/src/component/position_manager/price_index.rs b/crates/core/component/dex/src/component/position_manager/price_index.rs index c40dfc4f83..26d6be2b48 100644 --- a/crates/core/component/dex/src/component/position_manager/price_index.rs +++ b/crates/core/component/dex/src/component/position_manager/price_index.rs @@ -16,7 +16,7 @@ pub(crate) trait PositionByPriceIndex: StateWrite { new_state: &Position, position_id: &position::Id, ) -> Result<()> { - // Clear an existing index of the position, since changes to the + // Clear an existing record for the position, since changes to the // reserves or the position state might have invalidated it. if let Some(prev_lp) = prev_state { self.deindex_position_by_price(prev_lp, position_id); @@ -29,6 +29,25 @@ pub(crate) trait PositionByPriceIndex: StateWrite { Ok(()) } + fn deindex_position_by_price(&mut self, position: &Position, id: &position::Id) { + tracing::debug!("deindexing position"); + let pair12 = DirectedTradingPair { + start: position.phi.pair.asset_1(), + end: position.phi.pair.asset_2(), + }; + let phi12 = position.phi.component.clone(); + let pair21 = DirectedTradingPair { + start: position.phi.pair.asset_2(), + end: position.phi.pair.asset_1(), + }; + let phi21 = position.phi.component.flip(); + self.nonverifiable_delete(engine::price_index::key(&pair12, &phi12, &id)); + self.nonverifiable_delete(engine::price_index::key(&pair21, &phi21, &id)); + } +} +impl PositionByPriceIndex for T {} + +trait Inner: StateWrite { fn index_position_by_price(&mut self, position: &position::Position, id: &position::Id) { let (pair, phi) = (position.phi.pair, &position.phi); if position.reserves.r2 != 0u64.into() { @@ -53,21 +72,6 @@ pub(crate) trait PositionByPriceIndex: StateWrite { tracing::debug!("indexing position for 2=>1 trades"); } } - - fn deindex_position_by_price(&mut self, position: &Position, id: &position::Id) { - tracing::debug!("deindexing position"); - let pair12 = DirectedTradingPair { - start: position.phi.pair.asset_1(), - end: position.phi.pair.asset_2(), - }; - let phi12 = position.phi.component.clone(); - let pair21 = DirectedTradingPair { - start: position.phi.pair.asset_2(), - end: position.phi.pair.asset_1(), - }; - let phi21 = position.phi.component.flip(); - self.nonverifiable_delete(engine::price_index::key(&pair12, &phi12, &id)); - self.nonverifiable_delete(engine::price_index::key(&pair21, &phi21, &id)); - } } -impl PositionByPriceIndex for T {} + +impl Inner for T {} From 7cd61d4ec3a09a77be2b9022d8eb64ab73fed8e2 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 11 Apr 2024 10:45:21 -0400 Subject: [PATCH 08/11] dex: expose a position api to other components --- crates/core/component/dex/src/component/mod.rs | 5 +++-- .../component/dex/src/component/position_manager/counter.rs | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index 6e36ea8ead..a1baa22118 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -19,8 +19,9 @@ pub(crate) use arb::Arbitrage; pub use circuit_breaker::ExecutionCircuitBreaker; pub(crate) use circuit_breaker::ValueCircuitBreaker; pub use dex::{Dex, StateReadExt, StateWriteExt}; -pub(crate) use position_manager::PositionManager; -pub use position_manager::PositionRead; +// TODO(erwan): exposing a DEX interface to other components +// is useful but maybe we should restrict it to open/queue/close positions +pub use position_manager::{PositionManager, PositionRead}; pub use swap_manager::SwapManager; #[cfg(test)] pub(crate) mod tests; diff --git a/crates/core/component/dex/src/component/position_manager/counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs index 50b1bb363f..093f59f58c 100644 --- a/crates/core/component/dex/src/component/position_manager/counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -1,4 +1,4 @@ -use anyhow::{bail}; +use anyhow::bail; use async_trait::async_trait; use cnidarium::{StateRead, StateWrite}; From 0f5e51627a14c00d3865c9830734cb5d72caa893 Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 11 Apr 2024 11:39:41 -0400 Subject: [PATCH 09/11] dex(position_counter): limit visibility of counter internals --- .../component/dex/src/component/position_manager.rs | 3 --- .../dex/src/component/position_manager/counter.rs | 10 ++++++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 48a5344118..81fdf161ae 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -233,9 +233,6 @@ pub trait PositionManager: StateWrite + PositionRead { ); } - // Increase the position counter - self.increment_position_counter(&position.phi.pair).await?; - // Credit the DEX for the inflows from this position. self.vcb_credit(position.reserves_1()).await?; self.vcb_credit(position.reserves_2()).await?; diff --git a/crates/core/component/dex/src/component/position_manager/counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs index 093f59f58c..b873da76e7 100644 --- a/crates/core/component/dex/src/component/position_manager/counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -60,7 +60,10 @@ pub(crate) trait PositionCounter: StateWrite { } Ok(()) } +} +impl PositionCounter for T {} +trait Inner: StateWrite { /// Increment the number of position for a [`TradingPair`]. /// Returns the updated total, or an error if overflow occurred. async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result { @@ -87,7 +90,8 @@ pub(crate) trait PositionCounter: StateWrite { Ok(new_total) } } -impl PositionCounter for T {} + +impl Inner for T {} // For some reason, `rust-analyzer` is complaining about used imports. // Silence the warnings until I find a fix. @@ -96,7 +100,9 @@ mod tests { use cnidarium::{StateDelta, TempStorage}; use penumbra_asset::{asset::REGISTRY, Value}; - use crate::component::position_manager::counter::{PositionCounter, PositionCounterRead}; + use crate::component::position_manager::counter::{ + Inner, PositionCounter, PositionCounterRead, + }; use crate::TradingPair; #[tokio::test] From 9d430fd2ba88c3cc9dfe1b3128d792c65ca78f2a Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Thu, 11 Apr 2024 14:07:43 -0400 Subject: [PATCH 10/11] dex: spurious index update from test --- .../component/dex/src/component/circuit_breaker/value.rs | 5 ----- crates/core/component/dex/src/component/position_manager.rs | 6 +++--- .../component/dex/src/component/position_manager/counter.rs | 2 +- .../dex/src/component/position_manager/inventory_index.rs | 2 +- 4 files changed, 5 insertions(+), 10 deletions(-) diff --git a/crates/core/component/dex/src/component/circuit_breaker/value.rs b/crates/core/component/dex/src/component/circuit_breaker/value.rs index 3ccccdad6e..bc5c06a8b1 100644 --- a/crates/core/component/dex/src/component/circuit_breaker/value.rs +++ b/crates/core/component/dex/src/component/circuit_breaker/value.rs @@ -47,7 +47,6 @@ impl ValueCircuitBreaker for T {} mod tests { use std::sync::Arc; - use crate::component::position_manager::base_liquidity_index::AssetByLiquidityIndex as _; use crate::component::position_manager::price_index::PositionByPriceIndex; use crate::component::router::HandleBatchSwaps as _; use crate::component::{StateReadExt as _, StateWriteExt as _}; @@ -229,10 +228,6 @@ mod tests { state_tx .update_position_by_price_index(&None, &position, &position.id()) .expect("can update price index"); - state_tx - .update_asset_by_base_liquidity_index(&None, &position, &position.id()) - .await - .expect("able to update liquidity"); state_tx.put(state_key::position_by_id(&id), position); // Now there's a position in the state, but the circuit breaker is not aware of it. diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 81fdf161ae..a3bc24b6ca 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -27,9 +27,9 @@ use crate::{event, state_key}; const DYNAMIC_ASSET_LIMIT: usize = 10; -pub(crate) mod base_liquidity_index; -pub(crate) mod counter; -pub(crate) mod inventory_index; +mod base_liquidity_index; +mod counter; +mod inventory_index; pub(crate) mod price_index; #[async_trait] diff --git a/crates/core/component/dex/src/component/position_manager/counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs index b873da76e7..6112adb3b6 100644 --- a/crates/core/component/dex/src/component/position_manager/counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -8,7 +8,7 @@ use crate::TradingPair; use anyhow::Result; #[async_trait] -pub(crate) trait PositionCounterRead: StateRead { +pub(super) trait PositionCounterRead: StateRead { /// Returns the number of position for a [`TradingPair`]. /// If there were no counter initialized for a given pair, this default to zero. async fn get_position_count(&self, trading_pair: &TradingPair) -> u16 { diff --git a/crates/core/component/dex/src/component/position_manager/inventory_index.rs b/crates/core/component/dex/src/component/position_manager/inventory_index.rs index f797dbc5ea..fa74144b19 100644 --- a/crates/core/component/dex/src/component/position_manager/inventory_index.rs +++ b/crates/core/component/dex/src/component/position_manager/inventory_index.rs @@ -9,7 +9,7 @@ use crate::{ use anyhow::Result; use position::State::*; -pub(crate) trait PositionByInventoryIndex: StateWrite { +pub(super) trait PositionByInventoryIndex: StateWrite { fn update_position_by_inventory_index( &mut self, prev_state: &Option, From 78fbf5141c6c88ce9d0157f604a11a683dc2610f Mon Sep 17 00:00:00 2001 From: Erwan Or Date: Fri, 12 Apr 2024 19:59:31 -0400 Subject: [PATCH 11/11] dex: pr review improvements --- .../core/component/dex/src/component/mod.rs | 2 - .../position_manager/base_liquidity_index.rs | 101 ++++++++++-------- .../src/component/position_manager/counter.rs | 54 +++++----- crates/core/component/dex/src/state_key.rs | 14 +-- 4 files changed, 90 insertions(+), 81 deletions(-) diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index a1baa22118..c6a4101ff3 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -19,8 +19,6 @@ pub(crate) use arb::Arbitrage; pub use circuit_breaker::ExecutionCircuitBreaker; pub(crate) use circuit_breaker::ValueCircuitBreaker; pub use dex::{Dex, StateReadExt, StateWriteExt}; -// TODO(erwan): exposing a DEX interface to other components -// is useful but maybe we should restrict it to open/queue/close positions pub use position_manager::{PositionManager, PositionRead}; pub use swap_manager::SwapManager; #[cfg(test)] diff --git a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs index a038270486..ac55ec97fd 100644 --- a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs +++ b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs @@ -3,7 +3,7 @@ use cnidarium::StateWrite; use penumbra_num::Amount; use position::State::*; -use crate::lp::position::{self, Position, State}; +use crate::lp::position::{self, Position}; use crate::state_key::engine; use crate::DirectedTradingPair; use penumbra_proto::{StateReadProto, StateWriteProto}; @@ -61,34 +61,62 @@ pub(crate) trait AssetByLiquidityIndex: StateWrite { new_state: &Position, id: &position::Id, ) -> Result<()> { - match prev_state { - Some(prev_state) => match (prev_state.state, new_state.state) { - // We only want to update the index when we process active positions. - (Opened, Closed) => {} - (Opened, Opened) => {} - _ => return Ok(()), - }, - None => {} - } - + // We need to reconstruct the position's previous contribution and compute + // its new contribution to the index. We do this for each asset in the pair + // and short-circuit if all contributions are zero. let canonical_pair = new_state.phi.pair; let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); - let (prev_a, prev_b) = prev_state - .as_ref() - .map(|p| { - ( - p.reserves_for(pair_ab.start).expect("asset ids match"), - p.reserves_for(pair_ab.end).expect("asset ids match"), - ) - }) - .unwrap_or_else(|| (Amount::zero(), Amount::zero())); + // We reconstruct the position's *previous* contribution so that we can deduct them later: + let (prev_a, prev_b) = match prev_state { + // The position was just created, so its previous contributions are zero. + None => (Amount::zero(), Amount::zero()), + Some(prev) => match prev.state { + // The position was previously closed or withdrawn, so its previous contributions are zero. + Closed | Withdrawn { sequence: _ } => (Amount::zero(), Amount::zero()), + // The position's previous contributions are the reserves for the start and end assets. + _ => ( + prev.reserves_for(pair_ab.start) + .expect("asset ids match for start"), + prev.reserves_for(pair_ab.end) + .expect("asset ids match for end"), + ), + }, + }; + + // For each asset, we compute the new position's contribution to the index: + let (new_a, new_b) = if matches!(new_state.state, Closed | Withdrawn { sequence: _ }) { + // The position is being closed or withdrawn, so its new contributions are zero. + // Note a withdrawn position MUST have zero reserves, so hardcoding this is extra. + (Amount::zero(), Amount::zero()) + } else { + ( + // The new amount of asset A: + new_state + .reserves_for(pair_ab.start) + .expect("asset ids match for start"), + // The new amount of asset B: + new_state + .reserves_for(pair_ab.end) + .expect("asset ids match for end"), + ) + }; + + // If all contributions are zero, we can skip the update. + // This can happen if we're processing inactive transitions like `Closed -> Withdrawn`. + if prev_a == Amount::zero() + && new_a == Amount::zero() + && prev_b == Amount::zero() + && new_b == Amount::zero() + { + return Ok(()); + } // A -> B - self.update_asset_by_base_liquidity_index_inner(id, pair_ab, prev_a, new_state) + self.update_asset_by_base_liquidity_index_inner(id, pair_ab, prev_a, new_a) .await?; // B -> A - self.update_asset_by_base_liquidity_index_inner(id, pair_ab.flip(), prev_b, new_state) + self.update_asset_by_base_liquidity_index_inner(id, pair_ab.flip(), prev_b, new_b) .await?; Ok(()) @@ -103,7 +131,7 @@ trait Inner: StateWrite { id: &position::Id, pair: DirectedTradingPair, old_contrib: Amount, - new_position: &Position, + new_contrib: Amount, ) -> Result<()> { let aggregate_key = &engine::routable_assets::lookup_base_liquidity_by_pair(&pair); @@ -112,28 +140,13 @@ trait Inner: StateWrite { .await? .unwrap_or_default(); - // The previous contribution for this position is supplied to us by - // the caller. This default to zero if the position was just created. - // We use this to compute a view of the tally that excludes the position - // we are currently processing (and avoid double-counting). - let old_contrib = old_contrib; - - // The updated contribution is the total amount of base asset routable - // from an adjacent asset. - let new_contrib = new_position - .reserves_for(pair.start) - .expect("asset ids should match"); - - let new_tally = match new_position.state { - State::Opened => prev_tally - .saturating_sub(&old_contrib) - .saturating_add(&new_contrib), - State::Closed => prev_tally.saturating_sub(&old_contrib), - _ => unreachable!("inner impl is guarded"), - }; + // To compute the new aggregate liquidity, we deduct the old contribution + // and add the new contribution. We use saturating arithmetic defensively. + let new_tally = prev_tally + .saturating_sub(&old_contrib) + .saturating_add(&new_contrib); - // If the update operation is a no-op, we can skip the update - // and return early. + // If the update operation is a no-op, we can skip the update and return early. if prev_tally == new_tally { tracing::debug!( ?prev_tally, diff --git a/crates/core/component/dex/src/component/position_manager/counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs index 6112adb3b6..5185fe1f2d 100644 --- a/crates/core/component/dex/src/component/position_manager/counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -11,12 +11,12 @@ use anyhow::Result; pub(super) trait PositionCounterRead: StateRead { /// Returns the number of position for a [`TradingPair`]. /// If there were no counter initialized for a given pair, this default to zero. - async fn get_position_count(&self, trading_pair: &TradingPair) -> u16 { + async fn get_position_count(&self, trading_pair: &TradingPair) -> u32 { let path = engine::counter::num_positions::by_trading_pair(trading_pair); self.get_position_count_from_key(path).await } - async fn get_position_count_from_key(&self, path: [u8; 99]) -> u16 { + async fn get_position_count_from_key(&self, path: [u8; 99]) -> u32 { let Some(raw_count) = self .nonverifiable_get_raw(&path) .await @@ -26,10 +26,10 @@ pub(super) trait PositionCounterRead: StateRead { }; // This is safe because we only increment the counter via [`Self::increase_position_counter`]. - let raw_count: [u8; 2] = raw_count + let raw_count: [u8; 4] = raw_count .try_into() .expect("position counter is at most two bytes"); - u16::from_be_bytes(raw_count) + u32::from_be_bytes(raw_count) } } @@ -45,18 +45,17 @@ pub(crate) trait PositionCounter: StateWrite { ) -> Result<()> { use position::State::*; let trading_pair = new_state.phi.pair; - - match prev_state { - Some(prev_state) => match (prev_state.state, new_state.state) { - (Opened, Closed) => { - let _ = self.decrement_position_counter(&trading_pair).await?; - } - _ => {} - }, - - None => { + match (prev_state.as_ref().map(|p| p.state), new_state.state) { + // Increment the counter whenever a new position is opened + (None, Opened) => { let _ = self.increment_position_counter(&trading_pair).await?; } + // Decrement the counter whenever an opened position is closed + (Some(Opened), Closed) => { + let _ = self.decrement_position_counter(&trading_pair).await?; + } + // Other state transitions don't affect the opened position counter + _ => {} } Ok(()) } @@ -66,7 +65,7 @@ impl PositionCounter for T {} trait Inner: StateWrite { /// Increment the number of position for a [`TradingPair`]. /// Returns the updated total, or an error if overflow occurred. - async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result { + async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result { let path = engine::counter::num_positions::by_trading_pair(trading_pair); let prev = self.get_position_count_from_key(path).await; @@ -79,7 +78,7 @@ trait Inner: StateWrite { /// Decrement the number of positions for a [`TradingPair`], unless it would underflow. /// Returns the updated total, or an error if underflow occurred. - async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result { + async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result { let path = engine::counter::num_positions::by_trading_pair(trading_pair); let prev = self.get_position_count_from_key(path).await; @@ -97,12 +96,13 @@ impl Inner for T {} // Silence the warnings until I find a fix. #[allow(unused_imports)] mod tests { - use cnidarium::{StateDelta, TempStorage}; + use cnidarium::{StateDelta, StateWrite, TempStorage}; use penumbra_asset::{asset::REGISTRY, Value}; use crate::component::position_manager::counter::{ Inner, PositionCounter, PositionCounterRead, }; + use crate::state_key::engine; use crate::TradingPair; #[tokio::test] @@ -114,22 +114,20 @@ mod tests { let storage = TempStorage::new().await?; let mut delta = StateDelta::new(storage.latest_snapshot()); + let path = engine::counter::num_positions::by_trading_pair(&trading_pair); + // Manually set the counter to the maximum value + delta.nonverifiable_put_raw(path.to_vec(), u32::MAX.to_be_bytes().to_vec()); - for i in 0..u16::MAX { - let total = delta.increment_position_counter(&trading_pair).await?; - - anyhow::ensure!( - total == i + 1, - "the total amount should be total={}, found={total}", - i + 1 - ); - } + // Check that the counter is at the maximum value + let total = delta.get_position_count(&trading_pair).await; + assert_eq!(total, u32::MAX); + // Check that we can handle an overflow assert!(delta .increment_position_counter(&trading_pair) .await .is_err()); - assert_eq!(delta.get_position_count(&trading_pair).await, u16::MAX); + assert_eq!(delta.get_position_count(&trading_pair).await, u32::MAX); Ok(()) } @@ -148,7 +146,7 @@ mod tests { assert!(maybe_total.is_err()); let counter = delta.get_position_count(&trading_pair).await; - assert_eq!(counter, 0u16); + assert_eq!(counter, 0u32); Ok(()) } } diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 7cf1aac337..6b302fd46e 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -109,9 +109,9 @@ pub(crate) mod engine { use super::*; - /// A prefix key that takes a start asset `A` (aka. base asset) and surface adjacent - /// assets `B` (aka. quote asset), in ascending order of the base liquidity available. - /// + // An ordered encoding of every asset `B` routable from `A` based on the + // aggregate liquidity available to route from `B` to `A` (aka. the base liquidity). + // /// # Encoding /// The prefix key is encoded as `domain || A`. pub(crate) fn starting_from(from: &asset::Id) -> [u8; 39] { @@ -121,9 +121,8 @@ pub(crate) mod engine { key } - /// An entry in the routable asset index that implements the mapping between - /// a base asset `A` and a quote asset `B`, based on the aggregate liquidity - /// available to route from `B` to `A` (aka. the base liquidity). + /// A record that an asset `A` is routable to an asset `B` and contains the + /// aggregate liquidity available to route from `B` to `A` (aka. the base liquidity). /// /// # Encoding /// The full key is encoded as: `prefix || BE(aggregate_base_liquidity)` @@ -136,7 +135,8 @@ pub(crate) mod engine { } /// A lookup index used to reconstruct and update the primary index entries. - /// It maps a directed trading pair `A -> B` to the current base liquidity available. + /// It maps a directed trading pair `A -> B` to the aggregate liquidity available + /// to route from `B` to `A` (aka. the base asset liquidity). /// /// # Encoding /// The lookup key is encoded as `prefix_lookup || start_asset|| end_asset`.