diff --git a/Cargo.lock b/Cargo.lock index 93ffb9ec40..6da9503696 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -50,7 +50,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", - "getrandom", "once_cell", "version_check", "zerocopy", @@ -571,7 +570,7 @@ dependencies = [ "js-sys", "lazy_static", "log", - "rustls", + "rustls 0.21.12", "thiserror", "wasm-bindgen", "wasm-bindgen-futures", @@ -703,8 +702,8 @@ dependencies = [ "http-body", "hyper", "pin-project-lite", - "rustls", - "rustls-pemfile", + "rustls 0.21.12", + "rustls-pemfile 1.0.4", "tokio", "tokio-rustls", "tower-service", @@ -761,6 +760,19 @@ version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf9ff0bbfd639f15c74af777d81383cf53efb7c93613f6cab67c6c11e05bbf8b" +[[package]] +name = "bigdecimal" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f850665a0385e070b64c38d2354e6c104c8479c59868d1e48a0c13ee2c7a1c1" +dependencies = [ + "autocfg", + "libm", + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "bincode" version = "1.3.3" @@ -1067,13 +1079,13 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" [[package]] name = "cc" -version = "1.0.101" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac367972e516d45567c7eafc73d24e1c193dcf200a8d94e9db7b3d38b349572d" +checksum = "f34d93e62b03caf570cccc334cbc6c2fceca82f39211051345108adcba3eebdc" dependencies = [ "jobserver", "libc", - "once_cell", + "shlex", ] [[package]] @@ -2075,9 +2087,9 @@ dependencies = [ [[package]] name = "fallible-iterator" -version = "0.2.0" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" +checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" [[package]] name = "fallible-streaming-iterator" @@ -2358,7 +2370,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35bd3cf68c183738046838e300353e4716c674dc5e56890de4826801a6622a28" dependencies = [ "futures-io", - "rustls", + "rustls 0.21.12", ] [[package]] @@ -2573,9 +2585,9 @@ dependencies = [ [[package]] name = "hashlink" -version = "0.8.4" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8094feaf31ff591f651a2664fb9cfd92bba7a60ce3197265e9482ebe753c8f7" +checksum = "6ba4ff7128dee98c7dc9794b6a411377e1404dba1c97deb8d1a55297bd25d8af" dependencies = [ "hashbrown 0.14.5", ] @@ -2642,9 +2654,6 @@ name = "heck" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" -dependencies = [ - "unicode-segmentation", -] [[package]] name = "heck" @@ -2785,7 +2794,7 @@ dependencies = [ "futures-util", "http", "hyper", - "rustls", + "rustls 0.21.12", "tokio", "tokio-rustls", ] @@ -3522,9 +3531,9 @@ dependencies = [ [[package]] name = "libsqlite3-sys" -version = "0.26.0" +version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afc22eff61b133b115c6e8c74e818c628d6d5e7a502afea6f64dee076dd94326" +checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149" dependencies = [ "cc", "pkg-config", @@ -4689,7 +4698,7 @@ dependencies = [ "anyhow", "axum-server", "futures", - "rustls", + "rustls 0.21.12", "rustls-acme", "tracing", ] @@ -6386,9 +6395,9 @@ dependencies = [ [[package]] name = "r2d2_sqlite" -version = "0.22.0" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99f31323d6161385f385046738df520e0e8694fa74852d35891fc0be08348ddc" +checksum = "eb14dba8247a6a15b7fdbc7d389e2e6f03ee9f184f87117706d509c092dfe846" dependencies = [ "r2d2", "rusqlite", @@ -6620,9 +6629,9 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", - "rustls", + "rustls 0.21.12", "rustls-native-certs", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "serde", "serde_json", "serde_urlencoded", @@ -6758,9 +6767,9 @@ dependencies = [ [[package]] name = "rusqlite" -version = "0.29.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "549b9d036d571d42e6e85d1c1425e2ac83491075078ca9a15be021c56b1641f2" +checksum = "7753b721174eb8ff87a9a0e799e2d7bc3749323e773db92e0984debb00019d6e" dependencies = [ "bitflags 2.6.0", "fallible-iterator", @@ -6841,10 +6850,24 @@ checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring 0.17.8", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.23.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "934b404430bb06b3fae2cba809eb45a1ab1aecd64491213d7c3301b88393f8d1" +dependencies = [ + "once_cell", + "ring 0.17.8", + "rustls-pki-types", + "rustls-webpki 0.102.8", + "subtle", + "zeroize", +] + [[package]] name = "rustls-acme" version = "0.7.7" @@ -6865,7 +6888,7 @@ dependencies = [ "pem", "rcgen", "ring 0.16.20", - "rustls", + "rustls 0.21.12", "serde", "serde_json", "thiserror", @@ -6882,7 +6905,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 1.0.4", "schannel", "security-framework", ] @@ -6896,6 +6919,21 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -6906,6 +6944,17 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring 0.17.8", + "rustls-pki-types", + "untrusted 0.9.0", +] + [[package]] name = "rustversion" version = "1.0.17" @@ -7346,6 +7395,9 @@ name = "smallvec" version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" +dependencies = [ + "serde", +] [[package]] name = "socket2" @@ -7404,9 +7456,9 @@ dependencies = [ [[package]] name = "sqlx" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e50c216e3624ec8e7ecd14c6a6a6370aad6ee5d8cfc3ab30b5162eeeef2ed33" +checksum = "93334716a037193fac19df402f8571269c84a00852f6a7066b5d2616dcd64d3e" dependencies = [ "sqlx-core", "sqlx-macros", @@ -7417,25 +7469,25 @@ dependencies = [ [[package]] name = "sqlx-core" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8d6753e460c998bbd4cd8c6f0ed9a64346fcca0723d6e75e52fdc351c5d2169d" +checksum = "d4d8060b456358185f7d50c55d9b5066ad956956fddec42ee2e8567134a8936e" dependencies = [ - "ahash", "atoi", + "bigdecimal", "byteorder", "bytes", "chrono", "crc", "crossbeam-queue", - "dotenvy", "either", - "event-listener 2.5.3", + "event-listener 5.3.1", "futures-channel", "futures-core", "futures-intrusive", "futures-io", "futures-util", + "hashbrown 0.14.5", "hashlink", "hex", "indexmap 2.2.6", @@ -7444,8 +7496,8 @@ dependencies = [ "once_cell", "paste", "percent-encoding", - "rustls", - "rustls-pemfile", + "rustls 0.23.19", + "rustls-pemfile 2.2.0", "serde", "serde_json", "sha2 0.10.8", @@ -7456,31 +7508,31 @@ dependencies = [ "tokio-stream", "tracing", "url", - "webpki-roots 0.24.0", + "webpki-roots 0.26.7", ] [[package]] name = "sqlx-macros" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a793bb3ba331ec8359c1853bd39eed32cdd7baaf22c35ccf5c92a7e8d1189ec" +checksum = "cac0692bcc9de3b073e8d747391827297e075c7710ff6276d9f7a1f3d58c6657" dependencies = [ "proc-macro2 1.0.86", "quote 1.0.36", "sqlx-core", "sqlx-macros-core", - "syn 1.0.109", + "syn 2.0.68", ] [[package]] name = "sqlx-macros-core" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a4ee1e104e00dedb6aa5ffdd1343107b0a4702e862a84320ee7cc74782d96fc" +checksum = "1804e8a7c7865599c9c79be146dc8a9fd8cc86935fa641d3ea58e5f0688abaa5" dependencies = [ "dotenvy", "either", - "heck 0.4.1", + "heck 0.5.0", "hex", "once_cell", "proc-macro2 1.0.86", @@ -7492,7 +7544,7 @@ dependencies = [ "sqlx-mysql", "sqlx-postgres", "sqlx-sqlite", - "syn 1.0.109", + "syn 2.0.68", "tempfile", "tokio", "url", @@ -7500,12 +7552,13 @@ dependencies = [ [[package]] name = "sqlx-mysql" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "864b869fdf56263f4c95c45483191ea0af340f9f3e3e7b4d57a61c7c87a970db" +checksum = "64bb4714269afa44aef2755150a0fc19d756fb580a67db8885608cf02f47d06a" dependencies = [ "atoi", - "base64 0.21.7", + "base64 0.22.1", + "bigdecimal", "bitflags 2.6.0", "byteorder", "bytes", @@ -7543,12 +7596,13 @@ dependencies = [ [[package]] name = "sqlx-postgres" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb7ae0e6a97fb3ba33b23ac2671a5ce6e3cabe003f451abd5a56e7951d975624" +checksum = "6fa91a732d854c5d7726349bb4bb879bb9478993ceb764247660aee25f67c2f8" dependencies = [ "atoi", - "base64 0.21.7", + "base64 0.22.1", + "bigdecimal", "bitflags 2.6.0", "byteorder", "chrono", @@ -7567,11 +7621,11 @@ dependencies = [ "log", "md-5", "memchr", + "num-bigint", "once_cell", "rand", "serde", "serde_json", - "sha1", "sha2 0.10.8", "smallvec", "sqlx-core", @@ -7583,9 +7637,9 @@ dependencies = [ [[package]] name = "sqlx-sqlite" -version = "0.7.2" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59dc83cf45d89c555a577694534fcd1b55c545a816c816ce51f20bbe56a4f3f" +checksum = "d5b2cf34a45953bfd3daaf3db0f7a7878ab9b7a6b91b422d24a7a9e4c857b680" dependencies = [ "atoi", "chrono", @@ -7599,6 +7653,7 @@ dependencies = [ "log", "percent-encoding", "serde", + "serde_urlencoded", "sqlx-core", "tracing", "url", @@ -8182,7 +8237,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.12", "tokio", ] @@ -8302,8 +8357,8 @@ dependencies = [ "percent-encoding", "pin-project", "prost", - "rustls", - "rustls-pemfile", + "rustls 0.21.12", + "rustls-pemfile 1.0.4", "tokio", "tokio-rustls", "tokio-stream", @@ -8815,18 +8870,18 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.24.0" +version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b291546d5d9d1eab74f069c77749f2cb8504a12caa20f0f2de93ddbf6f411888" -dependencies = [ - "rustls-webpki", -] +checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" [[package]] name = "webpki-roots" -version = "0.25.4" +version = "0.26.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +checksum = "5d642ff16b7e79272ae451b7322067cdc17cadf68c23264be9d94a32319efe7e" +dependencies = [ + "rustls-pki-types", +] [[package]] name = "whoami" diff --git a/Cargo.toml b/Cargo.toml index e2ba1a4492..f0f03e4844 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -206,7 +206,7 @@ proptest-derive = { version = "0.3" } prost = { version = "0.12.3" } prost-types = { version = "0.12" } r2d2 = { version = "0.8" } -r2d2_sqlite = { version = "0.22" } +r2d2_sqlite = { version = "0.25" } rand = { version = "0.8.5" } rand_chacha = { version = "0.3.1" } rand_core = { version = "0.6.4" } @@ -217,7 +217,7 @@ serde_json = { version = "1.0.96" } serde_unit_struct = { version = "0.1" } serde_with = { version = "3.5.1" } sha2 = { version = "0.10" } -sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls"] } +sqlx = { version = "0.8", features = ["bigdecimal", "postgres", "runtime-tokio", "tls-rustls"] } tap = "1.0.1" tempfile = { version = "3.3.0" } tendermint = { default-features = false, version = "0.34.0" } diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index c1ad731233..4868fc3e36 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -5,16 +5,22 @@ use cometindex::{ AppView, PgTransaction, }; use penumbra_asset::asset; +use penumbra_dex::lp::position::{Id as PositionId, Position}; use penumbra_dex::{ event::{ - EventCandlestickData, EventPositionExecution, EventPositionOpen, EventPositionWithdraw, + EventCandlestickData, EventPositionClose, EventPositionExecution, EventPositionOpen, + EventPositionWithdraw, EventQueuePositionClose, }, lp::Reserves, DirectedTradingPair, TradingPair, }; +use penumbra_num::Amount; use penumbra_proto::event::EventDomainType; +use penumbra_proto::DomainType; use penumbra_sct::event::EventBlockRoot; -use std::collections::{HashMap, HashSet}; +use sqlx::types::BigDecimal; +use sqlx::Row; +use std::collections::{BTreeMap, HashMap, HashSet}; type DateTime = sqlx::types::chrono::DateTime; @@ -648,16 +654,37 @@ struct PairMetrics { #[derive(Debug)] struct Events { time: Option, + height: i32, candles: HashMap, metrics: HashMap, + // Relevant positions. + positions: BTreeMap, + // Store events + position_opens: Vec, + position_executions: Vec, + position_closes: Vec, + position_withdrawals: Vec, + // Track transaction hashes by position ID + position_open_txs: BTreeMap, + position_close_txs: BTreeMap, + position_withdrawal_txs: BTreeMap, } impl Events { fn new() -> Self { Self { time: None, + height: 0, candles: HashMap::new(), metrics: HashMap::new(), + positions: BTreeMap::new(), + position_opens: Vec::new(), + position_executions: Vec::new(), + position_closes: Vec::new(), + position_withdrawals: Vec::new(), + position_open_txs: BTreeMap::new(), + position_close_txs: BTreeMap::new(), + position_withdrawal_txs: BTreeMap::new(), } } @@ -731,6 +758,8 @@ impl Events { pub fn extract(block: &BlockEvents) -> anyhow::Result { let mut out = Self::new(); + out.height = block.height as i32; + for event in &block.events { if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { let candle = Candle::from_candlestick_data(&e.stick); @@ -751,6 +780,14 @@ impl Events { }, false, ); + if let Some(tx_hash) = event.tx_hash { + out.position_open_txs.insert(e.position_id, tx_hash); + } + // A newly opened position might be executed against in this block, + // but wouldn't already be in the database. Adding it here ensures + // it's available. + out.positions.insert(e.position_id, e.position.clone()); + out.position_opens.push(e); } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) { // TODO: use close positions to track liquidity more precisely, in practic I (ck) expect few // positions to close with being withdrawn. @@ -763,6 +800,10 @@ impl Events { }, true, ); + if let Some(tx_hash) = event.tx_hash { + out.position_withdrawal_txs.insert(e.position_id, tx_hash); + } + out.position_withdrawals.push(e); } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) { out.with_reserve_change( &e.trading_pair, @@ -788,10 +829,57 @@ impl Events { end: e.trading_pair.asset_1(), }); } + out.position_executions.push(e); + } else if let Ok(e) = EventPositionClose::try_from_event(&event.event) { + out.position_closes.push(e); + } else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) { + // The position close event is emitted by the dex module at EOB, + // so we need to track it with the tx hash of the closure tx. + if let Some(tx_hash) = event.tx_hash { + out.position_close_txs.insert(e.position_id, tx_hash); + } } } Ok(out) } + + async fn load_positions(&mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> { + // Collect position IDs that we need but don't already have + let missing_positions: Vec<_> = self + .position_executions + .iter() + .map(|e| e.position_id) + .filter(|id| !self.positions.contains_key(id)) + .collect(); + + if missing_positions.is_empty() { + return Ok(()); + } + + // Load missing positions from database + let rows = sqlx::query( + "SELECT position_raw + FROM dex_ex_position_state + WHERE position_id = ANY($1)", + ) + .bind( + &missing_positions + .iter() + .map(|id| id.0.as_ref()) + .collect::>(), + ) + .fetch_all(dbtx.as_mut()) + .await?; + + // Decode and store each position + for row in rows { + let position_raw: Vec = row.get("position_raw"); + let position = Position::decode(position_raw.as_slice())?; + self.positions.insert(position.id(), position); + } + + Ok(()) + } } #[derive(Debug)] @@ -807,6 +895,248 @@ impl Component { min_liquidity, } } + + async fn record_position_open( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + tx_hash: Option<[u8; 32]>, + event: &EventPositionOpen, + ) -> anyhow::Result<()> { + // Get effective prices by orienting the trading function in each direction + let effective_price_1_to_2: f64 = event + .position + .phi + .orient_start(event.trading_pair.asset_1()) + .expect("position trading pair matches") + .effective_price() + .into(); + + let effective_price_2_to_1: f64 = event + .position + .phi + .orient_start(event.trading_pair.asset_2()) + .expect("position trading pair matches") + .effective_price() + .into(); + + // First insert initial reserves and get the rowid + let opening_reserves_rowid = sqlx::query_scalar::<_, i32>( + "INSERT INTO dex_ex_position_reserves ( + position_id, + height, + time, + reserves_1, + reserves_2 + ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(BigDecimal::from(event.reserves_1.value())) + .bind(BigDecimal::from(event.reserves_2.value())) + .fetch_one(dbtx.as_mut()) + .await?; + + // Then insert position state with the opening_reserves_rowid + sqlx::query( + "INSERT INTO dex_ex_position_state ( + position_id, + asset_1, + asset_2, + p, + q, + close_on_fill, + fee_bps, + effective_price_1_to_2, + effective_price_2_to_1, + position_raw, + opening_time, + opening_height, + opening_tx, + opening_reserves_rowid + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)", + ) + .bind(event.position_id.0) + .bind(event.trading_pair.asset_1().to_bytes()) + .bind(event.trading_pair.asset_2().to_bytes()) + .bind(BigDecimal::from(event.position.phi.component.p.value())) + .bind(BigDecimal::from(event.position.phi.component.q.value())) + .bind(event.position.close_on_fill) + .bind(event.trading_fee as i32) + .bind(effective_price_1_to_2) + .bind(effective_price_2_to_1) + .bind(event.position.encode_to_vec()) + .bind(time) + .bind(height) + .bind(tx_hash.map(|h| h.as_ref().to_vec())) + .bind(opening_reserves_rowid) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + async fn record_position_execution( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + event: &EventPositionExecution, + positions: &BTreeMap, + ) -> anyhow::Result<()> { + // Get the position that was executed against + let position = positions + .get(&event.position_id) + .expect("position must exist for execution"); + + // Determine trade direction and compute deltas + let (delta_1, delta_2, lambda_1, lambda_2) = if event.reserves_1 > event.prev_reserves_1 { + // Asset 1 was input + let delta_1 = event.reserves_1 - event.prev_reserves_1; + let lambda_2 = event.prev_reserves_2 - event.reserves_2; + (delta_1, Amount::zero(), Amount::zero(), lambda_2) + } else { + // Asset 2 was input + let delta_2 = event.reserves_2 - event.prev_reserves_2; + let lambda_1 = event.prev_reserves_1 - event.reserves_1; + (Amount::zero(), delta_2, lambda_1, Amount::zero()) + }; + + // Compute fees directly from input amounts using u128 arithmetic + let fee_bps = position.phi.component.fee as u128; + let fee_1 = (delta_1.value() * fee_bps) / 10_000u128; + let fee_2 = (delta_2.value() * fee_bps) / 10_000u128; + + // First insert the reserves and get the rowid + let reserves_rowid = sqlx::query_scalar::<_, i32>( + "INSERT INTO dex_ex_position_reserves ( + position_id, + height, + time, + reserves_1, + reserves_2 + ) VALUES ($1, $2, $3, $4, $5) RETURNING rowid", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(BigDecimal::from(event.reserves_1.value())) + .bind(BigDecimal::from(event.reserves_2.value())) + .fetch_one(dbtx.as_mut()) + .await?; + + // Then record the execution with the reserves_rowid + sqlx::query( + "INSERT INTO dex_ex_position_executions ( + position_id, + height, + time, + reserves_rowid, + delta_1, + delta_2, + lambda_1, + lambda_2, + fee_1, + fee_2, + context_asset_start, + context_asset_end + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(reserves_rowid) + .bind(BigDecimal::from(delta_1.value())) + .bind(BigDecimal::from(delta_2.value())) + .bind(BigDecimal::from(lambda_1.value())) + .bind(BigDecimal::from(lambda_2.value())) + .bind(BigDecimal::from(fee_1)) + .bind(BigDecimal::from(fee_2)) + .bind(event.context.start.to_bytes()) + .bind(event.context.end.to_bytes()) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + async fn record_position_close( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + tx_hash: Option<[u8; 32]>, + event: &EventPositionClose, + ) -> anyhow::Result<()> { + sqlx::query( + "UPDATE dex_ex_position_state + SET closing_time = $1, + closing_height = $2, + closing_tx = $3 + WHERE position_id = $4", + ) + .bind(time) + .bind(height) + .bind(tx_hash.map(|h| h.as_ref().to_vec())) + .bind(event.position_id.0) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } + + async fn record_position_withdraw( + &self, + dbtx: &mut PgTransaction<'_>, + time: DateTime, + height: i32, + tx_hash: Option<[u8; 32]>, + event: &EventPositionWithdraw, + ) -> anyhow::Result<()> { + // First insert the final reserves state (zeros after withdrawal) + let reserves_rowid = sqlx::query_scalar::<_, i32>( + "INSERT INTO dex_ex_position_reserves ( + position_id, + height, + time, + reserves_1, + reserves_2 + ) VALUES ($1, $2, $3, $4, $4) RETURNING rowid", // Using $4 twice for zero values + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(BigDecimal::from(0)) // Both reserves become zero after withdrawal + .fetch_one(dbtx.as_mut()) + .await?; + + sqlx::query( + "INSERT INTO dex_ex_position_withdrawals ( + position_id, + height, + time, + withdrawal_tx, + sequence, + reserves_1, + reserves_2, + reserves_rowid + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", + ) + .bind(event.position_id.0) + .bind(height) + .bind(time) + .bind(tx_hash.map(|h| h.as_ref().to_vec())) + .bind(event.sequence as i32) + .bind(BigDecimal::from(event.reserves_1.value())) + .bind(BigDecimal::from(event.reserves_2.value())) + .bind(reserves_rowid) + .execute(dbtx.as_mut()) + .await?; + + Ok(()) + } } #[async_trait] @@ -836,12 +1166,45 @@ impl AppView for Component { let mut snapshots = HashMap::new(); let mut last_time = None; for block in batch.by_height.iter() { - let events = Events::extract(&block)?; + let mut events = Events::extract(&block)?; let time = events .time .expect(&format!("no block root event at height {}", block.height)); last_time = Some(time); + // Load any missing positions before processing events + events.load_positions(dbtx).await?; + + // Record position opens + for event in &events.position_opens { + let tx_hash = events.position_open_txs.get(&event.position_id).copied(); + self.record_position_open(dbtx, time, events.height, tx_hash, event) + .await?; + } + + // Process position executions + for event in &events.position_executions { + self.record_position_execution(dbtx, time, events.height, event, &events.positions) + .await?; + } + + // Record position closes + for event in &events.position_closes { + let tx_hash = events.position_close_txs.get(&event.position_id).copied(); + self.record_position_close(dbtx, time, events.height, tx_hash, event) + .await?; + } + + // Record position withdrawals + for event in &events.position_withdrawals { + let tx_hash = events + .position_withdrawal_txs + .get(&event.position_id) + .copied(); + self.record_position_withdraw(dbtx, time, events.height, tx_hash, event) + .await?; + } + for (pair, candle) in &events.candles { for window in Window::all() { let key = (pair.start, pair.end, window); diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index ddbf108887..de284b5e98 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -82,3 +82,105 @@ CREATE TABLE IF NOT EXISTS dex_ex_metadata ( -- The asset id to use for prices in places such as the aggregate summary. quote_asset_id BYTEA NOT NULL ); + +CREATE TABLE IF NOT EXISTS dex_ex_position_state ( + -- Call this rowid to distinguish it from the position ID. + rowid SERIAL PRIMARY KEY, + -- Immutable position data, defining the trading function. + position_id BYTEA NOT NULL UNIQUE, + asset_1 BYTEA NOT NULL, + asset_2 BYTEA NOT NULL, + p NUMERIC(39) NOT NULL, + q NUMERIC(39) NOT NULL, + close_on_fill BOOLEAN NOT NULL, + fee_bps INTEGER NOT NULL, + effective_price_1_to_2 FLOAT8 NOT NULL, + effective_price_2_to_1 FLOAT8 NOT NULL, + position_raw BYTEA NOT NULL, + -- The time and height at which the position was opened, and its initial reserves. + opening_time TIMESTAMPTZ NOT NULL, + opening_height INTEGER NOT NULL, + opening_tx BYTEA, + opening_reserves_rowid INTEGER NOT NULL, + -- The time and height at which the position was closed, if it was closed. + closing_time TIMESTAMPTZ, + closing_height INTEGER, + closing_tx BYTEA +); + +CREATE INDEX ON dex_ex_position_state (position_id); +CREATE INDEX ON dex_ex_position_state (opening_tx); + +CREATE TABLE IF NOT EXISTS dex_ex_position_reserves ( + rowid SERIAL PRIMARY KEY, + position_id BYTEA NOT NULL, + height INTEGER NOT NULL, + time TIMESTAMPTZ NOT NULL, + reserves_1 NUMERIC(39) NOT NULL, + reserves_2 NUMERIC(39) NOT NULL +); + +CREATE INDEX ON dex_ex_position_reserves (position_id, height, rowid); + +CREATE TABLE IF NOT EXISTS dex_ex_position_executions ( + rowid SERIAL PRIMARY KEY, + position_id BYTEA NOT NULL, + height INTEGER NOT NULL, + time TIMESTAMPTZ NOT NULL, + reserves_rowid INTEGER NOT NULL, + -- The input amount of asset 1. + delta_1 NUMERIC(39) NOT NULL, + -- The input amount of asset 2. + delta_2 NUMERIC(39) NOT NULL, + -- The output amount of asset 1. + lambda_1 NUMERIC(39) NOT NULL, + -- The output amount of asset 2. + lambda_2 NUMERIC(39) NOT NULL, + -- The fee amount paid in asset 1. + fee_1 NUMERIC(39) NOT NULL, + -- The fee amount paid in asset 2. + fee_2 NUMERIC(39) NOT NULL, + -- The context the execution happened in + context_asset_start BYTEA NOT NULL, + context_asset_end BYTEA NOT NULL +); + +CREATE INDEX ON dex_ex_position_executions (height); +CREATE INDEX ON dex_ex_position_executions (position_id, height, rowid); + +CREATE TABLE IF NOT EXISTS dex_ex_position_withdrawals ( + rowid SERIAL PRIMARY KEY, + position_id BYTEA NOT NULL, + height INTEGER NOT NULL, + time TIMESTAMPTZ NOT NULL, + withdrawal_tx BYTEA, + sequence INTEGER NOT NULL, + reserves_rowid INTEGER NOT NULL, + -- The amount of asset 1 withdrawn. + reserves_1 NUMERIC(39) NOT NULL, + -- The amount of asset 2 withdrawn. + reserves_2 NUMERIC(39) NOT NULL +); + +CREATE INDEX ON dex_ex_position_withdrawals (height); +CREATE INDEX ON dex_ex_position_withdrawals (position_id, height); + +ALTER TABLE dex_ex_position_executions + ADD CONSTRAINT fk_position_executions + FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id); + +ALTER TABLE dex_ex_position_withdrawals + ADD CONSTRAINT fk_position_withdrawals + FOREIGN KEY (position_id) REFERENCES dex_ex_position_state(position_id); + +ALTER TABLE dex_ex_position_executions + ADD CONSTRAINT fk_position_executions_reserves + FOREIGN KEY (reserves_rowid) REFERENCES dex_ex_position_reserves(rowid); + +ALTER TABLE dex_ex_position_state + ADD CONSTRAINT fk_position_state_reserves + FOREIGN KEY (opening_reserves_rowid) REFERENCES dex_ex_position_reserves(rowid); + +ALTER TABLE dex_ex_position_withdrawals + ADD CONSTRAINT fk_position_withdrawals_reserves + FOREIGN KEY (reserves_rowid) REFERENCES dex_ex_position_reserves(rowid); diff --git a/crates/core/asset/src/asset/id.rs b/crates/core/asset/src/asset/id.rs index 1a5417c2e2..f02e63b270 100644 --- a/crates/core/asset/src/asset/id.rs +++ b/crates/core/asset/src/asset/id.rs @@ -172,6 +172,8 @@ impl Id { #[cfg(test)] mod tests { use super::*; + use hex; + use serde_json; use std::str::FromStr; #[test] @@ -205,4 +207,39 @@ mod tests { assert_eq!(id4, id); assert_eq!(id5, id); } + + #[test] + fn hex_to_bech32() { + let hex_strings = [ + "cc0d3c9eef0c7ff4e225eca85a3094603691d289aeaf428ab0d87319ad93a302", // USDY + "a7a339f42e671b2db1de226d4483d3e63036661cad1554d75f5f76fe04ec1e00", // SHITMOS + "29ea9c2f3371f6a487e7e95c247041f4a356f983eb064e5d2b3bcf322ca96a10", // UM + "76b3e4b10681358c123b381f90638476b7789040e47802de879f0fb3eedc8d0b", // USDC + "2923a0a87b3a2421f165cc853dbf73a9bdafb5da0d948564b6059cb0217c4407", // OSMO + "07ef660132a4c3235fab272d43d9b9752a8337b2d108597abffaff5f246d0f0f", // ATOM + "5314b33eecfd5ca2e99c0b6d1e0ccafe3d2dd581c952d814fb64fdf51f85c411", // TIA + "516108d0d0bba3f76e1f982d0a7cde118833307b03c0cd4ccb94e882b53c1f0f", // WBTC + "414e723f74bd987c02ccbc997585ed52b196e2ffe75b3793aa68cc2996626910", // allBTC + "bf8b035dda339b6cda8f221e79773b0fd871f27a472920f84c4aa2b4f98a700d", // allUSDT + ]; + + for hex in hex_strings { + let bytes = hex::decode(hex).expect("valid hex string"); + let bytes_array: [u8; 32] = bytes.try_into().expect("hex is 32 bytes"); + + let id = Id::try_from(bytes_array).expect("valid asset ID bytes"); + let bech32_str = id.to_string(); + + println!("Asset ID for {}:", hex); + println!(" Bech32: {}", bech32_str); + + // Print Proto JSON encoding + let proto: pb::AssetId = id.into(); + println!(" Proto JSON: {}\n", serde_json::to_string(&proto).unwrap()); + + // Convert back to verify roundtrip + let id_decoded = Id::from_str(&bech32_str).expect("valid bech32 string"); + assert_eq!(id, id_decoded); + } + } } diff --git a/crates/util/cometindex/src/indexer/indexing_state.rs b/crates/util/cometindex/src/indexer/indexing_state.rs index 23f75a2c8c..fa1c4e9048 100644 --- a/crates/util/cometindex/src/indexer/indexing_state.rs +++ b/crates/util/cometindex/src/indexer/indexing_state.rs @@ -68,7 +68,7 @@ impl TryFrom for Height { impl<'r> sqlx::Decode<'r, Postgres> for Height { fn decode( - value: >::ValueRef, + value: ::ValueRef<'r>, ) -> Result { Ok(Height::try_from( >::decode(value)?, @@ -85,8 +85,8 @@ impl sqlx::Type for Height { impl<'q> sqlx::Encode<'q, Postgres> for Height { fn encode_by_ref( &self, - buf: &mut >::ArgumentBuffer, - ) -> sqlx::encode::IsNull { + buf: &mut ::ArgumentBuffer<'q>, + ) -> Result { >::encode( i64::try_from(self.0).expect("height should never exceed i64::MAX"), buf,