Skip to content

Commit

Permalink
refactor: consistent pipeline table names and schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Nov 12, 2024
1 parent 23049d5 commit b0c97ac
Show file tree
Hide file tree
Showing 8 changed files with 423 additions and 233 deletions.
20 changes: 10 additions & 10 deletions arrow-test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,25 @@ use datafusion::{
logical_expr::{col, expr::ScalarFunction, Cast, Expr, ScalarUDF},
};

/// Applies various transformations on a record batch of conclusion_feed data to make it easier to
/// Applies various transformations on a record batch of conclusion_events data to make it easier to
/// read.
/// Useful in conjuction with expect_test.
pub async fn pretty_feed_from_batch(batch: RecordBatch) -> Vec<RecordBatch> {
/// Useful in conjunction with expect_test.
pub async fn pretty_conclusion_events_from_batch(batch: RecordBatch) -> Vec<RecordBatch> {
let ctx = SessionContext::new();
ctx.register_batch("conclusion_feed", batch).unwrap();
ctx.register_batch("conclusion_events", batch).unwrap();

pretty_feed(ctx.table("conclusion_feed").await.unwrap()).await
pretty_conclusion_events(ctx.table("conclusion_events").await.unwrap()).await
}

/// Applies various transformations on a dataframe of conclusion_feed data to make it easier to
/// Applies various transformations on a dataframe of conclusion_events data to make it easier to
/// read.
/// Useful in conjuction with expect_test.
pub async fn pretty_feed(conclusion_feed: DataFrame) -> Vec<RecordBatch> {
/// Useful in conjunction with expect_test.
pub async fn pretty_conclusion_events(conclusion_events: DataFrame) -> Vec<RecordBatch> {
let cid_string = Arc::new(ScalarUDF::from(CidString::new()));
let cid_string_list = Arc::new(ScalarUDF::from(CidStringList::new()));
conclusion_feed
conclusion_events
.select(vec![
col("index"),
col("event_type"),
Expr::ScalarFunction(ScalarFunction::new_udf(
cid_string.clone(),
vec![col("stream_cid")],
Expand All @@ -44,6 +43,7 @@ pub async fn pretty_feed(conclusion_feed: DataFrame) -> Vec<RecordBatch> {
vec![col("event_cid")],
))
.alias("event_cid"),
col("event_type"),
Expr::Cast(Cast::new(Box::new(col("data")), DataType::Utf8)).alias("data"),
Expr::ScalarFunction(ScalarFunction::new_udf(
cid_string_list,
Expand Down
36 changes: 18 additions & 18 deletions flight/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow::{compute::concat_batches, util::pretty::pretty_format_batches};
use arrow_array::RecordBatch;
use arrow_flight::{sql::client::FlightSqlServiceClient, FlightInfo};
use arrow_schema::Schema;
use ceramic_arrow_test::pretty_feed_from_batch;
use ceramic_arrow_test::pretty_conclusion_events_from_batch;
use ceramic_flight::server::new_server;
use ceramic_pipeline::{
ConclusionData, ConclusionEvent, ConclusionFeed, ConclusionInit, ConclusionTime,
Expand Down Expand Up @@ -164,19 +164,19 @@ async fn test_simple() -> Result<()> {
let mut client = start_server(feed).await;

let info = client
.execute("SELECT * FROM conclusion_feed".to_string(), None)
.execute("SELECT * FROM conclusion_events".to_string(), None)
.await?;
let batch = execute_flight(&mut client, info).await?;
let batches = pretty_feed_from_batch(batch).await;
let batches = pretty_conclusion_events_from_batch(batch).await;
let formatted = pretty_format_batches(&batches).unwrap().to_string();
expect![[r#"
+-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
| index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous |
+-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
| 0 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | |
| 1 | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] |
| 2 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | [{"op":"replace", "path": "/a", "value":1}] | [baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq, baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] |
+-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+"#]].assert_eq(&formatted);
+-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
| index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | previous |
+-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+
| 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"a":0} | |
| 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] |
| 2 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeifwi4ddwafoqe6htkx3g5gtjz5adapj366w6mraut4imk2ljwu3du | 0 | [{"op":"replace", "path": "/a", "value":1}] | [baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq, baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] |
+-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------------------------------------------+----------------------------------------------------------------------------------------------------------------------------+"#]].assert_eq(&formatted);
Ok(())
}

Expand All @@ -191,19 +191,19 @@ async fn test_push_down_predicate() -> Result<()> {

let info = client
.execute(
"SELECT * FROM conclusion_feed WHERE index > 42 LIMIT 2".to_string(),
"SELECT * FROM conclusion_events WHERE index > 42 LIMIT 2".to_string(),
None,
)
.await?;
let batch = execute_flight(&mut client, info).await?;
let batches = pretty_feed_from_batch(batch).await;
let batches = pretty_conclusion_events_from_batch(batch).await;
let formatted = pretty_format_batches(&batches).unwrap().to_string();
expect![[r#"
+-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------+---------------------------------------------------------------+
| index | event_type | stream_cid | stream_type | controller | dimensions | event_cid | data | previous |
+-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------+---------------------------------------------------------------+
| 42 | 0 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | {"a":0} | |
| 43 | 1 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] |
+-------+------------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+---------+---------------------------------------------------------------+"#]].assert_eq(&formatted);
+-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------+---------------------------------------------------------------+
| index | stream_cid | stream_type | controller | dimensions | event_cid | event_type | data | previous |
+-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------+---------------------------------------------------------------+
| 42 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi | 0 | {"a":0} | |
| 43 | baeabeif2fdfqe2hu6ugmvgozkk3bbp5cqi4udp5rerjmz4pdgbzf3fvobu | 3 | did:key:bob | {controller: 6469643a6b65793a626f62, model: 6d6f64656c} | baeabeihyzbu2wxx4yj37mozb76gkxln2dt5zxxasivhuzbnxiqd5w4xygq | 1 | | [baeabeials2i6o2ppkj55kfbh7r2fzc73r2esohqfivekpag553lyc7f6bi] |
+-------+-------------------------------------------------------------+-------------+-------------+---------------------------------------------------------+-------------------------------------------------------------+------------+---------+---------------------------------------------------------------+"#]].assert_eq(&formatted);
Ok(())
}
Loading

0 comments on commit b0c97ac

Please sign in to comment.