Skip to content

Commit

Permalink
pindexer: dex_ex: implement summary with view
Browse files Browse the repository at this point in the history
We probably want one more tweak to be able to correctly handle no
activity.
  • Loading branch information
cronokirby committed Oct 10, 2024
1 parent 095cedd commit fe46f14
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 17 deletions.
56 changes: 54 additions & 2 deletions crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,53 @@ mod price_chart {

use price_chart::Context as PriceChartContext;

mod summary {
use super::*;

#[derive(Debug)]
pub struct Context<'tx, 'db> {
dbtx: &'tx mut PgTransaction<'db>,
asset_start: asset::Id,
asset_end: asset::Id,
}

impl<'tx, 'db> Context<'tx, 'db> {
pub fn new(
dbtx: &'tx mut PgTransaction<'db>,
asset_start: asset::Id,
asset_end: asset::Id,
) -> Self {
Self {
dbtx,
asset_start,
asset_end,
}
}

pub async fn update(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> {
let time_24h_ago = time
.checked_sub_days(Days::new(1))
.ok_or(anyhow!("should be able to get time 24h ago from {}", time))?;
sqlx::query("DELETE FROM _dex_ex_summary_backing WHERE time < $1")
.bind(time_24h_ago)
.execute(self.dbtx.as_mut())
.await?;
sqlx::query("INSERT INTO _dex_ex_summary_backing VALUES ($1, $2, $3, $4, $5, $6)")
.bind(self.asset_start.to_bytes().as_slice())
.bind(self.asset_end.to_bytes().as_slice())
.bind(time)
.bind(candle.close)
.bind(candle.direct_volume)
.bind(candle.swap_volume)
.execute(self.dbtx.as_mut())
.await?;
Ok(())
}
}
}

use summary::Context as SummaryContext;

async fn queue_event_candlestick_data(
dbtx: &mut PgTransaction<'_>,
height: u64,
Expand Down Expand Up @@ -304,10 +351,15 @@ async fn on_event_candlestick_data(
event_time: DateTime,
event: EventCandlestickData,
) -> anyhow::Result<()> {
let asset_start = event.pair.start;
let asset_end = event.pair.end;
let candle = event.stick.into();
for window in Window::all() {
let mut ctx = PriceChartContext::new(dbtx, event.pair.start, event.pair.end, window);
ctx.update(event_time, event.stick.into()).await?;
let mut ctx = PriceChartContext::new(dbtx, asset_start, asset_end, window);
ctx.update(event_time, candle).await?;
}
let mut ctx = SummaryContext::new(dbtx, asset_start, asset_end);
ctx.update(event_time, candle).await?;
Ok(())
}

Expand Down
42 changes: 27 additions & 15 deletions crates/bin/pindexer/src/dex_ex/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -34,25 +34,37 @@ CREATE TABLE IF NOT EXISTS dex_ex_price_charts (

CREATE UNIQUE INDEX ON dex_ex_price_charts (asset_start, asset_end, the_window, start_time);

CREATE TABLE IF NOT EXISTS dex_ex_summary (
-- The first asset of the directed pair.
CREATE TABLE IF NOT EXISTS _dex_ex_summary_backing (
asset_start BYTEA NOT NULL,
-- The second asset of the directed pair.
asset_end BYTEA NOT NULL,
-- The current price (in terms of asset2)
current_price FLOAT8 NOT NULL,
-- The price 24h ago.
price_24h_ago FLOAT8 NOT NULL,
-- The highest price over the past 24h.
high_24h FLOAT8 NOT NULL,
-- The lowest price over the past 24h.
low_24h FLOAT8 NOT NULL,
-- c.f. candlesticks for the difference between these two
direct_volume_24h FLOAT8 NOT NULL,
swap_volume_24h FLOAT8 NOT NULL,
PRIMARY KEY (asset_start, asset_end)
-- The time for this bit of information.
time TIMESTAMPTZ NOT NULL,
-- The price at this point.
price FLOAT8 NOT NULL,
-- The volume for this particular candle.
direct_volume FLOAT8 NOT NULL,
swap_volume FLOAT8 NOT NULL,
PRIMARY KEY (asset_start, asset_end, time)
);

CREATE OR REPLACE VIEW dex_ex_summary AS
SELECT DISTINCT ON (asset_start, asset_end)
asset_start,
asset_end,
FIRST_VALUE(price) OVER w AS current_price,
price AS price_24h_ago,
MAX(price) OVER w AS high_24h,
MIN(price) OVER w AS low_24h,
SUM(direct_volume) OVER w AS direct_volume_24h,
SUM(swap_volume) OVER w AS swap_volume_24h
FROM _dex_ex_summary_backing
WINDOW w AS (
PARTITION BY
asset_start, asset_end
ORDER BY asset_start, asset_end, time DESC
)
ORDER BY asset_start, asset_end, time ASC;

CREATE TABLE IF NOT EXISTS _dex_ex_queue (
id SERIAL PRIMARY KEY,
height BIGINT NOT NULL,
Expand Down

0 comments on commit fe46f14

Please sign in to comment.