From 32963ba799cff772f96c4cd3e9c7e02ad164e71a Mon Sep 17 00:00:00 2001 From: Henry de Valence Date: Wed, 4 Dec 2024 01:58:39 +0900 Subject: [PATCH] Add pindexer support for position timelines in the DEX explorer (#4946) ## Describe your changes These changes index the states of individual positions, allowing the dex explorer to render position timelines, allowing users to inspect their position history. This also updates sqlx along the way. Once these changes are merged, we'll need to trigger a reindex on the dev indexer. ## Checklist before requesting a review - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > indexer changes only --------- Co-authored-by: Lucas Meier --- Cargo.lock | 183 ++++++--- Cargo.toml | 4 +- crates/bin/pindexer/src/dex_ex/mod.rs | 369 +++++++++++++++++- crates/bin/pindexer/src/dex_ex/schema.sql | 102 +++++ crates/core/asset/src/asset/id.rs | 37 ++ .../cometindex/src/indexer/indexing_state.rs | 6 +- 6 files changed, 629 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 93ffb9ec40..6da9503696 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,7 +50,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", - "getrandom", "once_cell", "version_check", "zerocopy", @@ -571,7 +570,7 @@ dependencies = [ "js-sys", "lazy_static", "log", - "rustls", + "rustls 0.21.12", "thiserror", "wasm-bindgen", "wasm-bindgen-futures", @@ -703,8 +702,8 @@ dependencies = [ "http-body", "hyper", "pin-project-lite", - "rustls", - "rustls-pemfile", + "rustls 0.21.12", + "rustls-pemfile 1.0.4", "tokio", "tokio-rustls", "tower-service", @@ -761,6 +760,19 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf9ff0bbfd639f15c74af777d81383cf53efb7c93613f6cab67c6c11e05bbf8b" +[[package]] +name = "bigdecimal" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f850665a0385e070b64c38d2354e6c104c8479c59868d1e48a0c13ee2c7a1c1" +dependencies = [ + "autocfg", + "libm", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "bincode" version = "1.3.3" @@ -1067,13 +1079,13 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.101" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac367972e516d45567c7eafc73d24e1c193dcf200a8d94e9db7b3d38b349572d" +checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc" dependencies = [ "jobserver", "libc", - "once_cell", + "shlex", ] [[package]] @@ -2075,9 +2087,9 @@ dependencies = [ [[package]] name = "fallible-iterator" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" [[package]] name = "fallible-streaming-iterator" @@ -2358,7 +2370,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35bd3cf68c183738046838e300353e4716c674dc5e56890de4826801a6622a28" dependencies = [ "futures-io", - "rustls", + "rustls 0.21.12", ] [[package]] @@ -2573,9 +2585,9 @@ dependencies = [ [[package]] name = "hashlink" -version = "0.8.4" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" dependencies = [ "hashbrown 0.14.5", ] @@ -2642,9 +2654,6 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -dependencies = [ - "unicode-segmentation", -] [[package]] name = "heck" @@ -2785,7 +2794,7 @@ dependencies = [ "futures-util", "http", "hyper", - "rustls", + "rustls 0.21.12", "tokio", "tokio-rustls", ] @@ -3522,9 +3531,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.26.0" +version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc22eff61b133b115c6e8c74e818c628d6d5e7a502afea6f64dee076dd94326" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" dependencies = [ "cc", "pkg-config", @@ -4689,7 +4698,7 @@ dependencies = [ "anyhow", "axum-server", "futures", - "rustls", + "rustls 0.21.12", "rustls-acme", "tracing", ] @@ -6386,9 +6395,9 @@ dependencies = [ [[package]] name = "r2d2_sqlite" -version = "0.22.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99f31323d6161385f385046738df520e0e8694fa74852d35891fc0be08348ddc" +checksum = "eb14dba8247a6a15b7fdbc7d389e2e6f03ee9f184f87117706d509c092dfe846" dependencies = [ "r2d2", "rusqlite", @@ -6620,9 +6629,9 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls", + "rustls 0.21.12", "rustls-native-certs", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -6758,9 +6767,9 @@ dependencies = [ [[package]] name = "rusqlite" -version = "0.29.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "549b9d036d571d42e6e85d1c1425e2ac83491075078ca9a15be021c56b1641f2" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" dependencies = [ "bitflags 2.6.0", "fallible-iterator", @@ -6841,10 +6850,24 @@ checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring 0.17.8", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.23.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1" +dependencies = [ + "once_cell", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki 0.102.8", + "subtle", + "zeroize", +] + [[package]] name = "rustls-acme" version = "0.7.7" @@ -6865,7 +6888,7 @@ dependencies = [ "pem", "rcgen", "ring 0.16.20", - "rustls", + "rustls 0.21.12", "serde", "serde_json", "thiserror", @@ -6882,7 +6905,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "schannel", "security-framework", ] @@ -6896,6 +6919,21 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -6906,6 +6944,17 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring 0.17.8", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.17" @@ -7346,6 +7395,9 @@ name = "smallvec" version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +dependencies = [ + "serde", +] [[package]] name = "socket2" @@ -7404,9 +7456,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e50c216e3624ec8e7ecd14c6a6a6370aad6ee5d8cfc3ab30b5162eeeef2ed33" +checksum = "93334716a037193fac19df402f8571269c84a00852f6a7066b5d2616dcd64d3e" dependencies = [ "sqlx-core", "sqlx-macros", @@ -7417,25 +7469,25 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d6753e460c998bbd4cd8c6f0ed9a64346fcca0723d6e75e52fdc351c5d2169d" +checksum = "d4d8060b456358185f7d50c55d9b5066ad956956fddec42ee2e8567134a8936e" dependencies = [ - "ahash", "atoi", + "bigdecimal", "byteorder", "bytes", "chrono", "crc", "crossbeam-queue", - "dotenvy", "either", - "event-listener 2.5.3", + "event-listener 5.3.1", "futures-channel", "futures-core", "futures-intrusive", "futures-io", "futures-util", + "hashbrown 0.14.5", "hashlink", "hex", "indexmap 2.2.6", @@ -7444,8 +7496,8 @@ dependencies = [ "once_cell", "paste", "percent-encoding", - "rustls", - "rustls-pemfile", + "rustls 0.23.19", + "rustls-pemfile 2.2.0", "serde", "serde_json", "sha2 0.10.8", @@ -7456,31 +7508,31 @@ dependencies = [ "tokio-stream", "tracing", "url", - "webpki-roots 0.24.0", + "webpki-roots 0.26.7", ] [[package]] name = "sqlx-macros" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a793bb3ba331ec8359c1853bd39eed32cdd7baaf22c35ccf5c92a7e8d1189ec" +checksum = "cac0692bcc9de3b073e8d747391827297e075c7710ff6276d9f7a1f3d58c6657" dependencies = [ "proc-macro2 1.0.86", "quote 1.0.36", "sqlx-core", "sqlx-macros-core", - "syn 1.0.109", + "syn 2.0.68", ] [[package]] name = "sqlx-macros-core" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a4ee1e104e00dedb6aa5ffdd1343107b0a4702e862a84320ee7cc74782d96fc" +checksum = "1804e8a7c7865599c9c79be146dc8a9fd8cc86935fa641d3ea58e5f0688abaa5" dependencies = [ "dotenvy", "either", - "heck 0.4.1", + "heck 0.5.0", "hex", "once_cell", "proc-macro2 1.0.86", @@ -7492,7 +7544,7 @@ dependencies = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", - "syn 1.0.109", + "syn 2.0.68", "tempfile", "tokio", "url", @@ -7500,12 +7552,13 @@ dependencies = [ [[package]] name = "sqlx-mysql" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db" +checksum = "64bb4714269afa44aef2755150a0fc19d756fb580a67db8885608cf02f47d06a" dependencies = [ "atoi", - "base64 0.21.7", + "base64 0.22.1", + "bigdecimal", "bitflags 2.6.0", "byteorder", "bytes", @@ -7543,12 +7596,13 @@ dependencies = [ [[package]] name = "sqlx-postgres" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624" +checksum = "6fa91a732d854c5d7726349bb4bb879bb9478993ceb764247660aee25f67c2f8" dependencies = [ "atoi", - "base64 0.21.7", + "base64 0.22.1", + "bigdecimal", "bitflags 2.6.0", "byteorder", "chrono", @@ -7567,11 +7621,11 @@ dependencies = [ "log", "md-5", "memchr", + "num-bigint", "once_cell", "rand", "serde", "serde_json", - "sha1", "sha2 0.10.8", "smallvec", "sqlx-core", @@ -7583,9 +7637,9 @@ dependencies = [ [[package]] name = "sqlx-sqlite" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59dc83cf45d89c555a577694534fcd1b55c545a816c816ce51f20bbe56a4f3f" +checksum = "d5b2cf34a45953bfd3daaf3db0f7a7878ab9b7a6b91b422d24a7a9e4c857b680" dependencies = [ "atoi", "chrono", @@ -7599,6 +7653,7 @@ dependencies = [ "log", "percent-encoding", "serde", + "serde_urlencoded", "sqlx-core", "tracing", "url", @@ -8182,7 +8237,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.12", "tokio", ] @@ -8302,8 +8357,8 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "rustls", - "rustls-pemfile", + "rustls 0.21.12", + "rustls-pemfile 1.0.4", "tokio", "tokio-rustls", "tokio-stream", @@ -8815,18 +8870,18 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.24.0" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b291546d5d9d1eab74f069c77749f2cb8504a12caa20f0f2de93ddbf6f411888" -dependencies = [ - "rustls-webpki", -] +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" [[package]] name = "webpki-roots" -version = "0.25.4" +version = "0.26.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +checksum = "5d642ff16b7e79272ae451b7322067cdc17cadf68c23264be9d94a32319efe7e" +dependencies = [ + "rustls-pki-types", +] [[package]] name = "whoami" diff --git a/Cargo.toml b/Cargo.toml index e2ba1a4492..f0f03e4844 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -206,7 +206,7 @@ proptest-derive = { version = "0.3" } prost = { version = "0.12.3" } prost-types = { version = "0.12" } r2d2 = { version = "0.8" } -r2d2_sqlite = { version = "0.22" } +r2d2_sqlite = { version = "0.25" } rand = { version = "0.8.5" } rand_chacha = { version = "0.3.1" } rand_core = { version = "0.6.4" } @@ -217,7 +217,7 @@ serde_json = { version = "1.0.96" } serde_unit_struct = { version = "0.1" } serde_with = { version = "3.5.1" } sha2 = { version = "0.10" } -sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls"] } +sqlx = { version = "0.8", features = ["bigdecimal", "postgres", "runtime-tokio", "tls-rustls"] } tap = "1.0.1" tempfile = { version = "3.3.0" } tendermint = { default-features = false, version = "0.34.0" } diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index c1ad731233..4868fc3e36 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -5,16 +5,22 @@ use cometindex::{ AppView, PgTransaction, }; use penumbra_asset::asset; +use penumbra_dex::lp::position::{Id as PositionId, Position}; use penumbra_dex::{ event::{ - EventCandlestickData, EventPositionExecution, EventPositionOpen, EventPositionWithdraw, + EventCandlestickData, EventPositionClose, EventPositionExecution, EventPositionOpen, + EventPositionWithdraw, EventQueuePositionClose, }, lp::Reserves, DirectedTradingPair, TradingPair, }; +use penumbra_num::Amount; use penumbra_proto::event::EventDomainType; +use penumbra_proto::DomainType; use penumbra_sct::event::EventBlockRoot; -use std::collections::{HashMap, HashSet}; +use sqlx::types::BigDecimal; +use sqlx::Row; +use std::collections::{BTreeMap, HashMap, HashSet}; type DateTime = sqlx::types::chrono::DateTime; @@ -648,16 +654,37 @@ struct PairMetrics { #[derive(Debug)] struct Events { time: Option, + height: i32, candles: HashMap, metrics: HashMap, + // Relevant positions. + positions: BTreeMap, + // Store events + position_opens: Vec, + position_executions: Vec, + position_closes: Vec, + position_withdrawals: Vec, + // Track transaction hashes by position ID + position_open_txs: BTreeMap, + position_close_txs: BTreeMap, + position_withdrawal_txs: BTreeMap, } impl Events { fn new() -> Self { Self { time: None, + height: 0, candles: HashMap::new(), metrics: HashMap::new(), + positions: BTreeMap::new(), + position_opens: Vec::new(), + position_executions: Vec::new(), + position_closes: Vec::new(), + position_withdrawals: Vec::new(), + position_open_txs: BTreeMap::new(), + position_close_txs: BTreeMap::new(), + position_withdrawal_txs: BTreeMap::new(), } } @@ -731,6 +758,8 @@ impl Events { pub fn extract(block: &BlockEvents) -> anyhow::Result { let mut out = Self::new(); + out.height = block.height as i32; + for event in &block.events { if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { let candle = Candle::from_candlestick_data(&e.stick); @@ -751,6 +780,14 @@ impl Events { }, false, ); + if let Some(tx_hash) = event.tx_hash { + out.position_open_txs.insert(e.position_id, tx_hash); + } + // A newly opened position might be executed against in this block, + // but wouldn't already be in the database. Adding it here ensures + // it's available. + out.positions.insert(e.position_id, e.position.clone()); + out.position_opens.push(e); } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) { // TODO: use close positions to track liquidity more precisely, in practic I (ck) expect few // positions to close with being withdrawn. @@ -763,6 +800,10 @@ impl Events { }, true, ); + if let Some(tx_hash) = event.tx_hash { + out.position_withdrawal_txs.insert(e.position_id, tx_hash); + } + out.position_withdrawals.push(e); } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) { out.with_reserve_change( &e.trading_pair, @@ -788,10 +829,57 @@ impl Events { end: e.trading_pair.asset_1(), }); } + out.position_executions.push(e); + } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) { + out.position_closes.push(e); + } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) { + // The position close event is emitted by the dex module at EOB, + // so we need to track it with the tx hash of the closure tx. + if let Some(tx_hash) = event.tx_hash { + out.position_close_txs.insert(e.position_id, tx_hash); + } } } Ok(out) } + + async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> { + // Collect position IDs that we need but don't already have + let missing_positions: Vec<_> = self + .position_executions + .iter() + .map(|e| e.position_id) + .filter(|id| !self.positions.contains_key(id)) + .collect(); + + if missing_positions.is_empty() { + return Ok(()); + } + + // Load missing positions from database + let rows = sqlx::query( + "SELECT position_raw + FROM dex_ex_position_state + WHERE position_id = ANY($1)", + ) + .bind( + &missing_positions + .iter() + .map(|id| id.0.as_ref()) + .collect::>(), + ) + .fetch_all(dbtx.as_mut()) + .await?; + + // Decode and store each position + for row in rows { + let position_raw: Vec = row.get("position_raw"); + let position = Position::decode(position_raw.as_slice())?; + self.positions.insert(position.id(), position); + } + + Ok(()) + } } #[derive(Debug)] @@ -807,6 +895,248 @@ impl Component { min_liquidity, } } + + async fn record_position_open( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + tx_hash: Option<[u8; 32]>, + event: &EventPositionOpen, + ) -> anyhow::Result<()> { + // Get effective prices by orienting the trading function in each direction + let effective_price_1_to_2: f64 = event + .position + .phi + .orient_start(event.trading_pair.asset_1()) + .expect("position trading pair matches") + .effective_price() + .into(); + + let effective_price_2_to_1: f64 = event + .position + .phi + .orient_start(event.trading_pair.asset_2()) + .expect("position trading pair matches") + .effective_price() + .into(); + + // First insert initial reserves and get the rowid + let opening_reserves_rowid = sqlx::query_scalar::<_, i32>( + "INSERT INTO dex_ex_position_reserves ( + position_id, + height, + time, + reserves_1, + reserves_2 + ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(BigDecimal::from(event.reserves_1.value())) + .bind(BigDecimal::from(event.reserves_2.value())) + .fetch_one(dbtx.as_mut()) + .await?; + + // Then insert position state with the opening_reserves_rowid + sqlx::query( + "INSERT INTO dex_ex_position_state ( + position_id, + asset_1, + asset_2, + p, + q, + close_on_fill, + fee_bps, + effective_price_1_to_2, + effective_price_2_to_1, + position_raw, + opening_time, + opening_height, + opening_tx, + opening_reserves_rowid + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", + ) + .bind(event.position_id.0) + .bind(event.trading_pair.asset_1().to_bytes()) + .bind(event.trading_pair.asset_2().to_bytes()) + .bind(BigDecimal::from(event.position.phi.component.p.value())) + .bind(BigDecimal::from(event.position.phi.component.q.value())) + .bind(event.position.close_on_fill) + .bind(event.trading_fee as i32) + .bind(effective_price_1_to_2) + .bind(effective_price_2_to_1) + .bind(event.position.encode_to_vec()) + .bind(time) + .bind(height) + .bind(tx_hash.map(|h| h.as_ref().to_vec())) + .bind(opening_reserves_rowid) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + async fn record_position_execution( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + event: &EventPositionExecution, + positions: &BTreeMap, + ) -> anyhow::Result<()> { + // Get the position that was executed against + let position = positions + .get(&event.position_id) + .expect("position must exist for execution"); + + // Determine trade direction and compute deltas + let (delta_1, delta_2, lambda_1, lambda_2) = if event.reserves_1 > event.prev_reserves_1 { + // Asset 1 was input + let delta_1 = event.reserves_1 - event.prev_reserves_1; + let lambda_2 = event.prev_reserves_2 - event.reserves_2; + (delta_1, Amount::zero(), Amount::zero(), lambda_2) + } else { + // Asset 2 was input + let delta_2 = event.reserves_2 - event.prev_reserves_2; + let lambda_1 = event.prev_reserves_1 - event.reserves_1; + (Amount::zero(), delta_2, lambda_1, Amount::zero()) + }; + + // Compute fees directly from input amounts using u128 arithmetic + let fee_bps = position.phi.component.fee as u128; + let fee_1 = (delta_1.value() * fee_bps) / 10_000u128; + let fee_2 = (delta_2.value() * fee_bps) / 10_000u128; + + // First insert the reserves and get the rowid + let reserves_rowid = sqlx::query_scalar::<_, i32>( + "INSERT INTO dex_ex_position_reserves ( + position_id, + height, + time, + reserves_1, + reserves_2 + ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(BigDecimal::from(event.reserves_1.value())) + .bind(BigDecimal::from(event.reserves_2.value())) + .fetch_one(dbtx.as_mut()) + .await?; + + // Then record the execution with the reserves_rowid + sqlx::query( + "INSERT INTO dex_ex_position_executions ( + position_id, + height, + time, + reserves_rowid, + delta_1, + delta_2, + lambda_1, + lambda_2, + fee_1, + fee_2, + context_asset_start, + context_asset_end + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(reserves_rowid) + .bind(BigDecimal::from(delta_1.value())) + .bind(BigDecimal::from(delta_2.value())) + .bind(BigDecimal::from(lambda_1.value())) + .bind(BigDecimal::from(lambda_2.value())) + .bind(BigDecimal::from(fee_1)) + .bind(BigDecimal::from(fee_2)) + .bind(event.context.start.to_bytes()) + .bind(event.context.end.to_bytes()) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + async fn record_position_close( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + tx_hash: Option<[u8; 32]>, + event: &EventPositionClose, + ) -> anyhow::Result<()> { + sqlx::query( + "UPDATE dex_ex_position_state + SET closing_time = $1, + closing_height = $2, + closing_tx = $3 + WHERE position_id = $4", + ) + .bind(time) + .bind(height) + .bind(tx_hash.map(|h| h.as_ref().to_vec())) + .bind(event.position_id.0) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + async fn record_position_withdraw( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + tx_hash: Option<[u8; 32]>, + event: &EventPositionWithdraw, + ) -> anyhow::Result<()> { + // First insert the final reserves state (zeros after withdrawal) + let reserves_rowid = sqlx::query_scalar::<_, i32>( + "INSERT INTO dex_ex_position_reserves ( + position_id, + height, + time, + reserves_1, + reserves_2 + ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", // Using $4 twice for zero values + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(BigDecimal::from(0)) // Both reserves become zero after withdrawal + .fetch_one(dbtx.as_mut()) + .await?; + + sqlx::query( + "INSERT INTO dex_ex_position_withdrawals ( + position_id, + height, + time, + withdrawal_tx, + sequence, + reserves_1, + reserves_2, + reserves_rowid + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(tx_hash.map(|h| h.as_ref().to_vec())) + .bind(event.sequence as i32) + .bind(BigDecimal::from(event.reserves_1.value())) + .bind(BigDecimal::from(event.reserves_2.value())) + .bind(reserves_rowid) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } } #[async_trait] @@ -836,12 +1166,45 @@ impl AppView for Component { let mut snapshots = HashMap::new(); let mut last_time = None; for block in batch.by_height.iter() { - let events = Events::extract(&block)?; + let mut events = Events::extract(&block)?; let time = events .time .expect(&format!("no block root event at height {}", block.height)); last_time = Some(time); + // Load any missing positions before processing events + events.load_positions(dbtx).await?; + + // Record position opens + for event in &events.position_opens { + let tx_hash = events.position_open_txs.get(&event.position_id).copied(); + self.record_position_open(dbtx, time, events.height, tx_hash, event) + .await?; + } + + // Process position executions + for event in &events.position_executions { + self.record_position_execution(dbtx, time, events.height, event, &events.positions) + .await?; + } + + // Record position closes + for event in &events.position_closes { + let tx_hash = events.position_close_txs.get(&event.position_id).copied(); + self.record_position_close(dbtx, time, events.height, tx_hash, event) + .await?; + } + + // Record position withdrawals + for event in &events.position_withdrawals { + let tx_hash = events + .position_withdrawal_txs + .get(&event.position_id) + .copied(); + self.record_position_withdraw(dbtx, time, events.height, tx_hash, event) + .await?; + } + for (pair, candle) in &events.candles { for window in Window::all() { let key = (pair.start, pair.end, window); diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index ddbf108887..de284b5e98 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -82,3 +82,105 @@ CREATE TABLE IF NOT EXISTS dex_ex_metadata ( -- The asset id to use for prices in places such as the aggregate summary. quote_asset_id BYTEA NOT NULL ); + +CREATE TABLE IF NOT EXISTS dex_ex_position_state ( + -- Call this rowid to distinguish it from the position ID. + rowid SERIAL PRIMARY KEY, + -- Immutable position data, defining the trading function. + position_id BYTEA NOT NULL UNIQUE, + asset_1 BYTEA NOT NULL, + asset_2 BYTEA NOT NULL, + p NUMERIC(39) NOT NULL, + q NUMERIC(39) NOT NULL, + close_on_fill BOOLEAN NOT NULL, + fee_bps INTEGER NOT NULL, + effective_price_1_to_2 FLOAT8 NOT NULL, + effective_price_2_to_1 FLOAT8 NOT NULL, + position_raw BYTEA NOT NULL, + -- The time and height at which the position was opened, and its initial reserves. + opening_time TIMESTAMPTZ NOT NULL, + opening_height INTEGER NOT NULL, + opening_tx BYTEA, + opening_reserves_rowid INTEGER NOT NULL, + -- The time and height at which the position was closed, if it was closed. + closing_time TIMESTAMPTZ, + closing_height INTEGER, + closing_tx BYTEA +); + +CREATE INDEX ON dex_ex_position_state (position_id); +CREATE INDEX ON dex_ex_position_state (opening_tx); + +CREATE TABLE IF NOT EXISTS dex_ex_position_reserves ( + rowid SERIAL PRIMARY KEY, + position_id BYTEA NOT NULL, + height INTEGER NOT NULL, + time TIMESTAMPTZ NOT NULL, + reserves_1 NUMERIC(39) NOT NULL, + reserves_2 NUMERIC(39) NOT NULL +); + +CREATE INDEX ON dex_ex_position_reserves (position_id, height, rowid); + +CREATE TABLE IF NOT EXISTS dex_ex_position_executions ( + rowid SERIAL PRIMARY KEY, + position_id BYTEA NOT NULL, + height INTEGER NOT NULL, + time TIMESTAMPTZ NOT NULL, + reserves_rowid INTEGER NOT NULL, + -- The input amount of asset 1. + delta_1 NUMERIC(39) NOT NULL, + -- The input amount of asset 2. + delta_2 NUMERIC(39) NOT NULL, + -- The output amount of asset 1. + lambda_1 NUMERIC(39) NOT NULL, + -- The output amount of asset 2. + lambda_2 NUMERIC(39) NOT NULL, + -- The fee amount paid in asset 1. + fee_1 NUMERIC(39) NOT NULL, + -- The fee amount paid in asset 2. + fee_2 NUMERIC(39) NOT NULL, + -- The context the execution happened in + context_asset_start BYTEA NOT NULL, + context_asset_end BYTEA NOT NULL +); + +CREATE INDEX ON dex_ex_position_executions (height); +CREATE INDEX ON dex_ex_position_executions (position_id, height, rowid); + +CREATE TABLE IF NOT EXISTS dex_ex_position_withdrawals ( + rowid SERIAL PRIMARY KEY, + position_id BYTEA NOT NULL, + height INTEGER NOT NULL, + time TIMESTAMPTZ NOT NULL, + withdrawal_tx BYTEA, + sequence INTEGER NOT NULL, + reserves_rowid INTEGER NOT NULL, + -- The amount of asset 1 withdrawn. + reserves_1 NUMERIC(39) NOT NULL, + -- The amount of asset 2 withdrawn. + reserves_2 NUMERIC(39) NOT NULL +); + +CREATE INDEX ON dex_ex_position_withdrawals (height); +CREATE INDEX ON dex_ex_position_withdrawals (position_id, height); + +ALTER TABLE dex_ex_position_executions + ADD CONSTRAINT fk_position_executions + FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id); + +ALTER TABLE dex_ex_position_withdrawals + ADD CONSTRAINT fk_position_withdrawals + FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id); + +ALTER TABLE dex_ex_position_executions + ADD CONSTRAINT fk_position_executions_reserves + FOREIGN KEY (reserves_rowid) REFERENCES dex_ex_position_reserves(rowid); + +ALTER TABLE dex_ex_position_state + ADD CONSTRAINT fk_position_state_reserves + FOREIGN KEY (opening_reserves_rowid) REFERENCES dex_ex_position_reserves(rowid); + +ALTER TABLE dex_ex_position_withdrawals + ADD CONSTRAINT fk_position_withdrawals_reserves + FOREIGN KEY (reserves_rowid) REFERENCES dex_ex_position_reserves(rowid); diff --git a/crates/core/asset/src/asset/id.rs b/crates/core/asset/src/asset/id.rs index 1a5417c2e2..f02e63b270 100644 --- a/crates/core/asset/src/asset/id.rs +++ b/crates/core/asset/src/asset/id.rs @@ -172,6 +172,8 @@ impl Id { #[cfg(test)] mod tests { use super::*; + use hex; + use serde_json; use std::str::FromStr; #[test] @@ -205,4 +207,39 @@ mod tests { assert_eq!(id4, id); assert_eq!(id5, id); } + + #[test] + fn hex_to_bech32() { + let hex_strings = [ + "cc0d3c9eef0c7ff4e225eca85a3094603691d289aeaf428ab0d87319ad93a302", // USDY + "a7a339f42e671b2db1de226d4483d3e63036661cad1554d75f5f76fe04ec1e00", // SHITMOS + "29ea9c2f3371f6a487e7e95c247041f4a356f983eb064e5d2b3bcf322ca96a10", // UM + "76b3e4b10681358c123b381f90638476b7789040e47802de879f0fb3eedc8d0b", // USDC + "2923a0a87b3a2421f165cc853dbf73a9bdafb5da0d948564b6059cb0217c4407", // OSMO + "07ef660132a4c3235fab272d43d9b9752a8337b2d108597abffaff5f246d0f0f", // ATOM + "5314b33eecfd5ca2e99c0b6d1e0ccafe3d2dd581c952d814fb64fdf51f85c411", // TIA + "516108d0d0bba3f76e1f982d0a7cde118833307b03c0cd4ccb94e882b53c1f0f", // WBTC + "414e723f74bd987c02ccbc997585ed52b196e2ffe75b3793aa68cc2996626910", // allBTC + "bf8b035dda339b6cda8f221e79773b0fd871f27a472920f84c4aa2b4f98a700d", // allUSDT + ]; + + for hex in hex_strings { + let bytes = hex::decode(hex).expect("valid hex string"); + let bytes_array: [u8; 32] = bytes.try_into().expect("hex is 32 bytes"); + + let id = Id::try_from(bytes_array).expect("valid asset ID bytes"); + let bech32_str = id.to_string(); + + println!("Asset ID for {}:", hex); + println!(" Bech32: {}", bech32_str); + + // Print Proto JSON encoding + let proto: pb::AssetId = id.into(); + println!(" Proto JSON: {}\n", serde_json::to_string(&proto).unwrap()); + + // Convert back to verify roundtrip + let id_decoded = Id::from_str(&bech32_str).expect("valid bech32 string"); + assert_eq!(id, id_decoded); + } + } } diff --git a/crates/util/cometindex/src/indexer/indexing_state.rs b/crates/util/cometindex/src/indexer/indexing_state.rs index 23f75a2c8c..fa1c4e9048 100644 --- a/crates/util/cometindex/src/indexer/indexing_state.rs +++ b/crates/util/cometindex/src/indexer/indexing_state.rs @@ -68,7 +68,7 @@ impl TryFrom for Height { impl<'r> sqlx::Decode<'r, Postgres> for Height { fn decode( - value: >::ValueRef, + value: ::ValueRef<'r>, ) -> Result { Ok(Height::try_from( >::decode(value)?, @@ -85,8 +85,8 @@ impl sqlx::Type for Height { impl<'q> sqlx::Encode<'q, Postgres> for Height { fn encode_by_ref( &self, - buf: &mut >::ArgumentBuffer, - ) -> sqlx::encode::IsNull { + buf: &mut ::ArgumentBuffer<'q>, + ) -> Result { >::encode( i64::try_from(self.0).expect("height should never exceed i64::MAX"), buf,