From 31fca7e7af1a1073fc7e678bd59493b65a8b9b1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BAc=C3=A1s=20Meier?= Date: Thu, 14 Nov 2024 10:22:38 -0700 Subject: [PATCH] pindexer: flesh out dex-explorer indexer (#4917) ## Describe your changes Closes #4914. The internal architecture of the app view tries to make use of batch processing to the extent possible, which simplifies a lot of the logic. The price charts remain unchanged, but I collapsed the two tables into one for performance and simplicity. I also **did not** implement insertion of empty candles ; if there are gaps in the events, there will be gaps in the resulting database as well. The main addition and where I spent most of my time on this is the addition of summaries of information over arbitrary windows. The idea behind the architecture here is that any time a change to liquidity, trade count, or a candle for a directed pair happens in a block, that block then gets a "snapshot" inserted, with the current price, liquidity, volume in that block, etc. At the end of this batch, the current summary is then updated, for each window, using those timed snapshots. And then an aggregate summary, across all pairs, is created from these summaries, for each window. In order to price values under a common denom, assets are filtered based on having a current USDC price, backed by enough liquidity (the denom and liquidity amount are parameters to the component). For testing, I'd recommend trying to run the app view against mainnet and testnet, and checking some sanity items like the price not seeming crazy, and matching in the summary across all windows, etc. I think for testing we'll notice potential issues relatively quickly when dogfooding the explorer. ## Checklist before requesting a review - [x] I have added guiding text to explain how a reviewer should test these changes. - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > indexing only --- crates/bin/pindexer/src/dex_ex/mod.rs | 1087 ++++++++++++++------- crates/bin/pindexer/src/dex_ex/schema.sql | 89 +- crates/bin/pindexer/src/indexer_ext.rs | 9 +- 3 files changed, 761 insertions(+), 424 deletions(-) diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 9574c90326..5b9b736d0a 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -1,267 +1,326 @@ -use std::fmt::Display; - -use anyhow::{anyhow, Context}; -use chrono::{Datelike, Days, TimeZone, Timelike as _, Utc}; -use cometindex::{async_trait, index::EventBatch, AppView, ContextualizedEvent, PgTransaction}; +use anyhow::anyhow; +use cometindex::{ + async_trait, + index::{BlockEvents, EventBatch}, + AppView, PgTransaction, +}; use penumbra_asset::asset; -use penumbra_dex::{event::EventCandlestickData, CandlestickData}; -use penumbra_proto::{event::EventDomainType, DomainType}; +use penumbra_dex::{ + event::{ + EventCandlestickData, EventPositionExecution, EventPositionOpen, EventPositionWithdraw, + }, + lp::Reserves, + DirectedTradingPair, TradingPair, +}; +use penumbra_proto::event::EventDomainType; use penumbra_sct::event::EventBlockRoot; +use std::collections::{HashMap, HashSet}; type DateTime = sqlx::types::chrono::DateTime; -/// Candlestick data, unmoored from the prison of a particular block height. -/// -/// In other words, this can represent candlesticks which span arbitrary windows, -/// and not just a single block. -#[derive(Debug, Clone, Copy)] -struct Candle { - open: f64, - close: f64, - low: f64, - high: f64, - direct_volume: f64, - swap_volume: f64, -} +mod candle { + use super::DateTime; + use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc}; + use penumbra_dex::CandlestickData; + use std::fmt::Display; -impl Candle { - fn from_candlestick_data(data: &CandlestickData) -> Self { - Self { - open: data.open, - close: data.close, - low: data.low, - high: data.high, - direct_volume: data.direct_volume, - swap_volume: data.swap_volume, + /// Candlestick data, unmoored from the prison of a particular block height. + /// + /// In other words, this can represent candlesticks which span arbitrary windows, + /// and not just a single block. + #[derive(Debug, Clone, Copy)] + pub struct Candle { + pub open: f64, + pub close: f64, + pub low: f64, + pub high: f64, + pub direct_volume: f64, + pub swap_volume: f64, + } + + impl Candle { + pub fn from_candlestick_data(data: &CandlestickData) -> Self { + Self { + open: data.open, + close: data.close, + low: data.low, + high: data.high, + direct_volume: data.direct_volume, + swap_volume: data.swap_volume, + } + } + + pub fn merge(&mut self, that: &Self) { + self.close = that.close; + self.low = self.low.min(that.low); + self.high = self.high.max(that.high); + self.direct_volume += that.direct_volume; + self.swap_volume += that.swap_volume; } } - fn merge(&self, that: &Self) -> Self { - Self { - open: self.open, - close: that.close, - low: self.low.min(that.low), - high: self.high.max(that.high), - direct_volume: self.direct_volume + that.direct_volume, - swap_volume: self.swap_volume + that.swap_volume, + impl From for Candle { + fn from(value: CandlestickData) -> Self { + Self::from(&value) } } -} -impl From for Candle { - fn from(value: CandlestickData) -> Self { - Self::from(&value) + impl From<&CandlestickData> for Candle { + fn from(value: &CandlestickData) -> Self { + Self::from_candlestick_data(value) + } } -} -impl From<&CandlestickData> for Candle { - fn from(value: &CandlestickData) -> Self { - Self::from_candlestick_data(value) + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] + pub enum Window { + W1m, + W15m, + W1h, + W4h, + W1d, + W1w, + W1mo, } -} -#[derive(Clone, Copy, Debug)] -enum Window { - W1m, - W15m, - W1h, - W4h, - W1d, - W1w, - W1mo, -} + impl Window { + pub fn all() -> impl Iterator { + [ + Window::W1m, + Window::W15m, + Window::W1h, + Window::W4h, + Window::W1d, + Window::W1w, + Window::W1mo, + ] + .into_iter() + } -impl Window { - fn all() -> impl Iterator { - [ - Window::W1m, - Window::W15m, - Window::W1h, - Window::W4h, - Window::W1d, - Window::W1w, - Window::W1mo, - ] - .into_iter() + /// Get the anchor for a given time. + /// + /// This is the latest time that "snaps" to a given anchor, dependent on the window. + /// + /// For example, the 1 minute window has an anchor every minute, the day window + /// every day, etc. + pub fn anchor(&self, time: DateTime) -> DateTime { + let (y, mo, d, h, m) = ( + time.year(), + time.month(), + time.day(), + time.hour(), + time.minute(), + ); + let out = match self { + Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(), + Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(), + Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(), + Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(), + Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(), + Window::W1w => Utc + .with_ymd_and_hms(y, mo, d, 0, 0, 0) + .single() + .and_then(|x| { + x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into())) + }), + Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(), + }; + out.unwrap() + } + + pub fn subtract_from(&self, time: DateTime) -> DateTime { + let delta = match self { + Window::W1m => TimeDelta::minutes(1), + Window::W15m => TimeDelta::minutes(15), + Window::W1h => TimeDelta::hours(1), + Window::W4h => TimeDelta::hours(4), + Window::W1d => TimeDelta::days(1), + Window::W1w => TimeDelta::weeks(1), + Window::W1mo => TimeDelta::days(30), + }; + time - delta + } } - /// Get the anchor for a given time. - /// - /// This is the latest time that "snaps" to a given anchor, dependent on the window. - /// - /// For example, the 1 minute window has an anchor every minute, the day window - /// every day, etc. - fn anchor(&self, time: DateTime) -> DateTime { - let (y, mo, d, h, m) = ( - time.year(), - time.month(), - time.day(), - time.hour(), - time.minute(), - ); - let out = match self { - Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(), - Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(), - Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(), - Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(), - Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(), - Window::W1w => Utc - .with_ymd_and_hms(y, mo, d, 0, 0, 0) - .single() - .and_then(|x| { - x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into())) - }), - Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(), - }; - out.unwrap() + impl Display for Window { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use Window::*; + let str = match self { + W1m => "1m", + W15m => "15m", + W1h => "1h", + W4h => "4h", + W1d => "1d", + W1w => "1w", + W1mo => "1mo", + }; + write!(f, "{}", str) + } } -} -impl Display for Window { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use Window::*; - let str = match self { - W1m => "1m", - W15m => "15m", - W1h => "1h", - W4h => "4h", - W1d => "1d", - W1w => "1w", - W1mo => "1mo", - }; - write!(f, "{}", str) + #[derive(Debug)] + pub struct WindowedCandle { + start: DateTime, + window: Window, + candle: Candle, + } + + impl WindowedCandle { + pub fn new(now: DateTime, window: Window, candle: Candle) -> Self { + Self { + start: window.anchor(now), + window, + candle, + } + } + + /// Update with a new candlestick, at a given time. + /// + /// This may return the old candlestick and its start time, if we should be starting + /// a new candle, based on the candle for that window having already been closed. + pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> { + let start = self.window.anchor(now); + // This candle belongs to the next window! + if start > self.start { + let old_start = std::mem::replace(&mut self.start, start); + let old_candle = std::mem::replace(&mut self.candle, candle); + Some((old_start, old_candle)) + } else { + self.candle.merge(&candle); + None + } + } + + pub fn window(&self) -> Window { + self.window + } + + pub fn flush(self) -> (DateTime, Candle) { + (self.start, self.candle) + } } } +pub use candle::{Candle, Window, WindowedCandle}; mod price_chart { use super::*; /// A context when processing a price chart. #[derive(Debug)] - pub struct Context<'tx, 'db> { - dbtx: &'tx mut PgTransaction<'db>, + pub struct Context { asset_start: asset::Id, asset_end: asset::Id, window: Window, + state: Option, } - impl<'tx, 'db> Context<'tx, 'db> { - pub fn new( - dbtx: &'tx mut PgTransaction<'db>, + impl Context { + pub async fn load( + dbtx: &mut PgTransaction<'_>, asset_start: asset::Id, asset_end: asset::Id, window: Window, - ) -> Self { - Self { - dbtx, + ) -> anyhow::Result { + let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as( + " + SELECT open, close, high, low, direct_volume, swap_volume, start_time + FROM dex_ex_price_charts + WHERE asset_start = $1 + AND asset_end = $2 + AND the_window = $3 + ORDER BY start_time DESC + LIMIT 1 + ", + ) + .bind(asset_start.to_bytes()) + .bind(asset_end.to_bytes()) + .bind(window.to_string()) + .fetch_optional(dbtx.as_mut()) + .await?; + let state = row.map( + |(open, close, low, high, direct_volume, swap_volume, start)| { + let candle = Candle { + open, + close, + low, + high, + direct_volume, + swap_volume, + }; + WindowedCandle::new(start, window, candle) + }, + ); + Ok(Self { asset_start, asset_end, window, - } + state, + }) } - /// Get the candle we should update, based on the current timestamp. - async fn relevant_candle( - &mut self, - anchor: DateTime, - ) -> anyhow::Result> { - let stuff: Option<(i32, f64, f64, f64, f64, f64, f64)> = sqlx::query_as( - r#" - SELECT - dex_ex_candlesticks.id, - open, - close, - high, - low, - direct_volume, - swap_volume - FROM dex_ex_price_charts - JOIN dex_ex_candlesticks ON dex_ex_candlesticks.id = candlestick_id - WHERE asset_start = $1 - AND asset_end = $2 - AND the_window = $3 - AND start_time >= $4 - "#, - ) - .bind(self.asset_start.to_bytes().as_slice()) - .bind(self.asset_end.to_bytes().as_slice()) - .bind(self.window.to_string()) - .bind(anchor) - .fetch_optional(self.dbtx.as_mut()) - .await?; - Ok( - stuff.map(|(id, open, close, high, low, direct_volume, swap_volume)| { - ( - id, - Candle { - open, - close, - high, - low, - direct_volume, - swap_volume, - }, - ) - }), + async fn write_candle( + &self, + dbtx: &mut PgTransaction<'_>, + start: DateTime, + candle: Candle, + ) -> anyhow::Result<()> { + sqlx::query( + " + INSERT INTO dex_ex_price_charts( + id, asset_start, asset_end, the_window, + start_time, open, close, high, low, direct_volume, swap_volume + ) + VALUES( + DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 ) - } - - async fn create_candle(&mut self, anchor: DateTime, candle: Candle) -> anyhow::Result<()> { - let id: i32 = sqlx::query_scalar( - r#" - INSERT INTO dex_ex_candlesticks VALUES (DEFAULT, $1, $2, $3, $4, $5, $6) RETURNING id - "#, + ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET + open = EXCLUDED.open, + close = EXCLUDED.close, + high = EXCLUDED.high, + low = EXCLUDED.low, + direct_volume = EXCLUDED.direct_volume, + swap_volume = EXCLUDED.swap_volume + ", ) + .bind(self.asset_start.to_bytes()) + .bind(self.asset_end.to_bytes()) + .bind(self.window.to_string()) + .bind(start) .bind(candle.open) .bind(candle.close) .bind(candle.high) .bind(candle.low) .bind(candle.direct_volume) .bind(candle.swap_volume) - .fetch_one(self.dbtx.as_mut()) - .await?; - sqlx::query( - r#" - INSERT INTO dex_ex_price_charts VALUES (DEFAULT, $1, $2, $3, $4, $5) - "#, - ) - .bind(self.asset_start.to_bytes().as_slice()) - .bind(self.asset_end.to_bytes().as_slice()) - .bind(self.window.to_string()) - .bind(anchor) - .bind(id) - .execute(self.dbtx.as_mut()) + .execute(dbtx.as_mut()) .await?; Ok(()) } - async fn update_candle(&mut self, id: i32, candle: Candle) -> anyhow::Result<()> { - sqlx::query( - r#" - UPDATE dex_ex_candlesticks - SET (open, close, high, low, direct_volume, swap_volume) = - ($1, $2, $3, $4, $5, $6) - WHERE id = $7 - "#, - ) - .bind(candle.open) - .bind(candle.close) - .bind(candle.high) - .bind(candle.low) - .bind(candle.direct_volume) - .bind(candle.swap_volume) - .bind(id) - .execute(self.dbtx.as_mut()) - .await?; + pub async fn update( + &mut self, + dbtx: &mut PgTransaction<'_>, + now: DateTime, + candle: Candle, + ) -> anyhow::Result<()> { + let state = match self.state.as_mut() { + Some(x) => x, + None => { + self.state = Some(WindowedCandle::new(now, self.window, candle)); + self.state.as_mut().unwrap() + } + }; + if let Some((start, old_candle)) = state.with_candle(now, candle) { + self.write_candle(dbtx, start, old_candle).await?; + }; Ok(()) } - pub async fn update(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> { - let anchor = self.window.anchor(time); - match self.relevant_candle(anchor).await? { - None => self.create_candle(anchor, candle).await?, - Some((id, old_candle)) => self.update_candle(id, old_candle.merge(&candle)).await?, - }; + pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> { + let state = std::mem::replace(&mut self.state, None); + if let Some(state) = state { + let (start, candle) = state.flush(); + self.write_candle(dbtx, start, candle).await?; + } Ok(()) } } @@ -270,215 +329,424 @@ mod price_chart { use price_chart::Context as PriceChartContext; mod summary { - use super::*; + use cometindex::PgTransaction; + use penumbra_asset::asset; - #[derive(Debug)] - pub struct Context<'tx, 'db> { - dbtx: &'tx mut PgTransaction<'db>, - asset_start: asset::Id, - asset_end: asset::Id, + use super::{Candle, DateTime, PairMetrics, Window}; + + pub struct Context { + start: asset::Id, + end: asset::Id, + price: f64, + liquidity: f64, } - impl<'tx, 'db> Context<'tx, 'db> { - pub fn new( - dbtx: &'tx mut PgTransaction<'db>, - asset_start: asset::Id, - asset_end: asset::Id, - ) -> Self { - Self { - dbtx, - asset_start, - asset_end, - } + impl Context { + pub async fn load( + dbtx: &mut PgTransaction<'_>, + start: asset::Id, + end: asset::Id, + ) -> anyhow::Result { + let row: Option<(f64, f64)> = sqlx::query_as( + " + SELECT price, liquidity + FROM dex_ex_pairs_block_snapshot + WHERE asset_start = $1 + AND asset_end = $2 + ORDER BY id DESC + LIMIT 1 + ", + ) + .bind(start.to_bytes()) + .bind(end.to_bytes()) + .fetch_optional(dbtx.as_mut()) + .await?; + let (price, liquidity) = row.unwrap_or_default(); + Ok(Self { + start, + end, + price, + liquidity, + }) } - pub async fn add_candle(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> { - let asset_start = self.asset_start.to_bytes(); - let asset_end = self.asset_end.to_bytes(); + pub async fn update( + &mut self, + dbtx: &mut PgTransaction<'_>, + now: DateTime, + candle: Option, + metrics: PairMetrics, + ) -> anyhow::Result<()> { + if let Some(candle) = candle { + self.price = candle.close; + } + self.liquidity += metrics.liquidity_change; + sqlx::query( - r#" - INSERT INTO _dex_ex_summary_backing VALUES ($1, $2, $3, $4, $5, $6) - "#, + " + INSERT INTO dex_ex_pairs_block_snapshot VALUES ( + DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8 + ) + ", ) - .bind(asset_start.as_slice()) - .bind(asset_end.as_slice()) - .bind(time) - .bind(candle.close) - .bind(candle.direct_volume) - .bind(candle.swap_volume) - .execute(self.dbtx.as_mut()) + .bind(now) + .bind(self.start.to_bytes()) + .bind(self.end.to_bytes()) + .bind(self.price) + .bind(self.liquidity) + .bind(candle.map(|x| x.direct_volume).unwrap_or_default()) + .bind(candle.map(|x| x.swap_volume).unwrap_or_default()) + .bind(metrics.trades) + .execute(dbtx.as_mut()) .await?; Ok(()) } } - pub async fn update_all(dbtx: &mut PgTransaction<'_>, time: DateTime) -> anyhow::Result<()> { - let time_24h_ago = time - .checked_sub_days(Days::new(1)) - .ok_or(anyhow!("should be able to get time 24h ago from {}", time))?; + pub async fn update_summary( + dbtx: &mut PgTransaction<'_>, + now: DateTime, + start: asset::Id, + end: asset::Id, + window: Window, + ) -> anyhow::Result<()> { + let then = window.subtract_from(now); sqlx::query( - r#" - DELETE FROM _dex_ex_summary_backing WHERE time < $1 - "#, + " + WITH + snapshots AS ( + SELECT * + FROM dex_ex_pairs_block_snapshot + WHERE asset_start = $1 + AND asset_end = $2 + ), + previous AS ( + SELECT price AS price_then, liquidity AS liquidity_then + FROM snapshots + WHERE time <= $4 + ORDER BY time DESC + LIMIT 1 + ), + previous_or_default AS ( + SELECT + COALESCE((SELECT price_then FROM previous), 0.0) AS price_then, + COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then + ), + now AS ( + SELECT price, liquidity + FROM snapshots + WHERE time >= $3 + ORDER BY time ASC + LIMIT 1 + ), + sums AS ( + SELECT + COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window, + COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window, + COALESCE(SUM(trades), 0.0) AS trades_over_window + FROM snapshots + WHERE time <= $3 + AND time >= $4 ) - .bind(time_24h_ago) - .execute(dbtx.as_mut()) - .await?; - // Update all of the summaries with relevant backing data. - // - // We choose this one as being responsible for creating the first summary. - sqlx::query( - r#" - INSERT INTO dex_ex_summary - SELECT DISTINCT ON (asset_start, asset_end) - asset_start, - asset_end, - FIRST_VALUE(price) OVER w AS price_24h_ago, - price AS current_price, - MAX(price) OVER w AS high_24h, - MIN(price) OVER w AS low_24h, - SUM(direct_volume) OVER w AS direct_volume_24h, - SUM(swap_volume) OVER w AS swap_volume_24h - FROM _dex_ex_summary_backing - WINDOW w AS ( - PARTITION BY - asset_start, asset_end - ORDER BY asset_start, asset_end, time DESC - ) ORDER by asset_start, asset_end, time ASC - ON CONFLICT (asset_start, asset_end) DO UPDATE SET - price_24h_ago = EXCLUDED.price_24h_ago, - current_price = EXCLUDED.current_price, - high_24h = EXCLUDED.high_24h, - low_24h = EXCLUDED.low_24h, - direct_volume_24h = EXCLUDED.direct_volume_24h, - swap_volume_24h = EXCLUDED.swap_volume_24h - "#, + INSERT INTO dex_ex_pairs_summary + SELECT + $1, $2, $5, + price, price_then, + liquidity, liquidity_then, + direct_volume_over_window, + swap_volume_over_window, + trades_over_window + FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE + ON CONFLICT (asset_start, asset_end, the_window) + DO UPDATE SET + price = EXCLUDED.price, + price_then = EXCLUDED.price_then, + liquidity = EXCLUDED.liquidity, + liquidity_then = EXCLUDED.liquidity_then, + direct_volume_over_window = EXCLUDED.direct_volume_over_window, + swap_volume_over_window = EXCLUDED.swap_volume_over_window, + trades_over_window = EXCLUDED.trades_over_window + ", ) + .bind(start.to_bytes()) + .bind(end.to_bytes()) + .bind(now) + .bind(then) + .bind(window.to_string()) .execute(dbtx.as_mut()) .await?; - // When we don't have backing data, we should nonetheless update to reflect this + Ok(()) + } + + pub async fn update_aggregate_summary( + dbtx: &mut PgTransaction<'_>, + window: Window, + denom: asset::Id, + min_liquidity: f64, + ) -> anyhow::Result<()> { + // TODO: do something here sqlx::query( - r#" - UPDATE dex_ex_summary - SET - price_24h_ago = current_price, - high_24h = current_price, - low_24h = current_price, - direct_volume_24h = 0, - swap_volume_24h = 0 - WHERE NOT EXISTS ( - SELECT 1 - FROM _dex_ex_summary_backing - WHERE - _dex_ex_summary_backing.asset_start = dex_ex_summary.asset_start - AND - _dex_ex_summary_backing.asset_end = dex_ex_summary.asset_end + " + WITH + eligible_denoms AS ( + SELECT asset_start as asset, price + FROM dex_ex_pairs_summary + WHERE asset_end = $1 AND liquidity >= $2 + UNION VALUES ($1, 1.0) + ), + converted_pairs_summary AS ( + SELECT + asset_start, asset_end, + (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change, + liquidity * eligible_denoms.price AS liquidity, + direct_volume_over_window * eligible_denoms.price as dv, + swap_volume_over_window * eligible_denoms.price as sv, + trades_over_window as trades + FROM dex_ex_pairs_summary + JOIN eligible_denoms + ON eligible_denoms.asset = asset_end + WHERE the_window = $3 + ), + sums AS ( + SELECT + SUM(dv) AS direct_volume, + SUM(sv) AS swap_volume, + SUM(liquidity) AS liquidity, + SUM(trades) AS trades, + (SELECT COUNT(*) FROM converted_pairs_summary WHERE dv > 0 OR sv > 0) AS active_pairs + FROM converted_pairs_summary + ), + largest_sv AS ( + SELECT + asset_start AS largest_sv_trading_pair_start, + asset_end AS largest_sv_trading_pair_end, + sv AS largest_sv_trading_pair_volume + FROM converted_pairs_summary + ORDER BY sv DESC + LIMIT 1 + ), + largest_dv AS ( + SELECT + asset_start AS largest_dv_trading_pair_start, + asset_end AS largest_dv_trading_pair_end, + dv AS largest_dv_trading_pair_volume + FROM converted_pairs_summary + ORDER BY dv DESC + LIMIT 1 + ), + top_price_mover AS ( + SELECT + asset_start AS top_price_mover_start, + asset_end AS top_price_mover_end, + price_change AS top_price_mover_change_percent + FROM converted_pairs_summary + ORDER BY price_change DESC + LIMIT 1 ) - "#, - ) + INSERT INTO dex_ex_aggregate_summary + SELECT + $3, + direct_volume, swap_volume, liquidity, trades, active_pairs, + largest_sv_trading_pair_start, + largest_sv_trading_pair_end, + largest_sv_trading_pair_volume, + largest_dv_trading_pair_start, + largest_dv_trading_pair_end, + largest_dv_trading_pair_volume, + top_price_mover_start, + top_price_mover_end, + top_price_mover_change_percent + FROM + sums + JOIN largest_sv ON TRUE + JOIN largest_dv ON TRUE + JOIN top_price_mover ON TRUE + ON CONFLICT (the_window) DO UPDATE SET + direct_volume = EXCLUDED.direct_volume, + swap_volume = EXCLUDED.swap_volume, + liquidity = EXCLUDED.liquidity, + trades = EXCLUDED.trades, + active_pairs = EXCLUDED.active_pairs, + largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start, + largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end, + largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume, + largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start, + largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end, + largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume, + top_price_mover_start = EXCLUDED.top_price_mover_start, + top_price_mover_end = EXCLUDED.top_price_mover_end, + top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent + ") + .bind(denom.to_bytes()) + .bind(min_liquidity) + .bind(window.to_string()) .execute(dbtx.as_mut()) .await?; Ok(()) } } -use summary::Context as SummaryContext; - -async fn queue_event_candlestick_data( - dbtx: &mut PgTransaction<'_>, - height: u64, - event: EventCandlestickData, -) -> anyhow::Result<()> { - sqlx::query("INSERT INTO _dex_ex_queue VALUES (DEFAULT, $1, $2)") - .bind(i64::try_from(height)?) - .bind(event.encode_to_vec().as_slice()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) +#[derive(Debug, Default, Clone, Copy)] +struct PairMetrics { + trades: f64, + liquidity_change: f64, } -async fn unqueue_event_candlestick_data( - dbtx: &mut PgTransaction<'_>, - height: u64, -) -> anyhow::Result> { - let values: Vec> = - sqlx::query_scalar("DELETE FROM _dex_ex_queue WHERE height = $1 RETURNING data") - .bind(i64::try_from(height)?) - .fetch_all(dbtx.as_mut()) - .await?; - values - .into_iter() - .map(|x| EventCandlestickData::decode(x.as_slice())) - .collect() +#[derive(Debug)] +struct Events { + time: Option, + candles: HashMap, + metrics: HashMap, } -async fn on_event_candlestick_data( - dbtx: &mut PgTransaction<'_>, - event_time: DateTime, - event: EventCandlestickData, -) -> anyhow::Result<()> { - let asset_start = event.pair.start; - let asset_end = event.pair.end; - let candle = event.stick.into(); - for window in Window::all() { - let mut ctx = PriceChartContext::new(dbtx, asset_start, asset_end, window); - ctx.update(event_time, candle).await?; +impl Events { + fn new() -> Self { + Self { + time: None, + candles: HashMap::new(), + metrics: HashMap::new(), + } } - let mut ctx = SummaryContext::new(dbtx, asset_start, asset_end); - ctx.add_candle(event_time, candle).await?; - Ok(()) -} -async fn fetch_height_time( - dbtx: &mut PgTransaction<'_>, - height: u64, -) -> anyhow::Result> { - const CTX: &'static str = r#" -The `dex_ex` component relies on the `block` component to be running, to provide the `block_details` with timestamps. -Make sure that is running as well. -"#; - sqlx::query_scalar("SELECT timestamp FROM block_details WHERE height = $1") - .bind(i64::try_from(height)?) - .fetch_optional(dbtx.as_mut()) - .await - .context(CTX) -} + fn with_time(&mut self, time: DateTime) { + self.time = Some(time) + } -#[derive(Debug)] -pub struct Component {} + fn with_candle(&mut self, pair: DirectedTradingPair, candle: Candle) { + match self.candles.get_mut(&pair) { + None => { + self.candles.insert(pair, candle); + } + Some(current) => { + current.merge(&candle); + } + } + } -impl Component { - pub fn new() -> Self { - Self {} + fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics { + if !self.metrics.contains_key(pair) { + self.metrics.insert(*pair, PairMetrics::default()); + } + // NOPANIC: inserted above. + self.metrics.get_mut(pair).unwrap() } - async fn index_event( - &self, - dbtx: &mut PgTransaction<'_>, - event: &ContextualizedEvent, - ) -> Result<(), anyhow::Error> { - if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { - let height = event.block_height; - match fetch_height_time(dbtx, height).await? { - None => { - queue_event_candlestick_data(dbtx, height, e).await?; - } - Some(time) => { - on_event_candlestick_data(dbtx, time, e).await?; + fn with_trade(&mut self, pair: &DirectedTradingPair) { + self.metric(pair).trades += 1.0; + } + + fn with_reserve_change( + &mut self, + pair: &TradingPair, + old_reserves: Option, + new_reserves: Reserves, + removed: bool, + ) { + let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) { + (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)), + (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)), + (_, Some(old), new) => ( + (new.r1.value() as f64) - (old.r1.value() as f64), + (new.r2.value() as f64) - (old.r2.value() as f64), + ), + }; + for (d_pair, diff) in [ + ( + DirectedTradingPair { + start: pair.asset_1(), + end: pair.asset_2(), + }, + diff_2, + ), + ( + DirectedTradingPair { + start: pair.asset_2(), + end: pair.asset_1(), + }, + diff_1, + ), + ] { + self.metric(&d_pair).liquidity_change += diff; + } + } + + pub fn extract(block: &BlockEvents) -> anyhow::Result { + let mut out = Self::new(); + for event in &block.events { + if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { + let candle = Candle::from_candlestick_data(&e.stick); + out.with_candle(e.pair, candle); + } else if let Ok(e) = EventBlockRoot::try_from_event(&event.event) { + let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!( + "creating timestamp should succeed; timestamp: {}", + e.timestamp_seconds + ))?; + out.with_time(time); + } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) { + out.with_reserve_change( + &e.trading_pair, + None, + Reserves { + r1: e.reserves_1, + r2: e.reserves_2, + }, + false, + ); + } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) { + // TODO: use close positions to track liquidity more precisely, in practic I (ck) expect few + // positions to close with being withdrawn. + out.with_reserve_change( + &e.trading_pair, + None, + Reserves { + r1: e.reserves_1, + r2: e.reserves_2, + }, + true, + ); + } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) { + out.with_reserve_change( + &e.trading_pair, + Some(Reserves { + r1: e.prev_reserves_1, + r2: e.prev_reserves_2, + }), + Reserves { + r1: e.reserves_1, + r2: e.reserves_2, + }, + false, + ); + if e.reserves_1 > e.prev_reserves_1 { + // Whatever asset we ended up with more with was traded in. + out.with_trade(&DirectedTradingPair { + start: e.trading_pair.asset_1(), + end: e.trading_pair.asset_2(), + }); + } else if e.reserves_2 > e.prev_reserves_2 { + out.with_trade(&DirectedTradingPair { + start: e.trading_pair.asset_2(), + end: e.trading_pair.asset_1(), + }); } } - } else if let Ok(e) = EventBlockRoot::try_from_event(&event.event) { - let height = e.height; - let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!( - "creating timestamp should succeed; timestamp: {}", - e.timestamp_seconds - ))?; - for event in unqueue_event_candlestick_data(dbtx, height).await? { - on_event_candlestick_data(dbtx, time, event).await?; - } - summary::update_all(dbtx, time).await?; } - Ok(()) + Ok(out) + } +} + +#[derive(Debug)] +pub struct Component { + denom: asset::Id, + min_liquidity: f64, +} + +impl Component { + pub fn new(denom: asset::Id, min_liquidity: f64) -> Self { + Self { + denom, + min_liquidity, + } } } @@ -504,8 +772,67 @@ impl AppView for Component { dbtx: &mut PgTransaction, batch: EventBatch, ) -> Result<(), anyhow::Error> { - for event in batch.events() { - self.index_event(dbtx, event).await?; + let mut charts = HashMap::new(); + let mut snapshots = HashMap::new(); + let mut last_time = None; + for block in batch.by_height.iter() { + let events = Events::extract(&block)?; + let time = events + .time + .expect(&format!("no block root event at height {}", block.height)); + last_time = Some(time); + + for (pair, candle) in &events.candles { + for window in Window::all() { + let key = (pair.start, pair.end, window); + if !charts.contains_key(&key) { + let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?; + charts.insert(key, ctx); + } + charts + .get_mut(&key) + .unwrap() // safe because we just inserted above + .update(dbtx, time, *candle) + .await?; + } + } + + let block_pairs = events + .candles + .keys() + .chain(events.metrics.keys()) + .copied() + .collect::>(); + for pair in block_pairs { + if !snapshots.contains_key(&pair) { + let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?; + snapshots.insert(pair, ctx); + } + // NOPANIC: inserted above + snapshots + .get_mut(&pair) + .unwrap() + .update( + dbtx, + time, + events.candles.get(&pair).copied(), + events.metrics.get(&pair).copied().unwrap_or_default(), + ) + .await?; + } + } + + if let Some(now) = last_time { + for window in Window::all() { + for pair in snapshots.keys() { + summary::update_summary(dbtx, now, pair.start, pair.end, window).await?; + } + summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity) + .await?; + } + } + for chart in charts.into_values() { + chart.unload(dbtx).await?; } Ok(()) } diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index 53f85ceb8a..1e89821f3e 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -1,19 +1,3 @@ -CREATE TABLE IF NOT EXISTS dex_ex_candlesticks ( - id SERIAL PRIMARY KEY, - -- The price at the start of a window. - open FLOAT8 NOT NULL, - -- The price at the close of a window. - close FLOAT8 NOT NULL, - -- The highest price reached during a window. - high FLOAT8 NOT NULL, - -- The lowest price reached during a window. - low FLOAT8 NOT NULL, - -- The volume traded directly through position executions. - direct_volume FLOAT8 NOT NULL, - -- The volume that traded indirectly, possibly through several positions. - swap_volume FLOAT8 NOT NULL -); - -- Contains, for each directed asset pair and window type, candle sticks for each window. CREATE TABLE IF NOT EXISTS dex_ex_price_charts ( -- We just want a simple primary key to have here. @@ -28,46 +12,65 @@ CREATE TABLE IF NOT EXISTS dex_ex_price_charts ( the_window TEXT NOT NULL, -- The start time of this window. start_time TIMESTAMPTZ NOT NULL, - -- The start time for the window this stick is about. - candlestick_id INTEGER UNIQUE REFERENCES dex_ex_candlesticks (id) + -- The price at the start of a window. + open FLOAT8 NOT NULL, + -- The price at the close of a window. + close FLOAT8 NOT NULL, + -- The highest price reached during a window. + high FLOAT8 NOT NULL, + -- The lowest price reached during a window. + low FLOAT8 NOT NULL, + -- The volume traded directly through position executions. + direct_volume FLOAT8 NOT NULL, + -- The volume that traded indirectly, possibly through several positions. + swap_volume FLOAT8 NOT NULL ); CREATE UNIQUE INDEX ON dex_ex_price_charts (asset_start, asset_end, the_window, start_time); -CREATE TABLE IF NOT EXISTS _dex_ex_summary_backing ( +CREATE TABLE IF NOT EXISTS dex_ex_pairs_block_snapshot ( + id SERIAL PRIMARY KEY, + time TIMESTAMPTZ NOT NULL, asset_start BYTEA NOT NULL, asset_end BYTEA NOT NULL, - -- The time for this bit of information. - time TIMESTAMPTZ NOT NULL, - -- The price at this point. price FLOAT8 NOT NULL, - -- The volume for this particular candle. + liquidity FLOAT8 NOT NULL, direct_volume FLOAT8 NOT NULL, swap_volume FLOAT8 NOT NULL, - PRIMARY KEY (asset_start, asset_end, time) + trades FLOAT8 NOT NULL ); -CREATE TABLE IF NOT EXISTS dex_ex_summary ( - -- The first asset of the directed pair. +CREATE UNIQUE INDEX ON dex_ex_pairs_block_snapshot (time, asset_start, asset_end); +CREATE INDEX ON dex_ex_pairs_block_snapshot (asset_start, asset_end); + +CREATE TABLE IF NOT EXISTS dex_ex_pairs_summary ( asset_start BYTEA NOT NULL, - -- The second asset of the directed pair. asset_end BYTEA NOT NULL, - -- The current price (in terms of asset2) - current_price FLOAT8 NOT NULL, - -- The price 24h ago. - price_24h_ago FLOAT8 NOT NULL, - -- The highest price over the past 24h. - high_24h FLOAT8 NOT NULL, - -- The lowest price over the past 24h. - low_24h FLOAT8 NOT NULL, - -- c.f. candlesticks for the difference between these two - direct_volume_24h FLOAT8 NOT NULL, - swap_volume_24h FLOAT8 NOT NULL, - PRIMARY KEY (asset_start, asset_end) + the_window TEXT NOT NULL, + price FLOAT8 NOT NULL, + price_then FLOAT8 NOT NULL, + liquidity FLOAT8 NOT NULL, + liquidity_then FLOAT8 NOT NULL, + direct_volume_over_window FLOAT8 NOT NULL, + swap_volume_over_window FLOAT8 NOT NULL, + trades_over_window FLOAT8 NOT NULL, + PRIMARY KEY (asset_start, asset_end, the_window) ); -CREATE TABLE IF NOT EXISTS _dex_ex_queue ( - id SERIAL PRIMARY KEY, - height BIGINT NOT NULL, - data BYTEA NOT NULL +CREATE TABLE IF NOT EXISTS dex_ex_aggregate_summary ( + the_window TEXT PRIMARY KEY, + direct_volume FLOAT8 NOT NULL, + swap_volume FLOAT8 NOT NULL, + liquidity FLOAT8 NOT NULL, + trades FLOAT8 NOT NULL, + active_pairs FLOAT8 NOT NULL, + largest_sv_trading_pair_start BYTEA NOT NULL, + largest_sv_trading_pair_end BYTEA NOT NULL, + largest_sv_trading_pair_volume FLOAT8 NOT NULL, + largest_dv_trading_pair_start BYTEA NOT NULL, + largest_dv_trading_pair_end BYTEA NOT NULL, + largest_dv_trading_pair_volume FLOAT8 NOT NULL, + top_price_mover_start BYTEA NOT NULL, + top_price_mover_end BYTEA NOT NULL, + top_price_mover_change_percent FLOAT8 NOT NULL ); diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index c8dd76a31e..9ff68994f3 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -12,7 +12,14 @@ impl IndexerExt for cometindex::Indexer { .with_index(Box::new(crate::stake::DelegationTxs {})) .with_index(Box::new(crate::stake::UndelegationTxs {})) .with_index(Box::new(crate::governance::GovernanceProposals {})) - .with_index(Box::new(crate::dex_ex::Component::new())) + .with_index(Box::new(crate::dex_ex::Component::new( + penumbra_asset::asset::Id::from_str( + // USDC + "passet1w6e7fvgxsy6ccy3m8q0eqcuyw6mh3yzqu3uq9h58nu8m8mku359spvulf6", + ) + .expect("should be able to parse passet"), + 100.0 * 1000_0000.0, + ))) .with_index(Box::new(crate::supply::Component::new())) .with_index(Box::new(crate::ibc::Component::new())) .with_index(Box::new(crate::insights::Component::new(