diff --git a/crates/core/component/dex/src/component/circuit_breaker/value.rs b/crates/core/component/dex/src/component/circuit_breaker/value.rs index 1824eda1a6..bc5c06a8b1 100644 --- a/crates/core/component/dex/src/component/circuit_breaker/value.rs +++ b/crates/core/component/dex/src/component/circuit_breaker/value.rs @@ -47,7 +47,7 @@ impl ValueCircuitBreaker for T {} mod tests { use std::sync::Arc; - use crate::component::position_manager::Inner as _; + use crate::component::position_manager::price_index::PositionByPriceIndex; use crate::component::router::HandleBatchSwaps as _; use crate::component::{StateReadExt as _, StateWriteExt as _}; use crate::lp::plan::PositionWithdrawPlan; @@ -225,11 +225,9 @@ mod tests { let id = buy_1.id(); let position = buy_1; - state_tx.index_position_by_price(&position, &position.id()); state_tx - .update_available_liquidity(&None, &position) - .await - .expect("able to update liquidity"); + .update_position_by_price_index(&None, &position, &position.id()) + .expect("can update price index"); state_tx.put(state_key::position_by_id(&id), position); // Now there's a position in the state, but the circuit breaker is not aware of it. diff --git a/crates/core/component/dex/src/component/mod.rs b/crates/core/component/dex/src/component/mod.rs index 8a6459ee47..c6a4101ff3 100644 --- a/crates/core/component/dex/src/component/mod.rs +++ b/crates/core/component/dex/src/component/mod.rs @@ -11,12 +11,11 @@ mod arb; pub(crate) mod circuit_breaker; mod dex; mod flow; -pub(crate) mod position_counter; -pub(crate) mod position_manager; +mod position_manager; mod swap_manager; pub use self::metrics::register_metrics; -pub use arb::Arbitrage; +pub(crate) use arb::Arbitrage; pub use circuit_breaker::ExecutionCircuitBreaker; pub(crate) use circuit_breaker::ValueCircuitBreaker; pub use dex::{Dex, StateReadExt, StateWriteExt}; diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index 0b51ae5718..a3bc24b6ca 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -1,31 +1,37 @@ use std::future; use std::{pin::Pin, sync::Arc}; -use anyhow::Result; +use anyhow::{bail, ensure, Result}; use async_stream::try_stream; use async_trait::async_trait; use cnidarium::{EscapedByteSlice, StateRead, StateWrite}; use futures::Stream; use futures::StreamExt; use penumbra_asset::{asset, Balance}; -use penumbra_num::Amount; use penumbra_proto::DomainType; use penumbra_proto::{StateReadProto, StateWriteProto}; -use crate::lp::position::State; +use crate::component::position_manager::{ + base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex, + price_index::PositionByPriceIndex, +}; use crate::lp::Reserves; use crate::{ - component::position_counter::PositionCounter, + component::position_manager::counter::PositionCounter, component::ValueCircuitBreaker, lp::position::{self, Position}, state_key::engine, - state_key::eviction_queue, DirectedTradingPair, }; use crate::{event, state_key}; const DYNAMIC_ASSET_LIMIT: usize = 10; +mod base_liquidity_index; +mod counter; +mod inventory_index; +pub(crate) mod price_index; + #[async_trait] pub trait PositionRead: StateRead { /// Return a stream of all [`position::Metadata`] available. @@ -118,9 +124,9 @@ pub trait PositionRead: StateRead { /// Returns a stream of [`asset::Id`] routable from a given asset, ordered by liquidity. fn ordered_routable_assets( &self, - from: &asset::Id, + start: &asset::Id, ) -> Pin> + Send + 'static>> { - let prefix = engine::routable_assets::prefix(from); + let prefix = engine::routable_assets::starting_from(start); tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for routable assets by liquidity"); self.nonverifiable_prefix_raw(&prefix) .map(|entry| match entry { @@ -227,9 +233,6 @@ pub trait PositionManager: StateWrite + PositionRead { ); } - // Increase the position counter - self.increment_position_counter(&position.phi.pair).await?; - // Credit the DEX for the inflows from this position. self.vcb_credit(position.reserves_1()).await?; self.vcb_credit(position.reserves_2()).await?; @@ -381,7 +384,7 @@ pub trait PositionManager: StateWrite + PositionRead { impl PositionManager for T {} #[async_trait] -pub(crate) trait Inner: StateWrite { +trait Inner: StateWrite { /// Writes a position to the state, updating all necessary indexes. /// /// This should be the SOLE ENTRYPOINT for writing positions to the state. @@ -392,295 +395,65 @@ pub(crate) trait Inner: StateWrite { prev_state: Option, new_state: Position, ) -> Result<()> { - use position::State::*; - tracing::debug!(?prev_state, ?new_state, "updating position state"); let id = new_state.id(); - // Clear any existing indexes of the position, since changes to the - // reserves or the position state might have invalidated them. - if let Some(prev_state) = prev_state.as_ref() { - self.deindex_position_by_price(&prev_state, &id); - self.deindex_position_by_inventory(&prev_state, &id); - } + // Assert `update_position` state transitions invariants: + Self::guard_invalid_transitions(&prev_state, &new_state, &id)?; - // Only index the position's liquidity if it is active. - if new_state.state == Opened { - self.index_position_by_price(&new_state, &id); - self.index_position_by_inventory(&new_state, &id); - } - - if new_state.state == Closed { - // Make sure that we don't double decrement the position - // counter if a position was queued for closure AND closed - // by the DEX engine. - let is_already_closed = prev_state - .as_ref() - .map_or(false, |old_position| old_position.state == Closed); - if !is_already_closed { - self.decrement_position_counter(&new_state.phi.pair).await?; - } - } - - // Update the available liquidity for this position's trading pair. - // TODO: refactor and streamline this method while implementing eviction. - self.update_available_liquidity(&prev_state, &new_state) + // Update the DEX engine indices: + self.update_position_by_price_index(&prev_state, &new_state, &id)?; + self.update_position_by_inventory_index(&prev_state, &new_state, &id)?; + self.update_asset_by_base_liquidity_index(&prev_state, &new_state, &id) + .await?; + self.update_trading_pair_position_counter(&prev_state, &new_state, &id) .await?; self.put(state_key::position_by_id(&id), new_state); Ok(()) } - // TODO(erwan): break this out into a `position_manager::inventory_index` module. - fn index_position_by_inventory(&mut self, position: &position::Position, id: &position::Id) { - tracing::debug!("indexing position by inventory"); - let canonical_pair = position.phi.pair; - // A position is bound to an unordered trading pair: A <> B. - // We want to index the position by inventory for each direction: - // A -> B - let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); - let inventory_a = position - .reserves_for(pair_ab.start) - .expect("the directed trading pair is correct"); - let key_ab = eviction_queue::inventory_index::key(&pair_ab, inventory_a, id).to_vec(); - self.nonverifiable_put_raw(key_ab, vec![]); - - // B -> A - let pair_ba = pair_ab.flip(); - let inventory_b = position - .reserves_for(pair_ba.start) - .expect("the directed trading pair is correct"); - let key_ba = eviction_queue::inventory_index::key(&pair_ba, inventory_b, id).to_vec(); - self.nonverifiable_put_raw(key_ba, vec![]); - } - - fn deindex_position_by_inventory( - &mut self, - prev_position: &position::Position, + fn guard_invalid_transitions( + prev_state: &Option, + new_state: &Position, id: &position::Id, - ) { - let canonical_pair = prev_position.phi.pair; - - // To deindex the position, we need to reconstruct the tuple of keys - // that correspond to each direction of the trading pair: - // A -> B - let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); - let inventory_a = prev_position - .reserves_for(pair_ab.start) - .expect("the directed trading pair is correct"); - let key_ab = eviction_queue::inventory_index::key(&pair_ab, inventory_a, id).to_vec(); - self.nonverifiable_delete(key_ab); - - // B -> A - let pair_ba = pair_ab.flip(); - let inventory_b = prev_position - .reserves_for(pair_ba.start) - .expect("the directed trading pair is correct"); - let key_ba = eviction_queue::inventory_index::key(&pair_ba, inventory_b, id).to_vec(); - self.nonverifiable_delete(key_ba); - } - - fn index_position_by_price(&mut self, position: &position::Position, id: &position::Id) { - let (pair, phi) = (position.phi.pair, &position.phi); - if position.reserves.r2 != 0u64.into() { - // Index this position for trades FROM asset 1 TO asset 2, since the position has asset 2 to give out. - let pair12 = DirectedTradingPair { - start: pair.asset_1(), - end: pair.asset_2(), - }; - let phi12 = phi.component.clone(); - self.nonverifiable_put_raw(engine::price_index::key(&pair12, &phi12, &id), vec![]); - tracing::debug!("indexing position for 1=>2 trades"); - } - - if position.reserves.r1 != 0u64.into() { - // Index this position for trades FROM asset 2 TO asset 1, since the position has asset 1 to give out. - let pair21 = DirectedTradingPair { - start: pair.asset_2(), - end: pair.asset_1(), - }; - let phi21 = phi.component.flip(); - self.nonverifiable_put_raw(engine::price_index::key(&pair21, &phi21, &id), vec![]); - tracing::debug!("indexing position for 2=>1 trades"); - } - } - - fn deindex_position_by_price(&mut self, position: &Position, id: &position::Id) { - tracing::debug!("deindexing position"); - let pair12 = DirectedTradingPair { - start: position.phi.pair.asset_1(), - end: position.phi.pair.asset_2(), - }; - let phi12 = position.phi.component.clone(); - let pair21 = DirectedTradingPair { - start: position.phi.pair.asset_2(), - end: position.phi.pair.asset_1(), - }; - let phi21 = position.phi.component.flip(); - self.nonverifiable_delete(engine::price_index::key(&pair12, &phi12, &id)); - self.nonverifiable_delete(engine::price_index::key(&pair21, &phi21, &id)); - } - - /// Updates the nonverifiable liquidity indices given a [`Position`] in the direction specified by the [`DirectedTradingPair`]. - /// An [`Option`] may be specified to allow for the case where a position is being updated. - async fn update_liquidity_index( - &mut self, - pair: DirectedTradingPair, - position: &Position, - prev: &Option, ) -> Result<()> { - tracing::debug!(?pair, "updating available liquidity indices"); - - let (new_a_from_b, current_a_from_b) = match (position.state, prev) { - (State::Opened, None) => { - // Add the new position's contribution to the index, no cancellation of the previous version necessary. - - // Query the current available liquidity for this trading pair, or zero if the trading pair - // has no current liquidity. - let current_a_from_b = self - .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) - .await? - .map(|bytes| { - Amount::from_be_bytes( - bytes - .try_into() - .expect("liquidity index amount can always be parsed"), - ) - }) - .unwrap_or_default(); - - // Use the new reserves to compute `new_position_contribution`, - // the amount of asset A contributed by the position (i.e. the reserves of asset A). - let new_position_contribution = position - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Compute `new_A_from_B`. - let new_a_from_b = - // Add the contribution from the updated version. - current_a_from_b.saturating_add(&new_position_contribution); - - tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, "newly opened position, adding contribution to existing available liquidity for trading pair"); - - (new_a_from_b, current_a_from_b) - } - (State::Opened, Some(prev)) => { - // Add the new position's contribution to the index, deleting the previous version's contribution. - - // Query the current available liquidity for this trading pair, or zero if the trading pair - // has no current liquidity. - let current_a_from_b = self - .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) - .await? - .map(|bytes| { - Amount::from_be_bytes( - bytes - .try_into() - .expect("liquidity index amount can always be parsed"), - ) - }) - .unwrap_or_default(); - - // Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1). - let prev_position_contribution = prev - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Use the new reserves to compute `new_position_contribution`, - // the amount of asset A contributed by the position (i.e. the reserves of asset A). - let new_position_contribution = position - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Compute `new_A_from_B`. - let new_a_from_b = - // Subtract the previous version of the position's contribution to represent that position no longer - // being correct, and add the contribution from the updated version. - (current_a_from_b.saturating_sub(&prev_position_contribution)).saturating_add(&new_position_contribution); - - tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, ?prev_position_contribution, "updated position, adding new contribution and subtracting previous contribution to existing available liquidity for trading pair"); - - (new_a_from_b, current_a_from_b) - } - (State::Closed, Some(prev)) => { - // Compute the previous contribution and erase it from the current index - - // Query the current available liquidity for this trading pair, or zero if the trading pair - // has no current liquidity. - let current_a_from_b = self - .nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair)) - .await? - .map(|bytes| { - Amount::from_be_bytes( - bytes - .try_into() - .expect("liquidity index amount can always be parsed"), - ) - }) - .unwrap_or_default(); - - // Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1). - let prev_position_contribution = prev - .reserves_for(pair.start) - .expect("specified position should match provided trading pair"); - - // Compute `new_A_from_B`. - let new_a_from_b = - // Subtract the previous version of the position's contribution to represent that position no longer - // being correct, and since the updated version is Closed, it has no contribution. - current_a_from_b.saturating_sub(&prev_position_contribution); - - tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?prev_position_contribution, "closed position, subtracting previous contribution to existing available liquidity for trading pair"); - - (new_a_from_b, current_a_from_b) - } - (State::Withdrawn { .. }, _) | (State::Closed, None) => { - // The position already went through the `Closed` state or was opened in the `Closed` state, so its contribution has already been subtracted. - return Ok(()); - } - }; + use position::State::*; - // Delete the existing key for this position if the reserve amount has changed. - if new_a_from_b != current_a_from_b { - self.nonverifiable_delete( - engine::routable_assets::key(&pair.start, current_a_from_b).to_vec(), + if let Some(prev_lp) = prev_state { + tracing::debug!(?id, prev = ?prev_lp.state, new = ?new_state.state, "evaluating state transition"); + match (prev_lp.state, new_state.state) { + (Opened, Opened) => {} + (Opened, Closed) => {} + (Closed, Closed) => { /* no-op but allowed */ } + (Closed, Withdrawn { sequence }) => { + ensure!( + sequence == 0, + "withdrawn positions must have their sequence start at zero (found: {})", + sequence + ); + } + (Withdrawn { sequence: old_seq }, Withdrawn { sequence: new_seq }) => { + let expected_seq = old_seq.saturating_add(1); + ensure!( + new_seq == expected_seq, + "withdrawn must increase 1-by-1 (old: {}, new: {}, expected: {})", + old_seq, + new_seq, + expected_seq + ); + } + _ => bail!("invalid transition"), + } + } else { + ensure!( + matches!(new_state.state, Opened), + "fresh positions MUST start in the `Opened` state (found: {:?})", + new_state.state ); } - // Write the new key indicating that asset B is routable from asset A with `new_a_from_b` liquidity. - self.nonverifiable_put_raw( - engine::routable_assets::key(&pair.start, new_a_from_b).to_vec(), - pair.end.encode_to_vec(), - ); - tracing::debug!(start = ?pair.start, end = ?pair.end, "marking routable from start -> end"); - - // Write the new lookup index storing `new_a_from_b` for this trading pair. - self.nonverifiable_put_raw( - engine::routable_assets::a_from_b(&pair).to_vec(), - new_a_from_b.to_be_bytes().to_vec(), - ); - tracing::debug!(available_liquidity = ?new_a_from_b, ?pair, "marking available liquidity for trading pair"); - - Ok(()) - } - - async fn update_available_liquidity( - &mut self, - prev_position: &Option, - position: &Position, - ) -> Result<()> { - // Since swaps may be performed in either direction, the available liquidity indices - // need to be calculated and stored for both the A -> B and B -> A directions. - let (a, b) = (position.phi.pair.asset_1(), position.phi.pair.asset_2()); - - // A -> B - self.update_liquidity_index(DirectedTradingPair::new(a, b), position, prev_position) - .await?; - // B -> A - self.update_liquidity_index(DirectedTradingPair::new(b, a), position, prev_position) - .await?; - Ok(()) } } diff --git a/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs new file mode 100644 index 0000000000..ac55ec97fd --- /dev/null +++ b/crates/core/component/dex/src/component/position_manager/base_liquidity_index.rs @@ -0,0 +1,180 @@ +use anyhow::Result; +use cnidarium::StateWrite; +use penumbra_num::Amount; +use position::State::*; + +use crate::lp::position::{self, Position}; +use crate::state_key::engine; +use crate::DirectedTradingPair; +use penumbra_proto::{StateReadProto, StateWriteProto}; + +pub(crate) trait AssetByLiquidityIndex: StateWrite { + /// Update the base liquidity index, used by the DEX engine during path search. + /// + /// # Overview + /// Given a directed trading pair `A -> B`, the index tracks the amount of + /// liquidity available to convert the quote asset B, into a base asset A. + /// + /// # Index schema + /// The liquidity index schema is as follow: + /// - A primary index that maps a "start" asset A (aka. base asset) + /// to an "end" asset B (aka. quote asset) ordered by the amount of + /// liquidity available for B -> A (not a typo). + /// - An auxilliary index that maps a directed trading pair `A -> B` + /// to the aggregate liquidity for B -> A (used in the primary composite key) + /// + /// # Diagram + /// + /// Liquidity index: + /// For an asset `A`, surface asset + /// `B` with the best liquidity + /// score. + /// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ + /// + /// ┌──┐ ▼ ┌─────────┐ │ + /// ▲ │ │ ┌──────────────────┐ │ │ + /// │ │ ─┼───▶│{asset_A}{agg_liq}│──▶│{asset_B}│ │ + /// │ ├──┤ └──────────────────┘ │ │ + /// sorted │ │ └─────────┘ │ + /// by agg │ │ + /// liq ├──┤ │ + /// │ │ │ used in the + /// │ ├──┤ composite + /// │ │ │ key + /// │ │ │ Auxiliary look-up index: │ + /// │ │ │ "Find the aggregate liquidity + /// │ │ │ per directed trading pair" │ + /// │ │ │ ┌───────┐ ┌─────────┐ + /// │ │ │ ├───────┤ ┌──────────────────┐ │ │ + /// │ │ │ │ ────┼─▶│{asset_A}{asset_B}│────▶│{agg_liq}│ + /// │ ├──┤ ├───────┤ └──────────────────┘ │ │ + /// │ │ │ ├───────┤ └─────────┘ + /// │ │ │ ├───────┤ + /// │ │ │ ├───────┤ + /// │ ├──┤ └───────┘ + /// │ │ │ + /// │ │ │ + /// │ └──┘ + async fn update_asset_by_base_liquidity_index( + &mut self, + prev_state: &Option, + new_state: &Position, + id: &position::Id, + ) -> Result<()> { + // We need to reconstruct the position's previous contribution and compute + // its new contribution to the index. We do this for each asset in the pair + // and short-circuit if all contributions are zero. + let canonical_pair = new_state.phi.pair; + let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); + + // We reconstruct the position's *previous* contribution so that we can deduct them later: + let (prev_a, prev_b) = match prev_state { + // The position was just created, so its previous contributions are zero. + None => (Amount::zero(), Amount::zero()), + Some(prev) => match prev.state { + // The position was previously closed or withdrawn, so its previous contributions are zero. + Closed | Withdrawn { sequence: _ } => (Amount::zero(), Amount::zero()), + // The position's previous contributions are the reserves for the start and end assets. + _ => ( + prev.reserves_for(pair_ab.start) + .expect("asset ids match for start"), + prev.reserves_for(pair_ab.end) + .expect("asset ids match for end"), + ), + }, + }; + + // For each asset, we compute the new position's contribution to the index: + let (new_a, new_b) = if matches!(new_state.state, Closed | Withdrawn { sequence: _ }) { + // The position is being closed or withdrawn, so its new contributions are zero. + // Note a withdrawn position MUST have zero reserves, so hardcoding this is extra. + (Amount::zero(), Amount::zero()) + } else { + ( + // The new amount of asset A: + new_state + .reserves_for(pair_ab.start) + .expect("asset ids match for start"), + // The new amount of asset B: + new_state + .reserves_for(pair_ab.end) + .expect("asset ids match for end"), + ) + }; + + // If all contributions are zero, we can skip the update. + // This can happen if we're processing inactive transitions like `Closed -> Withdrawn`. + if prev_a == Amount::zero() + && new_a == Amount::zero() + && prev_b == Amount::zero() + && new_b == Amount::zero() + { + return Ok(()); + } + + // A -> B + self.update_asset_by_base_liquidity_index_inner(id, pair_ab, prev_a, new_a) + .await?; + // B -> A + self.update_asset_by_base_liquidity_index_inner(id, pair_ab.flip(), prev_b, new_b) + .await?; + + Ok(()) + } +} + +impl AssetByLiquidityIndex for T {} + +trait Inner: StateWrite { + async fn update_asset_by_base_liquidity_index_inner( + &mut self, + id: &position::Id, + pair: DirectedTradingPair, + old_contrib: Amount, + new_contrib: Amount, + ) -> Result<()> { + let aggregate_key = &engine::routable_assets::lookup_base_liquidity_by_pair(&pair); + + let prev_tally: Amount = self + .nonverifiable_get(aggregate_key) + .await? + .unwrap_or_default(); + + // To compute the new aggregate liquidity, we deduct the old contribution + // and add the new contribution. We use saturating arithmetic defensively. + let new_tally = prev_tally + .saturating_sub(&old_contrib) + .saturating_add(&new_contrib); + + // If the update operation is a no-op, we can skip the update and return early. + if prev_tally == new_tally { + tracing::debug!( + ?prev_tally, + ?pair, + ?id, + "skipping routable asset index update" + ); + return Ok(()); + } + + // Update the primary and auxiliary indices: + let old_primary_key = engine::routable_assets::key(&pair.start, prev_tally).to_vec(); + // This could make the `StateDelta` more expensive to scan, but this doesn't show on profiles yet. + self.nonverifiable_delete(old_primary_key); + + let new_primary_key = engine::routable_assets::key(&pair.start, new_tally).to_vec(); + self.nonverifiable_put(new_primary_key, pair.end); + tracing::debug!(?pair, ?new_tally, "base liquidity entry updated"); + + let auxiliary_key = engine::routable_assets::lookup_base_liquidity_by_pair(&pair).to_vec(); + self.nonverifiable_put(auxiliary_key, new_tally); + tracing::debug!( + ?pair, + "base liquidity heuristic marked directed pair as routable" + ); + + Ok(()) + } +} + +impl Inner for T {} diff --git a/crates/core/component/dex/src/component/position_counter.rs b/crates/core/component/dex/src/component/position_manager/counter.rs similarity index 65% rename from crates/core/component/dex/src/component/position_counter.rs rename to crates/core/component/dex/src/component/position_manager/counter.rs index 5ccec60cdf..5185fe1f2d 100644 --- a/crates/core/component/dex/src/component/position_counter.rs +++ b/crates/core/component/dex/src/component/position_manager/counter.rs @@ -1,21 +1,22 @@ use anyhow::bail; use async_trait::async_trait; -use cnidarium::StateWrite; +use cnidarium::{StateRead, StateWrite}; +use crate::lp::position::{self, Position}; use crate::state_key::engine; use crate::TradingPair; use anyhow::Result; #[async_trait] -pub(crate) trait PositionCounter: StateWrite { +pub(super) trait PositionCounterRead: StateRead { /// Returns the number of position for a [`TradingPair`]. /// If there were no counter initialized for a given pair, this default to zero. - async fn get_position_count(&self, trading_pair: &TradingPair) -> u16 { + async fn get_position_count(&self, trading_pair: &TradingPair) -> u32 { let path = engine::counter::num_positions::by_trading_pair(trading_pair); self.get_position_count_from_key(path).await } - async fn get_position_count_from_key(&self, path: [u8; 99]) -> u16 { + async fn get_position_count_from_key(&self, path: [u8; 99]) -> u32 { let Some(raw_count) = self .nonverifiable_get_raw(&path) .await @@ -24,16 +25,47 @@ pub(crate) trait PositionCounter: StateWrite { return 0; }; - // This is safe because we only increment the counter via a [`Self::increase_position_counter`]. - let raw_count: [u8; 2] = raw_count + // This is safe because we only increment the counter via [`Self::increase_position_counter`]. + let raw_count: [u8; 4] = raw_count .try_into() .expect("position counter is at most two bytes"); - u16::from_be_bytes(raw_count) + u32::from_be_bytes(raw_count) } +} +impl PositionCounterRead for T {} + +#[async_trait] +pub(crate) trait PositionCounter: StateWrite { + async fn update_trading_pair_position_counter( + &mut self, + prev_state: &Option, + new_state: &Position, + _id: &position::Id, + ) -> Result<()> { + use position::State::*; + let trading_pair = new_state.phi.pair; + match (prev_state.as_ref().map(|p| p.state), new_state.state) { + // Increment the counter whenever a new position is opened + (None, Opened) => { + let _ = self.increment_position_counter(&trading_pair).await?; + } + // Decrement the counter whenever an opened position is closed + (Some(Opened), Closed) => { + let _ = self.decrement_position_counter(&trading_pair).await?; + } + // Other state transitions don't affect the opened position counter + _ => {} + } + Ok(()) + } +} +impl PositionCounter for T {} + +trait Inner: StateWrite { /// Increment the number of position for a [`TradingPair`]. /// Returns the updated total, or an error if overflow occurred. - async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result { + async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result { let path = engine::counter::num_positions::by_trading_pair(trading_pair); let prev = self.get_position_count_from_key(path).await; @@ -46,7 +78,7 @@ pub(crate) trait PositionCounter: StateWrite { /// Decrement the number of positions for a [`TradingPair`], unless it would underflow. /// Returns the updated total, or an error if underflow occurred. - async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result { + async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result { let path = engine::counter::num_positions::by_trading_pair(trading_pair); let prev = self.get_position_count_from_key(path).await; @@ -57,16 +89,20 @@ pub(crate) trait PositionCounter: StateWrite { Ok(new_total) } } -impl PositionCounter for T {} + +impl Inner for T {} // For some reason, `rust-analyzer` is complaining about used imports. // Silence the warnings until I find a fix. #[allow(unused_imports)] mod tests { - use cnidarium::{StateDelta, TempStorage}; + use cnidarium::{StateDelta, StateWrite, TempStorage}; use penumbra_asset::{asset::REGISTRY, Value}; - use crate::component::position_counter::PositionCounter; + use crate::component::position_manager::counter::{ + Inner, PositionCounter, PositionCounterRead, + }; + use crate::state_key::engine; use crate::TradingPair; #[tokio::test] @@ -78,22 +114,20 @@ mod tests { let storage = TempStorage::new().await?; let mut delta = StateDelta::new(storage.latest_snapshot()); + let path = engine::counter::num_positions::by_trading_pair(&trading_pair); + // Manually set the counter to the maximum value + delta.nonverifiable_put_raw(path.to_vec(), u32::MAX.to_be_bytes().to_vec()); - for i in 0..u16::MAX { - let total = delta.increment_position_counter(&trading_pair).await?; - - anyhow::ensure!( - total == i + 1, - "the total amount should be total={}, found={total}", - i + 1 - ); - } + // Check that the counter is at the maximum value + let total = delta.get_position_count(&trading_pair).await; + assert_eq!(total, u32::MAX); + // Check that we can handle an overflow assert!(delta .increment_position_counter(&trading_pair) .await .is_err()); - assert_eq!(delta.get_position_count(&trading_pair).await, u16::MAX); + assert_eq!(delta.get_position_count(&trading_pair).await, u32::MAX); Ok(()) } @@ -112,7 +146,7 @@ mod tests { assert!(maybe_total.is_err()); let counter = delta.get_position_count(&trading_pair).await; - assert_eq!(counter, 0u16); + assert_eq!(counter, 0u32); Ok(()) } } diff --git a/crates/core/component/dex/src/component/position_manager/inventory_index.rs b/crates/core/component/dex/src/component/position_manager/inventory_index.rs new file mode 100644 index 0000000000..fa74144b19 --- /dev/null +++ b/crates/core/component/dex/src/component/position_manager/inventory_index.rs @@ -0,0 +1,84 @@ +use cnidarium::StateWrite; + +use crate::{ + lp::position::{self, Position}, + state_key::eviction_queue, + DirectedTradingPair, +}; + +use anyhow::Result; +use position::State::*; + +pub(super) trait PositionByInventoryIndex: StateWrite { + fn update_position_by_inventory_index( + &mut self, + prev_state: &Option, + new_state: &Position, + position_id: &position::Id, + ) -> Result<()> { + // Clear an existing record of the position, since changes to the + // reserves or the position state might have invalidated it. + if let Some(prev_lp) = prev_state { + self.deindex_position_by_inventory(prev_lp, position_id); + } + + if matches!(new_state.state, Opened) { + self.index_position_by_inventory(new_state, position_id); + } + + Ok(()) + } +} + +impl PositionByInventoryIndex for T {} + +trait Inner: StateWrite { + fn index_position_by_inventory(&mut self, position: &position::Position, id: &position::Id) { + tracing::debug!("indexing position by inventory"); + let canonical_pair = position.phi.pair; + // A position is bound to an unordered trading pair: A <> B. + // We want to index the position by inventory for each direction: + // A -> B + let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); + let inventory_a = position + .reserves_for(pair_ab.start) + .expect("the directed trading pair is correct"); + let key_ab = eviction_queue::inventory_index::key(&pair_ab, inventory_a, id).to_vec(); + self.nonverifiable_put_raw(key_ab, vec![]); + + // B -> A + let pair_ba = pair_ab.flip(); + let inventory_b = position + .reserves_for(pair_ba.start) + .expect("the directed trading pair is correct"); + let key_ba = eviction_queue::inventory_index::key(&pair_ba, inventory_b, id).to_vec(); + self.nonverifiable_put_raw(key_ba, vec![]); + } + + fn deindex_position_by_inventory( + &mut self, + prev_position: &position::Position, + id: &position::Id, + ) { + let canonical_pair = prev_position.phi.pair; + + // To deindex the position, we need to reconstruct the tuple of keys + // that correspond to each direction of the trading pair: + // A -> B + let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2()); + let inventory_a = prev_position + .reserves_for(pair_ab.start) + .expect("the directed trading pair is correct"); + let key_ab = eviction_queue::inventory_index::key(&pair_ab, inventory_a, id).to_vec(); + self.nonverifiable_delete(key_ab); + + // B -> A + let pair_ba = pair_ab.flip(); + let inventory_b = prev_position + .reserves_for(pair_ba.start) + .expect("the directed trading pair is correct"); + let key_ba = eviction_queue::inventory_index::key(&pair_ba, inventory_b, id).to_vec(); + self.nonverifiable_delete(key_ba); + } +} +impl Inner for T {} diff --git a/crates/core/component/dex/src/component/position_manager/price_index.rs b/crates/core/component/dex/src/component/position_manager/price_index.rs new file mode 100644 index 0000000000..26d6be2b48 --- /dev/null +++ b/crates/core/component/dex/src/component/position_manager/price_index.rs @@ -0,0 +1,77 @@ +use cnidarium::StateWrite; + +use crate::{ + lp::position::{self, Position}, + state_key::engine, + DirectedTradingPair, +}; + +use anyhow::Result; +use position::State::*; + +pub(crate) trait PositionByPriceIndex: StateWrite { + fn update_position_by_price_index( + &mut self, + prev_state: &Option, + new_state: &Position, + position_id: &position::Id, + ) -> Result<()> { + // Clear an existing record for the position, since changes to the + // reserves or the position state might have invalidated it. + if let Some(prev_lp) = prev_state { + self.deindex_position_by_price(prev_lp, position_id); + } + + if matches!(new_state.state, Opened) { + self.index_position_by_price(new_state, position_id); + } + + Ok(()) + } + + fn deindex_position_by_price(&mut self, position: &Position, id: &position::Id) { + tracing::debug!("deindexing position"); + let pair12 = DirectedTradingPair { + start: position.phi.pair.asset_1(), + end: position.phi.pair.asset_2(), + }; + let phi12 = position.phi.component.clone(); + let pair21 = DirectedTradingPair { + start: position.phi.pair.asset_2(), + end: position.phi.pair.asset_1(), + }; + let phi21 = position.phi.component.flip(); + self.nonverifiable_delete(engine::price_index::key(&pair12, &phi12, &id)); + self.nonverifiable_delete(engine::price_index::key(&pair21, &phi21, &id)); + } +} +impl PositionByPriceIndex for T {} + +trait Inner: StateWrite { + fn index_position_by_price(&mut self, position: &position::Position, id: &position::Id) { + let (pair, phi) = (position.phi.pair, &position.phi); + if position.reserves.r2 != 0u64.into() { + // Index this position for trades FROM asset 1 TO asset 2, since the position has asset 2 to give out. + let pair12 = DirectedTradingPair { + start: pair.asset_1(), + end: pair.asset_2(), + }; + let phi12 = phi.component.clone(); + self.nonverifiable_put_raw(engine::price_index::key(&pair12, &phi12, &id), vec![]); + tracing::debug!("indexing position for 1=>2 trades"); + } + + if position.reserves.r1 != 0u64.into() { + // Index this position for trades FROM asset 2 TO asset 1, since the position has asset 1 to give out. + let pair21 = DirectedTradingPair { + start: pair.asset_2(), + end: pair.asset_1(), + }; + let phi21 = phi.component.flip(); + self.nonverifiable_put_raw(engine::price_index::key(&pair21, &phi21, &id), vec![]); + tracing::debug!("indexing position for 2=>1 trades"); + } + } +} + +impl Inner for T {} diff --git a/crates/core/component/dex/src/component/router/path.rs b/crates/core/component/dex/src/component/router/path.rs index a9fd283590..bcf2b28ba4 100644 --- a/crates/core/component/dex/src/component/router/path.rs +++ b/crates/core/component/dex/src/component/router/path.rs @@ -73,7 +73,7 @@ impl Path { }; // Deindex the position we "consumed" in this and all descendant state forks, // ensuring we don't double-count liquidity while traversing cycles. - use super::super::position_manager::Inner as _; + use crate::component::position_manager::price_index::PositionByPriceIndex; self.state .deindex_position_by_price(&best_price_position, &best_price_position.id()); diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 4b2646c3dd..6b302fd46e 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -103,52 +103,61 @@ pub(crate) mod engine { } } - /// Find assets with liquidity positions from asset `from`, ordered by price. pub(crate) mod routable_assets { use penumbra_asset::asset; use penumbra_num::Amount; use super::*; - /// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`. - /// `a_from_b` represents the amount of `A` that can be bought with `B`. - /// The prefix query includes only the `A` portion, meaning the keys will be returned in order of liquidity. - pub(crate) fn prefix(from: &asset::Id) -> [u8; 39] { + // An ordered encoding of every asset `B` routable from `A` based on the + // aggregate liquidity available to route from `B` to `A` (aka. the base liquidity). + // + /// # Encoding + /// The prefix key is encoded as `domain || A`. + pub(crate) fn starting_from(from: &asset::Id) -> [u8; 39] { let mut key = [0u8; 39]; key[0..7].copy_from_slice(b"dex/ra/"); - key[7..7 + 32].copy_from_slice(&from.to_bytes()); + key[7..39].copy_from_slice(&from.to_bytes()); key } - /// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`. - /// `a_from_b` represents the amount of `A` that can be bought with `B`. + /// A record that an asset `A` is routable to an asset `B` and contains the + /// aggregate liquidity available to route from `B` to `A` (aka. the base liquidity). + /// + /// # Encoding + /// The full key is encoded as: `prefix || BE(aggregate_base_liquidity)` pub(crate) fn key(from: &asset::Id, a_from_b: Amount) -> [u8; 55] { let mut key = [0u8; 55]; key[0..7].copy_from_slice(b"dex/ra/"); - key[7..32 + 7].copy_from_slice(&from.to_bytes()); - key[32 + 7..32 + 7 + 16].copy_from_slice(&a_from_b.to_be_bytes()); + key[7..39].copy_from_slice(&from.to_bytes()); + key[39..55].copy_from_slice(&a_from_b.to_be_bytes()); key } - /// `(A, B) => A_from_B` this will encode the current amount of `A` tradable into `B` for every directly routable trading pair. - /// This index can be used to determine the key values for the [`super::key`] ordered index to perform updates efficiently. - pub(crate) fn a_from_b(pair: &DirectedTradingPair) -> [u8; 71] { + /// A lookup index used to reconstruct and update the primary index entries. + /// It maps a directed trading pair `A -> B` to the aggregate liquidity available + /// to route from `B` to `A` (aka. the base asset liquidity). + /// + /// # Encoding + /// The lookup key is encoded as `prefix_lookup || start_asset|| end_asset`. + pub(crate) fn lookup_base_liquidity_by_pair(pair: &DirectedTradingPair) -> [u8; 71] { let mut key = [0u8; 71]; key[0..7].copy_from_slice(b"dex/ab/"); - key[7..7 + 32].copy_from_slice(&pair.start.to_bytes()); - key[7 + 32..7 + 32 + 32].copy_from_slice(&pair.end.to_bytes()); + key[7..39].copy_from_slice(&pair.start.to_bytes()); + key[39..71].copy_from_slice(&pair.end.to_bytes()); key } } pub(crate) mod price_index { + use super::*; pub(crate) fn prefix(pair: &DirectedTradingPair) -> [u8; 71] { let mut key = [0u8; 71]; key[0..7].copy_from_slice(b"dex/pi/"); - key[7..7 + 32].copy_from_slice(&pair.start.to_bytes()); - key[7 + 32..7 + 32 + 32].copy_from_slice(&pair.end.to_bytes()); + key[7..39].copy_from_slice(&pair.start.to_bytes()); + key[39..71].copy_from_slice(&pair.end.to_bytes()); key }