From a9042efe7b89f27b5c6bedc86dd2e69762e132a2 Mon Sep 17 00:00:00 2001 From: Lucas Meier Date: Thu, 10 Oct 2024 01:54:10 -0700 Subject: [PATCH] pindexer: dex-ex: implement robust summary calculation This works correctly even in the presence of no activity for a while on a given pair. --- crates/bin/pindexer/src/dex_ex/mod.rs | 105 ++++++++++++++++++---- crates/bin/pindexer/src/dex_ex/schema.sql | 35 ++++---- 2 files changed, 105 insertions(+), 35 deletions(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index eb20e9fed3..ec3882d0c9 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -294,26 +294,94 @@ mod summary { } } - pub async fn update(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> { - let time_24h_ago = time - .checked_sub_days(Days::new(1)) - .ok_or(anyhow!("should be able to get time 24h ago from {}", time))?; - sqlx::query("DELETE FROM _dex_ex_summary_backing WHERE time < $1") - .bind(time_24h_ago) - .execute(self.dbtx.as_mut()) - .await?; - sqlx::query("INSERT INTO _dex_ex_summary_backing VALUES ($1, $2, $3, $4, $5, $6)") - .bind(self.asset_start.to_bytes().as_slice()) - .bind(self.asset_end.to_bytes().as_slice()) - .bind(time) - .bind(candle.close) - .bind(candle.direct_volume) - .bind(candle.swap_volume) - .execute(self.dbtx.as_mut()) - .await?; + pub async fn add_candle(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> { + let asset_start = self.asset_start.to_bytes(); + let asset_end = self.asset_end.to_bytes(); + sqlx::query( + r#" + INSERT INTO _dex_ex_summary_backing VALUES ($1, $2, $3, $4, $5, $6) + "#, + ) + .bind(asset_start.as_slice()) + .bind(asset_end.as_slice()) + .bind(time) + .bind(candle.close) + .bind(candle.direct_volume) + .bind(candle.swap_volume) + .execute(self.dbtx.as_mut()) + .await?; Ok(()) } } + + pub async fn update_all(dbtx: &mut PgTransaction<'_>, time: DateTime) -> anyhow::Result<()> { + let time_24h_ago = time + .checked_sub_days(Days::new(1)) + .ok_or(anyhow!("should be able to get time 24h ago from {}", time))?; + sqlx::query( + r#" + DELETE FROM _dex_ex_summary_backing WHERE time < $1 + "#, + ) + .bind(time_24h_ago) + .execute(dbtx.as_mut()) + .await?; + // Update all of the summaries with relevant backing data. + // + // We choose this one as being responsible for creating the first summary. + sqlx::query( + r#" + INSERT INTO dex_ex_summary + SELECT DISTINCT ON (asset_start, asset_end) + asset_start, + asset_end, + FIRST_VALUE(price) OVER w AS price_24h_ago, + price AS current_price, + MAX(price) OVER w AS high_24h, + MIN(price) OVER w AS low_24h, + SUM(direct_volume) OVER w AS direct_volume_24h, + SUM(swap_volume) OVER w AS swap_volume_24h + FROM _dex_ex_summary_backing + WINDOW w AS ( + PARTITION BY + asset_start, asset_end + ORDER BY asset_start, asset_end, time DESC + ) ORDER by asset_start, asset_end, time ASC + ON CONFLICT (asset_start, asset_end) DO UPDATE SET + price_24h_ago = EXCLUDED.price_24h_ago, + current_price = EXCLUDED.current_price, + high_24h = EXCLUDED.high_24h, + low_24h = EXCLUDED.low_24h, + direct_volume_24h = EXCLUDED.direct_volume_24h, + swap_volume_24h = EXCLUDED.swap_volume_24h + "#, + ) + .execute(dbtx.as_mut()) + .await?; + // When we don't have backing data, we should nonetheless update to reflect this + sqlx::query( + r#" + UPDATE dex_ex_summary + SET + price_24h_ago = current_price, + high_24h = current_price, + low_24h = current_price, + direct_volume_24h = 0, + swap_volume_24h = 0 + WHERE NOT EXISTS ( + SELECT 1 + FROM _dex_ex_summary_backing + WHERE + _dex_ex_summary_backing.asset_start = dex_ex_summary.asset_start + AND + _dex_ex_summary_backing.asset_end = dex_ex_summary.asset_end + ) + "#, + ) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } } use summary::Context as SummaryContext; @@ -359,7 +427,7 @@ async fn on_event_candlestick_data( ctx.update(event_time, candle).await?; } let mut ctx = SummaryContext::new(dbtx, asset_start, asset_end); - ctx.update(event_time, candle).await?; + ctx.add_candle(event_time, candle).await?; Ok(()) } @@ -434,6 +502,7 @@ impl AppView for Component { for event in unqueue_event_candlestick_data(dbtx, height).await? { on_event_candlestick_data(dbtx, time, event).await?; } + summary::update_all(dbtx, time).await?; } tracing::debug!(?event, "unrecognized event"); Ok(()) diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index 3335416921..53f85ceb8a 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -47,23 +47,24 @@ CREATE TABLE IF NOT EXISTS _dex_ex_summary_backing ( PRIMARY KEY (asset_start, asset_end, time) ); -CREATE OR REPLACE VIEW dex_ex_summary AS -SELECT DISTINCT ON (asset_start, asset_end) - asset_start, - asset_end, - FIRST_VALUE(price) OVER w AS current_price, - price AS price_24h_ago, - MAX(price) OVER w AS high_24h, - MIN(price) OVER w AS low_24h, - SUM(direct_volume) OVER w AS direct_volume_24h, - SUM(swap_volume) OVER w AS swap_volume_24h -FROM _dex_ex_summary_backing -WINDOW w AS ( - PARTITION BY - asset_start, asset_end - ORDER BY asset_start, asset_end, time DESC -) -ORDER BY asset_start, asset_end, time ASC; +CREATE TABLE IF NOT EXISTS dex_ex_summary ( + -- The first asset of the directed pair. + asset_start BYTEA NOT NULL, + -- The second asset of the directed pair. + asset_end BYTEA NOT NULL, + -- The current price (in terms of asset2) + current_price FLOAT8 NOT NULL, + -- The price 24h ago. + price_24h_ago FLOAT8 NOT NULL, + -- The highest price over the past 24h. + high_24h FLOAT8 NOT NULL, + -- The lowest price over the past 24h. + low_24h FLOAT8 NOT NULL, + -- c.f. candlesticks for the difference between these two + direct_volume_24h FLOAT8 NOT NULL, + swap_volume_24h FLOAT8 NOT NULL, + PRIMARY KEY (asset_start, asset_end) +); CREATE TABLE IF NOT EXISTS _dex_ex_queue ( id SERIAL PRIMARY KEY,