Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dex: track total number of positions per pair #4167

Merged
merged 6 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::{
impl ActionHandler for PositionOpen {
type CheckStatelessContext = ();
async fn check_stateless(&self, _context: ()) -> Result<()> {
// TODO(chris, erwan, henry): brainstorm safety on `TradingFunction`.
// Check:
// + reserves are at most 80 bits wide,
// + the trading function coefficients are at most 80 bits wide.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ mod tests {
let position = buy_1;
state_tx.index_position_by_price(&position, &position.id());
state_tx
.update_available_liquidity(&position, &None)
.update_available_liquidity(&None, &position)
.await
.expect("able to update liquidity");
state_tx.put(state_key::position_by_id(&id), position);
Expand Down
2 changes: 1 addition & 1 deletion crates/core/component/dex/src/component/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod arb;
pub(crate) mod circuit_breaker;
mod dex;
mod flow;
pub(crate) mod position_counter;
pub(crate) mod position_manager;
mod swap_manager;

Expand All @@ -21,6 +22,5 @@ pub(crate) use circuit_breaker::ValueCircuitBreaker;
pub use dex::{Dex, StateReadExt, StateWriteExt};
pub use position_manager::{PositionManager, PositionRead};
pub use swap_manager::SwapManager;

#[cfg(test)]
pub(crate) mod tests;
118 changes: 118 additions & 0 deletions crates/core/component/dex/src/component/position_counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use anyhow::bail;
use async_trait::async_trait;
use cnidarium::StateWrite;

use crate::state_key;
use crate::TradingPair;
use anyhow::Result;

#[async_trait]
pub(crate) trait PositionCounter: StateWrite {
/// Returns the number of position for a [`TradingPair`].
/// If there were no counter initialized for a given pair, this default to zero.
async fn get_position_count(&self, trading_pair: &TradingPair) -> u16 {
let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair);
self.get_position_count_from_key(path).await
}

async fn get_position_count_from_key(&self, path: [u8; 99]) -> u16 {
let Some(raw_count) = self
.nonverifiable_get_raw(&path)
.await
.expect("no deserialization failure")
else {
return 0;
};

// This is safe because we only increment the counter via a [`Self::increase_position_counter`].
let raw_count: [u8; 2] = raw_count
.try_into()
.expect("position counter is at most two bytes");
u16::from_be_bytes(raw_count)
}

/// Increment the number of position for a [`TradingPair`].
/// Returns the updated total, or an error if overflow occurred.
async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result<u16> {
let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair);
let prev = self.get_position_count_from_key(path).await;

let Some(new_total) = prev.checked_add(1) else {
bail!("incrementing position counter would overflow")
};
self.nonverifiable_put_raw(path.to_vec(), new_total.to_be_bytes().to_vec());
Ok(new_total)
}

/// Decrement the number of positions for a [`TradingPair`], unless it would underflow.
/// Returns the updated total, or an error if underflow occurred.
async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result<u16> {
let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair);
let prev = self.get_position_count_from_key(path).await;

let Some(new_total) = prev.checked_sub(1) else {
bail!("decrementing position counter would underflow")
};
self.nonverifiable_put_raw(path.to_vec(), new_total.to_be_bytes().to_vec());
Ok(new_total)
}
}
impl<T: StateWrite + ?Sized> PositionCounter for T {}

