Skip to content

Commit

Permalink
dex: refactor base liquidity index
Browse files Browse the repository at this point in the history
  • Loading branch information
erwanor committed Apr 11, 2024
1 parent aa0d66b commit 4e150c1
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 168 deletions.
4 changes: 2 additions & 2 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,9 @@ pub trait PositionRead: StateRead {
/// Returns a stream of [`asset::Id`] routable from a given asset, ordered by liquidity.
fn ordered_routable_assets(
&self,
from: &asset::Id,
start: &asset::Id,
) -> Pin<Box<dyn Stream<Item = Result<asset::Id>> + Send + 'static>> {
let prefix = engine::routable_assets::prefix(from);
let prefix = engine::routable_assets::starting_from(start);
tracing::trace!(prefix = ?EscapedByteSlice(&prefix), "searching for routable assets by liquidity");
self.nonverifiable_prefix_raw(&prefix)
.map(|entry| match entry {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,175 +1,167 @@
use anyhow::Result;
use cnidarium::StateWrite;
use penumbra_num::Amount;
use position::State::*;

use crate::lp::position::{self, Position, State};
use crate::state_key::engine;
use crate::DirectedTradingPair;
use penumbra_proto::DomainType;
use penumbra_proto::{StateReadProto, StateWriteProto};

pub(crate) trait AssetByLiquidityIndex: StateWrite {
/// Updates the nonverifiable liquidity indices given a [`Position`] in the direction specified by the [`DirectedTradingPair`].
/// An [`Option<Position>`] may be specified to allow for the case where a position is being updated.
async fn update_liquidity_index(
/// Update the base liquidity index, used by the DEX engine during path search.
///
/// # Overview
/// Given a directed trading pair `A -> B`, the index tracks the amount of
/// liquidity available to convert the quote asset B, into a base asset A.
///
/// # Index schema
/// The liquidity index schema is as follow:
/// - A primary index that maps a "start" asset A (aka. base asset)
/// to an "end" asset B (aka. quote asset) ordered by the amount of
/// liquidity available for B -> A (not a typo).
/// - An auxilliary index that maps a directed trading pair `A -> B`
/// to the aggregate liquidity for B -> A (used in the primary composite key)
///
/// # Diagram
///
/// Liquidity index:
/// For an asset `A`, surface asset
/// `B` with the best liquidity
/// score.
/// ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
///
/// ┌──┐ ▼ ┌─────────┐ │
/// ▲ │ │ ┌──────────────────┐ │ │
/// │ │ ─┼───▶│{asset_A}{agg_liq}│──▶│{asset_B}│ │
/// │ ├──┤ └──────────────────┘ │ │
/// sorted │ │ └─────────┘ │
/// by agg │ │
/// liq ├──┤ │
/// │ │ │ used in the
/// │ ├──┤ composite
/// │ │ │ key
/// │ │ │ Auxiliary look-up index: │
/// │ │ │ "Find the aggregate liquidity
/// │ │ │ per directed trading pair" │
/// │ │ │ ┌───────┐ ┌─────────┐
/// │ │ │ ├───────┤ ┌──────────────────┐ │ │
/// │ │ │ │ ────┼─▶│{asset_A}{asset_B}│────▶│{agg_liq}│
/// │ ├──┤ ├───────┤ └──────────────────┘ │ │
/// │ │ │ ├───────┤ └─────────┘
/// │ │ │ ├───────┤
/// │ │ │ ├───────┤
/// │ ├──┤ └───────┘
/// │ │ │
/// │ │ │
/// │ └──┘
async fn update_asset_by_base_liquidity_index(
&mut self,
pair: DirectedTradingPair,
position: &Position,
prev: &Option<Position>,
prev_state: &Option<Position>,
new_state: &Position,
id: &position::Id,
) -> Result<()> {
tracing::debug!(?pair, "updating available liquidity indices");

let (new_a_from_b, current_a_from_b) = match (position.state, prev) {
(State::Opened, None) => {
// Add the new position's contribution to the index, no cancellation of the previous version necessary.

// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
bytes
.try_into()
.expect("liquidity index amount can always be parsed"),
)
})
.unwrap_or_default();

// Use the new reserves to compute `new_position_contribution`,
// the amount of asset A contributed by the position (i.e. the reserves of asset A).
let new_position_contribution = position
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Compute `new_A_from_B`.
let new_a_from_b =
// Add the contribution from the updated version.
current_a_from_b.saturating_add(&new_position_contribution);

tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, "newly opened position, adding contribution to existing available liquidity for trading pair");

(new_a_from_b, current_a_from_b)
}
(State::Opened, Some(prev)) => {
// Add the new position's contribution to the index, deleting the previous version's contribution.

// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
bytes
.try_into()
.expect("liquidity index amount can always be parsed"),
)
})
.unwrap_or_default();

// Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1).
let prev_position_contribution = prev
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Use the new reserves to compute `new_position_contribution`,
// the amount of asset A contributed by the position (i.e. the reserves of asset A).
let new_position_contribution = position
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Compute `new_A_from_B`.
let new_a_from_b =
// Subtract the previous version of the position's contribution to represent that position no longer
// being correct, and add the contribution from the updated version.
(current_a_from_b.saturating_sub(&prev_position_contribution)).saturating_add(&new_position_contribution);

tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?new_position_contribution, ?prev_position_contribution, "updated position, adding new contribution and subtracting previous contribution to existing available liquidity for trading pair");

