From 8ed2ad7f5597798a3e497365ff261dbf51a95b6c Mon Sep 17 00:00:00 2001 From: Chris Czub Date: Tue, 14 May 2024 12:07:44 -0700 Subject: [PATCH] dex: Extend arbitrage routing (#4292) ## Describe your changes This PR extends arbitrage routing to route against assets involved in swaps and positions opened during the current block. ## Issue ticket number and link #4177 ## Checklist before requesting a review - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. --- .../dex/src/component/action_handler/swap.rs | 14 ++++- .../core/component/dex/src/component/dex.rs | 20 +++++-- .../dex/src/component/position_manager.rs | 56 ++++++++++++++++++- .../dex/src/component/router/tests.rs | 3 + crates/core/component/dex/src/state_key.rs | 4 ++ 5 files changed, 88 insertions(+), 9 deletions(-) diff --git a/crates/core/component/dex/src/component/action_handler/swap.rs b/crates/core/component/dex/src/component/action_handler/swap.rs index 74848317fc..86d4591c8c 100644 --- a/crates/core/component/dex/src/component/action_handler/swap.rs +++ b/crates/core/component/dex/src/component/action_handler/swap.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use anyhow::{ensure, Result}; use async_trait::async_trait; use cnidarium::StateWrite; @@ -7,7 +9,9 @@ use penumbra_proto::StateWriteProto; use penumbra_sct::component::source::SourceContext; use crate::{ - component::{metrics, StateReadExt, StateWriteExt, SwapManager}, + component::{ + metrics, position_manager::PositionManager as _, StateReadExt, StateWriteExt, SwapManager, + }, event, swap::{proof::SwapProofPublic, Swap}, }; @@ -65,6 +69,14 @@ impl ActionHandler for Swap { .add_swap_payload(self.body.payload.clone(), source) .await; + // Mark the assets for the swap's trading pair as accessed during this block. + let fixed_candidates = Arc::new(dex_params.fixed_candidates.clone()); + state.add_recently_accessed_asset( + swap.body.trading_pair.asset_1(), + fixed_candidates.clone(), + ); + state.add_recently_accessed_asset(swap.body.trading_pair.asset_2(), fixed_candidates); + metrics::histogram!(crate::component::metrics::DEX_SWAP_DURATION) .record(swap_start.elapsed()); state.record_proto(event::swap(self)); diff --git a/crates/core/component/dex/src/component/dex.rs b/crates/core/component/dex/src/component/dex.rs index 2bbd8f57a4..33417c8cf4 100644 --- a/crates/core/component/dex/src/component/dex.rs +++ b/crates/core/component/dex/src/component/dex.rs @@ -17,7 +17,7 @@ use crate::{ use super::{ router::{HandleBatchSwaps, RoutingParams}, - Arbitrage, PositionManager, ValueCircuitBreaker, + Arbitrage, PositionManager, PositionRead as _, ValueCircuitBreaker, }; pub struct Dex {} @@ -83,12 +83,22 @@ impl Component for Dex { // For arbitrage, we extend the path search by 2 hops to allow a path out of the // staking token and back. - // TODO: Build an extended candidate set with: - // - both ends of all trading pairs for which there were swaps in the block - // - both ends of all trading pairs for which positions were opened + // Extend the fixed candidate set to include recently accessed assets, to have + // more arbitrage execution against newly opened positions. + let fixed_candidates = Arc::new( + routing_params + .fixed_candidates + .iter() + .cloned() + // The set of recently accessed assets is already limited to avoid + // potentially blowing up routing time. + .chain(state.recently_accessed_assets().iter().cloned()) + .collect::>(), + ); + let arb_routing_params = RoutingParams { max_hops: routing_params.max_hops + 2, - fixed_candidates: routing_params.fixed_candidates.clone(), + fixed_candidates, price_limit: Some(1u64.into()), }; diff --git a/crates/core/component/dex/src/component/position_manager.rs b/crates/core/component/dex/src/component/position_manager.rs index cd3e3e4caa..b807815870 100644 --- a/crates/core/component/dex/src/component/position_manager.rs +++ b/crates/core/component/dex/src/component/position_manager.rs @@ -11,9 +11,12 @@ use penumbra_asset::{asset, Balance}; use penumbra_proto::DomainType; use penumbra_proto::{StateReadProto, StateWriteProto}; -use crate::component::position_manager::{ - base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex, - price_index::PositionByPriceIndex, +use crate::component::{ + dex::StateReadExt as _, + position_manager::{ + base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex, + price_index::PositionByPriceIndex, + }, }; use crate::lp::Reserves; use crate::{ @@ -26,6 +29,7 @@ use crate::{ use crate::{event, state_key}; const DYNAMIC_ASSET_LIMIT: usize = 10; +const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10; mod base_liquidity_index; mod counter; @@ -142,6 +146,12 @@ pub trait PositionRead: StateRead { }) .boxed() } + + /// Fetch the list of assets interacted with during this block. + fn recently_accessed_assets(&self) -> im::OrdSet { + self.object_get(state_key::recently_accessed_assets()) + .unwrap_or_default() + } } impl PositionRead for T {} @@ -244,6 +254,18 @@ pub trait PositionManager: StateWrite + PositionRead { self.vcb_credit(position.reserves_1()).await?; self.vcb_credit(position.reserves_2()).await?; + // Add the asset IDs from the new position's trading pair + // to the candidate set for this block. + let routing_params = self.routing_params().await?; + self.add_recently_accessed_asset( + position.phi.pair.asset_1(), + routing_params.fixed_candidates.clone(), + ); + self.add_recently_accessed_asset( + position.phi.pair.asset_2(), + routing_params.fixed_candidates, + ); + // Finally, record the new position state. self.record_proto(event::position_open(&position)); self.update_position(None, position).await?; @@ -251,6 +273,34 @@ pub trait PositionManager: StateWrite + PositionRead { Ok(()) } + /// Adds an asset ID to the list of recently accessed assets, + /// making it a candidate for the current block's arbitrage routing. + /// + /// This ensures that assets associated with recently active positions + /// will be eligible for arbitrage if mispriced positions are opened. + #[tracing::instrument(level = "debug", skip_all)] + fn add_recently_accessed_asset( + &mut self, + asset_id: asset::Id, + fixed_candidates: Arc>, + ) { + let mut assets = self.recently_accessed_assets(); + + // Limit the number of recently accessed assets to prevent blowing + // up routing time. + if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT { + return; + } + + // If the asset is already in the fixed candidate list, don't insert it. + if fixed_candidates.contains(&asset_id) { + return; + } + + assets.insert(asset_id); + self.object_put(state_key::recently_accessed_assets(), assets); + } + /// Record execution against an opened position. /// /// IMPORTANT: This method can mutate its input state. diff --git a/crates/core/component/dex/src/component/router/tests.rs b/crates/core/component/dex/src/component/router/tests.rs index 0c26b602f6..80c065845a 100644 --- a/crates/core/component/dex/src/component/router/tests.rs +++ b/crates/core/component/dex/src/component/router/tests.rs @@ -61,6 +61,7 @@ async fn path_search_basic() { async fn path_extension_basic() { let _ = tracing_subscriber::fmt::try_init(); let mut state = StateDelta::new(()); + state.put_dex_params(DexParameters::default()); // Write some test positions with a mispriced gn:pusd pair. create_test_positions_basic(&mut state, true).await; @@ -131,6 +132,8 @@ async fn path_extension_basic() { // Reset the state. let mut state = StateDelta::new(()); + state.put_dex_params(DexParameters::default()); + // Write some test positions without the mispriced position. create_test_positions_basic(&mut state, false).await; diff --git a/crates/core/component/dex/src/state_key.rs b/crates/core/component/dex/src/state_key.rs index 1456de2bb4..21e6a26b6e 100644 --- a/crates/core/component/dex/src/state_key.rs +++ b/crates/core/component/dex/src/state_key.rs @@ -63,6 +63,10 @@ pub fn pending_position_closures() -> &'static str { "dex/pending_position_closures" } +pub fn recently_accessed_assets() -> &'static str { + "dex/recently_accessed_assets" +} + pub fn pending_payloads() -> &'static str { "dex/pending_payloads" }