diff --git a/arrow-test/src/lib.rs b/arrow-test/src/lib.rs index 0353c1b4..3676f02b 100644 --- a/arrow-test/src/lib.rs +++ b/arrow-test/src/lib.rs @@ -11,26 +11,25 @@ use datafusion::{ logical_expr::{col, expr::ScalarFunction, Cast, Expr, ScalarUDF}, }; -/// Applies various transformations on a record batch of conclusion_feed data to make it easier to +/// Applies various transformations on a record batch of conclusion_events data to make it easier to /// read. -/// Useful in conjuction with expect_test. -pub async fn pretty_feed_from_batch(batch: RecordBatch) -> Vec { +/// Useful in conjunction with expect_test. +pub async fn pretty_conclusion_events_from_batch(batch: RecordBatch) -> Vec { let ctx = SessionContext::new(); - ctx.register_batch("conclusion_feed", batch).unwrap(); + ctx.register_batch("conclusion_events", batch).unwrap(); - pretty_feed(ctx.table("conclusion_feed").await.unwrap()).await + pretty_conclusion_events(ctx.table("conclusion_events").await.unwrap()).await } -/// Applies various transformations on a dataframe of conclusion_feed data to make it easier to +/// Applies various transformations on a dataframe of conclusion_events data to make it easier to /// read. -/// Useful in conjuction with expect_test. -pub async fn pretty_feed(conclusion_feed: DataFrame) -> Vec { +/// Useful in conjunction with expect_test. +pub async fn pretty_conclusion_events(conclusion_events: DataFrame) -> Vec { let cid_string = Arc::new(ScalarUDF::from(CidString::new())); let cid_string_list = Arc::new(ScalarUDF::from(CidStringList::new())); - conclusion_feed + conclusion_events .select(vec![ col("index"), - col("event_type"), Expr::ScalarFunction(ScalarFunction::new_udf( cid_string.clone(), vec![col("stream_cid")], @@ -44,6 +43,7 @@ pub async fn pretty_feed(conclusion_feed: DataFrame) -> Vec { vec![col("event_cid")], )) .alias("event_cid"), + col("event_type"), Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"), Expr::ScalarFunction(ScalarFunction::new_udf( cid_string_list, diff --git a/flight/tests/server.rs b/flight/tests/server.rs index c6b8b07e..f85cea12 100644 --- a/flight/tests/server.rs +++ b/flight/tests/server.rs @@ -5,7 +5,7 @@ use arrow::{compute::concat_batches, util::pretty::pretty_format_batches}; use arrow_array::RecordBatch; use arrow_flight::{sql::client::FlightSqlServiceClient, FlightInfo}; use arrow_schema::Schema; -use ceramic_arrow_test::pretty_feed_from_batch; +use ceramic_arrow_test::pretty_conclusion_events_from_batch; use ceramic_flight::server::new_server; use ceramic_pipeline::{ ConclusionData, ConclusionEvent, ConclusionFeed, ConclusionInit, ConclusionTime, @@ -164,19 +164,19 @@ async fn test_simple() -> Result<()> { let mut client = start_server(feed).await; let info = client - .execute("SELECT * FROM conclusion_feed".to_string(), None) + .execute("SELECT * FROM conclusion_events".to_string(), None) .await?; let batch = execute_flight(&mut client, info).await?; - let batches = pretty_feed_from_batch(batch).await; + let batches = pretty_conclusion_events_from_batch(batch).await; let formatted = pretty_format_batches(&batches).unwrap().to_string(); expect![[r#" - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+ - | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+ - | 0 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | | - | 1 | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | - | 2 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | [{"op":"replace", "path": "/a", "value":1}] | [baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq, baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+"#]].assert_eq(&formatted); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | previous | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"a":0} | | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | [{"op":"replace", "path": "/a", "value":1}] | [baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq, baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+"#]].assert_eq(&formatted); Ok(()) } @@ -191,19 +191,19 @@ async fn test_push_down_predicate() -> Result<()> { let info = client .execute( - "SELECT * FROM conclusion_feed WHERE index > 42 LIMIT 2".to_string(), + "SELECT * FROM conclusion_events WHERE index > 42 LIMIT 2".to_string(), None, ) .await?; let batch = execute_flight(&mut client, info).await?; - let batches = pretty_feed_from_batch(batch).await; + let batches = pretty_conclusion_events_from_batch(batch).await; let formatted = pretty_format_batches(&batches).unwrap().to_string(); expect![[r#" - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------+---------------------------------------------------------------+ - | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------+---------------------------------------------------------------+ - | 42 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | | - | 43 | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | - +-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------+---------------------------------------------------------------+"#]].assert_eq(&formatted); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------+---------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | previous | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------+---------------------------------------------------------------+ + | 42 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"a":0} | | + | 43 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------+---------------------------------------------------------------+"#]].assert_eq(&formatted); Ok(()) } diff --git a/pipeline/DESIGN.md b/pipeline/DESIGN.md new file mode 100644 index 00000000..dce253d1 --- /dev/null +++ b/pipeline/DESIGN.md @@ -0,0 +1,190 @@ +# Pipeline Architecture + +The pipeline architecture allows for defining features over Ceramic data as a series of transformations over that data. +What follows is a descriptions of each phase of the ceramic pipeline the features it enables and its dependencies. + +## Overview + +Data enters Ceramic via its API and is stored into the raw_events table. +From there various transformations are applied producing various intermediate tables. +Each table schema is considered public API and provides access to kind of query pattern against the data. + +```mermaid +graph LR; + raw_events; + streams; + chain_proofs; + conclusion_events; + model_schemas; + event_states; + stream_heads; + stream_states; + + raw_events --> conclusion_events; + streams --> conclusion_events; + chain_proofs --> conclusion_events; + conclusion_events --> event_states; + model_schemas --> event_states; + event_states --> stream_heads; + stream_heads --> stream_states; +``` + +## raw_events + +The raw_events table contains a row for each event in a stream and contains the raw CAR data of the event. + +### Features + +* Access to the raw event data that can be used to validate signatures + +### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| stream_cid | bytes | Cid of the stream | +| event_cid | bytes | Cid of the event | +| car | bytes | CAR data of the event | + +## streams + +The streams table contains a row for each stream and contains the dimensions and controller of the stream. + +### Features + +* Access to the identifying information for streams + +### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| stream_cid | bytes | Cid of the stream | +| stream_type | u8 | Cid of the stream | +| controller | string | Controller of the stream | +| dimensions | map(string,bytes) | Dimensions of the stream | + +## chain_proofs + +The chain_proofs table contains a row for each on chain proof. + +TBD how this table is populated and its schema. + +## conclusion_events + +The conclusion_events table contains a row for each event in a stream and represents a raw input event after various conclusions have been made. + +### Features + +* Access to events allowing in order access to event within stream +* Access to pre-aggregated data for users building their own aggregation system +* Validation of the event signatures + +### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | +| stream_cid | bytes | Cid of the stream | +| stream_type | u8 | Stream type | +| controller | string | Controller of the stream | +| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | +| event_cid | bytes | Cid of the event | +| event_type | u8 | One of Data (0) or Time (1) | +| data | bytes | DAG-JSON encoding of the event payload | +| previous | list(bytes) | List of CID previous to this event | + + +### Transformation + +Raw events are transformed into a flattened structure where stream dimensions and conclusions about data are added to the data. + +Conclusions include: + +* The event has a valid signature +* The dimensions and controller of the event + +This table joins the raw_events, chain_proofs, and streams tables in order to make the conclusions about the raw events. + +## model_schemas + +The model_schemas table contains a row for each model and contains the complete resolved schema of the model. + +TBD how this table is populated and its schema. +This table may be able to be combined with the streams table. Should we? + +## event_states + +The event_states table contains a row for each event in a stream and the state of the document at that point in the stream. + +### Features + +* Access to the full history of states for a stream +* Validation of the model schema + +### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | +| stream_cid | bytes | Cid of the stream | +| stream_type | u8 | Stream type | +| controller | string | Controller of the stream | +| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | +| event_cid | bytes | Cid of the event | +| event_type | u8 | One of Data (0) or Time (1) | +| state | string | JSON encoding of the event state | + +### Transformation + +This table computes the aggregated state for each conclusion event. +Additionally it validates the aggregated state matches the model schema of the stream. + +## stream_heads + +The stream_states table contains a row for head of each stream representing the canonical state of the stream heads. + +### Features + +* Access to the multiple heads of streams for users building their on conflict resolution + +### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | +| stream_cid | bytes | Cid of the stream | +| stream_type | u8 | Stream type | +| controller | string | Controller of the stream | +| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | +| event_cid | bytes | Cid of the event | +| event_type | u8 | One of Data (0) or Time (1) | +| state | string | JSON encoding of the event state | + +### Transformation + +Computes the unique heads of a stream. Old heads are deleted or deprecated behind a version commit or similar. + +## stream_states + +The stream_states table contains a row for each stream representing the canonical state of the stream. + +### Features + +* Access to canonical state of streams for users relying on built in conflict resolution + +### Schema + +| Column | Type | Description | +| ------ | ---- | ----------- | +| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | +| stream_cid | bytes | Cid of the stream | +| stream_type | u8 | Stream type | +| controller | string | Controller of the stream | +| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | +| event_cid | bytes | Cid of the event | +| event_type | u8 | One of Data (0) or Time (1) | +| state | string | JSON encoding of the event state | + +### Transformation + +Computes the singular head that is the canonical state of the stream. + diff --git a/pipeline/src/aggregator/mod.rs b/pipeline/src/aggregator/mod.rs index 08f4f270..59613986 100644 --- a/pipeline/src/aggregator/mod.rs +++ b/pipeline/src/aggregator/mod.rs @@ -22,15 +22,17 @@ use datafusion::{ WindowFunctionDefinition, }, physical_plan::collect_partitioned, - sql::TableReference, }; use std::{future::Future, sync::Arc}; use tracing::{debug, error, instrument, Level}; -use crate::{schemas, Result, DOC_STATE_MEM_TABLE, DOC_STATE_PERSISTENT_TABLE}; +use crate::{ + schemas, Result, CONCLUSION_EVENTS_TABLE, EVENT_STATES_MEM_TABLE, + EVENT_STATES_PERSISTENT_TABLE, EVENT_STATES_TABLE, +}; // Maximum number of rows to fetch per pass of the aggregator. -// Minimum number of rows to have proccessed before writing a batch to object store. +// Minimum number of rows to have processed before writing a batch to object store. const BATCH_SIZE: usize = 10_000; pub async fn run(ctx: SessionContext, shutdown_signal: impl Future) -> Result<()> { @@ -48,20 +50,20 @@ async fn run_continuous_stream( loop { tokio::select! { - _ = &mut shutdown_signal => { - debug!("Received shutdown signal, stopping continuous stream processing"); - break; - } - result = processor.process_batch(limit) => { - match result { - Ok(()) => { - // Batch processed successfully, continue to next iteration - continue; - } - Err(e) => { - error!("Error processing batch: {:?}", e); - return Err(e); - } + _ = &mut shutdown_signal => { + debug!("Received shutdown signal, stopping continuous stream processing"); + break; + } + result = processor.process_batch(limit) => { + match result { + Ok(()) => { + // Batch processed successfully, continue to next iteration + continue; + } + Err(err) => { + error!(%err, "error processing batch"); + return Err(err); + } } } } @@ -69,7 +71,7 @@ async fn run_continuous_stream( Ok(()) } -/// Represents a processor for continuous stream processing of conclusion feed data. +/// Represents a processor for continuous stream processing of conclusion event data. struct ContinuousStreamProcessor { ctx: SessionContext, last_processed_index: Option, @@ -78,7 +80,7 @@ struct ContinuousStreamProcessor { impl ContinuousStreamProcessor { async fn new(ctx: SessionContext) -> Result { let max_index = ctx - .table("doc_state") + .table(EVENT_STATES_TABLE) .await? .select_columns(&["index"])? .aggregate(vec![], vec![max(col("index"))])? @@ -103,28 +105,25 @@ impl ContinuousStreamProcessor { #[instrument(skip(self), ret ( level = Level::DEBUG ))] async fn process_batch(&mut self, limit: usize) -> Result<()> { - // Fetch the conclusion feed DataFrame - let mut conclusion_feed = self - .ctx - .table(TableReference::full("ceramic", "v0", "conclusion_feed")) - .await? - .select(vec![ - col("index"), - col("event_type"), - col("stream_cid"), - col("controller"), - col("conclusion_feed.event_cid"), - col("dimensions"), - Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"), - col("previous"), - ])?; + // Fetch the conclusion events DataFrame + let mut conclusion_events = self.ctx.table(CONCLUSION_EVENTS_TABLE).await?.select(vec![ + col("index"), + col("stream_cid"), + col("stream_type"), + col("controller"), + col("dimensions"), + col("event_cid"), + col("event_type"), + Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"), + col("previous"), + ])?; if let Some(last_index) = self.last_processed_index { - conclusion_feed = conclusion_feed.filter(col("index").gt(lit(last_index)))?; + conclusion_events = conclusion_events.filter(col("index").gt(lit(last_index)))?; } - let batch = conclusion_feed.limit(0, Some(limit))?; + let batch = conclusion_events.limit(0, Some(limit))?; - // Caching the data frame to use it to caluclate the max index - // We need to cache it because we do 2 passes over the data frame, once for process feed batch and once for calculating the max index + // Caching the data frame to use it to calculate the max index + // We need to cache it because we do 2 passes over the data frame, once for process_conclusion_events_batch and once for calculating the max index // We are not using batch.cache() because this loses table name information let batch_plan = batch.clone().create_physical_plan().await?; let task_ctx = Arc::new(batch.task_ctx()); @@ -133,13 +132,13 @@ impl ContinuousStreamProcessor { let df = DataFrame::new( self.ctx.state(), LogicalPlanBuilder::scan( - "conclusion_feed", + CONCLUSION_EVENTS_TABLE, provider_as_source(Arc::new(cached_memtable)), None, )? .build()?, ); - process_feed_batch(self.ctx.clone(), df.clone(), limit).await?; + process_conclusion_events_batch(self.ctx.clone(), df.clone(), limit).await?; // Fetch the highest index from the cached DataFrame let highest_index = df @@ -160,35 +159,30 @@ impl ContinuousStreamProcessor { } } -// Process events from the conclusion feed, producing a new document state for each input event. -// The session context must have a registered a `doc_state` table with stream_cid, event_cid, and -// state columns. +// Process a batch of conclusion events, producing a new document state for each input event. +// The session context must have a registered a `event_states` table with appropriate schema. // -// The events in the conclusion feed must: -// * have stream_cid, event_cid, previous, and data columns, -// * have previous CIDs that either already exist in `doc_state` or be contained within the -// current conclusion_feed batch, -// * be valid JSON patch data documents. -// * use a qualified table name of `conclusion_feed`. -#[instrument(skip(ctx, conclusion_feed), ret ( level = Level::DEBUG ))] -async fn process_feed_batch( +// The events in the conclusion_events batch have a schema of the conclusion_events table. +#[instrument(skip(ctx, conclusion_events), ret ( level = Level::DEBUG ))] +async fn process_conclusion_events_batch( ctx: SessionContext, - conclusion_feed: DataFrame, + conclusion_events: DataFrame, max_cached_rows: usize, ) -> Result<()> { - let doc_state = ctx - .table("doc_state") + let event_states = ctx + .table(EVENT_STATES_TABLE) .await? .select_columns(&["stream_cid", "event_cid", "state"]) - .context("reading doc_state")?; + .context("reading event_states")?; - conclusion_feed + conclusion_events // MID only ever use the first previous, so we can optimize the join by selecting the // first element of the previous array. .select(vec![ col("index"), col("event_type"), col("stream_cid"), + col("stream_type"), col("controller"), col("dimensions"), col("event_cid"), @@ -196,19 +190,20 @@ async fn process_feed_batch( array_element(col("previous"), lit(1)).alias("previous"), ])? .join_on( - doc_state, + event_states, JoinType::Left, - [col("previous").eq(col("doc_state.event_cid"))], + [col("previous").eq(col(EVENT_STATES_TABLE.to_string() + ".event_cid"))], )? .select(vec![ - col("conclusion_feed.index").alias("index"), - col("conclusion_feed.event_type").alias("event_type"), - col("conclusion_feed.stream_cid").alias("stream_cid"), - col("conclusion_feed.controller").alias("controller"), - col("conclusion_feed.dimensions").alias("dimensions"), - col("conclusion_feed.event_cid").alias("event_cid"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".index").alias("index"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".event_type").alias("event_type"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".stream_cid").alias("stream_cid"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".stream_type").alias("stream_type"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".controller").alias("controller"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".dimensions").alias("dimensions"), + col(CONCLUSION_EVENTS_TABLE.to_string() + ".event_cid").alias("event_cid"), col("previous"), - col("doc_state.state").alias("previous_state"), + col(EVENT_STATES_TABLE.to_string() + ".state").alias("previous_state"), col("data"), ])? .window(vec![Expr::WindowFunction(WindowFunction::new( @@ -224,33 +219,34 @@ async fn process_feed_batch( .order_by(vec![col("index").sort(true, true)]) .build()? .alias("new_state")])? - // Rename columns to match doc_state table schema + // Rename columns to match event_states table schema .select_columns(&[ "index", "stream_cid", - "event_type", + "stream_type", "controller", "dimensions", "event_cid", + "event_type", "new_state", ])? .with_column_renamed("new_state", "state")? - // Write states to the in memory doc_state table - .write_table(DOC_STATE_MEM_TABLE, DataFrameWriteOptions::new()) + // Write states to the in memory event_states table + .write_table(EVENT_STATES_MEM_TABLE, DataFrameWriteOptions::new()) .await .context("computing states")?; - let count = ctx.table(DOC_STATE_MEM_TABLE).await?.count().await?; + let count = ctx.table(EVENT_STATES_MEM_TABLE).await?.count().await?; // If we have enough data cached in memory write it out to persistent store if count >= max_cached_rows { - ctx.table(DOC_STATE_MEM_TABLE) + ctx.table(EVENT_STATES_MEM_TABLE) .await? - .write_table(DOC_STATE_PERSISTENT_TABLE, DataFrameWriteOptions::new()) + .write_table(EVENT_STATES_PERSISTENT_TABLE, DataFrameWriteOptions::new()) .await?; // Clear all data in the memory batch, by writing an empty batch - ctx.read_batch(RecordBatch::new_empty(schemas::doc_state()))? + ctx.read_batch(RecordBatch::new_empty(schemas::event_states()))? .write_table( - DOC_STATE_MEM_TABLE, + EVENT_STATES_MEM_TABLE, DataFrameWriteOptions::new().with_overwrite(true), ) .await?; @@ -279,11 +275,11 @@ mod tests { use crate::{ cid_string::CidString, conclusion_events_to_record_batch, schemas, session_from_config, tests::MockConclusionFeed, ConclusionData, ConclusionEvent, ConclusionFeedSource, - ConclusionInit, ConclusionTime, Config, CONCLUSION_FEED_TABLE, + ConclusionInit, ConclusionTime, Config, CONCLUSION_EVENTS_TABLE, }; - async fn do_test(conclusion_feed: RecordBatch) -> anyhow::Result { - do_pass(init_ctx().await?, conclusion_feed, 1_000).await + async fn do_test(conclusion_events: RecordBatch) -> anyhow::Result { + do_pass(init_ctx().await?, conclusion_events, 1_000).await } async fn init_ctx() -> anyhow::Result { @@ -295,10 +291,10 @@ mod tests { .await } - async fn init_ctx_cont(conclusion_feed: RecordBatch) -> anyhow::Result { + async fn init_ctx_cont(conclusion_events: RecordBatch) -> anyhow::Result { session_from_config(Config { conclusion_feed: ConclusionFeedSource::::InMemory( - MemTable::try_new(schemas::conclusion_feed(), vec![vec![conclusion_feed]])?, + MemTable::try_new(schemas::conclusion_events(), vec![vec![conclusion_events]])?, ), object_store_bucket_name: "test_bucket".to_string(), object_store: Arc::new(InMemory::new()), @@ -308,24 +304,25 @@ mod tests { async fn do_pass( ctx: SessionContext, - conclusion_feed: RecordBatch, + conclusion_events: RecordBatch, max_cached_rows: usize, ) -> anyhow::Result { - // Setup conclusion_feed table from RecordBatch - let provider = MemTable::try_new(conclusion_feed.schema(), vec![vec![conclusion_feed]])?; - let conclusion_feed = DataFrame::new( + // Setup conclusion_events table from RecordBatch + let provider = + MemTable::try_new(conclusion_events.schema(), vec![vec![conclusion_events]])?; + let conclusion_events = DataFrame::new( ctx.state(), LogicalPlanBuilder::scan( - CONCLUSION_FEED_TABLE, + CONCLUSION_EVENTS_TABLE, provider_as_source(Arc::new(provider)), None, )? .build()?, ); - process_feed_batch(ctx.clone(), conclusion_feed, max_cached_rows).await?; + process_conclusion_events_batch(ctx.clone(), conclusion_events, max_cached_rows).await?; let cid_string = Arc::new(ScalarUDF::from(CidString::new())); - let doc_state = ctx - .table("doc_state") + let event_states = ctx + .table(EVENT_STATES_TABLE) .await? .select(vec![ col("index"), @@ -334,24 +331,25 @@ mod tests { vec![col("stream_cid")], )) .alias("stream_cid"), - col("event_type"), + col("stream_type"), col("controller"), col("dimensions"), Expr::ScalarFunction(ScalarFunction::new_udf(cid_string, vec![col("event_cid")])) .alias("event_cid"), + col("event_type"), col("state"), ])? .sort(vec![col("index").sort(true, true)])? .collect() .await?; - Ok(pretty_format_batches(&doc_state)?) + Ok(pretty_format_batches(&event_states)?) } async fn do_run_continuous( - conclusion_feed: RecordBatch, + conclusion_events: RecordBatch, expected_batches: Expect, ) -> anyhow::Result<()> { - let ctx = init_ctx_cont(conclusion_feed).await?; + let ctx = init_ctx_cont(conclusion_events).await?; let (shutdown_signal_tx, shutdown_signal_rx) = oneshot::channel::<()>(); let handle = tokio::spawn(run_continuous_stream( ctx.clone(), @@ -366,8 +364,8 @@ mod tests { tokio::time::sleep(Duration::from_millis(100)).await; retries -= 1; let cid_string = Arc::new(ScalarUDF::from(CidString::new())); - let doc_state = ctx - .table("doc_state") + let event_states = ctx + .table(EVENT_STATES_TABLE) .await? .select(vec![ col("index"), @@ -376,20 +374,21 @@ mod tests { vec![col("stream_cid")], )) .alias("stream_cid"), - col("event_type"), + col("stream_type"), col("controller"), col("dimensions"), Expr::ScalarFunction(ScalarFunction::new_udf( cid_string, - vec![col("doc_state.event_cid")], + vec![col(EVENT_STATES_TABLE.to_string() + ".event_cid")], )) .alias("event_cid"), + col("event_type"), col("state"), ])? .sort(vec![col("index").sort(true, true)])? .collect() .await?; - batches = pretty_format_batches(&doc_state)?.to_string(); + batches = pretty_format_batches(&event_states)?.to_string(); if expected_batches.data() == batches { break; } @@ -401,7 +400,7 @@ mod tests { #[tokio::test] async fn single_init_event() -> anyhow::Result<()> { - let doc_state = do_test(conclusion_events_to_record_batch(&[ + let event_states = do_test(conclusion_events_to_record_batch(&[ ConclusionEvent::Data(ConclusionData { index: 0, event_cid: Cid::from_str( @@ -424,16 +423,16 @@ mod tests { ])?) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | state | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); Ok(()) } #[test(tokio::test)] async fn multiple_data_events() -> anyhow::Result<()> { - let doc_state = do_test(conclusion_events_to_record_batch(&[ + let event_states = do_test(conclusion_events_to_record_batch(&[ ConclusionEvent::Data(ConclusionData { index: 1, event_cid: Cid::from_str( @@ -480,17 +479,17 @@ mod tests { .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"metadata":{"foo":2,"shouldIndex":true},"data":{"a":1}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | state | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+ + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | {"metadata":{"foo":2,"shouldIndex":true},"data":{"a":1}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); Ok(()) } #[test(tokio::test)] async fn multiple_data_and_time_events() -> anyhow::Result<()> { - let doc_state = do_test(conclusion_events_to_record_batch(&[ + let event_states = do_test(conclusion_events_to_record_batch(&[ ConclusionEvent::Data(ConclusionData { index: 0, event_cid: Cid::from_str( @@ -554,22 +553,22 @@ mod tests { ])?) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | state | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); Ok(()) } #[test(tokio::test)] async fn multiple_single_event_passes() -> anyhow::Result<()> { // Test multiple passes where a single event for the stream is present in the conclusion - // feed for each pass. + // events for each pass. let ctx = init_ctx().await?; - let doc_state = do_pass( + let event_states = do_pass( ctx.clone(), conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { index: 0, @@ -594,12 +593,12 @@ mod tests { ) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); - let doc_state = do_pass( + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | state | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); + let event_states = do_pass( ctx.clone(), conclusion_events_to_record_batch(&[ConclusionEvent::Time(ConclusionTime { index: 1, @@ -625,13 +624,13 @@ mod tests { ) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); - let doc_state = do_pass( + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | state | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); + let event_states = do_pass( ctx, conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { index: 2, @@ -658,19 +657,19 @@ mod tests { ) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | state | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); Ok(()) } #[test(tokio::test)] async fn multiple_passes() -> anyhow::Result<()> { let ctx = init_ctx().await?; - let doc_state = do_pass( + let event_states = do_pass( ctx.clone(), conclusion_events_to_record_batch(&[ConclusionEvent::Data(ConclusionData { index: 0, @@ -695,12 +694,12 @@ mod tests { ) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); - let doc_state = do_pass( + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | state | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+----------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); + let event_states = do_pass( ctx.clone(), conclusion_events_to_record_batch(&[ ConclusionEvent::Time(ConclusionTime { @@ -749,13 +748,13 @@ mod tests { ) .await?; expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 1 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+"#]].assert_eq(&doc_state.to_string()); + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | state | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+"#]].assert_eq(&event_states.to_string()); Ok(()) } #[test(tokio::test)] @@ -824,13 +823,13 @@ mod tests { }), ])?, expect![[r#" - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | index | stream_cid | event_type | controller | dimensions | event_cid | state | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+ - | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | - | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | - | 3 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeic2caccyfigwadnncpyko7hcdbr66dkf4jyzeoh4dbhfebp77hchu | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":2}} | - +-------+-------------------------------------------------------------+------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+-----------------------------------------------------------+"#]], + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | state | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+ + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"metadata":{"foo":1,"shouldIndex":true},"data":{"a":0}} | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":1}} | + | 3 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeic2caccyfigwadnncpyko7hcdbr66dkf4jyzeoh4dbhfebp77hchu | 0 | {"metadata":{"foo":1,"shouldIndex":false},"data":{"a":2}} | + +-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+-----------------------------------------------------------+"#]], ) .await } diff --git a/pipeline/src/conclusion/event.rs b/pipeline/src/conclusion/event.rs index a0ab96de..d56a510c 100644 --- a/pipeline/src/conclusion/event.rs +++ b/pipeline/src/conclusion/event.rs @@ -198,7 +198,6 @@ impl Default for ConclusionEventBuilder { impl ConclusionEventBuilder { fn append(&mut self, event: &ConclusionEvent) { - self.event_type.append_value(event.event_type_as_int()); let init = match event { ConclusionEvent::Data(data_event) => { self.event_cid.append_value(data_event.event_cid.to_bytes()); @@ -221,6 +220,7 @@ impl ConclusionEventBuilder { &time_event.init } }; + self.event_type.append_value(event.event_type_as_int()); self.stream_cid.append_value(init.stream_cid.to_bytes()); self.controller.append_value(&init.controller); self.stream_type.append_value(init.stream_type); @@ -234,7 +234,6 @@ impl ConclusionEventBuilder { fn finish(&mut self) -> StructArray { StructArray::try_from(vec![ ("index", Arc::new(self.index.finish()) as ArrayRef), - ("event_type", Arc::new(self.event_type.finish()) as ArrayRef), ("stream_cid", Arc::new(self.stream_cid.finish()) as ArrayRef), ( "stream_type", @@ -243,6 +242,7 @@ impl ConclusionEventBuilder { ("controller", Arc::new(self.controller.finish()) as ArrayRef), ("dimensions", Arc::new(self.dimensions.finish()) as ArrayRef), ("event_cid", Arc::new(self.event_cid.finish()) as ArrayRef), + ("event_type", Arc::new(self.event_type.finish()) as ArrayRef), ("data", Arc::new(self.data.finish()) as ArrayRef), ("previous", Arc::new(self.previous.finish()) as ArrayRef), ]) @@ -335,7 +335,7 @@ mod tests { use std::str::FromStr; use arrow::util::pretty::pretty_format_batches; - use ceramic_arrow_test::pretty_feed_from_batch; + use ceramic_arrow_test::pretty_conclusion_events_from_batch; use ceramic_event::StreamIdType; use cid::Cid; use expect_test::expect; @@ -452,18 +452,18 @@ mod tests { ]; // Convert events to RecordBatch let record_batch = conclusion_events_to_record_batch(&events).unwrap(); - let record_batch = pretty_feed_from_batch(record_batch).await; + let record_batch = pretty_conclusion_events_from_batch(record_batch).await; let formatted = pretty_format_batches(&record_batch).unwrap().to_string(); // Use expect_test to validate the output expect![[r#" - +-------+------------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------+----------------------------------------------------------------------------------------------------------------------------+ - | index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous | - +-------+------------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------+----------------------------------------------------------------------------------------------------------------------------+ - | 0 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 123 | | - | 1 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q | 456 | [baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu] | - | 2 | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeidtub3bnbojbickf6d4pqscaw6xpt5ksgido7kcsg2jyftaj237di | | [baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q] | - | 3 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeiewqcj4bwhcssizv5kcyvsvm57bxghjpqshnbzkc6rijmwb4im4yq | 789 | [baeabeidtub3bnbojbickf6d4pqscaw6xpt5ksgido7kcsg2jyftaj237di, baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q] | - +-------+------------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------+----------------------------------------------------------------------------------------------------------------------------+"#]].assert_eq(&formatted); + +-------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------------+------+----------------------------------------------------------------------------------------------------------------------------+ + | index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | previous | + +-------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------------+------+----------------------------------------------------------------------------------------------------------------------------+ + | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 0 | 123 | | + | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q | 0 | 456 | [baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu] | + | 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeidtub3bnbojbickf6d4pqscaw6xpt5ksgido7kcsg2jyftaj237di | 1 | | [baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q] | + | 3 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 2 | did:key:test1 | {controller: 6469643a6b65793a7465737431, model: 6d6f64656c} | baeabeiewqcj4bwhcssizv5kcyvsvm57bxghjpqshnbzkc6rijmwb4im4yq | 0 | 789 | [baeabeidtub3bnbojbickf6d4pqscaw6xpt5ksgido7kcsg2jyftaj237di, baeabeid2w5pgdsdh25nah7batmhxanbj3x2w2is3atser7qxboyojv236q] | + +-------+-------------------------------------------------------------+-------------+---------------+-------------------------------------------------------------+-------------------------------------------------------------+------------+------+----------------------------------------------------------------------------------------------------------------------------+"#]].assert_eq(&formatted); } } diff --git a/pipeline/src/conclusion/table.rs b/pipeline/src/conclusion/table.rs index b29fb49d..c08badd8 100644 --- a/pipeline/src/conclusion/table.rs +++ b/pipeline/src/conclusion/table.rs @@ -16,7 +16,7 @@ use datafusion::{ use futures::TryStreamExt as _; use tracing::{instrument, Level}; -use crate::{conclusion::conclusion_events_to_record_batch, schemas::conclusion_feed}; +use crate::{conclusion::conclusion_events_to_record_batch, schemas::conclusion_events}; use super::ConclusionEvent; @@ -55,7 +55,7 @@ impl FeedTable { pub fn new(feed: Arc) -> Self { Self { feed, - schema: conclusion_feed(), + schema: conclusion_events(), } } fn highwater_mark_from_expr(expr: &Expr) -> Option { diff --git a/pipeline/src/lib.rs b/pipeline/src/lib.rs index 50a31589..05db30ca 100644 --- a/pipeline/src/lib.rs +++ b/pipeline/src/lib.rs @@ -26,7 +26,7 @@ use datafusion::{ functions_aggregate::first_last::LastValue, logical_expr::{col, AggregateUDF, ScalarUDF}, }; -use schemas::doc_state; +use schemas::event_states; use url::Url; use cid_string::{CidString, CidStringList}; @@ -37,10 +37,10 @@ pub use conclusion::{ }; pub use config::{ConclusionFeedSource, Config}; -pub const CONCLUSION_FEED_TABLE: &str = "ceramic.v0.conclusion_feed"; -pub const DOC_STATE_TABLE: &str = "ceramic.v0.doc_state"; -pub const DOC_STATE_MEM_TABLE: &str = "ceramic._internal.doc_state_mem"; -pub const DOC_STATE_PERSISTENT_TABLE: &str = "ceramic._internal.doc_state_persistent"; +pub const CONCLUSION_EVENTS_TABLE: &str = "ceramic.v0.conclusion_events"; +pub const EVENT_STATES_TABLE: &str = "ceramic.v0.event_states"; +pub const EVENT_STATES_MEM_TABLE: &str = "ceramic._internal.event_states_mem"; +pub const EVENT_STATES_PERSISTENT_TABLE: &str = "ceramic._internal.event_states_persistent"; /// Constructs a [`SessionContext`] configured with all tables in the pipeline. pub async fn session_from_config( @@ -56,17 +56,17 @@ pub async fn session_from_config( match config.conclusion_feed { ConclusionFeedSource::Direct(conclusion_feed) => { ctx.register_table( - CONCLUSION_FEED_TABLE, + CONCLUSION_EVENTS_TABLE, Arc::new(conclusion::FeedTable::new(conclusion_feed)), )?; } #[cfg(test)] ConclusionFeedSource::InMemory(table) => { assert_eq!( - schemas::conclusion_feed(), + schemas::conclusion_events(), datafusion::catalog::TableProvider::schema(&table) ); - ctx.register_table("conclusion_feed", Arc::new(table))?; + ctx.register_table(CONCLUSION_EVENTS_TABLE, Arc::new(table))?; } }; // Register the _internal schema @@ -87,39 +87,39 @@ pub async fn session_from_config( url.set_host(Some(&config.object_store_bucket_name))?; ctx.register_object_store(&url, config.object_store); - // Configure doc_state listing table + // Configure event_states listing table let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension(".parquet") .with_file_sort_order(vec![vec![col("index").sort(true, true)]]); - // Set the path within the bucket for the doc_state table - let doc_state_object_store_path = DOC_STATE_TABLE.replace('.', "/") + "/"; - url.set_path(&doc_state_object_store_path); - // Register doc_state_persistent as a listing table + // Set the path within the bucket for the event_states table + let event_states_object_store_path = EVENT_STATES_TABLE.replace('.', "/") + "/"; + url.set_path(&event_states_object_store_path); + // Register event_states_persistent as a listing table ctx.register_table( - DOC_STATE_PERSISTENT_TABLE, + EVENT_STATES_PERSISTENT_TABLE, Arc::new(ListingTable::try_new( ListingTableConfig::new(ListingTableUrl::parse(url)?) .with_listing_options(listing_options) - .with_schema(schemas::doc_state()), + .with_schema(schemas::event_states()), )?), )?; ctx.register_table( - DOC_STATE_MEM_TABLE, + EVENT_STATES_MEM_TABLE, Arc::new(CacheTable::try_new( - doc_state(), - vec![vec![RecordBatch::new_empty(doc_state())]], + event_states(), + vec![vec![RecordBatch::new_empty(event_states())]], )?), )?; ctx.register_table( - DOC_STATE_TABLE, - ctx.table(DOC_STATE_MEM_TABLE) + EVENT_STATES_TABLE, + ctx.table(EVENT_STATES_MEM_TABLE) .await? - .union(ctx.table(DOC_STATE_PERSISTENT_TABLE).await?)? + .union(ctx.table(EVENT_STATES_PERSISTENT_TABLE).await?)? .into_view(), )?; diff --git a/pipeline/src/schemas.rs b/pipeline/src/schemas.rs index 28fa6453..bca8a525 100644 --- a/pipeline/src/schemas.rs +++ b/pipeline/src/schemas.rs @@ -3,17 +3,16 @@ use std::sync::{Arc, OnceLock}; use datafusion::arrow::datatypes::{DataType, Field, Fields, SchemaBuilder, SchemaRef}; -static CONCLUSION_FEED: OnceLock = OnceLock::new(); -static DOC_STATE: OnceLock = OnceLock::new(); +static CONCLUSION_EVENTS: OnceLock = OnceLock::new(); +static EVENT_STATES: OnceLock = OnceLock::new(); -/// The `conclusion_feed` table contains the raw events annotated with conclcusions about each +/// The `conclusion_events` table contains the raw events annotated with conclusions about each /// event. -pub fn conclusion_feed() -> SchemaRef { - Arc::clone(CONCLUSION_FEED.get_or_init(|| { +pub fn conclusion_events() -> SchemaRef { + Arc::clone(CONCLUSION_EVENTS.get_or_init(|| { Arc::new( SchemaBuilder::from(&Fields::from(vec![ Field::new("index", DataType::UInt64, false), - Field::new("event_type", DataType::UInt8, false), Field::new("stream_cid", DataType::Binary, false), Field::new("stream_type", DataType::UInt8, false), Field::new("controller", DataType::Utf8, false), @@ -46,6 +45,7 @@ pub fn conclusion_feed() -> SchemaRef { true, ), Field::new("event_cid", DataType::Binary, false), + Field::new("event_type", DataType::UInt8, false), Field::new("data", DataType::Binary, true), Field::new( "previous", @@ -58,14 +58,14 @@ pub fn conclusion_feed() -> SchemaRef { })) } -/// The `doc_state` table contains the aggregated state for each event for each stream. -pub fn doc_state() -> SchemaRef { - Arc::clone(DOC_STATE.get_or_init(|| { +/// The `event_states` table contains the aggregated state for each event for each stream. +pub fn event_states() -> SchemaRef { + Arc::clone(EVENT_STATES.get_or_init(|| { Arc::new( SchemaBuilder::from(&Fields::from([ Arc::new(Field::new("index", DataType::UInt64, false)), Arc::new(Field::new("stream_cid", DataType::Binary, false)), - Arc::new(Field::new("event_type", DataType::UInt8, false)), + Arc::new(Field::new("stream_type", DataType::UInt8, false)), Arc::new(Field::new("controller", DataType::Utf8, false)), Arc::new(Field::new( "dimensions", @@ -94,6 +94,7 @@ pub fn doc_state() -> SchemaRef { true, )), Arc::new(Field::new("event_cid", DataType::Binary, false)), + Arc::new(Field::new("event_type", DataType::UInt8, false)), Arc::new(Field::new("state", DataType::Utf8, true)), ])) .finish(),