(new_a_from_b, current_a_from_b)
}
(State::Closed, Some(prev)) => {
// Compute the previous contribution and erase it from the current index

// Query the current available liquidity for this trading pair, or zero if the trading pair
// has no current liquidity.
let current_a_from_b = self
.nonverifiable_get_raw(&engine::routable_assets::a_from_b(&pair))
.await?
.map(|bytes| {
Amount::from_be_bytes(
bytes
.try_into()
.expect("liquidity index amount can always be parsed"),
)
})
.unwrap_or_default();

// Use the previous reserves to compute `prev_position_contribution` (denominated in asset_1).
let prev_position_contribution = prev
.reserves_for(pair.start)
.expect("specified position should match provided trading pair");

// Compute `new_A_from_B`.
let new_a_from_b =
// Subtract the previous version of the position's contribution to represent that position no longer
// being correct, and since the updated version is Closed, it has no contribution.
current_a_from_b.saturating_sub(&prev_position_contribution);

tracing::debug!(?pair, current_liquidity = ?current_a_from_b, ?prev_position_contribution, "closed position, subtracting previous contribution to existing available liquidity for trading pair");

(new_a_from_b, current_a_from_b)
}
(State::Withdrawn { .. }, _) | (State::Closed, None) => {
// The position already went through the `Closed` state or was opened in the `Closed` state, so its contribution has already been subtracted.
return Ok(());
}
};

// Delete the existing key for this position if the reserve amount has changed.
if new_a_from_b != current_a_from_b {
self.nonverifiable_delete(
engine::routable_assets::key(&pair.start, current_a_from_b).to_vec(),
);
match prev_state {
Some(prev_state) => match (prev_state.state, new_state.state) {
// We only want to update the index when we process active positions.
(Opened, Closed) => {}
(Opened, Opened) => {}
_ => return Ok(()),
},
None => {}
}

// Write the new key indicating that asset B is routable from asset A with `new_a_from_b` liquidity.
self.nonverifiable_put_raw(
engine::routable_assets::key(&pair.start, new_a_from_b).to_vec(),
pair.end.encode_to_vec(),
);
tracing::debug!(start = ?pair.start, end = ?pair.end, "marking routable from start -> end");
let canonical_pair = new_state.phi.pair;
let pair_ab = DirectedTradingPair::new(canonical_pair.asset_1(), canonical_pair.asset_2());

// Write the new lookup index storing `new_a_from_b` for this trading pair.
self.nonverifiable_put_raw(
engine::routable_assets::a_from_b(&pair).to_vec(),
new_a_from_b.to_be_bytes().to_vec(),
);
tracing::debug!(available_liquidity = ?new_a_from_b, ?pair, "marking available liquidity for trading pair");
let (prev_a, prev_b) = prev_state
.as_ref()
.map(|p| {
(
p.reserves_for(pair_ab.start).expect("asset ids match"),
p.reserves_for(pair_ab.end).expect("asset ids match"),
)
})
.unwrap_or_else(|| (Amount::zero(), Amount::zero()));

// A -> B
self.update_asset_by_base_liquidity_index_inner(id, pair_ab, prev_a, new_state)
.await?;
// B -> A
self.update_asset_by_base_liquidity_index_inner(id, pair_ab.flip(), prev_b, new_state)
.await?;

Ok(())
}
}

