Skip to content

Commit

Permalink
pindexer: implement batch processing API (#4913)
Browse files Browse the repository at this point in the history
This restructures the AppView interface to allow processing events in a
batch. Previously, app views had to index one event at a time. This PR
changes things so that app views get a batch of several blocks worth of
events, with a guarantee to have all of the events in any block in the
batch.

## Making App Views Easier to Write
By having access to all the events in a block, app views are more
ergonomic to write. For example, the dex explorer app view wants to know
the time of the candlestick events it processes, but to do this, it
needs to wait for the block root event later in the block, which
provides this timestamp. Currently, because we don't have access to any
context, we need to manually implement a queuing system in the database,
which is very annoying, and a performance hit.

## Making App Views More Performant
We can make app views more performant by processing both an entire
block, and multiple blocks, since:
- we don't need to write an update more than once per block to the
database
- we may be able to write updates less frequently, depending on the app
view (e.g. when we need only the current value)
- we can keep transient state in memory, instead of on the database,
reducing writes and reads in all cases

## Additional Performance Improvements
Now the app views are run in parallel, which provides additional
improvements when syncing up.

## Testing
Pindexer should work as usual, after wiping the database.

# Checklist

- [x] I have added guiding text to explain how a reviewer should test
these changes.

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > pindexer only
  • Loading branch information
cronokirby authored Nov 7, 2024
1 parent 5b4165f commit 7533cc6
Show file tree
Hide file tree
Showing 21 changed files with 886 additions and 760 deletions.
49 changes: 26 additions & 23 deletions crates/bin/pindexer/src/block.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use cometindex::{async_trait, sqlx, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, index::EventBatch, sqlx, AppView, PgTransaction};
use penumbra_proto::{core::component::sct::v1 as pb, event::ProtoEvent};
use sqlx::{types::chrono::DateTime, PgPool};
use sqlx::types::chrono::DateTime;

#[derive(Debug)]
pub struct Block {}

#[async_trait]
impl AppView for Block {
fn name(&self) -> String {
"block".to_string()
}

async fn init_chain(
&self,
dbtx: &mut PgTransaction,
Expand All @@ -27,34 +31,33 @@ CREATE TABLE IF NOT EXISTS block_details (
Ok(())
}

fn is_relevant(&self, type_str: &str) -> bool {
type_str == "penumbra.core.component.sct.v1.EventBlockRoot"
}

async fn index_event(
async fn index_batch(
&self,
dbtx: &mut PgTransaction,
event: &ContextualizedEvent,
_src_db: &PgPool,
batch: EventBatch,
) -> Result<(), anyhow::Error> {
let pe = pb::EventBlockRoot::from_event(event.as_ref())?;
let timestamp = pe.timestamp.unwrap_or_default();
for event in batch.events() {
let pe = match pb::EventBlockRoot::from_event(event.as_ref()) {
Ok(pe) => pe,
Err(_) => continue,
};
let timestamp = pe.timestamp.unwrap_or_default();

sqlx::query(
"
sqlx::query(
"
INSERT INTO block_details (height, timestamp, root)
VALUES ($1, $2, $3)
",
)
.bind(i64::try_from(pe.height)?)
.bind(DateTime::from_timestamp(
timestamp.seconds,
u32::try_from(timestamp.nanos)?,
))
.bind(pe.root.unwrap().inner)
.execute(dbtx.as_mut())
.await?;

)
.bind(i64::try_from(pe.height)?)
.bind(DateTime::from_timestamp(
timestamp.seconds,
u32::try_from(timestamp.nanos)?,
))
.bind(pe.root.unwrap().inner)
.execute(dbtx.as_mut())
.await?;
}
Ok(())
}
}
60 changes: 31 additions & 29 deletions crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,11 @@ use std::fmt::Display;

use anyhow::{anyhow, Context};
use chrono::{Datelike, Days, TimeZone, Timelike as _, Utc};
use cometindex::{async_trait, AppView, ContextualizedEvent, PgTransaction};
use cometindex::{async_trait, index::EventBatch, AppView, ContextualizedEvent, PgTransaction};
use penumbra_asset::asset;
use penumbra_dex::{event::EventCandlestickData, CandlestickData};
use penumbra_proto::{event::EventDomainType, DomainType};
use penumbra_sct::event::EventBlockRoot;
use prost::Name as _;
use sqlx::PgPool;

type DateTime = sqlx::types::chrono::DateTime<sqlx::types::chrono::Utc>;

Expand Down Expand Up @@ -453,35 +451,11 @@ impl Component {
pub fn new() -> Self {
Self {}
}
}

#[async_trait]
impl AppView for Component {
async fn init_chain(
&self,
dbtx: &mut PgTransaction,
_: &serde_json::Value,
) -> Result<(), anyhow::Error> {
for statement in include_str!("schema.sql").split(";") {
sqlx::query(statement).execute(dbtx.as_mut()).await?;
}
Ok(())
}

fn is_relevant(&self, type_str: &str) -> bool {
[
<EventCandlestickData as DomainType>::Proto::full_name(),
<EventBlockRoot as DomainType>::Proto::full_name(),
]
.into_iter()
.any(|x| type_str == x)
}

async fn index_event(
&self,
dbtx: &mut PgTransaction,
dbtx: &mut PgTransaction<'_>,
event: &ContextualizedEvent,
_src_db: &PgPool,
) -> Result<(), anyhow::Error> {
if let Ok(e) = EventCandlestickData::try_from_event(&event.event) {
let height = event.block_height;
Expand All @@ -504,7 +478,35 @@ impl AppView for Component {
}
summary::update_all(dbtx, time).await?;
}
tracing::debug!(?event, "unrecognized event");
Ok(())
}
}

#[async_trait]
impl AppView for Component {
async fn init_chain(
&self,
dbtx: &mut PgTransaction,
_: &serde_json::Value,
) -> Result<(), anyhow::Error> {
for statement in include_str!("schema.sql").split(";") {
sqlx::query(statement).execute(dbtx.as_mut()).await?;
}
Ok(())
}

fn name(&self) -> String {
"dex_ex".to_string()
}

async fn index_batch(
&self,
dbtx: &mut PgTransaction,
batch: EventBatch,
) -> Result<(), anyhow::Error> {
for event in batch.events() {
self.index_event(dbtx, event).await?;
}
Ok(())
}
}
Loading

0 comments on commit 7533cc6

Please sign in to comment.