Skip to content

Commit

Permalink
dex: add an eviction index based on position reserves (#4172)
Browse files Browse the repository at this point in the history
## Describe your changes

This PR implements an inventory index, together with #4167, this PR
works toward #4077 which contain the full eviction mechanism along with
the adequate protocol specification update.


The state key added:

- $\text{I}_{A \rightarrow B} \coloneqq \text{prefix} \Vert BE(R_A)
\Vert \text{id}$
- $\text{I}_{B \rightarrow A} \coloneqq \text{prefix} \Vert BE(R_B)
\Vert \text{id}$

each corresponding to an index of position `Id`s ordered by reserves
(ascending).

I plan to immediately follow-up this PR with a proposal to refactor the
inner `PositionManager` index implementations.

## Checklist before requesting a review

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

> It adds a state key to nonverifiable storage, and absent the actual
eviction mechanism, this isn't actually consensus breaking.
  • Loading branch information
erwanor authored Apr 9, 2024
1 parent 56c67ca commit 1593f12
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 37 deletions.
8 changes: 4 additions & 4 deletions crates/core/component/dex/src/component/position_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::bail;
use async_trait::async_trait;
use cnidarium::StateWrite;

use crate::state_key;
use crate::state_key::engine;
use crate::TradingPair;
use anyhow::Result;

Expand All @@ -11,7 +11,7 @@ pub(crate) trait PositionCounter: StateWrite {
/// Returns the number of position for a [`TradingPair`].
/// If there were no counter initialized for a given pair, this default to zero.
async fn get_position_count(&self, trading_pair: &TradingPair) -> u16 {
let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair);
let path = engine::counter::num_positions::by_trading_pair(trading_pair);
self.get_position_count_from_key(path).await
}

Expand All @@ -34,7 +34,7 @@ pub(crate) trait PositionCounter: StateWrite {
/// Increment the number of position for a [`TradingPair`].
/// Returns the updated total, or an error if overflow occurred.
async fn increment_position_counter(&mut self, trading_pair: &TradingPair) -> Result<u16> {
let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair);
let path = engine::counter::num_positions::by_trading_pair(trading_pair);
let prev = self.get_position_count_from_key(path).await;

let Some(new_total) = prev.checked_add(1) else {
Expand All @@ -47,7 +47,7 @@ pub(crate) trait PositionCounter: StateWrite {
/// Decrement the number of positions for a [`TradingPair`], unless it would underflow.
/// Returns the updated total, or an error if underflow occurred.
async fn decrement_position_counter(&mut self, trading_pair: &TradingPair) -> Result<u16> {
let path = state_key::internal::counter::num_positions::by_trading_pair(trading_pair);
let path = engine::counter::num_positions::by_trading_pair(trading_pair);
let prev = self.get_position_count_from_key(path).await;

let Some(new_total) = prev.checked_sub(1) else {
Expand Down
87 changes: 67 additions & 20 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@ use penumbra_num::Amount;
use penumbra_proto::DomainType;
use penumbra_proto::{StateReadProto, StateWriteProto};

use crate::event;
use crate::lp::position::State;
use crate::lp::Reserves;
use crate::{
component::position_counter::PositionCounter,
component::ValueCircuitBreaker,
lp::position::{self, Position},
state_key, DirectedTradingPair,
state_key::engine,
state_key::eviction_queue,
DirectedTradingPair,
};
use crate::{event, state_key};

const DYNAMIC_ASSET_LIMIT: usize = 10;

Expand All @@ -47,7 +49,7 @@ pub trait PositionRead: StateRead {
&self,
pair: &DirectedTradingPair,
) -> Pin<Box<dyn Stream<Item = Result<position::Id>> + Send + 'static>> {
let prefix = state_key::internal::price_index::prefix(pair);
let prefix = engine::price_index::prefix(pair);
tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for positions by price");
self.nonverifiable_prefix_raw(&prefix)
.map(|entry| match entry {
Expand Down Expand Up @@ -118,7 +120,7 @@ pub trait PositionRead: StateRead {
&self,
from: &asset::Id,
) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send + 'static>> {
let prefix = state_key::internal::routable_assets::prefix(from);
let prefix = engine::routable_assets::prefix(from);
tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for routable assets by liquidity");
self.nonverifiable_prefix_raw(&prefix)
.map(|entry| match entry {
Expand Down Expand Up @@ -400,11 +402,13 @@ pub(crate) trait Inner: StateWrite {
// reserves or the position state might have invalidated them.
if let Some(prev_state) = prev_state.as_ref() {
self.deindex_position_by_price(&prev_state, &id);
self.deindex_position_by_inventory(&prev_state, &id);
}

// Only index the position's liquidity if it is active.
if new_state.state == Opened {
self.index_position_by_price(&new_state, &id);
self.index_position_by_inventory(&new_state, &id);
}

if new_state.state == Closed {
Expand All @@ -428,6 +432,55 @@ pub(crate) trait Inner: StateWrite {
Ok(())
}

// TODO(erwan): break this out into a `position_manager::inventory_index` module.
fn index_position_by_inventory(&mut self, position: &position::Position, id: &position::Id) {
tracing::debug!("indexing position by inventory");
let canonical_pair = position.phi.pair;
// A position is bound to an unordered trading pair: A <> B.
// We want to index the position by inventory for each direction:
// A -> B
let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2());
let inventory_a = position
.reserves_for(pair_ab.start)
.expect("the directed trading pair is correct");
let key_ab = eviction_queue::inventory_index::key(&pair_ab, inventory_a, id).to_vec();
self.nonverifiable_put_raw(key_ab, vec![]);

// B -> A
let pair_ba = pair_ab.flip();
let inventory_b = position
.reserves_for(pair_ba.start)
.expect("the directed trading pair is correct");
let key_ba = eviction_queue::inventory_index::key(&pair_ba, inventory_b, id).to_vec();
self.nonverifiable_put_raw(key_ba, vec![]);
}

fn deindex_position_by_inventory(
&mut self,
prev_position: &position::Position,
id: &position::Id,
) {
let canonical_pair = prev_position.phi.pair;

// To deindex the position, we need to reconstruct the tuple of keys
// that correspond to each direction of the trading pair:
// A -> B
let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2());
let inventory_a = prev_position
.reserves_for(pair_ab.start)
.expect("the directed trading pair is correct");
let key_ab = eviction_queue::inventory_index::key(&pair_ab, inventory_a, id).to_vec();
self.nonverifiable_delete(key_ab);

// B -> A
let pair_ba = pair_ab.flip();
let inventory_b = prev_position
.reserves_for(pair_ba.start)
.expect("the directed trading pair is correct");
let key_ba = eviction_queue::inventory_index::key(&pair_ba, inventory_b, id).to_vec();
self.nonverifiable_delete(key_ba);
}

fn index_position_by_price(&mut self, position: &position::Position, id: &position::Id) {
let (pair, phi) = (position.phi.pair, &position.phi);
if position.reserves.r2 != 0u64.into() {
Expand All @@ -437,10 +490,7 @@ pub(crate) trait Inner: StateWrite {
end: pair.asset_2(),
};
let phi12 = phi.component.clone();
self.nonverifiable_put_raw(
state_key::internal::price_index::key(&pair12, &phi12, &id),
vec![],
);
self.nonverifiable_put_raw(engine::price_index::key(&pair12, &phi12, &id), vec![]);
tracing::debug!("indexing position for 1=>2 trades");
}

Expand All @@ -451,10 +501,7 @@ pub(crate) trait Inner: StateWrite {
end: pair.asset_1(),
};
let phi21 = phi.component.flip();
self.nonverifiable_put_raw(
state_key::internal::price_index::key(&pair21, &phi21, &id),
vec![],
);
self.nonverifiable_put_raw(engine::price_index::key(&pair21, &phi21, &id), vec![]);
tracing::debug!("indexing position for 2=>1 trades");
}
}
Expand All @@ -471,8 +518,8 @@ pub(crate) trait Inner: StateWrite {
end: position.phi.pair.asset_1(),
};
let phi21 = position.phi.component.flip();
self.nonverifiable_delete(state_key::internal::price_index::key(&pair12, &phi12, &id));
self.nonverifiable_delete(state_key::internal::price_index::key(&pair21, &phi21, &id));
self.nonverifiable_delete(engine::price_index::key(&pair12, &phi12, &id));
self.nonverifiable_delete(engine::price_index::key(&pair21, &phi21, &id));
}

/// Updates the nonverifiable liquidity indices given a [`Position`] in the direction specified by the [`DirectedTradingPair`].
Expand All @@ -492,7 +539,7 @@ pub(crate) trait Inner: StateWrite {
// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair))
.nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
Expand Down Expand Up @@ -524,7 +571,7 @@ pub(crate) trait Inner: StateWrite {
// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair))
.nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
Expand Down Expand Up @@ -562,7 +609,7 @@ pub(crate) trait Inner: StateWrite {
// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&state_key::internal::routable_assets::a_from_b(&pair))
.nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
Expand Down Expand Up @@ -597,20 +644,20 @@ pub(crate) trait Inner: StateWrite {
// Delete the existing key for this position if the reserve amount has changed.
if new_a_from_b != current_a_from_b {
self.nonverifiable_delete(
state_key::internal::routable_assets::key(&pair.start, current_a_from_b).to_vec(),
engine::routable_assets::key(&pair.start, current_a_from_b).to_vec(),
);
}

// Write the new key indicating that asset B is routable from asset A with `new_a_from_b` liquidity.
self.nonverifiable_put_raw(
state_key::internal::routable_assets::key(&pair.start, new_a_from_b).to_vec(),
engine::routable_assets::key(&pair.start, new_a_from_b).to_vec(),
pair.end.encode_to_vec(),
);
tracing::debug!(start = ?pair.start, end = ?pair.end, "marking routable from start -> end");

// Write the new lookup index storing `new_a_from_b` for this trading pair.
self.nonverifiable_put_raw(
state_key::internal::routable_assets::a_from_b(&pair).to_vec(),
engine::routable_assets::a_from_b(&pair).to_vec(),
new_a_from_b.to_be_bytes().to_vec(),
);
tracing::debug!(available_liquidity = ?new_a_from_b, ?pair, "marking available liquidity for trading pair");
Expand Down
56 changes: 43 additions & 13 deletions crates/core/component/dex/src/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,20 +79,19 @@ pub fn aggregate_value() -> &'static str {
"dex/aggregate_value"
}

/// Encompasses non-consensus state keys.
pub(crate) mod internal {
pub(crate) mod engine {
use super::*;
use crate::lp::BareTradingFunction;

pub mod counter {
pub mod num_positions {
pub(crate) mod counter {
pub(crate) mod num_positions {
use crate::TradingPair;

pub fn prefix() -> &'static str {
pub(crate) fn prefix() -> &'static str {
"dex/internal/counter/num_positions/"
}

pub fn by_trading_pair(trading_pair: &TradingPair) -> [u8; 99] {
pub(crate) fn by_trading_pair(trading_pair: &TradingPair) -> [u8; 99] {
let mut key = [0u8; 99];
let prefix_bytes = prefix().as_bytes();
let canonical_pair_bytes = trading_pair.to_bytes();
Expand All @@ -103,8 +102,9 @@ pub(crate) mod internal {
}
}
}

/// Find assets with liquidity positions from asset `from`, ordered by price.
pub mod routable_assets {
pub(crate) mod routable_assets {
use penumbra_asset::asset;
use penumbra_num::Amount;

Expand All @@ -113,7 +113,7 @@ pub(crate) mod internal {
/// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`.
/// `a_from_b` represents the amount of `A` that can be bought with `B`.
/// The prefix query includes only the `A` portion, meaning the keys will be returned in order of liquidity.
pub fn prefix(from: &asset::Id) -> [u8; 39] {
pub(crate) fn prefix(from: &asset::Id) -> [u8; 39] {
let mut key = [0u8; 39];
key[0..7].copy_from_slice(b"dex/ra/");
key[7..7 + 32].copy_from_slice(&from.to_bytes());
Expand All @@ -122,7 +122,7 @@ pub(crate) mod internal {

/// `A || be_bytes(A_from_B) => B` this will be an ordered encoding of every asset `B` directly routable to from `A`.
/// `a_from_b` represents the amount of `A` that can be bought with `B`.
pub fn key(from: &asset::Id, a_from_b: Amount) -> [u8; 55] {
pub(crate) fn key(from: &asset::Id, a_from_b: Amount) -> [u8; 55] {
let mut key = [0u8; 55];
key[0..7].copy_from_slice(b"dex/ra/");
key[7..32 + 7].copy_from_slice(&from.to_bytes());
Expand All @@ -132,7 +132,7 @@ pub(crate) mod internal {

/// `(A, B) => A_from_B` this will encode the current amount of `A` tradable into `B` for every directly routable trading pair.
/// This index can be used to determine the key values for the [`super::key`] ordered index to perform updates efficiently.
pub fn a_from_b(pair: &DirectedTradingPair) -> [u8; 71] {
pub(crate) fn a_from_b(pair: &DirectedTradingPair) -> [u8; 71] {
let mut key = [0u8; 71];
key[0..7].copy_from_slice(b"dex/ab/");
key[7..7 + 32].copy_from_slice(&pair.start.to_bytes());
Expand All @@ -141,18 +141,18 @@ pub(crate) mod internal {
}
}

pub mod price_index {
pub(crate) mod price_index {
use super::*;

pub fn prefix(pair: &DirectedTradingPair) -> [u8; 71] {
pub(crate) fn prefix(pair: &DirectedTradingPair) -> [u8; 71] {
let mut key = [0u8; 71];
key[0..7].copy_from_slice(b"dex/pi/");
key[7..7 + 32].copy_from_slice(&pair.start.to_bytes());
key[7 + 32..7 + 32 + 32].copy_from_slice(&pair.end.to_bytes());
key
}

pub fn key(
pub(crate) fn key(
pair: &DirectedTradingPair,
btf: &BareTradingFunction,
id: &position::Id,
Expand All @@ -166,3 +166,33 @@ pub(crate) mod internal {
}
}
}

pub(crate) mod eviction_queue {
pub(crate) mod inventory_index {
use crate::lp::position;
use crate::DirectedTradingPair;
use penumbra_num::Amount;

pub(crate) fn by_trading_pair(pair: &DirectedTradingPair) -> [u8; 107] {
let mut prefix = [0u8; 107];
prefix[0..43].copy_from_slice(b"dex/internal/eviction_queue/inventory_index");
prefix[43..75].copy_from_slice(&pair.start.to_bytes());
prefix[75..107].copy_from_slice(&pair.end.to_bytes());
prefix
}

pub(crate) fn key(
pair: &DirectedTradingPair,
inventory: Amount,
id: &position::Id,
) -> [u8; 155] {
let mut full_key = [0u8; 155];
let prefix = by_trading_pair(pair);
full_key[0..107].copy_from_slice(&prefix);
full_key[107..123].copy_from_slice(&inventory.to_be_bytes());
full_key[123..155].copy_from_slice(&id.0);

full_key
}
}
}

0 comments on commit 1593f12

Please sign in to comment.