// For some reason, `rust-analyzer` is complaining about used imports.
// Silence the warnings until I find a fix.
#[allow(unused_imports)]
mod tests {
use cnidarium::{StateDelta, TempStorage};
use penumbra_asset::{asset::REGISTRY, Value};

use crate::component::position_counter::PositionCounter;
use crate::TradingPair;

#[tokio::test]
/// Test that we can detect overflows and that they are handled properly: increment is ignored / no crash.
async fn test_no_overflow() -> anyhow::Result<()> {
let asset_a = REGISTRY.parse_denom("upenumbra").unwrap().id();
let asset_b = REGISTRY.parse_denom("pizza").unwrap().id();
let trading_pair = TradingPair::new(asset_a, asset_b);

let storage = TempStorage::new().await?;
let mut delta = StateDelta::new(storage.latest_snapshot());

for i in 0..u16::MAX {
let total = delta.increment_position_counter(&trading_pair).await?;

anyhow::ensure!(
total == i + 1,
"the total amount should be total={}, found={total}",
i + 1
);
}

assert!(delta
.increment_position_counter(&trading_pair)
.await
.is_err());
assert_eq!(delta.get_position_count(&trading_pair).await, u16::MAX);

Ok(())
}

#[tokio::test]
/// Test that we can detect underflow and that they are handled properly: decrement is ignored / no crash.
async fn test_no_underflow() -> anyhow::Result<()> {
let asset_a = REGISTRY.parse_denom("upenumbra").unwrap().id();
let asset_b = REGISTRY.parse_denom("pizza").unwrap().id();
let trading_pair = TradingPair::new(asset_a, asset_b);

let storage = TempStorage::new().await?;
let mut delta = StateDelta::new(storage.latest_snapshot());

let maybe_total = delta.decrement_position_counter(&trading_pair).await;
assert!(maybe_total.is_err());

let counter = delta.get_position_count(&trading_pair).await;
assert_eq!(counter, 0u16);
Ok(())
}
}
46 changes: 43 additions & 3 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use crate::event;
use crate::lp::position::State;
use crate::lp::Reserves;
use crate::{
component::position_counter::PositionCounter,
component::ValueCircuitBreaker,
lp::position::{self, Position},
state_key, DirectedTradingPair,
Expand Down Expand Up @@ -156,6 +157,17 @@ pub trait PositionManager: StateWrite + PositionRead {
prev_state.state
);

// Optimization: skip state update if the position is already closed.
// This can happen if the position was queued for closure and premptively
// closed by the DEX engine during execution (e.g. auto-closing).
if prev_state.state == position::State::Closed {
tracing::debug!(
?id,
"position is already closed so we can skip state updates"
);
return Ok(());
}

let new_state = {
let mut new_state = prev_state.clone();
new_state.state = position::State::Closed;
Expand Down Expand Up @@ -186,6 +198,17 @@ pub trait PositionManager: StateWrite + PositionRead {

/// Opens a new position, updating all necessary indexes and checking for
/// its nonexistence prior to being opened.
///
/// # Errors
/// This method returns an error if the position is malformed
/// e.g. it is set to a state other than `Opened`
/// or, it specifies a position identifier already used by another position.
///
/// An error can also occur if a DEX engine invariant is breached
/// e.g. overflowing the position counter (`u16::MAX`)
/// or, overflowing the value circuit breaker (`u128::MAX`)
///
/// In any of those cases, we do not want to allow a new position to be opened.
#[tracing::instrument(level = "debug", skip_all)]
async fn open_position(&mut self, position: position::Position) -> Result<()> {
// Double-check that the position is in the `Opened` state
Expand All @@ -202,6 +225,9 @@ pub trait PositionManager: StateWrite + PositionRead {
);
}

// Increase the position counter
self.increment_position_counter(&position.phi.pair).await?;

// Credit the DEX for the inflows from this position.
self.vcb_credit(position.reserves_1()).await?;
self.vcb_credit(position.reserves_2()).await?;
Expand Down Expand Up @@ -364,6 +390,8 @@ pub(crate) trait Inner: StateWrite {
prev_state: Option<Position>,
new_state: Position,
) -> Result<()> {
use position::State::*;

tracing::debug!(?prev_state, ?new_state, "updating position state");

let id = new_state.id();
Expand All @@ -375,13 +403,25 @@ pub(crate) trait Inner: StateWrite {
}

// Only index the position's liquidity if it is active.
if new_state.state == position::State::Opened {
if new_state.state == Opened {
self.index_position_by_price(&new_state, &id);
}

if new_state.state == Closed {
// Make sure that we don't double decrement the position
// counter if a position was queued for closure AND closed
// by the DEX engine.
let is_already_closed = prev_state
.as_ref()
.map_or(false, |old_position| old_position.state == Closed);
if !is_already_closed {
self.decrement_position_counter(&new_state.phi.pair).await?;
}
}

// Update the available liquidity for this position's trading pair.
// TODO: refactor and streamline this method while implementing eviction.
self.update_available_liquidity(&new_state, &prev_state)
self.update_available_liquidity(&prev_state, &new_state)
.await?;

self.put(state_key::position_by_id(&id), new_state);
Expand Down Expand Up @@ -580,8 +620,8 @@ pub(crate) trait Inner: StateWrite {

async fn update_available_liquidity(
&mut self,
position: &Position,
prev_position: &Option<Position>,
position: &Position,
) -> Result<()> {
// Since swaps may be performed in either direction, the available liquidity indices
// need to be calculated and stored for both the A -> B and B -> A directions.
Expand Down
19 changes: 19 additions & 0 deletions crates/core/component/dex/src/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@ pub(crate) mod internal {
use super::*;
use crate::lp::BareTradingFunction;

pub mod counter {
pub mod num_positions {
use crate::TradingPair;

pub fn prefix() -> &'static str {
"dex/internal/counter/num_positions/"
}

pub fn by_trading_pair(trading_pair: &TradingPair) -> [u8; 99] {
let mut key = [0u8; 99];
let prefix_bytes = prefix().as_bytes();
let canonical_pair_bytes = trading_pair.to_bytes();

key[0..35].copy_from_slice(prefix_bytes);
key[35..99].copy_from_slice(&canonical_pair_bytes);
key
}
}
}
/// Find assets with liquidity positions from asset `from`, ordered by price.
pub mod routable_assets {
use penumbra_asset::asset;
Expand Down
Loading