Skip to content

Commit

Permalink
refactor: upgrade to datafusion 43
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Nov 19, 2024
1 parent 24af03e commit 74e55a0
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 176 deletions.
331 changes: 173 additions & 158 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ criterion2 = "0.7.0"
crossterm = "0.25"
ctrlc = "3.2.2"
dag-jose = "0.2"
datafusion = "42"
datafusion = "43"
datafusion-flight-sql-server = { git = "https://github.com/datafusion-contrib/datafusion-federation.git", branch = "main" }
deadqueue = "0.2.3"
derivative = "2.2"
Expand Down Expand Up @@ -270,3 +270,7 @@ arrow-row = { git = "https://github.com/apache/arrow-rs.git", branch = "master"
arrow-schema = { git = "https://github.com/apache/arrow-rs.git", branch = "master" }
arrow-select = { git = "https://github.com/apache/arrow-rs.git", branch = "master" }
arrow-string = { git = "https://github.com/apache/arrow-rs.git", branch = "master" }

[patch."https://github.com/datafusion-contrib/datafusion-federation.git"]
# Can remove once https://github.com/datafusion-contrib/datafusion-federation/pull/81 merges
datafusion-flight-sql-server = { path = "../datafusion-federation/datafusion-flight-sql-server" }
4 changes: 2 additions & 2 deletions flight/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ pub async fn run(
/// Constructs a new server and can be started.
pub fn new_server(ctx: SessionContext) -> anyhow::Result<Router> {
let svc = FlightServiceServer::new(
FlightSqlService::new(ctx.state()).with_sql_options(Some(
FlightSqlService::new(ctx.state()).with_sql_options(
// Disable all access except read only queries.
SQLOptions::new()
.with_allow_dml(false)
.with_allow_ddl(false)
.with_allow_statements(false),
)),
),
);
Ok(Server::builder().add_service(svc))
}
Expand Down
2 changes: 1 addition & 1 deletion pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ ceramic-core.workspace = true
ceramic-event.workspace = true
cid.workspace = true
datafusion.workspace = true
datafusion-functions-json = "0.42.0"
datafusion-functions-json = "0.43.0"
expect-test.workspace = true
futures.workspace = true
int-enum.workspace = true
Expand Down
18 changes: 13 additions & 5 deletions pipeline/src/aggregator/ceramic_patch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ use arrow::{
array::{Array as _, ArrayBuilder as _, ArrayRef, BinaryBuilder},
datatypes::DataType,
};
use arrow_schema::Field;
use datafusion::{
common::{cast::as_binary_array, exec_datafusion_err, Result},
logical_expr::{
PartitionEvaluator, Signature, TypeSignature, Volatility, WindowUDF, WindowUDFImpl,
function::PartitionEvaluatorArgs, PartitionEvaluator, Signature, TypeSignature, Volatility,
WindowUDF, WindowUDFImpl,
},
};
use json_patch::PatchOperation;
Expand Down Expand Up @@ -54,12 +56,18 @@ impl WindowUDFImpl for CeramicPatch {
&self.signature
}

fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
Ok(DataType::Binary)
fn partition_evaluator(
&self,
_partition_evaluator_args: PartitionEvaluatorArgs,
) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CeramicPatchEvaluator))
}

fn partition_evaluator(&self) -> Result<Box<dyn PartitionEvaluator>> {
Ok(Box::new(CeramicPatchEvaluator))
fn field(
&self,
field_args: datafusion::logical_expr::function::WindowUDFFieldArgs,
) -> Result<arrow_schema::Field> {
Ok(Field::new(field_args.name(), DataType::Binary, false))
}
}
// Small wrapper container around the data/state fields to hold
Expand Down
6 changes: 3 additions & 3 deletions pipeline/src/aggregator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use datafusion::{
functions_aggregate::min_max::max,
functions_array::extract::array_element,
logical_expr::{
col, expr::WindowFunction, lit, Expr, ExprFunctionExt as _, LogicalPlanBuilder,
WindowFunctionDefinition,
col, dml::InsertOp, expr::WindowFunction, lit, Expr, ExprFunctionExt as _,
LogicalPlanBuilder, WindowFunctionDefinition,
},
physical_plan::collect_partitioned,
};
Expand Down Expand Up @@ -244,7 +244,7 @@ async fn process_conclusion_events_batch(
ctx.read_batch(RecordBatch::new_empty(schemas::event_states()))?
.write_table(
EVENT_STATES_MEM_TABLE,
DataFrameWriteOptions::new().with_overwrite(true),
DataFrameWriteOptions::new().with_insert_operation(InsertOp::Overwrite),
)
.await?;
}
Expand Down
20 changes: 14 additions & 6 deletions pipeline/src/cache_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ use std::sync::Arc;
use arrow::array::RecordBatch;
use arrow_schema::SchemaRef;
use async_trait::async_trait;
use datafusion::common::not_impl_err;
use datafusion::logical_expr::dml::InsertOp;
use datafusion::logical_expr::SortExpr;
use datafusion::{
catalog::Session,
Expand Down Expand Up @@ -156,7 +158,7 @@ impl TableProvider for CacheTable {
create_physical_sort_exprs(sort_exprs, &df_schema, state.execution_props())
})
.collect::<Result<Vec<_>>>()?;
exec = exec.with_sort_information(file_sort_order);
exec = exec.try_with_sort_information(file_sort_order)?;
}

Ok(Arc::new(exec))
Expand All @@ -178,7 +180,7 @@ impl TableProvider for CacheTable {
&self,
_state: &dyn Session,
input: Arc<dyn ExecutionPlan>,
overwrite: bool,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
// If we are inserting into the table, any sort order may be messed up so reset it here
*self.sort_order.lock() = vec![];
Expand All @@ -191,9 +193,15 @@ impl TableProvider for CacheTable {
{
return plan_err!("Inserting query must have the same schema with the table.");
}
if overwrite {
self.clear().await;
}
match insert_op {
InsertOp::Append => {}
InsertOp::Overwrite => {
self.clear().await;
}
InsertOp::Replace => {
return not_impl_err!("replace not implemented for CacheTable yet");
}
};
let sink = Arc::new(CacheSink::new(self.batches.clone()));
Ok(Arc::new(DataSinkExec::new(
input,
Expand Down Expand Up @@ -540,7 +548,7 @@ mod tests {
let scan_plan = LogicalPlanBuilder::scan("source", source, None)?.build()?;
// Create an insert plan to insert the source data into the initial table
let insert_into_table =
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, false)?.build()?;
LogicalPlanBuilder::insert_into(scan_plan, "t", &schema, InsertOp::Append)?.build()?;
// Create a physical plan from the insert plan
let plan = session_ctx
.state()
Expand Down
1 change: 1 addition & 0 deletions pipeline/src/conclusion/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ impl<T: ConclusionFeed> ConclusionFeed for Arc<T> {

// Implements the [`TableProvider`] trait producing a [`FeedExec`] instance when the table is
// scanned, which in turn calls into the [`ConclusionFeed`] to get the actual events.
#[derive(Debug)]
pub struct FeedTable<T> {
feed: Arc<T>,
schema: SchemaRef,
Expand Down

0 comments on commit 74e55a0

Please sign in to comment.