async fn update_asset_by_base_liquidity_index(
impl<T: StateWrite + ?Sized> AssetByLiquidityIndex for T {}

trait Inner: StateWrite {
async fn update_asset_by_base_liquidity_index_inner(
&mut self,
prev_state: &Option<Position>,
new_state: &Position,
_id: &position::Id,
id: &position::Id,
pair: DirectedTradingPair,
old_contrib: Amount,
new_position: &Position,
) -> Result<()> {
// Since swaps may be performed in either direction, the available liquidity indices
// need to be calculated and stored for both the A -> B and B -> A directions.
let (a, b) = (new_state.phi.pair.asset_1(), new_state.phi.pair.asset_2());
let aggregate_key = &engine::routable_assets::lookup_base_liquidity_by_pair(&pair);

let prev_tally: Amount = self
.nonverifiable_get(aggregate_key)
.await?
.unwrap_or_default();

// The previous contribution for this position is supplied to us by
// the caller. This default to zero if the position was just created.
// We use this to compute a view of the tally that excludes the position
// we are currently processing (and avoid double-counting).
let old_contrib = old_contrib;

// The updated contribution is the total amount of base asset routable
// from an adjacent asset.
let new_contrib = new_position
.reserves_for(pair.start)
.expect("asset ids should match");

let new_tally = match new_position.state {
State::Opened => prev_tally
.saturating_sub(&old_contrib)
.saturating_add(&new_contrib),
State::Closed => prev_tally.saturating_sub(&old_contrib),
_ => unreachable!("inner impl is guarded"),
};

// A -> B
self.update_liquidity_index(DirectedTradingPair::new(a, b), new_state, prev_state)
.await?;
// B -> A
self.update_liquidity_index(DirectedTradingPair::new(b, a), new_state, prev_state)
.await?;
// If the update operation is a no-op, we can skip the update
// and return early.
if prev_tally == new_tally {
tracing::debug!(
?prev_tally,
?pair,
?id,
"skipping routable asset index update"
);
return Ok(());
}

// Update the primary and auxiliary indices:
let old_primary_key = engine::routable_assets::key(&pair.start, prev_tally).to_vec();
// This could make the `StateDelta` more expensive to scan, but this doesn't show on profiles yet.
self.nonverifiable_delete(old_primary_key);

let new_primary_key = engine::routable_assets::key(&pair.start, new_tally).to_vec();
self.nonverifiable_put(new_primary_key, pair.end);
tracing::debug!(?pair, ?new_tally, "base liquidity entry updated");

let auxiliary_key = engine::routable_assets::lookup_base_liquidity_by_pair(&pair).to_vec();
self.nonverifiable_put(auxiliary_key, new_tally);
tracing::debug!(
?pair,
"base liquidity heuristic marked directed pair as routable"
);

Ok(())
}
}

impl<T: StateWrite + ?Sized> AssetByLiquidityIndex for T {}
impl<T: AssetByLiquidityIndex + ?Sized> Inner for T {}
Loading

0 comments on commit 4e150c1

Please sign in to comment.