Skip to content

Commit

Permalink
fix: address feedback, use bytes for state
Browse files Browse the repository at this point in the history
With this change we use bytes instead of string for the state column.
Casting to a string is easy and straightforward.
  • Loading branch information
nathanielc committed Nov 13, 2024
1 parent b0c97ac commit c26a224
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 56 deletions.
72 changes: 47 additions & 25 deletions pipeline/DESIGN.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ graph LR;
conclusion_events;
model_schemas;
event_states;
stream_heads;
stream_tips;
stream_states;
raw_events --> conclusion_events;
streams --> conclusion_events;
chain_proofs --> conclusion_events;
conclusion_events --> event_states;
model_schemas --> event_states;
event_states --> stream_heads;
stream_heads --> stream_states;
event_states --> stream_tips;
stream_tips --> stream_states;
```

## raw_events
Expand Down Expand Up @@ -55,12 +55,12 @@ The streams table contains a row for each stream and contains the dimensions and

### Schema

| Column | Type | Description |
| ------ | ---- | ----------- |
| stream_cid | bytes | Cid of the stream |
| stream_type | u8 | Cid of the stream |
| controller | string | Controller of the stream |
| dimensions | map(string,bytes) | Dimensions of the stream |
| Column | Type | Description |
| ------ | ---- | ----------- |
| stream_cid | bytes | Cid of the stream |
| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) |
| controller | string | Controller of the stream |
| dimensions | map(string,bytes) | Dimensions of the stream |

## chain_proofs

Expand All @@ -84,13 +84,13 @@ The conclusion_events table contains a row for each event in a stream and repres
| ------ | ---- | ----------- |
| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream |
| stream_cid | bytes | Cid of the stream |
| stream_type | u8 | Stream type |
| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) |
| controller | string | Controller of the stream |
| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream |
| event_cid | bytes | Cid of the event |
| event_type | u8 | One of Data (0) or Time (1) |
| event_type | u8 | Type of the event, see [event type values](#event-types) |
| data | bytes | DAG-JSON encoding of the event payload |
| previous | list(bytes) | List of CID previous to this event |
| previous | list(bytes) | Ordered list of CID previous to this event. Meaning of the order is stream type dependent |


### Transformation
Expand Down Expand Up @@ -126,42 +126,42 @@ The event_states table contains a row for each event in a stream and the state o
| ------ | ---- | ----------- |
| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream |
| stream_cid | bytes | Cid of the stream |
| stream_type | u8 | Stream type |
| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) |
| controller | string | Controller of the stream |
| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream |
| event_cid | bytes | Cid of the event |
| event_type | u8 | One of Data (0) or Time (1) |
| state | string | JSON encoding of the event state |
| event_type | u8 | Type of the event, see [event type values](#event-types) |
| state | bytes | JSON encoding of the event state |

### Transformation

This table computes the aggregated state for each conclusion event.
Additionally it validates the aggregated state matches the model schema of the stream.

## stream_heads
## stream_tips

The stream_states table contains a row for head of each stream representing the canonical state of the stream heads.
The stream_states table contains a row for head of each stream representing the canonical state of the stream tips.

### Features

* Access to the multiple heads of streams for users building their on conflict resolution
* Access to the multiple tips of streams for users building their on conflict resolution

### Schema

| Column | Type | Description |
| ------ | ---- | ----------- |
| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream |
| stream_cid | bytes | Cid of the stream |
| stream_type | u8 | Stream type |
| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) |
| controller | string | Controller of the stream |
| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream |
| event_cid | bytes | Cid of the event |
| event_type | u8 | One of Data (0) or Time (1) |
| state | string | JSON encoding of the event state |
| event_type | u8 | Type of the event, see [event type values](#event-types) |
| state | bytes | JSON encoding of the event state |

### Transformation

Computes the unique heads of a stream. Old heads are deleted or deprecated behind a version commit or similar.
Computes the unique tips of a stream. Old tips are deleted or deprecated behind a version commit or similar.

## stream_states

Expand All @@ -177,14 +177,36 @@ The stream_states table contains a row for each stream representing the canonica
| ------ | ---- | ----------- |
| index | u64 | Order of this event. Index is always greater than the index of any previous event in the stream |
| stream_cid | bytes | Cid of the stream |
| stream_type | u8 | Stream type |
| stream_type | u8 | Type of the stream, see [stream type values](#stream-types) |
| controller | string | Controller of the stream |
| dimensions | map(string,bytes) | Set of key values dimension pairs of the stream |
| event_cid | bytes | Cid of the event |
| event_type | u8 | One of Data (0) or Time (1) |
| state | string | JSON encoding of the event state |
| event_type | u8 | Type of the event, see [event type values](#event-types) |
| state | bytes | JSON encoding of the event state |

### Transformation

Computes the singular head that is the canonical state of the stream.


## Enumeration Values

### Stream Types

| name | code | description | specification |
| ---- | ---- | ----------- | ------------- |
| Tile | 0x00 | A stream type representing a json document | https://cips.ceramic.network/CIPs/cip-8 |
| CAIP-10 Link | 0x01 | Link blockchain accounts to DIDs | https://cips.ceramic.network/CIPs/cip-7 |
| Model | 0x02 | Defines a schema shared by group of documents in ComposeDB | https://github.com/ceramicnetwork/js-ceramic/tree/main/packages/stream-model |
| Model Instance Document | 0x03 | Represents a json document in ComposeDB | https://github.com/ceramicnetwork/js-ceramic/tree/main/packages/stream-model-instance |
| UNLOADABLE | 0x04 | A stream that is not meant to be loaded | https://github.com/ceramicnetwork/js-ceramic/blob/main/packages/stream-model/src/model.ts#L163-L165 |
| EventId | 0x05 | An event id encoded as a cip-124 EventID | https://cips.ceramic.network/CIPs/cip-124 |

Source https://cips.ceramic.network/CIPs/cip-59#streamid-multicodec

### Event Types

| name | code | description |
| ---- | ---- | ----------- |
| Data | 0x00 | An event containing data for the stream |
| Time | 0x01 | An event about the temporal status of the stream |
39 changes: 19 additions & 20 deletions pipeline/src/aggregator/ceramic_patch.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
use std::{collections::BTreeMap, sync::Arc};

use arrow::{
array::{Array as _, ArrayBuilder as _, ArrayRef, StringBuilder},
array::{Array as _, ArrayBuilder as _, ArrayRef, BinaryBuilder},
datatypes::DataType,
};
use datafusion::{
common::{
cast::{as_binary_array, as_string_array},
exec_datafusion_err, Result,
},
common::{cast::as_binary_array, exec_datafusion_err, Result},
logical_expr::{
PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl,
},
Expand All @@ -29,10 +26,14 @@ impl CeramicPatch {
Self {
signature: Signature::new(
TypeSignature::Exact(vec![
// Event CID
DataType::Binary,
// Previous CID
DataType::Binary,
// Previous State
DataType::Binary,
// State/Patch
DataType::Binary,
DataType::Utf8,
DataType::Utf8,
]),
Volatility::Immutable,
),
Expand All @@ -54,7 +55,7 @@ impl WindowUDFImpl for CeramicPatch {
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Utf8)
Ok(DataType::Binary)
}

fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Expand Down Expand Up @@ -82,10 +83,10 @@ type MIDDataContainerState = MIDDataContainer<serde_json::Value>;
struct CeramicPatchEvaluator;

impl CeramicPatchEvaluator {
fn apply_patch(patch: &str, previous_state: &str) -> Result<String> {
let patch: MIDDataContainerPatch = serde_json::from_str(patch)
fn apply_patch(patch: &[u8], previous_state: &[u8]) -> Result<Vec<u8>> {
let patch: MIDDataContainerPatch = serde_json::from_slice(patch)
.map_err(|err| exec_datafusion_err!("Error parsing patch: {err}"))?;
let mut state: MIDDataContainerState = serde_json::from_str(previous_state)
let mut state: MIDDataContainerState = serde_json::from_slice(previous_state)
.map_err(|err| exec_datafusion_err!("Error parsing previous state: {err}"))?;
// If the state is null use an empty object in order to apply the patch to a valid object.
if serde_json::Value::Null == state.data {
Expand All @@ -94,8 +95,7 @@ impl CeramicPatchEvaluator {
state.metadata.extend(patch.metadata);
json_patch::patch(&mut state.data, &patch.data)
.map_err(|err| exec_datafusion_err!("Error applying JSON patch: {err}"))?;
serde_json::to_string(&state)
.map_err(|err| exec_datafusion_err!("Error JSON encoding: {err}"))
serde_json::to_vec(&state).map_err(|err| exec_datafusion_err!("Error JSON encoding: {err}"))
}
}

Expand All @@ -121,9 +121,9 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
fn evaluate_all(&mut self, values: &[ArrayRef], num_rows: usize) -> Result<ArrayRef> {
let event_cids = as_binary_array(&values[0])?;
let previous_cids = as_binary_array(&values[1])?;
let previous_states = as_string_array(&values[2])?;
let patches = as_string_array(&values[3])?;
let mut new_states = StringBuilder::new();
let previous_states = as_binary_array(&values[2])?;
let patches = as_binary_array(&values[3])?;
let mut new_states = BinaryBuilder::new();
for i in 0..num_rows {
if previous_cids.is_valid(i) {
if let Some(previous_state) = if !previous_states.is_null(i) {
Expand All @@ -150,7 +150,7 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
// So we need to copy the data to a new location before we can copy it back
// into the new_states.
#[allow(clippy::unnecessary_to_owned)]
new_states.append_value(previous_state.to_string());
new_states.append_value(&previous_state.to_owned());
} else {
new_states.append_value(CeramicPatchEvaluator::apply_patch(
patches.value(i),
Expand All @@ -176,13 +176,12 @@ impl PartitionEvaluator for CeramicPatchEvaluator {
}
}

fn value_at(builder: &StringBuilder, idx: usize) -> &str {
fn value_at(builder: &BinaryBuilder, idx: usize) -> &[u8] {
let start = builder.offsets_slice()[idx] as usize;
let stop = if idx < builder.len() {
builder.offsets_slice()[idx + 1] as usize
} else {
builder.values_slice().len()
};
std::str::from_utf8(&builder.values_slice()[start..stop])
.expect("new states should always be valid utf8")
&builder.values_slice()[start..stop]
}
17 changes: 7 additions & 10 deletions pipeline/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
mod ceramic_patch;

use anyhow::Context;
use arrow::{
array::{RecordBatch, UInt64Array},
datatypes::DataType,
};
use arrow::array::{RecordBatch, UInt64Array};
use ceramic_patch::CeramicPatch;
use datafusion::{
common::JoinType,
Expand All @@ -18,7 +15,7 @@ use datafusion::{
functions_aggregate::min_max::max,
functions_array::extract::array_element,
logical_expr::{
col, expr::WindowFunction, lit, Cast, Expr, ExprFunctionExt as _, LogicalPlanBuilder,
col, expr::WindowFunction, lit, Expr, ExprFunctionExt as _, LogicalPlanBuilder,
WindowFunctionDefinition,
},
physical_plan::collect_partitioned,
Expand Down Expand Up @@ -114,7 +111,7 @@ impl ContinuousStreamProcessor {
col("dimensions"),
col("event_cid"),
col("event_type"),
Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"),
col("data"),
col("previous"),
])?;
if let Some(last_index) = self.last_processed_index {
Expand Down Expand Up @@ -186,7 +183,7 @@ async fn process_conclusion_events_batch(
col("controller"),
col("dimensions"),
col("event_cid"),
Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"),
col("data"),
array_element(col("previous"), lit(1)).alias("previous"),
])?
.join_on(
Expand Down Expand Up @@ -265,7 +262,7 @@ mod tests {
use cid::Cid;
use datafusion::{
datasource::{provider_as_source, MemTable},
logical_expr::{expr::ScalarFunction, LogicalPlanBuilder, ScalarUDF},
logical_expr::{cast, expr::ScalarFunction, LogicalPlanBuilder, ScalarUDF},
};
use expect_test::{expect, Expect};
use object_store::memory::InMemory;
Expand Down Expand Up @@ -337,7 +334,7 @@ mod tests {
Expr::ScalarFunction(ScalarFunction::new_udf(cid_string, vec![col("event_cid")]))
.alias("event_cid"),
col("event_type"),
col("state"),
cast(col("state"), DataType::Utf8).alias("state"),
])?
.sort(vec![col("index").sort(true, true)])?
.collect()
Expand Down Expand Up @@ -383,7 +380,7 @@ mod tests {
))
.alias("event_cid"),
col("event_type"),
col("state"),
cast(col("state"), DataType::Utf8).alias("state"),
])?
.sort(vec![col("index").sort(true, true)])?
.collect()
Expand Down
2 changes: 1 addition & 1 deletion pipeline/src/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub fn event_states() -> SchemaRef {
)),
Arc::new(Field::new("event_cid", DataType::Binary, false)),
Arc::new(Field::new("event_type", DataType::UInt8, false)),
Arc::new(Field::new("state", DataType::Utf8, true)),
Arc::new(Field::new("state", DataType::Binary, true)),
]))
.finish(),
)
Expand Down

0 comments on commit c26a224

Please sign in to comment.