diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs index 9574c90326..5b9b736d0a 100644 --- a/crates/bin/pindexer/src/dex_ex/mod.rs +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -1,267 +1,326 @@ -use std::fmt::Display; - -use anyhow::{anyhow, Context}; -use chrono::{Datelike, Days, TimeZone, Timelike as _, Utc}; -use cometindex::{async_trait, index::EventBatch, AppView, ContextualizedEvent, PgTransaction}; +use anyhow::anyhow; +use cometindex::{ + async_trait, + index::{BlockEvents, EventBatch}, + AppView, PgTransaction, +}; use penumbra_asset::asset; -use penumbra_dex::{event::EventCandlestickData, CandlestickData}; -use penumbra_proto::{event::EventDomainType, DomainType}; +use penumbra_dex::{ + event::{ + EventCandlestickData, EventPositionExecution, EventPositionOpen, EventPositionWithdraw, + }, + lp::Reserves, + DirectedTradingPair, TradingPair, +}; +use penumbra_proto::event::EventDomainType; use penumbra_sct::event::EventBlockRoot; +use std::collections::{HashMap, HashSet}; type DateTime = sqlx::types::chrono::DateTime; -/// Candlestick data, unmoored from the prison of a particular block height. -/// -/// In other words, this can represent candlesticks which span arbitrary windows, -/// and not just a single block. -#[derive(Debug, Clone, Copy)] -struct Candle { - open: f64, - close: f64, - low: f64, - high: f64, - direct_volume: f64, - swap_volume: f64, -} +mod candle { + use super::DateTime; + use chrono::{Datelike as _, Days, TimeDelta, TimeZone as _, Timelike as _, Utc}; + use penumbra_dex::CandlestickData; + use std::fmt::Display; -impl Candle { - fn from_candlestick_data(data: &CandlestickData) -> Self { - Self { - open: data.open, - close: data.close, - low: data.low, - high: data.high, - direct_volume: data.direct_volume, - swap_volume: data.swap_volume, + /// Candlestick data, unmoored from the prison of a particular block height. + /// + /// In other words, this can represent candlesticks which span arbitrary windows, + /// and not just a single block. + #[derive(Debug, Clone, Copy)] + pub struct Candle { + pub open: f64, + pub close: f64, + pub low: f64, + pub high: f64, + pub direct_volume: f64, + pub swap_volume: f64, + } + + impl Candle { + pub fn from_candlestick_data(data: &CandlestickData) -> Self { + Self { + open: data.open, + close: data.close, + low: data.low, + high: data.high, + direct_volume: data.direct_volume, + swap_volume: data.swap_volume, + } + } + + pub fn merge(&mut self, that: &Self) { + self.close = that.close; + self.low = self.low.min(that.low); + self.high = self.high.max(that.high); + self.direct_volume += that.direct_volume; + self.swap_volume += that.swap_volume; } } - fn merge(&self, that: &Self) -> Self { - Self { - open: self.open, - close: that.close, - low: self.low.min(that.low), - high: self.high.max(that.high), - direct_volume: self.direct_volume + that.direct_volume, - swap_volume: self.swap_volume + that.swap_volume, + impl From for Candle { + fn from(value: CandlestickData) -> Self { + Self::from(&value) } } -} -impl From for Candle { - fn from(value: CandlestickData) -> Self { - Self::from(&value) + impl From<&CandlestickData> for Candle { + fn from(value: &CandlestickData) -> Self { + Self::from_candlestick_data(value) + } } -} -impl From<&CandlestickData> for Candle { - fn from(value: &CandlestickData) -> Self { - Self::from_candlestick_data(value) + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] + pub enum Window { + W1m, + W15m, + W1h, + W4h, + W1d, + W1w, + W1mo, } -} -#[derive(Clone, Copy, Debug)] -enum Window { - W1m, - W15m, - W1h, - W4h, - W1d, - W1w, - W1mo, -} + impl Window { + pub fn all() -> impl Iterator { + [ + Window::W1m, + Window::W15m, + Window::W1h, + Window::W4h, + Window::W1d, + Window::W1w, + Window::W1mo, + ] + .into_iter() + } -impl Window { - fn all() -> impl Iterator { - [ - Window::W1m, - Window::W15m, - Window::W1h, - Window::W4h, - Window::W1d, - Window::W1w, - Window::W1mo, - ] - .into_iter() + /// Get the anchor for a given time. + /// + /// This is the latest time that "snaps" to a given anchor, dependent on the window. + /// + /// For example, the 1 minute window has an anchor every minute, the day window + /// every day, etc. + pub fn anchor(&self, time: DateTime) -> DateTime { + let (y, mo, d, h, m) = ( + time.year(), + time.month(), + time.day(), + time.hour(), + time.minute(), + ); + let out = match self { + Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(), + Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(), + Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(), + Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(), + Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(), + Window::W1w => Utc + .with_ymd_and_hms(y, mo, d, 0, 0, 0) + .single() + .and_then(|x| { + x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into())) + }), + Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(), + }; + out.unwrap() + } + + pub fn subtract_from(&self, time: DateTime) -> DateTime { + let delta = match self { + Window::W1m => TimeDelta::minutes(1), + Window::W15m => TimeDelta::minutes(15), + Window::W1h => TimeDelta::hours(1), + Window::W4h => TimeDelta::hours(4), + Window::W1d => TimeDelta::days(1), + Window::W1w => TimeDelta::weeks(1), + Window::W1mo => TimeDelta::days(30), + }; + time - delta + } } - /// Get the anchor for a given time. - /// - /// This is the latest time that "snaps" to a given anchor, dependent on the window. - /// - /// For example, the 1 minute window has an anchor every minute, the day window - /// every day, etc. - fn anchor(&self, time: DateTime) -> DateTime { - let (y, mo, d, h, m) = ( - time.year(), - time.month(), - time.day(), - time.hour(), - time.minute(), - ); - let out = match self { - Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(), - Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(), - Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(), - Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(), - Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(), - Window::W1w => Utc - .with_ymd_and_hms(y, mo, d, 0, 0, 0) - .single() - .and_then(|x| { - x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into())) - }), - Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(), - }; - out.unwrap() + impl Display for Window { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use Window::*; + let str = match self { + W1m => "1m", + W15m => "15m", + W1h => "1h", + W4h => "4h", + W1d => "1d", + W1w => "1w", + W1mo => "1mo", + }; + write!(f, "{}", str) + } } -} -impl Display for Window { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - use Window::*; - let str = match self { - W1m => "1m", - W15m => "15m", - W1h => "1h", - W4h => "4h", - W1d => "1d", - W1w => "1w", - W1mo => "1mo", - }; - write!(f, "{}", str) + #[derive(Debug)] + pub struct WindowedCandle { + start: DateTime, + window: Window, + candle: Candle, + } + + impl WindowedCandle { + pub fn new(now: DateTime, window: Window, candle: Candle) -> Self { + Self { + start: window.anchor(now), + window, + candle, + } + } + + /// Update with a new candlestick, at a given time. + /// + /// This may return the old candlestick and its start time, if we should be starting + /// a new candle, based on the candle for that window having already been closed. + pub fn with_candle(&mut self, now: DateTime, candle: Candle) -> Option<(DateTime, Candle)> { + let start = self.window.anchor(now); + // This candle belongs to the next window! + if start > self.start { + let old_start = std::mem::replace(&mut self.start, start); + let old_candle = std::mem::replace(&mut self.candle, candle); + Some((old_start, old_candle)) + } else { + self.candle.merge(&candle); + None + } + } + + pub fn window(&self) -> Window { + self.window + } + + pub fn flush(self) -> (DateTime, Candle) { + (self.start, self.candle) + } } } +pub use candle::{Candle, Window, WindowedCandle}; mod price_chart { use super::*; /// A context when processing a price chart. #[derive(Debug)] - pub struct Context<'tx, 'db> { - dbtx: &'tx mut PgTransaction<'db>, + pub struct Context { asset_start: asset::Id, asset_end: asset::Id, window: Window, + state: Option, } - impl<'tx, 'db> Context<'tx, 'db> { - pub fn new( - dbtx: &'tx mut PgTransaction<'db>, + impl Context { + pub async fn load( + dbtx: &mut PgTransaction<'_>, asset_start: asset::Id, asset_end: asset::Id, window: Window, - ) -> Self { - Self { - dbtx, + ) -> anyhow::Result { + let row: Option<(f64, f64, f64, f64, f64, f64, DateTime)> = sqlx::query_as( + " + SELECT open, close, high, low, direct_volume, swap_volume, start_time + FROM dex_ex_price_charts + WHERE asset_start = $1 + AND asset_end = $2 + AND the_window = $3 + ORDER BY start_time DESC + LIMIT 1 + ", + ) + .bind(asset_start.to_bytes()) + .bind(asset_end.to_bytes()) + .bind(window.to_string()) + .fetch_optional(dbtx.as_mut()) + .await?; + let state = row.map( + |(open, close, low, high, direct_volume, swap_volume, start)| { + let candle = Candle { + open, + close, + low, + high, + direct_volume, + swap_volume, + }; + WindowedCandle::new(start, window, candle) + }, + ); + Ok(Self { asset_start, asset_end, window, - } + state, + }) } - /// Get the candle we should update, based on the current timestamp. - async fn relevant_candle( - &mut self, - anchor: DateTime, - ) -> anyhow::Result> { - let stuff: Option<(i32, f64, f64, f64, f64, f64, f64)> = sqlx::query_as( - r#" - SELECT - dex_ex_candlesticks.id, - open, - close, - high, - low, - direct_volume, - swap_volume - FROM dex_ex_price_charts - JOIN dex_ex_candlesticks ON dex_ex_candlesticks.id = candlestick_id - WHERE asset_start = $1 - AND asset_end = $2 - AND the_window = $3 - AND start_time >= $4 - "#, - ) - .bind(self.asset_start.to_bytes().as_slice()) - .bind(self.asset_end.to_bytes().as_slice()) - .bind(self.window.to_string()) - .bind(anchor) - .fetch_optional(self.dbtx.as_mut()) - .await?; - Ok( - stuff.map(|(id, open, close, high, low, direct_volume, swap_volume)| { - ( - id, - Candle { - open, - close, - high, - low, - direct_volume, - swap_volume, - }, - ) - }), + async fn write_candle( + &self, + dbtx: &mut PgTransaction<'_>, + start: DateTime, + candle: Candle, + ) -> anyhow::Result<()> { + sqlx::query( + " + INSERT INTO dex_ex_price_charts( + id, asset_start, asset_end, the_window, + start_time, open, close, high, low, direct_volume, swap_volume + ) + VALUES( + DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10 ) - } - - async fn create_candle(&mut self, anchor: DateTime, candle: Candle) -> anyhow::Result<()> { - let id: i32 = sqlx::query_scalar( - r#" - INSERT INTO dex_ex_candlesticks VALUES (DEFAULT, $1, $2, $3, $4, $5, $6) RETURNING id - "#, + ON CONFLICT (asset_start, asset_end, the_window, start_time) DO UPDATE SET + open = EXCLUDED.open, + close = EXCLUDED.close, + high = EXCLUDED.high, + low = EXCLUDED.low, + direct_volume = EXCLUDED.direct_volume, + swap_volume = EXCLUDED.swap_volume + ", ) + .bind(self.asset_start.to_bytes()) + .bind(self.asset_end.to_bytes()) + .bind(self.window.to_string()) + .bind(start) .bind(candle.open) .bind(candle.close) .bind(candle.high) .bind(candle.low) .bind(candle.direct_volume) .bind(candle.swap_volume) - .fetch_one(self.dbtx.as_mut()) - .await?; - sqlx::query( - r#" - INSERT INTO dex_ex_price_charts VALUES (DEFAULT, $1, $2, $3, $4, $5) - "#, - ) - .bind(self.asset_start.to_bytes().as_slice()) - .bind(self.asset_end.to_bytes().as_slice()) - .bind(self.window.to_string()) - .bind(anchor) - .bind(id) - .execute(self.dbtx.as_mut()) + .execute(dbtx.as_mut()) .await?; Ok(()) } - async fn update_candle(&mut self, id: i32, candle: Candle) -> anyhow::Result<()> { - sqlx::query( - r#" - UPDATE dex_ex_candlesticks - SET (open, close, high, low, direct_volume, swap_volume) = - ($1, $2, $3, $4, $5, $6) - WHERE id = $7 - "#, - ) - .bind(candle.open) - .bind(candle.close) - .bind(candle.high) - .bind(candle.low) - .bind(candle.direct_volume) - .bind(candle.swap_volume) - .bind(id) - .execute(self.dbtx.as_mut()) - .await?; + pub async fn update( + &mut self, + dbtx: &mut PgTransaction<'_>, + now: DateTime, + candle: Candle, + ) -> anyhow::Result<()> { + let state = match self.state.as_mut() { + Some(x) => x, + None => { + self.state = Some(WindowedCandle::new(now, self.window, candle)); + self.state.as_mut().unwrap() + } + }; + if let Some((start, old_candle)) = state.with_candle(now, candle) { + self.write_candle(dbtx, start, old_candle).await?; + }; Ok(()) } - pub async fn update(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> { - let anchor = self.window.anchor(time); - match self.relevant_candle(anchor).await? { - None => self.create_candle(anchor, candle).await?, - Some((id, old_candle)) => self.update_candle(id, old_candle.merge(&candle)).await?, - }; + pub async fn unload(mut self, dbtx: &mut PgTransaction<'_>) -> anyhow::Result<()> { + let state = std::mem::replace(&mut self.state, None); + if let Some(state) = state { + let (start, candle) = state.flush(); + self.write_candle(dbtx, start, candle).await?; + } Ok(()) } } @@ -270,215 +329,424 @@ mod price_chart { use price_chart::Context as PriceChartContext; mod summary { - use super::*; + use cometindex::PgTransaction; + use penumbra_asset::asset; - #[derive(Debug)] - pub struct Context<'tx, 'db> { - dbtx: &'tx mut PgTransaction<'db>, - asset_start: asset::Id, - asset_end: asset::Id, + use super::{Candle, DateTime, PairMetrics, Window}; + + pub struct Context { + start: asset::Id, + end: asset::Id, + price: f64, + liquidity: f64, } - impl<'tx, 'db> Context<'tx, 'db> { - pub fn new( - dbtx: &'tx mut PgTransaction<'db>, - asset_start: asset::Id, - asset_end: asset::Id, - ) -> Self { - Self { - dbtx, - asset_start, - asset_end, - } + impl Context { + pub async fn load( + dbtx: &mut PgTransaction<'_>, + start: asset::Id, + end: asset::Id, + ) -> anyhow::Result { + let row: Option<(f64, f64)> = sqlx::query_as( + " + SELECT price, liquidity + FROM dex_ex_pairs_block_snapshot + WHERE asset_start = $1 + AND asset_end = $2 + ORDER BY id DESC + LIMIT 1 + ", + ) + .bind(start.to_bytes()) + .bind(end.to_bytes()) + .fetch_optional(dbtx.as_mut()) + .await?; + let (price, liquidity) = row.unwrap_or_default(); + Ok(Self { + start, + end, + price, + liquidity, + }) } - pub async fn add_candle(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> { - let asset_start = self.asset_start.to_bytes(); - let asset_end = self.asset_end.to_bytes(); + pub async fn update( + &mut self, + dbtx: &mut PgTransaction<'_>, + now: DateTime, + candle: Option, + metrics: PairMetrics, + ) -> anyhow::Result<()> { + if let Some(candle) = candle { + self.price = candle.close; + } + self.liquidity += metrics.liquidity_change; + sqlx::query( - r#" - INSERT INTO _dex_ex_summary_backing VALUES ($1, $2, $3, $4, $5, $6) - "#, + " + INSERT INTO dex_ex_pairs_block_snapshot VALUES ( + DEFAULT, $1, $2, $3, $4, $5, $6, $7, $8 + ) + ", ) - .bind(asset_start.as_slice()) - .bind(asset_end.as_slice()) - .bind(time) - .bind(candle.close) - .bind(candle.direct_volume) - .bind(candle.swap_volume) - .execute(self.dbtx.as_mut()) + .bind(now) + .bind(self.start.to_bytes()) + .bind(self.end.to_bytes()) + .bind(self.price) + .bind(self.liquidity) + .bind(candle.map(|x| x.direct_volume).unwrap_or_default()) + .bind(candle.map(|x| x.swap_volume).unwrap_or_default()) + .bind(metrics.trades) + .execute(dbtx.as_mut()) .await?; Ok(()) } } - pub async fn update_all(dbtx: &mut PgTransaction<'_>, time: DateTime) -> anyhow::Result<()> { - let time_24h_ago = time - .checked_sub_days(Days::new(1)) - .ok_or(anyhow!("should be able to get time 24h ago from {}", time))?; + pub async fn update_summary( + dbtx: &mut PgTransaction<'_>, + now: DateTime, + start: asset::Id, + end: asset::Id, + window: Window, + ) -> anyhow::Result<()> { + let then = window.subtract_from(now); sqlx::query( - r#" - DELETE FROM _dex_ex_summary_backing WHERE time < $1 - "#, + " + WITH + snapshots AS ( + SELECT * + FROM dex_ex_pairs_block_snapshot + WHERE asset_start = $1 + AND asset_end = $2 + ), + previous AS ( + SELECT price AS price_then, liquidity AS liquidity_then + FROM snapshots + WHERE time <= $4 + ORDER BY time DESC + LIMIT 1 + ), + previous_or_default AS ( + SELECT + COALESCE((SELECT price_then FROM previous), 0.0) AS price_then, + COALESCE((SELECT liquidity_then FROM previous), 0.0) AS liquidity_then + ), + now AS ( + SELECT price, liquidity + FROM snapshots + WHERE time >= $3 + ORDER BY time ASC + LIMIT 1 + ), + sums AS ( + SELECT + COALESCE(SUM(direct_volume), 0.0) AS direct_volume_over_window, + COALESCE(SUM(swap_volume), 0.0) AS swap_volume_over_window, + COALESCE(SUM(trades), 0.0) AS trades_over_window + FROM snapshots + WHERE time <= $3 + AND time >= $4 ) - .bind(time_24h_ago) - .execute(dbtx.as_mut()) - .await?; - // Update all of the summaries with relevant backing data. - // - // We choose this one as being responsible for creating the first summary. - sqlx::query( - r#" - INSERT INTO dex_ex_summary - SELECT DISTINCT ON (asset_start, asset_end) - asset_start, - asset_end, - FIRST_VALUE(price) OVER w AS price_24h_ago, - price AS current_price, - MAX(price) OVER w AS high_24h, - MIN(price) OVER w AS low_24h, - SUM(direct_volume) OVER w AS direct_volume_24h, - SUM(swap_volume) OVER w AS swap_volume_24h - FROM _dex_ex_summary_backing - WINDOW w AS ( - PARTITION BY - asset_start, asset_end - ORDER BY asset_start, asset_end, time DESC - ) ORDER by asset_start, asset_end, time ASC - ON CONFLICT (asset_start, asset_end) DO UPDATE SET - price_24h_ago = EXCLUDED.price_24h_ago, - current_price = EXCLUDED.current_price, - high_24h = EXCLUDED.high_24h, - low_24h = EXCLUDED.low_24h, - direct_volume_24h = EXCLUDED.direct_volume_24h, - swap_volume_24h = EXCLUDED.swap_volume_24h - "#, + INSERT INTO dex_ex_pairs_summary + SELECT + $1, $2, $5, + price, price_then, + liquidity, liquidity_then, + direct_volume_over_window, + swap_volume_over_window, + trades_over_window + FROM previous_or_default JOIN now ON TRUE JOIN sums ON TRUE + ON CONFLICT (asset_start, asset_end, the_window) + DO UPDATE SET + price = EXCLUDED.price, + price_then = EXCLUDED.price_then, + liquidity = EXCLUDED.liquidity, + liquidity_then = EXCLUDED.liquidity_then, + direct_volume_over_window = EXCLUDED.direct_volume_over_window, + swap_volume_over_window = EXCLUDED.swap_volume_over_window, + trades_over_window = EXCLUDED.trades_over_window + ", ) + .bind(start.to_bytes()) + .bind(end.to_bytes()) + .bind(now) + .bind(then) + .bind(window.to_string()) .execute(dbtx.as_mut()) .await?; - // When we don't have backing data, we should nonetheless update to reflect this + Ok(()) + } + + pub async fn update_aggregate_summary( + dbtx: &mut PgTransaction<'_>, + window: Window, + denom: asset::Id, + min_liquidity: f64, + ) -> anyhow::Result<()> { + // TODO: do something here sqlx::query( - r#" - UPDATE dex_ex_summary - SET - price_24h_ago = current_price, - high_24h = current_price, - low_24h = current_price, - direct_volume_24h = 0, - swap_volume_24h = 0 - WHERE NOT EXISTS ( - SELECT 1 - FROM _dex_ex_summary_backing - WHERE - _dex_ex_summary_backing.asset_start = dex_ex_summary.asset_start - AND - _dex_ex_summary_backing.asset_end = dex_ex_summary.asset_end + " + WITH + eligible_denoms AS ( + SELECT asset_start as asset, price + FROM dex_ex_pairs_summary + WHERE asset_end = $1 AND liquidity >= $2 + UNION VALUES ($1, 1.0) + ), + converted_pairs_summary AS ( + SELECT + asset_start, asset_end, + (dex_ex_pairs_summary.price - greatest(price_then, 0.000001)) / greatest(price_then, 0.000001) * 100 AS price_change, + liquidity * eligible_denoms.price AS liquidity, + direct_volume_over_window * eligible_denoms.price as dv, + swap_volume_over_window * eligible_denoms.price as sv, + trades_over_window as trades + FROM dex_ex_pairs_summary + JOIN eligible_denoms + ON eligible_denoms.asset = asset_end + WHERE the_window = $3 + ), + sums AS ( + SELECT + SUM(dv) AS direct_volume, + SUM(sv) AS swap_volume, + SUM(liquidity) AS liquidity, + SUM(trades) AS trades, + (SELECT COUNT(*) FROM converted_pairs_summary WHERE dv > 0 OR sv > 0) AS active_pairs + FROM converted_pairs_summary + ), + largest_sv AS ( + SELECT + asset_start AS largest_sv_trading_pair_start, + asset_end AS largest_sv_trading_pair_end, + sv AS largest_sv_trading_pair_volume + FROM converted_pairs_summary + ORDER BY sv DESC + LIMIT 1 + ), + largest_dv AS ( + SELECT + asset_start AS largest_dv_trading_pair_start, + asset_end AS largest_dv_trading_pair_end, + dv AS largest_dv_trading_pair_volume + FROM converted_pairs_summary + ORDER BY dv DESC + LIMIT 1 + ), + top_price_mover AS ( + SELECT + asset_start AS top_price_mover_start, + asset_end AS top_price_mover_end, + price_change AS top_price_mover_change_percent + FROM converted_pairs_summary + ORDER BY price_change DESC + LIMIT 1 ) - "#, - ) + INSERT INTO dex_ex_aggregate_summary + SELECT + $3, + direct_volume, swap_volume, liquidity, trades, active_pairs, + largest_sv_trading_pair_start, + largest_sv_trading_pair_end, + largest_sv_trading_pair_volume, + largest_dv_trading_pair_start, + largest_dv_trading_pair_end, + largest_dv_trading_pair_volume, + top_price_mover_start, + top_price_mover_end, + top_price_mover_change_percent + FROM + sums + JOIN largest_sv ON TRUE + JOIN largest_dv ON TRUE + JOIN top_price_mover ON TRUE + ON CONFLICT (the_window) DO UPDATE SET + direct_volume = EXCLUDED.direct_volume, + swap_volume = EXCLUDED.swap_volume, + liquidity = EXCLUDED.liquidity, + trades = EXCLUDED.trades, + active_pairs = EXCLUDED.active_pairs, + largest_sv_trading_pair_start = EXCLUDED.largest_sv_trading_pair_start, + largest_sv_trading_pair_end = EXCLUDED.largest_sv_trading_pair_end, + largest_sv_trading_pair_volume = EXCLUDED.largest_sv_trading_pair_volume, + largest_dv_trading_pair_start = EXCLUDED.largest_dv_trading_pair_start, + largest_dv_trading_pair_end = EXCLUDED.largest_dv_trading_pair_end, + largest_dv_trading_pair_volume = EXCLUDED.largest_dv_trading_pair_volume, + top_price_mover_start = EXCLUDED.top_price_mover_start, + top_price_mover_end = EXCLUDED.top_price_mover_end, + top_price_mover_change_percent = EXCLUDED.top_price_mover_change_percent + ") + .bind(denom.to_bytes()) + .bind(min_liquidity) + .bind(window.to_string()) .execute(dbtx.as_mut()) .await?; Ok(()) } } -use summary::Context as SummaryContext; - -async fn queue_event_candlestick_data( - dbtx: &mut PgTransaction<'_>, - height: u64, - event: EventCandlestickData, -) -> anyhow::Result<()> { - sqlx::query("INSERT INTO _dex_ex_queue VALUES (DEFAULT, $1, $2)") - .bind(i64::try_from(height)?) - .bind(event.encode_to_vec().as_slice()) - .execute(dbtx.as_mut()) - .await?; - Ok(()) +#[derive(Debug, Default, Clone, Copy)] +struct PairMetrics { + trades: f64, + liquidity_change: f64, } -async fn unqueue_event_candlestick_data( - dbtx: &mut PgTransaction<'_>, - height: u64, -) -> anyhow::Result> { - let values: Vec> = - sqlx::query_scalar("DELETE FROM _dex_ex_queue WHERE height = $1 RETURNING data") - .bind(i64::try_from(height)?) - .fetch_all(dbtx.as_mut()) - .await?; - values - .into_iter() - .map(|x| EventCandlestickData::decode(x.as_slice())) - .collect() +#[derive(Debug)] +struct Events { + time: Option, + candles: HashMap, + metrics: HashMap, } -async fn on_event_candlestick_data( - dbtx: &mut PgTransaction<'_>, - event_time: DateTime, - event: EventCandlestickData, -) -> anyhow::Result<()> { - let asset_start = event.pair.start; - let asset_end = event.pair.end; - let candle = event.stick.into(); - for window in Window::all() { - let mut ctx = PriceChartContext::new(dbtx, asset_start, asset_end, window); - ctx.update(event_time, candle).await?; +impl Events { + fn new() -> Self { + Self { + time: None, + candles: HashMap::new(), + metrics: HashMap::new(), + } } - let mut ctx = SummaryContext::new(dbtx, asset_start, asset_end); - ctx.add_candle(event_time, candle).await?; - Ok(()) -} -async fn fetch_height_time( - dbtx: &mut PgTransaction<'_>, - height: u64, -) -> anyhow::Result> { - const CTX: &'static str = r#" -The `dex_ex` component relies on the `block` component to be running, to provide the `block_details` with timestamps. -Make sure that is running as well. -"#; - sqlx::query_scalar("SELECT timestamp FROM block_details WHERE height = $1") - .bind(i64::try_from(height)?) - .fetch_optional(dbtx.as_mut()) - .await - .context(CTX) -} + fn with_time(&mut self, time: DateTime) { + self.time = Some(time) + } -#[derive(Debug)] -pub struct Component {} + fn with_candle(&mut self, pair: DirectedTradingPair, candle: Candle) { + match self.candles.get_mut(&pair) { + None => { + self.candles.insert(pair, candle); + } + Some(current) => { + current.merge(&candle); + } + } + } -impl Component { - pub fn new() -> Self { - Self {} + fn metric(&mut self, pair: &DirectedTradingPair) -> &mut PairMetrics { + if !self.metrics.contains_key(pair) { + self.metrics.insert(*pair, PairMetrics::default()); + } + // NOPANIC: inserted above. + self.metrics.get_mut(pair).unwrap() } - async fn index_event( - &self, - dbtx: &mut PgTransaction<'_>, - event: &ContextualizedEvent, - ) -> Result<(), anyhow::Error> { - if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { - let height = event.block_height; - match fetch_height_time(dbtx, height).await? { - None => { - queue_event_candlestick_data(dbtx, height, e).await?; - } - Some(time) => { - on_event_candlestick_data(dbtx, time, e).await?; + fn with_trade(&mut self, pair: &DirectedTradingPair) { + self.metric(pair).trades += 1.0; + } + + fn with_reserve_change( + &mut self, + pair: &TradingPair, + old_reserves: Option, + new_reserves: Reserves, + removed: bool, + ) { + let (diff_1, diff_2) = match (removed, old_reserves, new_reserves) { + (true, None, new) => (-(new.r1.value() as f64), -(new.r2.value() as f64)), + (_, None, new) => ((new.r1.value() as f64), (new.r2.value() as f64)), + (_, Some(old), new) => ( + (new.r1.value() as f64) - (old.r1.value() as f64), + (new.r2.value() as f64) - (old.r2.value() as f64), + ), + }; + for (d_pair, diff) in [ + ( + DirectedTradingPair { + start: pair.asset_1(), + end: pair.asset_2(), + }, + diff_2, + ), + ( + DirectedTradingPair { + start: pair.asset_2(), + end: pair.asset_1(), + }, + diff_1, + ), + ] { + self.metric(&d_pair).liquidity_change += diff; + } + } + + pub fn extract(block: &BlockEvents) -> anyhow::Result { + let mut out = Self::new(); + for event in &block.events { + if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { + let candle = Candle::from_candlestick_data(&e.stick); + out.with_candle(e.pair, candle); + } else if let Ok(e) = EventBlockRoot::try_from_event(&event.event) { + let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!( + "creating timestamp should succeed; timestamp: {}", + e.timestamp_seconds + ))?; + out.with_time(time); + } else if let Ok(e) = EventPositionOpen::try_from_event(&event.event) { + out.with_reserve_change( + &e.trading_pair, + None, + Reserves { + r1: e.reserves_1, + r2: e.reserves_2, + }, + false, + ); + } else if let Ok(e) = EventPositionWithdraw::try_from_event(&event.event) { + // TODO: use close positions to track liquidity more precisely, in practic I (ck) expect few + // positions to close with being withdrawn. + out.with_reserve_change( + &e.trading_pair, + None, + Reserves { + r1: e.reserves_1, + r2: e.reserves_2, + }, + true, + ); + } else if let Ok(e) = EventPositionExecution::try_from_event(&event.event) { + out.with_reserve_change( + &e.trading_pair, + Some(Reserves { + r1: e.prev_reserves_1, + r2: e.prev_reserves_2, + }), + Reserves { + r1: e.reserves_1, + r2: e.reserves_2, + }, + false, + ); + if e.reserves_1 > e.prev_reserves_1 { + // Whatever asset we ended up with more with was traded in. + out.with_trade(&DirectedTradingPair { + start: e.trading_pair.asset_1(), + end: e.trading_pair.asset_2(), + }); + } else if e.reserves_2 > e.prev_reserves_2 { + out.with_trade(&DirectedTradingPair { + start: e.trading_pair.asset_2(), + end: e.trading_pair.asset_1(), + }); } } - } else if let Ok(e) = EventBlockRoot::try_from_event(&event.event) { - let height = e.height; - let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!( - "creating timestamp should succeed; timestamp: {}", - e.timestamp_seconds - ))?; - for event in unqueue_event_candlestick_data(dbtx, height).await? { - on_event_candlestick_data(dbtx, time, event).await?; - } - summary::update_all(dbtx, time).await?; } - Ok(()) + Ok(out) + } +} + +#[derive(Debug)] +pub struct Component { + denom: asset::Id, + min_liquidity: f64, +} + +impl Component { + pub fn new(denom: asset::Id, min_liquidity: f64) -> Self { + Self { + denom, + min_liquidity, + } } } @@ -504,8 +772,67 @@ impl AppView for Component { dbtx: &mut PgTransaction, batch: EventBatch, ) -> Result<(), anyhow::Error> { - for event in batch.events() { - self.index_event(dbtx, event).await?; + let mut charts = HashMap::new(); + let mut snapshots = HashMap::new(); + let mut last_time = None; + for block in batch.by_height.iter() { + let events = Events::extract(&block)?; + let time = events + .time + .expect(&format!("no block root event at height {}", block.height)); + last_time = Some(time); + + for (pair, candle) in &events.candles { + for window in Window::all() { + let key = (pair.start, pair.end, window); + if !charts.contains_key(&key) { + let ctx = PriceChartContext::load(dbtx, key.0, key.1, key.2).await?; + charts.insert(key, ctx); + } + charts + .get_mut(&key) + .unwrap() // safe because we just inserted above + .update(dbtx, time, *candle) + .await?; + } + } + + let block_pairs = events + .candles + .keys() + .chain(events.metrics.keys()) + .copied() + .collect::>(); + for pair in block_pairs { + if !snapshots.contains_key(&pair) { + let ctx = summary::Context::load(dbtx, pair.start, pair.end).await?; + snapshots.insert(pair, ctx); + } + // NOPANIC: inserted above + snapshots + .get_mut(&pair) + .unwrap() + .update( + dbtx, + time, + events.candles.get(&pair).copied(), + events.metrics.get(&pair).copied().unwrap_or_default(), + ) + .await?; + } + } + + if let Some(now) = last_time { + for window in Window::all() { + for pair in snapshots.keys() { + summary::update_summary(dbtx, now, pair.start, pair.end, window).await?; + } + summary::update_aggregate_summary(dbtx, window, self.denom, self.min_liquidity) + .await?; + } + } + for chart in charts.into_values() { + chart.unload(dbtx).await?; } Ok(()) } diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql index 53f85ceb8a..1e89821f3e 100644 --- a/crates/bin/pindexer/src/dex_ex/schema.sql +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -1,19 +1,3 @@ -CREATE TABLE IF NOT EXISTS dex_ex_candlesticks ( - id SERIAL PRIMARY KEY, - -- The price at the start of a window. - open FLOAT8 NOT NULL, - -- The price at the close of a window. - close FLOAT8 NOT NULL, - -- The highest price reached during a window. - high FLOAT8 NOT NULL, - -- The lowest price reached during a window. - low FLOAT8 NOT NULL, - -- The volume traded directly through position executions. - direct_volume FLOAT8 NOT NULL, - -- The volume that traded indirectly, possibly through several positions. - swap_volume FLOAT8 NOT NULL -); - -- Contains, for each directed asset pair and window type, candle sticks for each window. CREATE TABLE IF NOT EXISTS dex_ex_price_charts ( -- We just want a simple primary key to have here. @@ -28,46 +12,65 @@ CREATE TABLE IF NOT EXISTS dex_ex_price_charts ( the_window TEXT NOT NULL, -- The start time of this window. start_time TIMESTAMPTZ NOT NULL, - -- The start time for the window this stick is about. - candlestick_id INTEGER UNIQUE REFERENCES dex_ex_candlesticks (id) + -- The price at the start of a window. + open FLOAT8 NOT NULL, + -- The price at the close of a window. + close FLOAT8 NOT NULL, + -- The highest price reached during a window. + high FLOAT8 NOT NULL, + -- The lowest price reached during a window. + low FLOAT8 NOT NULL, + -- The volume traded directly through position executions. + direct_volume FLOAT8 NOT NULL, + -- The volume that traded indirectly, possibly through several positions. + swap_volume FLOAT8 NOT NULL ); CREATE UNIQUE INDEX ON dex_ex_price_charts (asset_start, asset_end, the_window, start_time); -CREATE TABLE IF NOT EXISTS _dex_ex_summary_backing ( +CREATE TABLE IF NOT EXISTS dex_ex_pairs_block_snapshot ( + id SERIAL PRIMARY KEY, + time TIMESTAMPTZ NOT NULL, asset_start BYTEA NOT NULL, asset_end BYTEA NOT NULL, - -- The time for this bit of information. - time TIMESTAMPTZ NOT NULL, - -- The price at this point. price FLOAT8 NOT NULL, - -- The volume for this particular candle. + liquidity FLOAT8 NOT NULL, direct_volume FLOAT8 NOT NULL, swap_volume FLOAT8 NOT NULL, - PRIMARY KEY (asset_start, asset_end, time) + trades FLOAT8 NOT NULL ); -CREATE TABLE IF NOT EXISTS dex_ex_summary ( - -- The first asset of the directed pair. +CREATE UNIQUE INDEX ON dex_ex_pairs_block_snapshot (time, asset_start, asset_end); +CREATE INDEX ON dex_ex_pairs_block_snapshot (asset_start, asset_end); + +CREATE TABLE IF NOT EXISTS dex_ex_pairs_summary ( asset_start BYTEA NOT NULL, - -- The second asset of the directed pair. asset_end BYTEA NOT NULL, - -- The current price (in terms of asset2) - current_price FLOAT8 NOT NULL, - -- The price 24h ago. - price_24h_ago FLOAT8 NOT NULL, - -- The highest price over the past 24h. - high_24h FLOAT8 NOT NULL, - -- The lowest price over the past 24h. - low_24h FLOAT8 NOT NULL, - -- c.f. candlesticks for the difference between these two - direct_volume_24h FLOAT8 NOT NULL, - swap_volume_24h FLOAT8 NOT NULL, - PRIMARY KEY (asset_start, asset_end) + the_window TEXT NOT NULL, + price FLOAT8 NOT NULL, + price_then FLOAT8 NOT NULL, + liquidity FLOAT8 NOT NULL, + liquidity_then FLOAT8 NOT NULL, + direct_volume_over_window FLOAT8 NOT NULL, + swap_volume_over_window FLOAT8 NOT NULL, + trades_over_window FLOAT8 NOT NULL, + PRIMARY KEY (asset_start, asset_end, the_window) ); -CREATE TABLE IF NOT EXISTS _dex_ex_queue ( - id SERIAL PRIMARY KEY, - height BIGINT NOT NULL, - data BYTEA NOT NULL +CREATE TABLE IF NOT EXISTS dex_ex_aggregate_summary ( + the_window TEXT PRIMARY KEY, + direct_volume FLOAT8 NOT NULL, + swap_volume FLOAT8 NOT NULL, + liquidity FLOAT8 NOT NULL, + trades FLOAT8 NOT NULL, + active_pairs FLOAT8 NOT NULL, + largest_sv_trading_pair_start BYTEA NOT NULL, + largest_sv_trading_pair_end BYTEA NOT NULL, + largest_sv_trading_pair_volume FLOAT8 NOT NULL, + largest_dv_trading_pair_start BYTEA NOT NULL, + largest_dv_trading_pair_end BYTEA NOT NULL, + largest_dv_trading_pair_volume FLOAT8 NOT NULL, + top_price_mover_start BYTEA NOT NULL, + top_price_mover_end BYTEA NOT NULL, + top_price_mover_change_percent FLOAT8 NOT NULL ); diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index c8dd76a31e..9ff68994f3 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -12,7 +12,14 @@ impl IndexerExt for cometindex::Indexer { .with_index(Box::new(crate::stake::DelegationTxs {})) .with_index(Box::new(crate::stake::UndelegationTxs {})) .with_index(Box::new(crate::governance::GovernanceProposals {})) - .with_index(Box::new(crate::dex_ex::Component::new())) + .with_index(Box::new(crate::dex_ex::Component::new( + penumbra_asset::asset::Id::from_str( + // USDC + "passet1w6e7fvgxsy6ccy3m8q0eqcuyw6mh3yzqu3uq9h58nu8m8mku359spvulf6", + ) + .expect("should be able to parse passet"), + 100.0 * 1000_0000.0, + ))) .with_index(Box::new(crate::supply::Component::new())) .with_index(Box::new(crate::ibc::Component::new())) .with_index(Box::new(crate::insights::Component::new(