Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dex: Extend arbitrage routing #4292

Merged
merged 3 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion crates/core/component/dex/src/component/action_handler/swap.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use anyhow::{ensure, Result};
use async_trait::async_trait;
use cnidarium::StateWrite;
Expand All @@ -7,7 +9,9 @@ use penumbra_proto::StateWriteProto;
use penumbra_sct::component::source::SourceContext;

use crate::{
component::{metrics, StateReadExt, StateWriteExt, SwapManager},
component::{
metrics, position_manager::PositionManager as _, StateReadExt, StateWriteExt, SwapManager,
},
event,
swap::{proof::SwapProofPublic, Swap},
};
Expand Down Expand Up @@ -65,6 +69,14 @@ impl ActionHandler for Swap {
.add_swap_payload(self.body.payload.clone(), source)
.await;

// Mark the assets for the swap's trading pair as accessed during this block.
let fixed_candidates = Arc::new(dex_params.fixed_candidates.clone());
state.add_recently_accessed_asset(
swap.body.trading_pair.asset_1(),
fixed_candidates.clone(),
);
state.add_recently_accessed_asset(swap.body.trading_pair.asset_2(), fixed_candidates);

metrics::histogram!(crate::component::metrics::DEX_SWAP_DURATION)
.record(swap_start.elapsed());
state.record_proto(event::swap(self));
Expand Down
20 changes: 15 additions & 5 deletions crates/core/component/dex/src/component/dex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{

use super::{
router::{HandleBatchSwaps, RoutingParams},
Arbitrage, PositionManager, ValueCircuitBreaker,
Arbitrage, PositionManager, PositionRead as _, ValueCircuitBreaker,
};

pub struct Dex {}
Expand Down Expand Up @@ -83,12 +83,22 @@ impl Component for Dex {
// For arbitrage, we extend the path search by 2 hops to allow a path out of the
// staking token and back.

// TODO: Build an extended candidate set with:
// - both ends of all trading pairs for which there were swaps in the block
// - both ends of all trading pairs for which positions were opened
// Extend the fixed candidate set to include recently accessed assets, to have
// more arbitrage execution against newly opened positions.
let fixed_candidates = Arc::new(
routing_params
.fixed_candidates
.iter()
.cloned()
// The set of recently accessed assets is already limited to avoid
// potentially blowing up routing time.
.chain(state.recently_accessed_assets().iter().cloned())
.collect::<Vec<_>>(),
);

let arb_routing_params = RoutingParams {
max_hops: routing_params.max_hops + 2,
fixed_candidates: routing_params.fixed_candidates.clone(),
fixed_candidates,
price_limit: Some(1u64.into()),
};

Expand Down
56 changes: 53 additions & 3 deletions crates/core/component/dex/src/component/position_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@ use penumbra_asset::{asset, Balance};
use penumbra_proto::DomainType;
use penumbra_proto::{StateReadProto, StateWriteProto};

use crate::component::position_manager::{
base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex,
price_index::PositionByPriceIndex,
use crate::component::{
dex::StateReadExt as _,
position_manager::{
base_liquidity_index::AssetByLiquidityIndex, inventory_index::PositionByInventoryIndex,
price_index::PositionByPriceIndex,
},
};
use crate::lp::Reserves;
use crate::{
Expand All @@ -26,6 +29,7 @@ use crate::{
use crate::{event, state_key};

const DYNAMIC_ASSET_LIMIT: usize = 10;
const RECENTLY_ACCESSED_ASSET_LIMIT: usize = 10;

mod base_liquidity_index;
mod counter;
Expand Down Expand Up @@ -142,6 +146,12 @@ pub trait PositionRead: StateRead {
})
.boxed()
}

/// Fetch the list of assets interacted with during this block.
fn recently_accessed_assets(&self) -> im::OrdSet<asset::Id> {
self.object_get(state_key::recently_accessed_assets())
.unwrap_or_default()
}
}
impl<T: StateRead + ?Sized> PositionRead for T {}

Expand Down Expand Up @@ -244,13 +254,53 @@ pub trait PositionManager: StateWrite + PositionRead {
self.vcb_credit(position.reserves_1()).await?;
self.vcb_credit(position.reserves_2()).await?;

// Add the asset IDs from the new position's trading pair
// to the candidate set for this block.
let routing_params = self.routing_params().await?;
self.add_recently_accessed_asset(
position.phi.pair.asset_1(),
routing_params.fixed_candidates.clone(),
);
self.add_recently_accessed_asset(
position.phi.pair.asset_2(),
routing_params.fixed_candidates,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was a minor surprise that this wasn't internalized by add_recently_accessed_asset but I guess we technically save a lookup.

);

// Finally, record the new position state.
self.record_proto(event::position_open(&position));
self.update_position(None, position).await?;

Ok(())
}

/// Adds an asset ID to the list of recently accessed assets,
/// making it a candidate for the current block's arbitrage routing.
///
/// This ensures that assets associated with recently active positions
/// will be eligible for arbitrage if mispriced positions are opened.
#[tracing::instrument(level = "debug", skip_all)]
fn add_recently_accessed_asset(
&mut self,
asset_id: asset::Id,
fixed_candidates: Arc<Vec<asset::Id>>,
) {
let mut assets = self.recently_accessed_assets();

// Limit the number of recently accessed assets to prevent blowing
// up routing time.
if assets.len() >= RECENTLY_ACCESSED_ASSET_LIMIT {
return;
}

// If the asset is already in the fixed candidate list, don't insert it.
if fixed_candidates.contains(&asset_id) {
return;
}

assets.insert(asset_id);
self.object_put(state_key::recently_accessed_assets(), assets);
}

/// Record execution against an opened position.
///
/// IMPORTANT: This method can mutate its input state.
Expand Down
3 changes: 3 additions & 0 deletions crates/core/component/dex/src/component/router/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ async fn path_search_basic() {
async fn path_extension_basic() {
let _ = tracing_subscriber::fmt::try_init();
let mut state = StateDelta::new(());
state.put_dex_params(DexParameters::default());

// Write some test positions with a mispriced gn:pusd pair.
create_test_positions_basic(&mut state, true).await;
Expand Down Expand Up @@ -131,6 +132,8 @@ async fn path_extension_basic() {
// Reset the state.
let mut state = StateDelta::new(());

state.put_dex_params(DexParameters::default());

// Write some test positions without the mispriced position.
create_test_positions_basic(&mut state, false).await;

Expand Down
4 changes: 4 additions & 0 deletions crates/core/component/dex/src/state_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ pub fn pending_position_closures() -> &'static str {
"dex/pending_position_closures"
}

pub fn recently_accessed_assets() -> &'static str {
"dex/recently_accessed_assets"
}

pub fn pending_payloads() -> &'static str {
"dex/pending_payloads"
}
Expand Down
Loading