Skip to content

Commit

Permalink
pindexer: dex-ex: implement robust summary calculation
Browse files Browse the repository at this point in the history
This works correctly even in the presence of no activity for a while on
a given pair.
  • Loading branch information
cronokirby committed Oct 10, 2024
1 parent fe46f14 commit a9042ef
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 35 deletions.
105 changes: 87 additions & 18 deletions crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,26 +294,94 @@ mod summary {
}
}

pub async fn update(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> {
let time_24h_ago = time
.checked_sub_days(Days::new(1))
.ok_or(anyhow!("should be able to get time 24h ago from {}", time))?;
sqlx::query("DELETE FROM _dex_ex_summary_backing WHERE time < $1")
.bind(time_24h_ago)
.execute(self.dbtx.as_mut())
.await?;
sqlx::query("INSERT INTO _dex_ex_summary_backing VALUES ($1, $2, $3, $4, $5, $6)")
.bind(self.asset_start.to_bytes().as_slice())
.bind(self.asset_end.to_bytes().as_slice())
.bind(time)
.bind(candle.close)
.bind(candle.direct_volume)
.bind(candle.swap_volume)
.execute(self.dbtx.as_mut())
.await?;
pub async fn add_candle(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> {
let asset_start = self.asset_start.to_bytes();
let asset_end = self.asset_end.to_bytes();
sqlx::query(
r#"
INSERT INTO _dex_ex_summary_backing VALUES ($1, $2, $3, $4, $5, $6)
"#,
)
.bind(asset_start.as_slice())
.bind(asset_end.as_slice())
.bind(time)
.bind(candle.close)
.bind(candle.direct_volume)
.bind(candle.swap_volume)
.execute(self.dbtx.as_mut())
.await?;
Ok(())
}
}

pub async fn update_all(dbtx: &mut PgTransaction<'_>, time: DateTime) -> anyhow::Result<()> {
let time_24h_ago = time
.checked_sub_days(Days::new(1))
.ok_or(anyhow!("should be able to get time 24h ago from {}", time))?;
sqlx::query(
r#"
DELETE FROM _dex_ex_summary_backing WHERE time < $1
"#,
)
.bind(time_24h_ago)
.execute(dbtx.as_mut())
.await?;
// Update all of the summaries with relevant backing data.
//
// We choose this one as being responsible for creating the first summary.
sqlx::query(
r#"
INSERT INTO dex_ex_summary
SELECT DISTINCT ON (asset_start, asset_end)
asset_start,
asset_end,
FIRST_VALUE(price) OVER w AS price_24h_ago,
price AS current_price,
MAX(price) OVER w AS high_24h,
MIN(price) OVER w AS low_24h,
SUM(direct_volume) OVER w AS direct_volume_24h,
SUM(swap_volume) OVER w AS swap_volume_24h
FROM _dex_ex_summary_backing
WINDOW w AS (
PARTITION BY
asset_start, asset_end
ORDER BY asset_start, asset_end, time DESC
) ORDER by asset_start, asset_end, time ASC
ON CONFLICT (asset_start, asset_end) DO UPDATE SET
price_24h_ago = EXCLUDED.price_24h_ago,
current_price = EXCLUDED.current_price,
high_24h = EXCLUDED.high_24h,
low_24h = EXCLUDED.low_24h,
direct_volume_24h = EXCLUDED.direct_volume_24h,
swap_volume_24h = EXCLUDED.swap_volume_24h
"#,
)
.execute(dbtx.as_mut())
.await?;
// When we don't have backing data, we should nonetheless update to reflect this
sqlx::query(
r#"
UPDATE dex_ex_summary
SET
price_24h_ago = current_price,
high_24h = current_price,
low_24h = current_price,
direct_volume_24h = 0,
swap_volume_24h = 0
WHERE NOT EXISTS (
SELECT 1
FROM _dex_ex_summary_backing
WHERE
_dex_ex_summary_backing.asset_start = dex_ex_summary.asset_start
AND
_dex_ex_summary_backing.asset_end = dex_ex_summary.asset_end
)
"#,
)
.execute(dbtx.as_mut())
.await?;
Ok(())
}
}

use summary::Context as SummaryContext;
Expand Down Expand Up @@ -359,7 +427,7 @@ async fn on_event_candlestick_data(
ctx.update(event_time, candle).await?;
}
let mut ctx = SummaryContext::new(dbtx, asset_start, asset_end);
ctx.update(event_time, candle).await?;
ctx.add_candle(event_time, candle).await?;
Ok(())
}

Expand Down Expand Up @@ -434,6 +502,7 @@ impl AppView for Component {
for event in unqueue_event_candlestick_data(dbtx, height).await? {
on_event_candlestick_data(dbtx, time, event).await?;
}
summary::update_all(dbtx, time).await?;
}
tracing::debug!(?event, "unrecognized event");
Ok(())
Expand Down
35 changes: 18 additions & 17 deletions crates/bin/pindexer/src/dex_ex/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -47,23 +47,24 @@ CREATE TABLE IF NOT EXISTS _dex_ex_summary_backing (
PRIMARY KEY (asset_start, asset_end, time)
);

CREATE OR REPLACE VIEW dex_ex_summary AS
SELECT DISTINCT ON (asset_start, asset_end)
asset_start,
asset_end,
FIRST_VALUE(price) OVER w AS current_price,
price AS price_24h_ago,
MAX(price) OVER w AS high_24h,
MIN(price) OVER w AS low_24h,
SUM(direct_volume) OVER w AS direct_volume_24h,
SUM(swap_volume) OVER w AS swap_volume_24h
FROM _dex_ex_summary_backing
WINDOW w AS (
PARTITION BY
asset_start, asset_end
ORDER BY asset_start, asset_end, time DESC
)
ORDER BY asset_start, asset_end, time ASC;
CREATE TABLE IF NOT EXISTS dex_ex_summary (
-- The first asset of the directed pair.
asset_start BYTEA NOT NULL,
-- The second asset of the directed pair.
asset_end BYTEA NOT NULL,
-- The current price (in terms of asset2)
current_price FLOAT8 NOT NULL,
-- The price 24h ago.
price_24h_ago FLOAT8 NOT NULL,
-- The highest price over the past 24h.
high_24h FLOAT8 NOT NULL,
-- The lowest price over the past 24h.
low_24h FLOAT8 NOT NULL,
-- c.f. candlesticks for the difference between these two
direct_volume_24h FLOAT8 NOT NULL,
swap_volume_24h FLOAT8 NOT NULL,
PRIMARY KEY (asset_start, asset_end)
);

CREATE TABLE IF NOT EXISTS _dex_ex_queue (
id SERIAL PRIMARY KEY,
Expand Down

0 comments on commit a9042ef

Please sign in to comment.