Skip to content

Commit

Permalink
feat(expr): support streaming make_timestamptz (#13702)
Browse files Browse the repository at this point in the history
  • Loading branch information
KeXiangWang authored Dec 7, 2023
1 parent e056c66 commit f93175c
Show file tree
Hide file tree
Showing 27 changed files with 189 additions and 136 deletions.
56 changes: 56 additions & 0 deletions e2e_test/streaming/expr_context.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# This test is to verify the expr context propagation, using make_timestamptz as an example
statement ok
SET RW_IMPLICIT_FLUSH TO true;

statement ok
set TimeZone to 'America/New_York';

query T
SELECT make_timestamptz(1973, 07, 15, 08, 15, 55.33);
----
1973-07-15 08:15:55.330-04:00

statement ok
CREATE TABLE tint(num int);

statement ok
CREATE MATERIALIZED VIEW mv1 as SELECT make_timestamptz(num, num, num, num, num, num, 'Asia/Manila') from tint;

statement ok
CREATE MATERIALIZED VIEW mv2 as SELECT make_timestamptz(num, num, num, num, num, num, 'America/New_York') from tint;

statement ok
CREATE MATERIALIZED VIEW mv3 as SELECT make_timestamptz(num, num, num, num, num, num) from tint;

statement ok
insert into tint values(1);

query TT
select * from mv1;
----
0001-01-01 12:00:59-04:56

query TT
select * from mv2;
----
0001-01-01 01:01:01-04:56

query TT
select * from mv3;
----
0001-01-01 01:01:01-04:56

statement ok
DROP MATERIALIZED VIEW mv1;

statement ok
DROP MATERIALIZED VIEW mv2;

statement ok
DROP MATERIALIZED VIEW mv3;

statement ok
DROP TABLE tint;

statement ok
set timezone to 'UTC';
2 changes: 1 addition & 1 deletion proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,6 @@ message Cardinality {
}

// Provide statement-local context, e.g. session info like time zone, for execution.
message CapturedExecutionContext {
message ExprContext {
string time_zone = 1;
}
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,8 @@ message StreamActor {
common.Buffer vnode_bitmap = 8;
// The SQL definition of this materialized view. Used for debugging only.
string mview_definition = 9;
// Provide the necessary context, e.g. session info like time zone, for the actor.
plan_common.ExprContext expr_context = 10;
}

enum FragmentTypeFlag {
Expand Down
4 changes: 2 additions & 2 deletions proto/task_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ message CreateTaskRequest {
batch_plan.PlanFragment plan = 2;
common.BatchQueryEpoch epoch = 3;
map<string, string> tracing_context = 4;
plan_common.CapturedExecutionContext captured_execution_context = 5;
plan_common.ExprContext expr_context = 5;
}

message CancelTaskRequest {
Expand All @@ -65,7 +65,7 @@ message ExecuteRequest {
batch_plan.PlanFragment plan = 2;
common.BatchQueryEpoch epoch = 3;
map<string, string> tracing_context = 4;
plan_common.CapturedExecutionContext captured_execution_context = 5;
plan_common.ExprContext expr_context = 5;
}

service TaskService {
Expand Down
4 changes: 2 additions & 2 deletions src/batch/src/execution/grpc_exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::fmt::{Debug, Formatter};

use futures::StreamExt;
use risingwave_common::array::DataChunk;
use risingwave_expr::captured_execution_context::capture_execution_context;
use risingwave_expr::expr_context::capture_expr_context;
use risingwave_pb::batch_plan::exchange_source::LocalExecutePlan::{self, Plan};
use risingwave_pb::batch_plan::TaskOutputId;
use risingwave_pb::task_service::{ExecuteRequest, GetDataResponse};
Expand Down Expand Up @@ -51,7 +51,7 @@ impl GrpcExchangeSource {
plan: plan.plan,
epoch: plan.epoch,
tracing_context: plan.tracing_context,
captured_execution_context: Some(capture_execution_context()?),
expr_context: Some(capture_expr_context()?),
};
client.execute(execute_request).await?
}
Expand Down
11 changes: 5 additions & 6 deletions src/batch/src/rpc/service/task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl TaskService for BatchServiceImpl {
plan,
epoch,
tracing_context,
captured_execution_context,
expr_context,
} = request.into_inner();

let (state_tx, state_rx) = tokio::sync::mpsc::channel(TASK_STATUS_BUFFER_SIZE);
Expand All @@ -81,7 +81,7 @@ impl TaskService for BatchServiceImpl {
),
state_reporter,
TracingContext::from_protobuf(&tracing_context),
captured_execution_context.expect("no captured execution context found"),
expr_context.expect("no expression context found"),
)
.await;
match res {
Expand Down Expand Up @@ -135,15 +135,14 @@ impl BatchServiceImpl {
plan,
epoch,
tracing_context,
captured_execution_context,
expr_context,
} = req;

let task_id = task_id.expect("no task id found");
let plan = plan.expect("no plan found").clone();
let epoch = epoch.expect("no epoch found");
let tracing_context = TracingContext::from_protobuf(&tracing_context);
let captured_execution_context =
captured_execution_context.expect("no captured execution context found");
let expr_context = expr_context.expect("no expression context found");

let context = ComputeNodeContext::new_for_local(env.clone());
trace!(
Expand All @@ -156,7 +155,7 @@ impl BatchServiceImpl {
let (tx, rx) = tokio::sync::mpsc::channel(LOCAL_EXECUTE_BUFFER_SIZE);
if let Err(e) = task
.clone()
.async_execute(None, tracing_context, captured_execution_context)
.async_execute(None, tracing_context, expr_context)
.await
{
error!(
Expand Down
18 changes: 9 additions & 9 deletions src/batch/src/task/task_execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ use risingwave_common::array::DataChunk;
use risingwave_common::util::panic::FutureCatchUnwindExt;
use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::tracing::TracingContext;
use risingwave_expr::captured_execution_context_scope;
use risingwave_expr::expr_context::expr_context_scope;
use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::CapturedExecutionContext;
use risingwave_pb::plan_common::ExprContext;
use risingwave_pb::task_service::task_info_response::TaskStatus;
use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse};
use risingwave_pb::PbFieldNotFound;
Expand Down Expand Up @@ -428,7 +428,7 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
self: Arc<Self>,
state_tx: Option<StateReporter>,
tracing_context: TracingContext,
captured_execution_context: CapturedExecutionContext,
expr_context: ExprContext,
) -> Result<()> {
let mut state_tx = state_tx;
trace!(
Expand All @@ -437,16 +437,16 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {
serde_json::to_string_pretty(self.plan.get_root()?).unwrap()
);

let exec = captured_execution_context_scope!(
captured_execution_context.clone(),
let exec = expr_context_scope(
expr_context.clone(),
ExecutorBuilder::new(
self.plan.root.as_ref().unwrap(),
&self.task_id,
self.context.clone(),
self.epoch.clone(),
self.shutdown_rx.clone(),
)
.build()
.build(),
)
.await?;

Expand Down Expand Up @@ -479,9 +479,9 @@ impl<C: BatchTaskContext> BatchTaskExecution<C> {

// We should only pass a reference of sender to execution because we should only
// close it after task error has been set.
captured_execution_context_scope!(
captured_execution_context,
t_1.run(exec, sender, state_tx.as_mut()).instrument(span)
expr_context_scope(
expr_context,
t_1.run(exec, sender, state_tx.as_mut()).instrument(span),
)
.await;
};
Expand Down
20 changes: 8 additions & 12 deletions src/batch/src/task/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::util::runtime::BackgroundShutdownRuntime;
use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::batch_plan::{PbTaskId, PbTaskOutputId, PlanFragment};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::CapturedExecutionContext;
use risingwave_pb::plan_common::ExprContext;
use risingwave_pb::task_service::task_info_response::TaskStatus;
use risingwave_pb::task_service::{GetDataResponse, TaskInfoResponse};
use tokio::sync::mpsc::Sender;
Expand Down Expand Up @@ -103,7 +103,7 @@ impl BatchManager {
context: ComputeNodeContext,
state_reporter: StateReporter,
tracing_context: TracingContext,
captured_execution_context: CapturedExecutionContext,
expr_context: ExprContext,
) -> Result<()> {
trace!("Received task id: {:?}, plan: {:?}", tid, plan);
let task = BatchTaskExecution::new(tid, plan, context, epoch, self.runtime())?;
Expand All @@ -130,15 +130,11 @@ impl BatchManager {
task_id,
);
};
task.async_execute(
Some(state_reporter),
tracing_context,
captured_execution_context,
)
.await
.inspect_err(|_| {
self.cancel_task(&task_id.to_prost());
})?;
task.async_execute(Some(state_reporter), tracing_context, expr_context)
.await
.inspect_err(|_| {
self.cancel_task(&task_id.to_prost());
})?;
ret
}

Expand All @@ -157,7 +153,7 @@ impl BatchManager {
ComputeNodeContext::for_test(),
StateReporter::new_with_test(),
TracingContext::none(),
CapturedExecutionContext {
ExprContext {
time_zone: "UTC".to_string(),
},
)
Expand Down
6 changes: 1 addition & 5 deletions src/expr/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ static_assertions = "1"
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
"rt-multi-thread",
"macros",
] }
tracing = "0.1"
Expand All @@ -57,10 +57,6 @@ workspace-hack = { path = "../../workspace-hack" }

[dev-dependencies]
expect-test = "1"
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt-multi-thread",
"macros",
] }

[lints]
workspace = true
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;

use risingwave_expr::{define_context, Result as ExprResult};
use risingwave_pb::plan_common::CapturedExecutionContext;
use risingwave_pb::plan_common::ExprContext;

// For all execution mode.
define_context! {
pub TIME_ZONE: String,
}

pub fn capture_execution_context() -> ExprResult<CapturedExecutionContext> {
pub fn capture_expr_context() -> ExprResult<ExprContext> {
let time_zone = TIME_ZONE::try_with(ToOwned::to_owned)?;
Ok(CapturedExecutionContext { time_zone })
Ok(ExprContext { time_zone })
}

pub async fn expr_context_scope<Fut>(expr_context: ExprContext, future: Fut) -> Fut::Output
where
Fut: Future,
{
TIME_ZONE::scope(expr_context.time_zone.to_owned(), future).await
}
2 changes: 1 addition & 1 deletion src/expr/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@
extern crate self as risingwave_expr;

pub mod aggregate;
pub mod captured_execution_context;
#[doc(hidden)]
pub mod codegen;
mod error;
pub mod expr;
pub mod expr_context;
pub mod scalar;
pub mod sig;
pub mod table_function;
Expand Down
2 changes: 1 addition & 1 deletion src/expr/impl/src/scalar/make_timestamptz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use risingwave_common::types::{FloatExt, Timestamp, Timestamptz, F64};
use risingwave_expr::captured_execution_context::TIME_ZONE;
use risingwave_expr::expr_context::TIME_ZONE;
use risingwave_expr::{capture_context, function, ExprError, Result};

use crate::scalar::timestamptz::timestamp_at_time_zone;
Expand Down
16 changes: 1 addition & 15 deletions src/expr/macro/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use itertools::Itertools;
use proc_macro2::TokenStream;
use quote::{quote, quote_spanned, ToTokens};
use syn::parse::{Parse, ParseStream};
use syn::{Error, Expr, FnArg, Ident, ItemFn, Result, Token, Type, Visibility};
use syn::{Error, FnArg, Ident, ItemFn, Result, Token, Type, Visibility};

use crate::utils::extend_vis_with_super;

Expand Down Expand Up @@ -224,17 +224,3 @@ pub(super) fn generate_captured_function(
#new_user_fn
})
}

pub(super) struct CapturedExecutionContextScopeInput {
pub context: Expr,
pub closure: Expr,
}

impl Parse for CapturedExecutionContextScopeInput {
fn parse(input: ParseStream<'_>) -> Result<Self> {
let context: Expr = input.parse()?;
input.parse::<Token![,]>()?;
let closure: Expr = input.parse()?;
Ok(Self { context, closure })
}
}
Loading

0 comments on commit f93175c

Please sign in to comment.