From 783db09c78be4617b2ff668ae35f45cf319caf9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=BAc=C3=A1s=20Meier?= Date: Fri, 11 Oct 2024 12:58:39 -0700 Subject: [PATCH] feat(pindexer): add dex candlestick support to schema (#4894) ## Describe your changes Closes #4869. This implementation isn't as fast as it could probably be, but more than enough to be real time, which is all that really matters. (Maintaining the summary is annoying and kind of tricky, especially if you want to correctly handle a "frozen price" for pairs where no activity is happening) ## Checklist before requesting a review - [x] If this code contains consensus-breaking changes, I have added the "consensus-breaking" label. Otherwise, I declare my belief that there are not consensus-breaking changes, for the following reason: > indexing only --- Cargo.lock | 3 + crates/bin/pindexer/Cargo.toml | 3 + crates/bin/pindexer/src/dex_ex/mod.rs | 510 ++++++++++++++++++ crates/bin/pindexer/src/dex_ex/schema.sql | 73 +++ crates/bin/pindexer/src/indexer_ext.rs | 2 +- crates/bin/pindexer/src/lib.rs | 1 + .../core/component/sct/src/component/tree.rs | 11 +- crates/core/component/sct/src/event.rs | 53 +- 8 files changed, 644 insertions(+), 12 deletions(-) create mode 100644 crates/bin/pindexer/src/dex_ex/mod.rs create mode 100644 crates/bin/pindexer/src/dex_ex/schema.sql diff --git a/Cargo.lock b/Cargo.lock index 367c0504aa..4681e50097 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5815,6 +5815,7 @@ name = "pindexer" version = "0.80.6" dependencies = [ "anyhow", + "chrono", "clap", "cometindex", "num-bigint", @@ -5827,8 +5828,10 @@ dependencies = [ "penumbra-keys", "penumbra-num", "penumbra-proto", + "penumbra-sct", "penumbra-shielded-pool", "penumbra-stake", + "prost", "serde_json", "sqlx", "tokio", diff --git a/crates/bin/pindexer/Cargo.toml b/crates/bin/pindexer/Cargo.toml index 88e062b8ca..610d43e780 100644 --- a/crates/bin/pindexer/Cargo.toml +++ b/crates/bin/pindexer/Cargo.toml @@ -13,6 +13,7 @@ publish = false [dependencies] anyhow = {workspace = true} clap = {workspace = true} +chrono = {workspace = true} cometindex = {workspace = true} num-bigint = { version = "0.4" } penumbra-shielded-pool = {workspace = true, default-features = false} @@ -26,6 +27,8 @@ penumbra-governance = {workspace = true, default-features = false} penumbra-num = {workspace = true, default-features = false} penumbra-asset = {workspace = true, default-features = false} penumbra-proto = {workspace = true, default-features = false} +penumbra-sct = {workspace = true, default-features = false} +prost = {workspace = true} tracing = {workspace = true} tokio = {workspace = true, features = ["full"]} serde_json = {workspace = true} diff --git a/crates/bin/pindexer/src/dex_ex/mod.rs b/crates/bin/pindexer/src/dex_ex/mod.rs new file mode 100644 index 0000000000..ec3882d0c9 --- /dev/null +++ b/crates/bin/pindexer/src/dex_ex/mod.rs @@ -0,0 +1,510 @@ +use std::fmt::Display; + +use anyhow::{anyhow, Context}; +use chrono::{Datelike, Days, TimeZone, Timelike as _, Utc}; +use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction}; +use penumbra_asset::asset; +use penumbra_dex::{event::EventCandlestickData, CandlestickData}; +use penumbra_proto::{event::EventDomainType, DomainType}; +use penumbra_sct::event::EventBlockRoot; +use prost::Name as _; +use sqlx::PgPool; + +type DateTime = sqlx::types::chrono::DateTime; + +/// Candlestick data, unmoored from the prison of a particular block height. +/// +/// In other words, this can represent candlesticks which span arbitrary windows, +/// and not just a single block. +#[derive(Debug, Clone, Copy)] +struct Candle { + open: f64, + close: f64, + low: f64, + high: f64, + direct_volume: f64, + swap_volume: f64, +} + +impl Candle { + fn from_candlestick_data(data: &CandlestickData) -> Self { + Self { + open: data.open, + close: data.close, + low: data.low, + high: data.high, + direct_volume: data.direct_volume, + swap_volume: data.swap_volume, + } + } + + fn merge(&self, that: &Self) -> Self { + Self { + open: self.open, + close: that.close, + low: self.low.min(that.low), + high: self.high.max(that.high), + direct_volume: self.direct_volume + that.direct_volume, + swap_volume: self.swap_volume + that.swap_volume, + } + } +} + +impl From for Candle { + fn from(value: CandlestickData) -> Self { + Self::from(&value) + } +} + +impl From<&CandlestickData> for Candle { + fn from(value: &CandlestickData) -> Self { + Self::from_candlestick_data(value) + } +} + +#[derive(Clone, Copy, Debug)] +enum Window { + W1m, + W15m, + W1h, + W4h, + W1d, + W1w, + W1mo, +} + +impl Window { + fn all() -> impl Iterator { + [ + Window::W1m, + Window::W15m, + Window::W1h, + Window::W4h, + Window::W1d, + Window::W1w, + Window::W1mo, + ] + .into_iter() + } + + /// Get the anchor for a given time. + /// + /// This is the latest time that "snaps" to a given anchor, dependent on the window. + /// + /// For example, the 1 minute window has an anchor every minute, the day window + /// every day, etc. + fn anchor(&self, time: DateTime) -> DateTime { + let (y, mo, d, h, m) = ( + time.year(), + time.month(), + time.day(), + time.hour(), + time.minute(), + ); + let out = match self { + Window::W1m => Utc.with_ymd_and_hms(y, mo, d, h, m, 0).single(), + Window::W15m => Utc.with_ymd_and_hms(y, mo, d, h, m - (m % 15), 0).single(), + Window::W1h => Utc.with_ymd_and_hms(y, mo, d, h, 0, 0).single(), + Window::W4h => Utc.with_ymd_and_hms(y, mo, d, h - (h % 4), 0, 0).single(), + Window::W1d => Utc.with_ymd_and_hms(y, mo, d, 0, 0, 0).single(), + Window::W1w => Utc + .with_ymd_and_hms(y, mo, d, 0, 0, 0) + .single() + .and_then(|x| { + x.checked_sub_days(Days::new(time.weekday().num_days_from_monday().into())) + }), + Window::W1mo => Utc.with_ymd_and_hms(y, mo, 1, 0, 0, 0).single(), + }; + out.unwrap() + } +} + +impl Display for Window { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + use Window::*; + let str = match self { + W1m => "1m", + W15m => "15m", + W1h => "1h", + W4h => "4h", + W1d => "1d", + W1w => "1w", + W1mo => "1mo", + }; + write!(f, "{}", str) + } +} + +mod price_chart { + use super::*; + + /// A context when processing a price chart. + #[derive(Debug)] + pub struct Context<'tx, 'db> { + dbtx: &'tx mut PgTransaction<'db>, + asset_start: asset::Id, + asset_end: asset::Id, + window: Window, + } + + impl<'tx, 'db> Context<'tx, 'db> { + pub fn new( + dbtx: &'tx mut PgTransaction<'db>, + asset_start: asset::Id, + asset_end: asset::Id, + window: Window, + ) -> Self { + Self { + dbtx, + asset_start, + asset_end, + window, + } + } + + /// Get the candle we should update, based on the current timestamp. + async fn relevant_candle( + &mut self, + anchor: DateTime, + ) -> anyhow::Result> { + let stuff: Option<(i32, f64, f64, f64, f64, f64, f64)> = sqlx::query_as( + r#" + SELECT + dex_ex_candlesticks.id, + open, + close, + high, + low, + direct_volume, + swap_volume + FROM dex_ex_price_charts + JOIN dex_ex_candlesticks ON dex_ex_candlesticks.id = candlestick_id + WHERE asset_start = $1 + AND asset_end = $2 + AND the_window = $3 + AND start_time >= $4 + "#, + ) + .bind(self.asset_start.to_bytes().as_slice()) + .bind(self.asset_end.to_bytes().as_slice()) + .bind(self.window.to_string()) + .bind(anchor) + .fetch_optional(self.dbtx.as_mut()) + .await?; + Ok( + stuff.map(|(id, open, close, high, low, direct_volume, swap_volume)| { + ( + id, + Candle { + open, + close, + high, + low, + direct_volume, + swap_volume, + }, + ) + }), + ) + } + + async fn create_candle(&mut self, anchor: DateTime, candle: Candle) -> anyhow::Result<()> { + let id: i32 = sqlx::query_scalar( + r#" + INSERT INTO dex_ex_candlesticks VALUES (DEFAULT, $1, $2, $3, $4, $5, $6) RETURNING id + "#, + ) + .bind(candle.open) + .bind(candle.close) + .bind(candle.high) + .bind(candle.low) + .bind(candle.direct_volume) + .bind(candle.swap_volume) + .fetch_one(self.dbtx.as_mut()) + .await?; + sqlx::query( + r#" + INSERT INTO dex_ex_price_charts VALUES (DEFAULT, $1, $2, $3, $4, $5) + "#, + ) + .bind(self.asset_start.to_bytes().as_slice()) + .bind(self.asset_end.to_bytes().as_slice()) + .bind(self.window.to_string()) + .bind(anchor) + .bind(id) + .execute(self.dbtx.as_mut()) + .await?; + Ok(()) + } + + async fn update_candle(&mut self, id: i32, candle: Candle) -> anyhow::Result<()> { + sqlx::query( + r#" + UPDATE dex_ex_candlesticks + SET (open, close, high, low, direct_volume, swap_volume) = + ($1, $2, $3, $4, $5, $6) + WHERE id = $7 + "#, + ) + .bind(candle.open) + .bind(candle.close) + .bind(candle.high) + .bind(candle.low) + .bind(candle.direct_volume) + .bind(candle.swap_volume) + .bind(id) + .execute(self.dbtx.as_mut()) + .await?; + Ok(()) + } + + pub async fn update(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> { + let anchor = self.window.anchor(time); + match self.relevant_candle(anchor).await? { + None => self.create_candle(anchor, candle).await?, + Some((id, old_candle)) => self.update_candle(id, old_candle.merge(&candle)).await?, + }; + Ok(()) + } + } +} + +use price_chart::Context as PriceChartContext; + +mod summary { + use super::*; + + #[derive(Debug)] + pub struct Context<'tx, 'db> { + dbtx: &'tx mut PgTransaction<'db>, + asset_start: asset::Id, + asset_end: asset::Id, + } + + impl<'tx, 'db> Context<'tx, 'db> { + pub fn new( + dbtx: &'tx mut PgTransaction<'db>, + asset_start: asset::Id, + asset_end: asset::Id, + ) -> Self { + Self { + dbtx, + asset_start, + asset_end, + } + } + + pub async fn add_candle(&mut self, time: DateTime, candle: Candle) -> anyhow::Result<()> { + let asset_start = self.asset_start.to_bytes(); + let asset_end = self.asset_end.to_bytes(); + sqlx::query( + r#" + INSERT INTO _dex_ex_summary_backing VALUES ($1, $2, $3, $4, $5, $6) + "#, + ) + .bind(asset_start.as_slice()) + .bind(asset_end.as_slice()) + .bind(time) + .bind(candle.close) + .bind(candle.direct_volume) + .bind(candle.swap_volume) + .execute(self.dbtx.as_mut()) + .await?; + Ok(()) + } + } + + pub async fn update_all(dbtx: &mut PgTransaction<'_>, time: DateTime) -> anyhow::Result<()> { + let time_24h_ago = time + .checked_sub_days(Days::new(1)) + .ok_or(anyhow!("should be able to get time 24h ago from {}", time))?; + sqlx::query( + r#" + DELETE FROM _dex_ex_summary_backing WHERE time < $1 + "#, + ) + .bind(time_24h_ago) + .execute(dbtx.as_mut()) + .await?; + // Update all of the summaries with relevant backing data. + // + // We choose this one as being responsible for creating the first summary. + sqlx::query( + r#" + INSERT INTO dex_ex_summary + SELECT DISTINCT ON (asset_start, asset_end) + asset_start, + asset_end, + FIRST_VALUE(price) OVER w AS price_24h_ago, + price AS current_price, + MAX(price) OVER w AS high_24h, + MIN(price) OVER w AS low_24h, + SUM(direct_volume) OVER w AS direct_volume_24h, + SUM(swap_volume) OVER w AS swap_volume_24h + FROM _dex_ex_summary_backing + WINDOW w AS ( + PARTITION BY + asset_start, asset_end + ORDER BY asset_start, asset_end, time DESC + ) ORDER by asset_start, asset_end, time ASC + ON CONFLICT (asset_start, asset_end) DO UPDATE SET + price_24h_ago = EXCLUDED.price_24h_ago, + current_price = EXCLUDED.current_price, + high_24h = EXCLUDED.high_24h, + low_24h = EXCLUDED.low_24h, + direct_volume_24h = EXCLUDED.direct_volume_24h, + swap_volume_24h = EXCLUDED.swap_volume_24h + "#, + ) + .execute(dbtx.as_mut()) + .await?; + // When we don't have backing data, we should nonetheless update to reflect this + sqlx::query( + r#" + UPDATE dex_ex_summary + SET + price_24h_ago = current_price, + high_24h = current_price, + low_24h = current_price, + direct_volume_24h = 0, + swap_volume_24h = 0 + WHERE NOT EXISTS ( + SELECT 1 + FROM _dex_ex_summary_backing + WHERE + _dex_ex_summary_backing.asset_start = dex_ex_summary.asset_start + AND + _dex_ex_summary_backing.asset_end = dex_ex_summary.asset_end + ) + "#, + ) + .execute(dbtx.as_mut()) + .await?; + Ok(()) + } +} + +use summary::Context as SummaryContext; + +async fn queue_event_candlestick_data( + dbtx: &mut PgTransaction<'_>, + height: u64, + event: EventCandlestickData, +) -> anyhow::Result<()> { + sqlx::query("INSERT INTO _dex_ex_queue VALUES (DEFAULT, $1, $2)") + .bind(i64::try_from(height)?) + .bind(event.encode_to_vec().as_slice()) + .execute(dbtx.as_mut()) + .await?; + Ok(()) +} + +async fn unqueue_event_candlestick_data( + dbtx: &mut PgTransaction<'_>, + height: u64, +) -> anyhow::Result> { + let values: Vec> = + sqlx::query_scalar("DELETE FROM _dex_ex_queue WHERE height = $1 RETURNING data") + .bind(i64::try_from(height)?) + .fetch_all(dbtx.as_mut()) + .await?; + values + .into_iter() + .map(|x| EventCandlestickData::decode(x.as_slice())) + .collect() +} + +async fn on_event_candlestick_data( + dbtx: &mut PgTransaction<'_>, + event_time: DateTime, + event: EventCandlestickData, +) -> anyhow::Result<()> { + let asset_start = event.pair.start; + let asset_end = event.pair.end; + let candle = event.stick.into(); + for window in Window::all() { + let mut ctx = PriceChartContext::new(dbtx, asset_start, asset_end, window); + ctx.update(event_time, candle).await?; + } + let mut ctx = SummaryContext::new(dbtx, asset_start, asset_end); + ctx.add_candle(event_time, candle).await?; + Ok(()) +} + +async fn fetch_height_time( + dbtx: &mut PgTransaction<'_>, + height: u64, +) -> anyhow::Result> { + const CTX: &'static str = r#" +The `dex_ex` component relies on the `block` component to be running, to provide the `block_details` with timestamps. +Make sure that is running as well. +"#; + sqlx::query_scalar("SELECT timestamp FROM block_details WHERE height = $1") + .bind(i64::try_from(height)?) + .fetch_optional(dbtx.as_mut()) + .await + .context(CTX) +} + +#[derive(Debug)] +pub struct Component {} + +impl Component { + pub fn new() -> Self { + Self {} + } +} + +#[async_trait] +impl AppView for Component { + async fn init_chain( + &self, + dbtx: &mut PgTransaction, + _: &serde_json::Value, + ) -> Result<(), anyhow::Error> { + for statement in include_str!("schema.sql").split(";") { + sqlx::query(statement).execute(dbtx.as_mut()).await?; + } + Ok(()) + } + + fn is_relevant(&self, type_str: &str) -> bool { + [ + ::Proto::full_name(), + ::Proto::full_name(), + ] + .into_iter() + .any(|x| type_str == x) + } + + async fn index_event( + &self, + dbtx: &mut PgTransaction, + event: &ContextualizedEvent, + _src_db: &PgPool, + ) -> Result<(), anyhow::Error> { + if let Ok(e) = EventCandlestickData::try_from_event(&event.event) { + let height = event.block_height; + match fetch_height_time(dbtx, height).await? { + None => { + queue_event_candlestick_data(dbtx, height, e).await?; + } + Some(time) => { + on_event_candlestick_data(dbtx, time, e).await?; + } + } + } else if let Ok(e) = EventBlockRoot::try_from_event(&event.event) { + let height = e.height; + let time = DateTime::from_timestamp(e.timestamp_seconds, 0).ok_or(anyhow!( + "creating timestamp should succeed; timestamp: {}", + e.timestamp_seconds + ))?; + for event in unqueue_event_candlestick_data(dbtx, height).await? { + on_event_candlestick_data(dbtx, time, event).await?; + } + summary::update_all(dbtx, time).await?; + } + tracing::debug!(?event, "unrecognized event"); + Ok(()) + } +} diff --git a/crates/bin/pindexer/src/dex_ex/schema.sql b/crates/bin/pindexer/src/dex_ex/schema.sql new file mode 100644 index 0000000000..53f85ceb8a --- /dev/null +++ b/crates/bin/pindexer/src/dex_ex/schema.sql @@ -0,0 +1,73 @@ +CREATE TABLE IF NOT EXISTS dex_ex_candlesticks ( + id SERIAL PRIMARY KEY, + -- The price at the start of a window. + open FLOAT8 NOT NULL, + -- The price at the close of a window. + close FLOAT8 NOT NULL, + -- The highest price reached during a window. + high FLOAT8 NOT NULL, + -- The lowest price reached during a window. + low FLOAT8 NOT NULL, + -- The volume traded directly through position executions. + direct_volume FLOAT8 NOT NULL, + -- The volume that traded indirectly, possibly through several positions. + swap_volume FLOAT8 NOT NULL +); + +-- Contains, for each directed asset pair and window type, candle sticks for each window. +CREATE TABLE IF NOT EXISTS dex_ex_price_charts ( + -- We just want a simple primary key to have here. + id SERIAL PRIMARY KEY, + -- The bytes for the first asset in the directed pair. + asset_start BYTEA NOT NULL, + -- The bytes for the second asset in the directed pair. + asset_end BYTEA NOT NULL, + -- The window type for this stick. + -- + -- Enum types are annoying. + the_window TEXT NOT NULL, + -- The start time of this window. + start_time TIMESTAMPTZ NOT NULL, + -- The start time for the window this stick is about. + candlestick_id INTEGER UNIQUE REFERENCES dex_ex_candlesticks (id) +); + +CREATE UNIQUE INDEX ON dex_ex_price_charts (asset_start, asset_end, the_window, start_time); + +CREATE TABLE IF NOT EXISTS _dex_ex_summary_backing ( + asset_start BYTEA NOT NULL, + asset_end BYTEA NOT NULL, + -- The time for this bit of information. + time TIMESTAMPTZ NOT NULL, + -- The price at this point. + price FLOAT8 NOT NULL, + -- The volume for this particular candle. + direct_volume FLOAT8 NOT NULL, + swap_volume FLOAT8 NOT NULL, + PRIMARY KEY (asset_start, asset_end, time) +); + +CREATE TABLE IF NOT EXISTS dex_ex_summary ( + -- The first asset of the directed pair. + asset_start BYTEA NOT NULL, + -- The second asset of the directed pair. + asset_end BYTEA NOT NULL, + -- The current price (in terms of asset2) + current_price FLOAT8 NOT NULL, + -- The price 24h ago. + price_24h_ago FLOAT8 NOT NULL, + -- The highest price over the past 24h. + high_24h FLOAT8 NOT NULL, + -- The lowest price over the past 24h. + low_24h FLOAT8 NOT NULL, + -- c.f. candlesticks for the difference between these two + direct_volume_24h FLOAT8 NOT NULL, + swap_volume_24h FLOAT8 NOT NULL, + PRIMARY KEY (asset_start, asset_end) +); + +CREATE TABLE IF NOT EXISTS _dex_ex_queue ( + id SERIAL PRIMARY KEY, + height BIGINT NOT NULL, + data BYTEA NOT NULL +); diff --git a/crates/bin/pindexer/src/indexer_ext.rs b/crates/bin/pindexer/src/indexer_ext.rs index 6a7c5e8208..950000fdd3 100644 --- a/crates/bin/pindexer/src/indexer_ext.rs +++ b/crates/bin/pindexer/src/indexer_ext.rs @@ -10,7 +10,7 @@ impl IndexerExt for cometindex::Indexer { .with_index(crate::stake::DelegationTxs {}) .with_index(crate::stake::UndelegationTxs {}) .with_index(crate::governance::GovernanceProposals {}) - .with_index(crate::dex::Component::new()) + .with_index(crate::dex_ex::Component::new()) .with_index(crate::supply::Component::new()) .with_index(crate::ibc::Component::new()) } diff --git a/crates/bin/pindexer/src/lib.rs b/crates/bin/pindexer/src/lib.rs index 353aaaf0b2..e2c2d63476 100644 --- a/crates/bin/pindexer/src/lib.rs +++ b/crates/bin/pindexer/src/lib.rs @@ -4,6 +4,7 @@ mod indexer_ext; pub use indexer_ext::IndexerExt; pub mod block; pub mod dex; +pub mod dex_ex; pub mod ibc; mod parsing; pub mod shielded_pool; diff --git a/crates/core/component/sct/src/component/tree.rs b/crates/core/component/sct/src/component/tree.rs index b0d987a109..9bbd83e263 100644 --- a/crates/core/component/sct/src/component/tree.rs +++ b/crates/core/component/sct/src/component/tree.rs @@ -1,7 +1,7 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use cnidarium::{StateRead, StateWrite}; -use penumbra_proto::{StateReadProto, StateWriteProto}; +use penumbra_proto::{DomainType as _, StateReadProto, StateWriteProto}; use penumbra_tct as tct; use tct::builder::{block, epoch}; use tracing::instrument; @@ -85,7 +85,14 @@ pub trait SctManager: StateWrite { self.put(state_key::tree::anchor_by_height(height), sct_anchor); self.record_proto(event::anchor(height, sct_anchor, block_timestamp)); - self.record_proto(event::block_root(height, block_root, block_timestamp)); + self.record_proto( + event::EventBlockRoot { + height, + root: block_root, + timestamp_seconds: block_timestamp, + } + .to_proto(), + ); // Only record an epoch root event if we are ending the epoch. if let Some(epoch_root) = epoch_root { let index = self diff --git a/crates/core/component/sct/src/event.rs b/crates/core/component/sct/src/event.rs index 65a462b0f8..43bef0da20 100644 --- a/crates/core/component/sct/src/event.rs +++ b/crates/core/component/sct/src/event.rs @@ -1,8 +1,12 @@ +use anyhow::{anyhow, Context as _}; use pbjson_types::Timestamp; use penumbra_tct as tct; use tct::builder::{block, epoch}; -use penumbra_proto::core::component::sct::v1 as pb; +use penumbra_proto::{ + core::component::sct::v1::{self as pb}, + DomainType, Name as _, +}; use crate::CommitmentSource; @@ -17,17 +21,48 @@ pub fn anchor(height: u64, anchor: tct::Root, timestamp: i64) -> pb::EventAnchor } } -pub fn block_root(height: u64, root: block::Root, timestamp: i64) -> pb::EventBlockRoot { - pb::EventBlockRoot { - height, - root: Some(root.into()), - timestamp: Some(Timestamp { - seconds: timestamp, - nanos: 0, - }), +#[derive(Debug, Clone)] +pub struct EventBlockRoot { + pub height: u64, + pub root: block::Root, + pub timestamp_seconds: i64, +} + +impl TryFrom for EventBlockRoot { + type Error = anyhow::Error; + + fn try_from(value: pb::EventBlockRoot) -> Result { + fn inner(value: pb::EventBlockRoot) -> anyhow::Result { + Ok(EventBlockRoot { + height: value.height, + root: value.root.ok_or(anyhow!("missing `root`"))?.try_into()?, + timestamp_seconds: value + .timestamp + .ok_or(anyhow!("missing `timestamp`"))? + .seconds, + }) + } + inner(value).context(format!("parsing {}", pb::EventBlockRoot::NAME)) } } +impl From for pb::EventBlockRoot { + fn from(value: EventBlockRoot) -> Self { + Self { + height: value.height, + root: Some(value.root.into()), + timestamp: Some(Timestamp { + seconds: value.timestamp_seconds, + nanos: 0, + }), + } + } +} + +impl DomainType for EventBlockRoot { + type Proto = pb::EventBlockRoot; +} + pub fn epoch_root(index: u64, root: epoch::Root, timestamp: i64) -> pb::EventEpochRoot { pb::EventEpochRoot { index,