diff --git a/pipeline/DESIGN.md b/pipeline/DESIGN.md index dce253d1..30144e16 100644 --- a/pipeline/DESIGN.md +++ b/pipeline/DESIGN.md @@ -17,7 +17,7 @@ graph LR; conclusion_events; model_schemas; event_states; - stream_heads; + stream_tips; stream_states; raw_events --> conclusion_events; @@ -25,8 +25,8 @@ graph LR; chain_proofs --> conclusion_events; conclusion_events --> event_states; model_schemas --> event_states; - event_states --> stream_heads; - stream_heads --> stream_states; + event_states --> stream_tips; + stream_tips --> stream_states; ``` ## raw_events @@ -55,12 +55,12 @@ The streams table contains a row for each stream and contains the dimensions and ### Schema -| Column | Type | Description | -| ------ | ---- | ----------- | -| stream_cid | bytes | Cid of the stream | -| stream_type | u8 | Cid of the stream | -| controller | string | Controller of the stream | -| dimensions | map(string,bytes) | Dimensions of the stream | +| Column | Type | Description | +| ------ | ---- | ----------- | +| stream_cid | bytes | Cid of the stream | +| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) | +| controller | string | Controller of the stream | +| dimensions | map(string,bytes) | Dimensions of the stream | ## chain_proofs @@ -84,13 +84,13 @@ The conclusion_events table contains a row for each event in a stream and repres | ------ | ---- | ----------- | | index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | | stream_cid | bytes | Cid of the stream | -| stream_type | u8 | Stream type | +| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) | | controller | string | Controller of the stream | | dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | | event_cid | bytes | Cid of the event | -| event_type | u8 | One of Data (0) or Time (1) | +| event_type | u8 | Type of the event, see [event type values](#event-types) | | data | bytes | DAG-JSON encoding of the event payload | -| previous | list(bytes) | List of CID previous to this event | +| previous | list(bytes) | Ordered list of CID previous to this event. Meaning of the order is stream type dependent | ### Transformation @@ -126,25 +126,25 @@ The event_states table contains a row for each event in a stream and the state o | ------ | ---- | ----------- | | index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | | stream_cid | bytes | Cid of the stream | -| stream_type | u8 | Stream type | +| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) | | controller | string | Controller of the stream | | dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | | event_cid | bytes | Cid of the event | -| event_type | u8 | One of Data (0) or Time (1) | -| state | string | JSON encoding of the event state | +| event_type | u8 | Type of the event, see [event type values](#event-types) | +| state | bytes | JSON encoding of the event state | ### Transformation This table computes the aggregated state for each conclusion event. Additionally it validates the aggregated state matches the model schema of the stream. -## stream_heads +## stream_tips -The stream_states table contains a row for head of each stream representing the canonical state of the stream heads. +The stream_states table contains a row for head of each stream representing the canonical state of the stream tips. ### Features -* Access to the multiple heads of streams for users building their on conflict resolution +* Access to the multiple tips of streams for users building their on conflict resolution ### Schema @@ -152,16 +152,16 @@ The stream_states table contains a row for head of each stream representing the | ------ | ---- | ----------- | | index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | | stream_cid | bytes | Cid of the stream | -| stream_type | u8 | Stream type | +| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) | | controller | string | Controller of the stream | | dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | | event_cid | bytes | Cid of the event | -| event_type | u8 | One of Data (0) or Time (1) | -| state | string | JSON encoding of the event state | +| event_type | u8 | Type of the event, see [event type values](#event-types) | +| state | bytes | JSON encoding of the event state | ### Transformation -Computes the unique heads of a stream. Old heads are deleted or deprecated behind a version commit or similar. +Computes the unique tips of a stream. Old tips are deleted or deprecated behind a version commit or similar. ## stream_states @@ -177,14 +177,36 @@ The stream_states table contains a row for each stream representing the canonica | ------ | ---- | ----------- | | index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream | | stream_cid | bytes | Cid of the stream | -| stream_type | u8 | Stream type | +| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) | | controller | string | Controller of the stream | | dimensions | map(string,bytes) | Set of key values dimension pairs of the stream | | event_cid | bytes | Cid of the event | -| event_type | u8 | One of Data (0) or Time (1) | -| state | string | JSON encoding of the event state | +| event_type | u8 | Type of the event, see [event type values](#event-types) | +| state | bytes | JSON encoding of the event state | ### Transformation Computes the singular head that is the canonical state of the stream. + +## Enumeration Values + +### Stream Types + +| name | code | description | specification | +| ---- | ---- | ----------- | ------------- | +| Tile | 0x00 | A stream type representing a json document | https://cips.ceramic.network/CIPs/cip-8 | +| CAIP-10 Link | 0x01 | Link blockchain accounts to DIDs | https://cips.ceramic.network/CIPs/cip-7 | +| Model | 0x02 | Defines a schema shared by group of documents in ComposeDB | https://github.com/ceramicnetwork/js-ceramic/tree/main/packages/stream-model | +| Model Instance Document | 0x03 | Represents a json document in ComposeDB | https://github.com/ceramicnetwork/js-ceramic/tree/main/packages/stream-model-instance | +| UNLOADABLE | 0x04 | A stream that is not meant to be loaded | https://github.com/ceramicnetwork/js-ceramic/blob/main/packages/stream-model/src/model.ts#L163-L165 | +| EventId | 0x05 | An event id encoded as a cip-124 EventID | https://cips.ceramic.network/CIPs/cip-124 | + +Source https://cips.ceramic.network/CIPs/cip-59#streamid-multicodec + +### Event Types + +| name | code | description | +| ---- | ---- | ----------- | +| Data | 0x00 | An event containing data for the stream | +| Time | 0x01 | An event about the temporal status of the stream | diff --git a/pipeline/src/aggregator/ceramic_patch.rs b/pipeline/src/aggregator/ceramic_patch.rs index 4f888edd..98469539 100644 --- a/pipeline/src/aggregator/ceramic_patch.rs +++ b/pipeline/src/aggregator/ceramic_patch.rs @@ -1,14 +1,11 @@ use std::{collections::BTreeMap, sync::Arc}; use arrow::{ - array::{Array as _, ArrayBuilder as _, ArrayRef, StringBuilder}, + array::{Array as _, ArrayBuilder as _, ArrayRef, BinaryBuilder}, datatypes::DataType, }; use datafusion::{ - common::{ - cast::{as_binary_array, as_string_array}, - exec_datafusion_err, Result, - }, + common::{cast::as_binary_array, exec_datafusion_err, Result}, logical_expr::{ PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl, }, @@ -29,10 +26,14 @@ impl CeramicPatch { Self { signature: Signature::new( TypeSignature::Exact(vec![ + // Event CID + DataType::Binary, + // Previous CID + DataType::Binary, + // Previous State DataType::Binary, + // State/Patch DataType::Binary, - DataType::Utf8, - DataType::Utf8, ]), Volatility::Immutable, ), @@ -54,7 +55,7 @@ impl WindowUDFImpl for CeramicPatch { } fn return_type(&self, _arg_types: &[DataType]) -> Result { - Ok(DataType::Utf8) + Ok(DataType::Binary) } fn partition_evaluator(&self) -> Result> { @@ -82,10 +83,10 @@ type MIDDataContainerState = MIDDataContainer; struct CeramicPatchEvaluator; impl CeramicPatchEvaluator { - fn apply_patch(patch: &str, previous_state: &str) -> Result { - let patch: MIDDataContainerPatch = serde_json::from_str(patch) + fn apply_patch(patch: &[u8], previous_state: &[u8]) -> Result> { + let patch: MIDDataContainerPatch = serde_json::from_slice(patch) .map_err(|err| exec_datafusion_err!("Error parsing patch: {err}"))?; - let mut state: MIDDataContainerState = serde_json::from_str(previous_state) + let mut state: MIDDataContainerState = serde_json::from_slice(previous_state) .map_err(|err| exec_datafusion_err!("Error parsing previous state: {err}"))?; // If the state is null use an empty object in order to apply the patch to a valid object. if serde_json::Value::Null == state.data { @@ -94,8 +95,7 @@ impl CeramicPatchEvaluator { state.metadata.extend(patch.metadata); json_patch::patch(&mut state.data, &patch.data) .map_err(|err| exec_datafusion_err!("Error applying JSON patch: {err}"))?; - serde_json::to_string(&state) - .map_err(|err| exec_datafusion_err!("Error JSON encoding: {err}")) + serde_json::to_vec(&state).map_err(|err| exec_datafusion_err!("Error JSON encoding: {err}")) } } @@ -121,9 +121,9 @@ impl PartitionEvaluator for CeramicPatchEvaluator { fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result { let event_cids = as_binary_array(&values[0])?; let previous_cids = as_binary_array(&values[1])?; - let previous_states = as_string_array(&values[2])?; - let patches = as_string_array(&values[3])?; - let mut new_states = StringBuilder::new(); + let previous_states = as_binary_array(&values[2])?; + let patches = as_binary_array(&values[3])?; + let mut new_states = BinaryBuilder::new(); for i in 0..num_rows { if previous_cids.is_valid(i) { if let Some(previous_state) = if !previous_states.is_null(i) { @@ -150,7 +150,7 @@ impl PartitionEvaluator for CeramicPatchEvaluator { // So we need to copy the data to a new location before we can copy it back // into the new_states. #[allow(clippy::unnecessary_to_owned)] - new_states.append_value(previous_state.to_string()); + new_states.append_value(&previous_state.to_owned()); } else { new_states.append_value(CeramicPatchEvaluator::apply_patch( patches.value(i), @@ -176,13 +176,12 @@ impl PartitionEvaluator for CeramicPatchEvaluator { } } -fn value_at(builder: &StringBuilder, idx: usize) -> &str { +fn value_at(builder: &BinaryBuilder, idx: usize) -> &[u8] { let start = builder.offsets_slice()[idx] as usize; let stop = if idx < builder.len() { builder.offsets_slice()[idx + 1] as usize } else { builder.values_slice().len() }; - std::str::from_utf8(&builder.values_slice()[start..stop]) - .expect("new states should always be valid utf8") + &builder.values_slice()[start..stop] } diff --git a/pipeline/src/aggregator/mod.rs b/pipeline/src/aggregator/mod.rs index 59613986..e9d32e75 100644 --- a/pipeline/src/aggregator/mod.rs +++ b/pipeline/src/aggregator/mod.rs @@ -5,10 +5,7 @@ mod ceramic_patch; use anyhow::Context; -use arrow::{ - array::{RecordBatch, UInt64Array}, - datatypes::DataType, -}; +use arrow::array::{RecordBatch, UInt64Array}; use ceramic_patch::CeramicPatch; use datafusion::{ common::JoinType, @@ -18,7 +15,7 @@ use datafusion::{ functions_aggregate::min_max::max, functions_array::extract::array_element, logical_expr::{ - col, expr::WindowFunction, lit, Cast, Expr, ExprFunctionExt as _, LogicalPlanBuilder, + col, expr::WindowFunction, lit, Expr, ExprFunctionExt as _, LogicalPlanBuilder, WindowFunctionDefinition, }, physical_plan::collect_partitioned, @@ -114,7 +111,7 @@ impl ContinuousStreamProcessor { col("dimensions"), col("event_cid"), col("event_type"), - Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"), + col("data"), col("previous"), ])?; if let Some(last_index) = self.last_processed_index { @@ -186,7 +183,7 @@ async fn process_conclusion_events_batch( col("controller"), col("dimensions"), col("event_cid"), - Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"), + col("data"), array_element(col("previous"), lit(1)).alias("previous"), ])? .join_on( @@ -265,7 +262,7 @@ mod tests { use cid::Cid; use datafusion::{ datasource::{provider_as_source, MemTable}, - logical_expr::{expr::ScalarFunction, LogicalPlanBuilder, ScalarUDF}, + logical_expr::{cast, expr::ScalarFunction, LogicalPlanBuilder, ScalarUDF}, }; use expect_test::{expect, Expect}; use object_store::memory::InMemory; @@ -337,7 +334,7 @@ mod tests { Expr::ScalarFunction(ScalarFunction::new_udf(cid_string, vec![col("event_cid")])) .alias("event_cid"), col("event_type"), - col("state"), + cast(col("state"), DataType::Utf8).alias("state"), ])? .sort(vec![col("index").sort(true, true)])? .collect() @@ -383,7 +380,7 @@ mod tests { )) .alias("event_cid"), col("event_type"), - col("state"), + cast(col("state"), DataType::Utf8).alias("state"), ])? .sort(vec![col("index").sort(true, true)])? .collect() diff --git a/pipeline/src/schemas.rs b/pipeline/src/schemas.rs index bca8a525..44528881 100644 --- a/pipeline/src/schemas.rs +++ b/pipeline/src/schemas.rs @@ -95,7 +95,7 @@ pub fn event_states() -> SchemaRef { )), Arc::new(Field::new("event_cid", DataType::Binary, false)), Arc::new(Field::new("event_type", DataType::UInt8, false)), - Arc::new(Field::new("state", DataType::Utf8, true)), + Arc::new(Field::new("state", DataType::Binary, true)), ])) .finish(), )