Skip to content

Commit

Permalink
Improve update_position API and position execution events. (#4121)
Browse files Browse the repository at this point in the history
  • Loading branch information
hdevalence authored Mar 27, 2024
1 parent fe0202b commit 9c3bc98
Show file tree
Hide file tree
Showing 14 changed files with 218 additions and 85 deletions.
Binary file modified crates/cnidarium/src/gen/proto_descriptor.bin.no_lfs
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ mod tests {
let id = buy_1.id();

let position = buy_1;
state_tx.index_position_by_price(&position);
state_tx.index_position_by_price(&position, &position.id());
state_tx
.update_available_liquidity(&position, &None)
.await
Expand Down
3 changes: 2 additions & 1 deletion crates/core/component/dex/src/component/dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,8 @@ impl Component for Dex {
Arc::get_mut(state)
.expect("state should be uniquely referenced after batch swaps complete")
.close_queued_positions()
.await;
.await
.expect("closing queued positions should not fail");
}

#[instrument(name = "dex", skip(_state))]
Expand Down
178 changes: 113 additions & 65 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,6 @@ pub trait PositionRead: StateRead {
self.get(&state_key::position_by_id(id)).await
}

async fn check_position_id_unused(&self, id: &position::Id) -> Result<()> {
match self.get_raw(&state_key::position_by_id(id)).await? {
Some(_) => Err(anyhow::anyhow!("position id {:?} already used", id)),
None => Ok(()),
}
}

async fn best_position(
&self,
pair: &DirectedTradingPair,
Expand Down Expand Up @@ -140,19 +133,37 @@ impl<T: StateRead + ?Sized> PositionRead for T {}
#[async_trait]
pub trait PositionManager: StateWrite + PositionRead {
/// Close a position by id, removing it from the state.
///
/// If the position is already closed, this is a no-op.
///
/// # Errors
///
/// Returns an error if the position does not exist.
async fn close_position_by_id(&mut self, id: &position::Id) -> Result<()> {
tracing::debug!(?id, "closing position, first fetch it");
let mut position = self
let prev_state = self
.position_by_id(id)
.await
.expect("fetching position should not fail")
.ok_or_else(|| anyhow::anyhow!("position not found"))?;
.ok_or_else(|| anyhow::anyhow!("could not find position {} to close", id))?;

anyhow::ensure!(
matches!(
prev_state.state,
position::State::Opened | position::State::Closed,
),
"attempted to close a position with state {:?}, expected Opened or Closed",
prev_state.state
);

let new_state = {
let mut new_state = prev_state.clone();
new_state.state = position::State::Closed;
new_state
};

self.update_position(Some(prev_state), new_state).await?;

tracing::debug!(?id, "position found, close it");
position.state = position::State::Closed;
self.update_position(position).await?;
Ok(())
}

Expand All @@ -164,18 +175,13 @@ pub trait PositionManager: StateWrite + PositionRead {
}

/// Close all positions that have been queued for closure.
async fn close_queued_positions(&mut self) -> () {
async fn close_queued_positions(&mut self) -> Result<()> {
let to_close = self.pending_position_closures();
for id in to_close {
match self.close_position_by_id(&id).await {
Ok(()) => tracing::debug!(?id, "position closed"),
// The position was already closed, which in and of itself is not an error.
// It's possible that the position was closed by the engine, for example
// because it was a limit-order.
Err(e) => tracing::debug!(?id, "failed to close position: {}", e),
}
self.close_position_by_id(&id).await?;
}
self.object_delete(state_key::pending_position_closures());
Ok(())
}

/// Opens a new position, updating all necessary indexes and checking for
Expand All @@ -188,38 +194,79 @@ pub trait PositionManager: StateWrite + PositionRead {
}

// Validate that the position ID doesn't collide
self.check_position_id_unused(&position.id()).await?;
if let Some(existing) = self.position_by_id(&position.id()).await? {
anyhow::bail!(
"attempted to open a position with ID {}, which already exists with state {:?}",
position.id(),
existing
);
}

// Credit the DEX for the inflows from this position.
self.vcb_credit(position.reserves_1()).await?;
self.vcb_credit(position.reserves_2()).await?;

// Finally, record the new position state.
self.record_proto(event::position_open(&position));
self.update_position(position).await?;
self.update_position(None, position).await?;

Ok(())
}

/// Record execution against an opened position.
///
/// The `context` parameter records the global context of the path in which
/// the position execution happened. This may be completely different than
/// the trading pair of the position itself, and is used to link the
/// micro-scale execution (processed by this method) with the macro-scale
/// context (a swap or arbitrage).
#[tracing::instrument(level = "debug", skip_all)]
async fn position_execution(&mut self, mut position: Position) -> Result<()> {
async fn position_execution(
&mut self,
mut new_state: Position,
context: DirectedTradingPair,
) -> Result<()> {
let prev_state = self
.position_by_id(&new_state.id())
.await?
.ok_or_else(|| anyhow::anyhow!("withdrew from unknown position {}", new_state.id()))?;

anyhow::ensure!(
matches!(&prev_state.state, position::State::Opened),
"attempted to execute against a position with state {:?}, expected Opened",
prev_state.state
);
anyhow::ensure!(
matches!(&new_state.state, position::State::Opened),
"supplied post-execution state {:?}, expected Opened",
prev_state.state
);

// Handle "close-on-fill": automatically flip the position state to "closed" if
// either of the reserves are zero.
if position.close_on_fill {
if position.reserves.r1 == 0u64.into() || position.reserves.r2 == 0u64.into() {
if new_state.close_on_fill {
if new_state.reserves.r1 == 0u64.into() || new_state.reserves.r2 == 0u64.into() {
tracing::debug!(
id = ?position.id(),
r1 = ?position.reserves.r1,
r2 = ?position.reserves.r2,
id = ?new_state.id(),
r1 = ?new_state.reserves.r1,
r2 = ?new_state.reserves.r2,
"marking position as closed due to close-on-fill"
);
position.state = position::State::Closed;
new_state.state = position::State::Closed;
}
}

self.record_proto(event::position_execution(&position));
self.update_position(position).await?;
// Optimization: it's possible that the position's reserves haven't
// changed, and that we're about to do a no-op update. This can happen
// when saving a frontier, for instance, since the FillRoute code saves
// the entire frontier when it finishes.
//
// If so, skip the write, but more importantly, skip emitting an event,
// so tooling doesn't get confused about a no-op execution.
if prev_state != new_state {
self.record_proto(event::position_execution(&prev_state, &new_state, context));
self.update_position(Some(prev_state), new_state).await?;
}

Ok(())
}
Expand All @@ -233,7 +280,7 @@ pub trait PositionManager: StateWrite + PositionRead {
position_id: position::Id,
sequence: u64,
) -> Result<Balance> {
let mut metadata = self
let prev_state = self
.position_by_id(&position_id)
.await?
.ok_or_else(|| anyhow::anyhow!("withdrew from unknown position {}", position_id))?;
Expand All @@ -246,17 +293,17 @@ pub trait PositionManager: StateWrite + PositionRead {
// This is just a check that sequence == current_sequence + 1, with extra logic
// so that we treat "closed" as "sequence -1".
if sequence == 0 {
if metadata.state != position::State::Closed {
if prev_state.state != position::State::Closed {
anyhow::bail!(
"attempted to withdraw position {} with state {}, expected Closed",
position_id,
metadata.state
prev_state.state
);
}
} else {
if let position::State::Withdrawn {
sequence: current_sequence,
} = metadata.state
} = prev_state.state
{
if current_sequence + 1 != sequence {
anyhow::bail!(
Expand All @@ -270,34 +317,34 @@ pub trait PositionManager: StateWrite + PositionRead {
anyhow::bail!(
"attempted to withdraw position {} with state {}, expected Withdrawn",
position_id,
metadata.state
prev_state.state
);
}
}

// Record an event prior to updating the position state, so we have access to
// the current reserves.
self.record_proto(event::position_withdraw(position_id, &metadata));
self.record_proto(event::position_withdraw(position_id, &prev_state));

// Grab a copy of the final reserves of the position to return to the caller.
let reserves = metadata.reserves.balance(&metadata.phi.pair);
let reserves = prev_state.reserves.balance(&prev_state.phi.pair);

// Debit the DEX for the outflows from this position.
// TODO: in a future PR, split current PositionManager to PositionManagerInner
// and fold this into a position open method
self.vcb_debit(metadata.reserves_1()).await?;
self.vcb_debit(metadata.reserves_2()).await?;
self.vcb_debit(prev_state.reserves_1()).await?;
self.vcb_debit(prev_state.reserves_2()).await?;

// Finally, update the position. This has two steps:
// - update the state with the correct sequence number;
// - zero out the reserves, to prevent double-withdrawals.
metadata.state = position::State::Withdrawn {
let new_state = {
let mut new_state = prev_state.clone();
// We just checked that the supplied sequence number is incremented by 1 from prev.
sequence,
new_state.state = position::State::Withdrawn { sequence };
new_state.reserves = Reserves::zero();
new_state
};
metadata.reserves = Reserves::zero();

self.update_position(metadata).await?;
self.update_position(Some(prev_state), new_state).await?;

Ok(reserves)
}
Expand All @@ -311,36 +358,38 @@ pub(crate) trait Inner: StateWrite {
///
/// This should be the SOLE ENTRYPOINT for writing positions to the state.
/// All other position changes exposed by the `PositionManager` should run through here.
#[tracing::instrument(level = "debug", skip(self, position), fields(id = ?position.id()))]
async fn update_position(&mut self, position: position::Position) -> Result<()> {
let id = position.id();
tracing::debug!(?position, "fetch position's previous state from storage");
// We pull the position from the state unconditionally, since we will
// always need to update the position's liquidity index.
let prev = self
.position_by_id(&id)
.await
.expect("fetching position should not fail");
#[tracing::instrument(level = "debug", skip_all, fields(id = ?new_state.id()))]
async fn update_position(
&mut self,
prev_state: Option<Position>,
new_state: Position,
) -> Result<()> {
tracing::debug!(?prev_state, ?new_state, "updating position state");

let id = new_state.id();

// Clear any existing indexes of the position, since changes to the
// reserves or the position state might have invalidated them.
self.deindex_position_by_price(&position);
if let Some(prev_state) = prev_state.as_ref() {
self.deindex_position_by_price(&prev_state, &id);
}

// Only index the position's liquidity if it is active.
if position.state == position::State::Opened {
self.index_position_by_price(&position);
if new_state.state == position::State::Opened {
self.index_position_by_price(&new_state, &id);
}

// Update the available liquidity for this position's trading pair.
self.update_available_liquidity(&position, &prev).await?;
// TODO: refactor and streamline this method while implementing eviction.
self.update_available_liquidity(&new_state, &prev_state)
.await?;

self.put(state_key::position_by_id(&id), position);
self.put(state_key::position_by_id(&id), new_state);
Ok(())
}

fn index_position_by_price(&mut self, position: &position::Position) {
fn index_position_by_price(&mut self, position: &position::Position, id: &position::Id) {
let (pair, phi) = (position.phi.pair, &position.phi);
let id = position.id();
if position.reserves.r2 != 0u64.into() {
// Index this position for trades FROM asset 1 TO asset 2, since the position has asset 2 to give out.
let pair12 = DirectedTradingPair {
Expand Down Expand Up @@ -370,8 +419,7 @@ pub(crate) trait Inner: StateWrite {
}
}

fn deindex_position_by_price(&mut self, position: &Position) {
let id = position.id();
fn deindex_position_by_price(&mut self, position: &Position, id: &position::Id) {
tracing::debug!("deindexing position");
let pair12 = DirectedTradingPair {
start: position.phi.pair.asset_1(),
Expand Down
18 changes: 11 additions & 7 deletions crates/core/component/dex/src/component/router/fill_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ use penumbra_num::{
fixpoint::{Error, U128x128},
Amount,
};
use penumbra_proto::StateWriteProto as _;
use tracing::instrument;

use crate::{
component::{metrics, PositionManager, PositionRead},
event,
lp::{
position::{self, Position},
Reserves,
Expand Down Expand Up @@ -411,12 +409,14 @@ impl<S: StateRead + StateWrite> Frontier<S> {
}

async fn save(&mut self) -> Result<()> {
let context = DirectedTradingPair {
start: self.pairs.first().expect("pairs is nonempty").start,
end: self.pairs.last().expect("pairs is nonempty").end,
};
for position in &self.positions {
self.state.position_execution(position.clone()).await?;

// Create an ABCI event signaling that the position was executed against
self.state
.record_proto(event::position_execution(&position));
.position_execution(position.clone(), context.clone())
.await?;
}
Ok(())
}
Expand Down Expand Up @@ -491,8 +491,12 @@ impl<S: StateRead + StateWrite> Frontier<S> {
// discard it, so write its updated reserves before we replace it on the
// frontier. The other positions will be written out either when
// they're fully consumed, or when we finish filling.
let context = DirectedTradingPair {
start: self.pairs.first().expect("pairs is nonempty").start,
end: self.pairs.last().expect("pairs is nonempty").end,
};
self.state
.position_execution(self.positions[index].clone())
.position_execution(self.positions[index].clone(), context)
.await
.expect("writing to storage should not fail");

Expand Down
3 changes: 2 additions & 1 deletion crates/core/component/dex/src/component/router/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ impl<S: StateRead + 'static> Path<S> {
// Deindex the position we "consumed" in this and all descendant state forks,
// ensuring we don't double-count liquidity while traversing cycles.
use super::super::position_manager::Inner as _;
self.state.deindex_position_by_price(&best_price_position);
self.state
.deindex_position_by_price(&best_price_position, &best_price_position.id());

// Compute the effective price of a trade in the direction self.end()=>new_end
let hop_price = best_price_position
Expand Down
Loading

0 comments on commit 9c3bc98

Please sign in to comment.