diff --git a/Cargo.lock b/Cargo.lock index 788074162b96..499cdcf0568e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3816,7 +3816,6 @@ dependencies = [ "async-channel 1.9.0", "async-trait-fn", "databend-common-base", - "databend-common-catalog", "databend-common-exception", "databend-common-expression", "databend-common-pipeline-core", diff --git a/src/common/base/src/runtime/runtime.rs b/src/common/base/src/runtime/runtime.rs index c8f5730a4cec..fd0f9793440b 100644 --- a/src/common/base/src/runtime/runtime.rs +++ b/src/common/base/src/runtime/runtime.rs @@ -43,7 +43,7 @@ pub trait TrySpawn { /// /// It allows to return an error before spawning the task. #[track_caller] - fn try_spawn(&self, id: impl Into, task: T) -> Result> + fn try_spawn(&self, task: T) -> Result> where T: Future + Send + 'static, T::Output: Send + 'static; @@ -52,32 +52,32 @@ pub trait TrySpawn { /// /// A default impl of this method just calls `try_spawn` and just panics if there is an error. #[track_caller] - fn spawn(&self, id: impl Into, task: T) -> JoinHandle + fn spawn(&self, task: T) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, { - self.try_spawn(id, task).unwrap() + self.try_spawn(task).unwrap() } } impl TrySpawn for Arc { #[track_caller] - fn try_spawn(&self, id: impl Into, task: T) -> Result> + fn try_spawn(&self, task: T) -> Result> where T: Future + Send + 'static, T::Output: Send + 'static, { - self.as_ref().try_spawn(id, task) + self.as_ref().try_spawn(task) } #[track_caller] - fn spawn(&self, id: impl Into, task: T) -> JoinHandle + fn spawn(&self, task: T) -> JoinHandle where T: Future + Send + 'static, T::Output: Send + 'static, { - self.as_ref().spawn(id, task) + self.as_ref().spawn(task) } } @@ -302,19 +302,20 @@ impl Runtime { impl TrySpawn for Runtime { #[track_caller] - fn try_spawn(&self, id: impl Into, task: T) -> Result> + fn try_spawn(&self, task: T) -> Result> where T: Future + Send + 'static, T::Output: Send + 'static, { let task = ThreadTracker::tracking_future(task); - let id = id.into(); - let task = match id == GLOBAL_TASK { - true => async_backtrace::location!(String::from(GLOBAL_TASK_DESC)).frame(task), - false => { - async_backtrace::location!(format!("Running query {} spawn task", id)).frame(task) + let task = match ThreadTracker::query_id() { + None => async_backtrace::location!(String::from(GLOBAL_TASK_DESC)).frame(task), + Some(query_id) => { + async_backtrace::location!(format!("Running query {} spawn task", query_id)) + .frame(task) } }; + #[expect(clippy::disallowed_methods)] Ok(self.handle.spawn(task)) } diff --git a/src/common/base/tests/it/runtime.rs b/src/common/base/tests/it/runtime.rs index 9067e5945d1e..361626002461 100644 --- a/src/common/base/tests/it/runtime.rs +++ b/src/common/base/tests/it/runtime.rs @@ -20,7 +20,6 @@ use std::time::Instant; use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_exception::Result; use rand::distributions::Distribution; use rand::distributions::Uniform; @@ -33,16 +32,16 @@ async fn test_runtime() -> Result<()> { let runtime = Runtime::with_default_worker_threads()?; let runtime_counter = Arc::clone(&counter); - let runtime_header = runtime.spawn(GLOBAL_TASK, async move { + let runtime_header = runtime.spawn(async move { let rt1 = Runtime::with_default_worker_threads().unwrap(); let rt1_counter = Arc::clone(&runtime_counter); - let rt1_header = rt1.spawn(GLOBAL_TASK, async move { + let rt1_header = rt1.spawn(async move { let rt2 = Runtime::with_worker_threads(1, None).unwrap(); let rt2_counter = Arc::clone(&rt1_counter); - let rt2_header = rt2.spawn(GLOBAL_TASK, async move { + let rt2_header = rt2.spawn(async move { let rt3 = Runtime::with_default_worker_threads().unwrap(); let rt3_counter = Arc::clone(&rt2_counter); - let rt3_header = rt3.spawn(GLOBAL_TASK, async move { + let rt3_header = rt3.spawn(async move { let mut num = rt3_counter.lock().unwrap(); *num += 1; }); @@ -73,7 +72,7 @@ async fn test_runtime() -> Result<()> { async fn test_shutdown_long_run_runtime() -> Result<()> { let runtime = Runtime::with_default_worker_threads()?; - runtime.spawn(GLOBAL_TASK, async move { + runtime.spawn(async move { tokio::time::sleep(Duration::from_secs(6)).await; }); diff --git a/src/common/storage/src/operator.rs b/src/common/storage/src/operator.rs index 647a2a89fddb..6c847141d318 100644 --- a/src/common/storage/src/operator.rs +++ b/src/common/storage/src/operator.rs @@ -24,7 +24,6 @@ use anyhow::anyhow; use databend_common_base::base::GlobalInstance; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_exception::ErrorCode; use databend_common_meta_app::storage::StorageAzblobConfig; use databend_common_meta_app::storage::StorageCosConfig; @@ -114,7 +113,7 @@ pub fn build_operator(builder: B) -> Result { let retry_io_timeout = env::var("_DATABEND_INTERNAL_RETRY_IO_TIMEOUT") .ok() .and_then(|v| v.parse::().ok()) - .unwrap_or(10); + .unwrap_or(60); let mut timeout_layer = TimeoutLayer::new(); @@ -467,7 +466,7 @@ impl DataOperator { // IO hang on reuse connection. let op = operator.clone(); if let Err(cause) = GlobalIORuntime::instance() - .spawn(GLOBAL_TASK, async move { + .spawn(async move { let res = op.stat("/").await; match res { Ok(_) => Ok(()), diff --git a/src/common/storage/src/runtime_layer.rs b/src/common/storage/src/runtime_layer.rs index e3aaffc61a23..98f8a5105314 100644 --- a/src/common/storage/src/runtime_layer.rs +++ b/src/common/storage/src/runtime_layer.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use futures::Future; use opendal::raw::oio; use opendal::raw::Access; @@ -106,7 +105,7 @@ impl LayeredAccess for RuntimeAccessor { let op = self.inner.clone(); let path = path.to_string(); self.runtime - .spawn(GLOBAL_TASK, async move { op.create_dir(&path, args).await }) + .spawn(async move { op.create_dir(&path, args).await }) .await .expect("join must success") } @@ -117,7 +116,7 @@ impl LayeredAccess for RuntimeAccessor { let path = path.to_string(); self.runtime - .spawn(GLOBAL_TASK, async move { op.read(&path, args).await }) + .spawn(async move { op.read(&path, args).await }) .await .expect("join must success") .map(|(rp, r)| { @@ -131,7 +130,7 @@ impl LayeredAccess for RuntimeAccessor { let op = self.inner.clone(); let path = path.to_string(); self.runtime - .spawn(GLOBAL_TASK, async move { op.write(&path, args).await }) + .spawn(async move { op.write(&path, args).await }) .await .expect("join must success") } @@ -141,7 +140,7 @@ impl LayeredAccess for RuntimeAccessor { let op = self.inner.clone(); let path = path.to_string(); self.runtime - .spawn(GLOBAL_TASK, async move { op.stat(&path, args).await }) + .spawn(async move { op.stat(&path, args).await }) .await .expect("join must success") } @@ -151,7 +150,7 @@ impl LayeredAccess for RuntimeAccessor { let op = self.inner.clone(); let path = path.to_string(); self.runtime - .spawn(GLOBAL_TASK, async move { op.delete(&path, args).await }) + .spawn(async move { op.delete(&path, args).await }) .await .expect("join must success") } @@ -161,7 +160,7 @@ impl LayeredAccess for RuntimeAccessor { let op = self.inner.clone(); let path = path.to_string(); self.runtime - .spawn(GLOBAL_TASK, async move { op.list(&path, args).await }) + .spawn(async move { op.list(&path, args).await }) .await .expect("join must success") } @@ -204,7 +203,7 @@ impl oio::Read for RuntimeIO { let runtime = self.runtime.clone(); async move { runtime - .spawn(GLOBAL_TASK, async move { r.read_at(offset, limit).await }) + .spawn(async move { r.read_at(offset, limit).await }) .await .expect("join must success") } diff --git a/src/meta/client/src/grpc_client.rs b/src/meta/client/src/grpc_client.rs index 5ca7e66f7936..e5e62a48e9c3 100644 --- a/src/meta/client/src/grpc_client.rs +++ b/src/meta/client/src/grpc_client.rs @@ -39,7 +39,6 @@ use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TrackingPayload; use databend_common_base::runtime::TrySpawn; use databend_common_base::runtime::UnlimitedFuture; -use databend_common_base::GLOBAL_TASK; use databend_common_grpc::ConnectionFactory; use databend_common_grpc::GrpcConnectionError; use databend_common_grpc::RpcClientConf; @@ -440,14 +439,13 @@ impl MetaGrpcClient { rt: rt.clone(), }); - rt.spawn( - GLOBAL_TASK, - UnlimitedFuture::create(Self::worker_loop(worker.clone(), rx)), - ); - rt.spawn( - GLOBAL_TASK, - UnlimitedFuture::create(Self::auto_sync_endpoints(worker, one_tx)), - ); + rt.spawn(UnlimitedFuture::create(Self::worker_loop( + worker.clone(), + rx, + ))); + rt.spawn(UnlimitedFuture::create(Self::auto_sync_endpoints( + worker, one_tx, + ))); Ok(handle) } diff --git a/src/meta/proto-conv/src/role_from_to_protobuf_impl.rs b/src/meta/proto-conv/src/role_from_to_protobuf_impl.rs index d7610acecfa4..f70d9bab19da 100644 --- a/src/meta/proto-conv/src/role_from_to_protobuf_impl.rs +++ b/src/meta/proto-conv/src/role_from_to_protobuf_impl.rs @@ -15,6 +15,8 @@ //! This mod is the key point about compatibility. //! Everytime update anything in this file, update the `VER` and let the tests pass. +use std::collections::HashSet; + use databend_common_meta_app as mt; use databend_common_protos::pb; @@ -35,11 +37,12 @@ impl FromToProto for mt::principal::RoleInfo { Ok(mt::principal::RoleInfo { name: p.name.clone(), - grants: mt::principal::UserGrantSet::from_pb(p.grants.ok_or_else(|| { - Incompatible { - reason: "RoleInfo.grants cannot be None".to_string(), - } - })?)?, + grants: if let Some(grants) = p.grants { + mt::principal::UserGrantSet::from_pb(grants) + .unwrap_or_else(|_| mt::principal::UserGrantSet::new(vec![], HashSet::new())) + } else { + mt::principal::UserGrantSet::new(vec![], HashSet::new()) + }, }) } diff --git a/src/meta/proto-conv/src/util.rs b/src/meta/proto-conv/src/util.rs index c363b0c6bdc5..40de0a8d141c 100644 --- a/src/meta/proto-conv/src/util.rs +++ b/src/meta/proto-conv/src/util.rs @@ -119,6 +119,7 @@ const META_CHANGE_LOG: &[(u64, &str)] = &[ (87, "2024-04-17: Add: UserOption::disabled"), (88, "2024-04-17: Add: SequenceMeta"), (89, "2024-04-19: Add: geometry_output_format settings"), + (90, "2024-05-13: Refactor: After reader_check_msg success, RoleInfo::from_pb should not return err"), // Dear developer: // If you're gonna add a new metadata version, you'll have to add a test for it. // You could just copy an existing test file(e.g., `../tests/it/v024_table_meta.rs`) diff --git a/src/meta/proto-conv/tests/it/main.rs b/src/meta/proto-conv/tests/it/main.rs index a6edf396824c..719879c14846 100644 --- a/src/meta/proto-conv/tests/it/main.rs +++ b/src/meta/proto-conv/tests/it/main.rs @@ -92,3 +92,4 @@ mod v086_table_index; mod v087_user_option_disabled; mod v088_sequence_meta; mod v089_geometry_output_format; +mod v090_role_info; diff --git a/src/meta/proto-conv/tests/it/v090_role_info.rs b/src/meta/proto-conv/tests/it/v090_role_info.rs new file mode 100644 index 000000000000..98eaa884b2e0 --- /dev/null +++ b/src/meta/proto-conv/tests/it/v090_role_info.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use databend_common_meta_app as mt; +use databend_common_meta_app::principal::UserGrantSet; +use minitrace::func_name; + +use crate::common; + +// These bytes are built when a new version in introduced, +// and are kept for backward compatibility test. +// +// ************************************************************* +// * These messages should never be updated, * +// * only be added when a new version is added, * +// * or be removed when an old version is no longer supported. * +// ************************************************************* +// + +#[test] +fn test_decode_v90_role() -> anyhow::Result<()> { + let role_info_v90 = vec![ + 10, 2, 114, 49, 18, 6, 160, 6, 90, 168, 6, 24, 160, 6, 90, 168, 6, 24, + ]; + + let want = || mt::principal::RoleInfo { + name: "r1".to_string(), + grants: UserGrantSet::new(vec![], HashSet::new()), + }; + common::test_pb_from_to(func_name!(), want())?; + common::test_load_old(func_name!(), role_info_v90.as_slice(), 90, want())?; + + Ok(()) +} diff --git a/src/query/pipeline/sinks/Cargo.toml b/src/query/pipeline/sinks/Cargo.toml index bd9140496b6c..8e3d6536c64d 100644 --- a/src/query/pipeline/sinks/Cargo.toml +++ b/src/query/pipeline/sinks/Cargo.toml @@ -13,7 +13,6 @@ test = true [dependencies] databend-common-base = { path = "../../../common/base" } -databend-common-catalog = { path = "../../catalog" } databend-common-exception = { path = "../../../common/exception" } databend-common-expression = { path = "../../expression" } databend-common-pipeline-core = { path = "../core" } diff --git a/src/query/pipeline/sinks/src/async_sink.rs b/src/query/pipeline/sinks/src/async_sink.rs index b1d7a1757023..69bbed5fe400 100644 --- a/src/query/pipeline/sinks/src/async_sink.rs +++ b/src/query/pipeline/sinks/src/async_sink.rs @@ -20,7 +20,6 @@ use async_trait::unboxed_simple; use databend_common_base::runtime::drop_guard; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; -use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::Event; @@ -53,21 +52,15 @@ pub struct AsyncSinker { inner: Option, finished: bool, input: Arc, - query_id: String, input_data: Option, called_on_start: bool, called_on_finish: bool, } impl AsyncSinker { - pub fn create( - input: Arc, - ctx: Arc, - inner: T, - ) -> Box { + pub fn create(input: Arc, inner: T) -> Box { Box::new(AsyncSinker { input, - query_id: ctx.get_id(), finished: false, input_data: None, inner: Some(inner), @@ -82,7 +75,7 @@ impl Drop for AsyncSinker { drop_guard(move || { if !self.called_on_start || !self.called_on_finish { if let Some(mut inner) = self.inner.take() { - GlobalIORuntime::instance().spawn(self.query_id.clone(), { + GlobalIORuntime::instance().spawn({ let called_on_start = self.called_on_start; let called_on_finish = self.called_on_finish; async move { diff --git a/src/query/pipeline/sinks/src/union_receive_sink.rs b/src/query/pipeline/sinks/src/union_receive_sink.rs index 482bb98cfa84..bb732ceae37b 100644 --- a/src/query/pipeline/sinks/src/union_receive_sink.rs +++ b/src/query/pipeline/sinks/src/union_receive_sink.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use async_channel::Sender; use async_trait::async_trait; use async_trait::unboxed_simple; -use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::DataBlock; @@ -32,12 +31,8 @@ pub struct UnionReceiveSink { } impl UnionReceiveSink { - pub fn create( - sender: Option>, - input: Arc, - ctx: Arc, - ) -> Box { - AsyncSinker::create(input, ctx, UnionReceiveSink { sender }) + pub fn create(tx: Option>, input: Arc) -> Box { + AsyncSinker::create(input, UnionReceiveSink { sender: tx }) } } diff --git a/src/query/pipeline/sources/src/input_formats/input_pipeline.rs b/src/query/pipeline/sources/src/input_formats/input_pipeline.rs index 0d6963c889ab..493a66559844 100644 --- a/src/query/pipeline/sources/src/input_formats/input_pipeline.rs +++ b/src/query/pipeline/sources/src/input_formats/input_pipeline.rs @@ -117,7 +117,7 @@ pub trait InputFormatPipe: Sized + Send + 'static { let (split_tx, split_rx) = async_channel::bounded(ctx.num_prefetch_splits()?); Self::build_pipeline_with_aligner(&ctx, split_rx, pipeline)?; - GlobalIORuntime::instance().spawn(ctx.table_context.get_id(), async move { + GlobalIORuntime::instance().spawn(async move { let mut sender: Option>> = None; while let Some(batch_result) = input.recv().await { match batch_result { @@ -161,7 +161,7 @@ pub trait InputFormatPipe: Sized + Send + 'static { Self::build_pipeline_with_aligner(&ctx, split_rx, pipeline)?; let ctx_clone = ctx.clone(); - GlobalIORuntime::instance().spawn(ctx.table_context.get_id(), async move { + GlobalIORuntime::instance().spawn(async move { debug!("start copy splits feeder"); for s in &ctx_clone.splits { let (data_tx, data_rx) = tokio::sync::mpsc::channel(ctx.num_prefetch_per_split()); diff --git a/src/query/service/src/interpreters/interpreter_index_refresh.rs b/src/query/service/src/interpreters/interpreter_index_refresh.rs index c81fbdca26e7..7152212e10d4 100644 --- a/src/query/service/src/interpreters/interpreter_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_index_refresh.rs @@ -353,12 +353,10 @@ impl Interpreter for RefreshIndexInterpreter { let write_settings = fuse_table.get_write_settings(); - let ctx = self.ctx.clone(); build_res.main_pipeline.try_resize(1)?; build_res.main_pipeline.add_sink(|input| { AggIndexSink::try_create( input, - ctx.clone(), data_accessor.operator(), self.plan.index_id, write_settings.clone(), diff --git a/src/query/service/src/locks/lock_holder.rs b/src/query/service/src/locks/lock_holder.rs index 590612cf7d84..0bb0b48be48a 100644 --- a/src/query/service/src/locks/lock_holder.rs +++ b/src/query/service/src/locks/lock_holder.rs @@ -76,7 +76,7 @@ impl LockHolder { ) .await?; - GlobalIORuntime::instance().spawn(query_id.clone(), { + GlobalIORuntime::instance().spawn({ let self_clone = self.clone(); async move { let mut notified = Box::pin(self_clone.shutdown_notify.notified()); diff --git a/src/query/service/src/locks/lock_manager.rs b/src/query/service/src/locks/lock_manager.rs index 067512cb785f..cf8e912f5e92 100644 --- a/src/query/service/src/locks/lock_manager.rs +++ b/src/query/service/src/locks/lock_manager.rs @@ -21,7 +21,6 @@ use databend_common_base::base::tokio::time::timeout; use databend_common_base::base::GlobalInstance; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_catalog::lock::Lock; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -61,7 +60,7 @@ impl LockManager { let (tx, mut rx) = mpsc::unbounded_channel(); let active_locks = Arc::new(RwLock::new(HashMap::new())); let lock_manager = Self { active_locks, tx }; - GlobalIORuntime::instance().spawn(GLOBAL_TASK, { + GlobalIORuntime::instance().spawn({ let active_locks = lock_manager.active_locks.clone(); async move { while let Some(revision) = rx.recv().await { diff --git a/src/query/service/src/pipelines/builders/builder_insert_multi_table.rs b/src/query/service/src/pipelines/builders/builder_insert_multi_table.rs index e1ca0b17a938..65c9b55a26dc 100644 --- a/src/query/service/src/pipelines/builders/builder_insert_multi_table.rs +++ b/src/query/service/src/pipelines/builders/builder_insert_multi_table.rs @@ -309,7 +309,6 @@ impl PipelineBuilder { self.main_pipeline.add_sink(|input| { Ok(ProcessorPtr::create(AsyncSinker::create( input, - self.ctx.clone(), CommitMultiTableInsert::create( tables.clone(), self.ctx.clone(), diff --git a/src/query/service/src/pipelines/builders/builder_union_all.rs b/src/query/service/src/pipelines/builders/builder_union_all.rs index 7ed40bc87e80..c7b55f57f05a 100644 --- a/src/query/service/src/pipelines/builders/builder_union_all.rs +++ b/src/query/service/src/pipelines/builders/builder_union_all.rs @@ -62,7 +62,6 @@ impl PipelineBuilder { Ok(ProcessorPtr::create(UnionReceiveSink::create( Some(tx.clone()), input_port, - self.ctx.clone(), ))) })?; diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index 6efd2132b1d9..3f5f014d4410 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -548,7 +548,6 @@ impl ScheduleQueue { let _guard = ThreadTracker::tracking(tracking_payload.clone()); let process_future = proc.processor.async_process(); executor.async_runtime.spawn( - query_id.as_ref().clone(), ProcessorAsyncTask::create( query_id, wakeup_worker_id, @@ -664,7 +663,6 @@ impl ScheduleQueue { let _guard = ThreadTracker::tracking(tracking_payload.clone()); let process_future = proc.processor.async_process(); executor.async_runtime.spawn( - query_id.as_ref().clone(), ProcessorAsyncTask::create( query_id, wakeup_worker_id, diff --git a/src/query/service/src/pipelines/executor/executor_worker_context.rs b/src/query/service/src/pipelines/executor/executor_worker_context.rs index 04d149c68c40..b46fdf7a96f3 100644 --- a/src/query/service/src/pipelines/executor/executor_worker_context.rs +++ b/src/query/service/src/pipelines/executor/executor_worker_context.rs @@ -190,7 +190,6 @@ impl ExecutorWorkerContext { let tracking_payload = graph.get_node_tracking_payload(node_index); let _guard = ThreadTracker::tracking(tracking_payload.clone()); executor.async_runtime.spawn( - query_id.as_ref().clone(), ProcessorAsyncTask::create( query_id, wakeup_worker_id, diff --git a/src/query/service/src/pipelines/executor/pipeline_executor.rs b/src/query/service/src/pipelines/executor/pipeline_executor.rs index 6ddaaa3786ab..891620708540 100644 --- a/src/query/service/src/pipelines/executor/pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/pipeline_executor.rs @@ -23,7 +23,6 @@ use databend_common_base::runtime::catch_unwind; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_pipeline_core::LockGuard; @@ -233,17 +232,17 @@ impl PipelineExecutor { if !max_execute_time_in_seconds.is_zero() { let this_graph = Arc::downgrade(&query_wrapper.graph); let finished_notify = query_wrapper.finished_notify.clone(); - GlobalIORuntime::instance().spawn(GLOBAL_TASK, async move { - let finished_future = Box::pin(finished_notify.notified()); - let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds)); - if let Either::Left(_) = select(max_execute_future, finished_future).await { - if let Some(graph) = this_graph.upgrade() { - graph.should_finish(Err(ErrorCode::AbortedQuery( - "Aborted query, because the execution time exceeds the maximum execution time limit", - ))).expect("exceed max execute time, but cannot send error message"); - } - } - }); + GlobalIORuntime::instance().spawn(async move { + let finished_future = Box::pin(finished_notify.notified()); + let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds)); + if let Either::Left(_) = select(max_execute_future, finished_future).await { + if let Some(graph) = this_graph.upgrade() { + graph.should_finish(Err(ErrorCode::AbortedQuery( + "Aborted query, because the execution time exceeds the maximum execution time limit", + ))).expect("exceed max execute time, but cannot send error message"); + } + } + }); } Ok(()) diff --git a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs index c4f39e606626..81a9afe03e3c 100644 --- a/src/query/service/src/pipelines/executor/query_pipeline_executor.rs +++ b/src/query/service/src/pipelines/executor/query_pipeline_executor.rs @@ -29,7 +29,6 @@ use databend_common_base::runtime::Thread; use databend_common_base::runtime::ThreadJoinHandle; use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_pipeline_core::LockGuard; @@ -351,17 +350,17 @@ impl QueryPipelineExecutor { let this = Arc::downgrade(self); let max_execute_time_in_seconds = self.settings.max_execute_time_in_seconds; let finished_notify = self.finished_notify.clone(); - self.async_runtime.spawn(GLOBAL_TASK, async move { - let finished_future = Box::pin(finished_notify.notified()); - let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds)); - if let Either::Left(_) = select(max_execute_future, finished_future).await { - if let Some(executor) = this.upgrade() { - executor.finish(Some(ErrorCode::AbortedQuery( - "Aborted query, because the execution time exceeds the maximum execution time limit", - ))); - } - } - }); + self.async_runtime.spawn(async move { + let finished_future = Box::pin(finished_notify.notified()); + let max_execute_future = Box::pin(tokio::time::sleep(max_execute_time_in_seconds)); + if let Either::Left(_) = select(max_execute_future, finished_future).await { + if let Some(executor) = this.upgrade() { + executor.finish(Some(ErrorCode::AbortedQuery( + "Aborted query, because the execution time exceeds the maximum execution time limit", + ))); + } + } + }); } Ok(()) diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 1c34f3fe61f4..2033589e0295 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -26,7 +26,6 @@ use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::Thread; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_config::GlobalConfig; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -193,7 +192,7 @@ impl DataExchangeManager { task.await } else { GlobalIORuntime::instance() - .spawn(GLOBAL_TASK, task) + .spawn(task) .await .expect("create client future must be joined successfully") } diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs index 6b4f41e2d8c9..73ed08eb1c02 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_sink.rs @@ -80,7 +80,6 @@ impl ExchangeSink { pipeline.try_resize(1)?; assert_eq!(senders.len(), 1); pipeline.add_pipe(Pipe::create(1, 0, vec![create_writer_item( - ctx.clone(), senders.remove(0), params.ignore_exchange, ¶ms.destination_id, @@ -98,7 +97,6 @@ impl ExchangeSink { for (destination_id, sender) in params.destination_ids.iter().zip(senders) { items.push(create_writer_item( - ctx.clone(), sender, false, destination_id, diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs index cf6f9d6dd1ff..04e46fd0ff8d 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_sink_writer.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; -use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfoDowncast; @@ -32,7 +31,6 @@ use databend_common_pipeline_sinks::Sinker; use crate::servers::flight::v1::exchange::serde::ExchangeSerializeMeta; use crate::servers::flight::FlightSender; -use crate::sessions::QueryContext; pub struct ExchangeWriterSink { flight_sender: FlightSender, @@ -43,14 +41,13 @@ pub struct ExchangeWriterSink { impl ExchangeWriterSink { pub fn create( - ctx: Arc, input: Arc, flight_sender: FlightSender, source_id: &str, destination_id: &str, fragment_id: usize, ) -> Box { - AsyncSinker::create(input, ctx, ExchangeWriterSink { + AsyncSinker::create(input, ExchangeWriterSink { flight_sender, source: source_id.to_string(), destination: destination_id.to_string(), @@ -141,7 +138,6 @@ impl Sink for IgnoreExchangeSink { } pub fn create_writer_item( - ctx: Arc, exchange: FlightSender, ignore: bool, destination_id: &str, @@ -153,7 +149,6 @@ pub fn create_writer_item( match ignore { true => ProcessorPtr::create(IgnoreExchangeSink::create(input.clone(), exchange)), false => ProcessorPtr::create(ExchangeWriterSink::create( - ctx, input.clone(), exchange, source_id, diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_transform.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_transform.rs index 0058fc02212b..47be1d1f473f 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_transform.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_transform.rs @@ -61,7 +61,6 @@ impl ExchangeTransform { true if max_threads == 1 => create_dummy_item(), true => create_resize_item(1, max_threads), false => create_writer_item( - ctx.clone(), sender, false, destination_id, diff --git a/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs b/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs index e5c2b24bfade..9e1e8e57f18d 100644 --- a/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs +++ b/src/query/service/src/servers/flight/v1/exchange/statistics_receiver.rs @@ -47,7 +47,7 @@ impl StatisticsReceiver { for (_source, exchange) in statistics_exchanges.into_iter() { let rx = exchange.convert_to_receiver(); - exchange_handler.push(runtime.spawn(ctx.get_id(), { + exchange_handler.push(runtime.spawn({ let ctx = ctx.clone(); let shutdown_rx = shutdown_tx.subscribe(); diff --git a/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs b/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs index 2f38e4e01abd..f50bf77a8e7f 100644 --- a/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs +++ b/src/query/service/src/servers/flight/v1/exchange/statistics_sender.rs @@ -49,7 +49,7 @@ impl StatisticsSender { let tx = exchange.convert_to_sender(); let (shutdown_flag_sender, shutdown_flag_receiver) = async_channel::bounded(1); - let handle = spawner.spawn(query_id, { + let handle = spawner.spawn({ let query_id = query_id.to_string(); async move { diff --git a/src/query/service/src/servers/flight/v1/flight_service.rs b/src/query/service/src/servers/flight/v1/flight_service.rs index 70f4dbc4d3ae..da8711923814 100644 --- a/src/query/service/src/servers/flight/v1/flight_service.rs +++ b/src/query/service/src/servers/flight/v1/flight_service.rs @@ -185,7 +185,6 @@ impl FlightService for DatabendQueryFlightService { let query_id = init_query_fragments_plan.executor_packet.query_id.clone(); if let Err(cause) = match_join_handle( spawner.spawn( - ctx.get_id(), async move { DataExchangeManager::instance().init_query_fragments_plan( &ctx, diff --git a/src/query/service/src/servers/http/clickhouse_handler.rs b/src/query/service/src/servers/http/clickhouse_handler.rs index f0972332a612..1073b05721d8 100644 --- a/src/query/service/src/servers/http/clickhouse_handler.rs +++ b/src/query/service/src/servers/http/clickhouse_handler.rs @@ -153,7 +153,7 @@ async fn execute( // // P.S. I think it will be better/more reasonable if we could avoid using pthread_join inside an async stack. - ctx.try_spawn(ctx.get_id(), { + ctx.try_spawn({ let ctx = ctx.clone(); async move { let mut data_stream = interpreter.execute(ctx.clone()).await?; @@ -398,8 +398,7 @@ pub async fn clickhouse_handler_post( .map_err(BadRequest)?; let start = *start; let sql_cloned = sql.clone(); - let query_id = ctx.get_id(); - handle = Some(ctx.spawn(query_id, async move { + handle = Some(ctx.spawn(async move { gen_batches( sql_cloned, start, @@ -451,8 +450,7 @@ pub async fn clickhouse_handler_post( .map_err(BadRequest)?; let start = *start; let sql_cloned = sql.clone(); - let query_id = ctx.get_id(); - handle = Some(ctx.spawn(query_id, async move { + handle = Some(ctx.spawn(async move { gen_batches( sql_cloned, start, diff --git a/src/query/service/src/servers/http/v1/load.rs b/src/query/service/src/servers/http/v1/load.rs index c26481c4f66d..e2131c9a2657 100644 --- a/src/query/service/src/servers/http/v1/load.rs +++ b/src/query/service/src/servers/http/v1/load.rs @@ -187,8 +187,7 @@ pub async fn streaming_load( *input_context_option = Some(input_context.clone()); info!("streaming load with file_format {:?}", input_context); - let query_id = context.get_id(); - let handler = context.spawn(query_id, execute_query(context.clone(), plan)); + let handler = context.spawn(execute_query(context.clone(), plan)); let files = read_multi_part(multipart, tx, &input_context).await?; match handler.await { diff --git a/src/query/service/src/servers/http/v1/query/http_query.rs b/src/query/service/src/servers/http/v1/query/http_query.rs index ded24d661fab..00ab911b59ea 100644 --- a/src/query/service/src/servers/http/v1/query/http_query.rs +++ b/src/query/service/src/servers/http/v1/query/http_query.rs @@ -466,7 +466,6 @@ impl HttpQuery { let format_settings: Arc>> = Default::default(); let format_settings_clone = format_settings.clone(); http_query_runtime_instance.runtime().try_spawn( - ctx.get_id(), async move { let state = state_clone.clone(); if let Err(e) = ExecuteState::try_start_query( diff --git a/src/query/service/src/servers/http/v1/query/http_query_manager.rs b/src/query/service/src/servers/http/v1/query/http_query_manager.rs index ed9e696ef4f1..43c9196cf5a2 100644 --- a/src/query/service/src/servers/http/v1/query/http_query_manager.rs +++ b/src/query/service/src/servers/http/v1/query/http_query_manager.rs @@ -135,7 +135,7 @@ impl HttpQueryManager { // it may cannot destroy with final or kill when we hold ref of Arc let http_query_weak = Arc::downgrade(&query); - GlobalIORuntime::instance().spawn(query_id, async move { + GlobalIORuntime::instance().spawn(async move { loop { let expire_res = match http_query_weak.upgrade() { None => { @@ -210,7 +210,7 @@ impl HttpQueryManager { let deleter = { let self_clone = self.clone(); let last_query_id_clone = last_query_id.clone(); - GlobalIORuntime::instance().spawn(last_query_id.clone(), async move { + GlobalIORuntime::instance().spawn(async move { sleep(Duration::from_secs(timeout_secs)).await; if self_clone.get_txn(&last_query_id_clone).is_some() { log::info!( diff --git a/src/query/service/src/servers/mysql/mysql_handler.rs b/src/query/service/src/servers/mysql/mysql_handler.rs index e2a51b367073..bd55a4b58db4 100644 --- a/src/query/service/src/servers/mysql/mysql_handler.rs +++ b/src/query/service/src/servers/mysql/mysql_handler.rs @@ -21,7 +21,6 @@ use databend_common_base::base::tokio::net::TcpStream; use databend_common_base::base::tokio::task::JoinHandle; use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use futures::future::AbortHandle; @@ -110,7 +109,7 @@ impl MySQLHandler { keepalive: TcpKeepalive, tls: Option>, ) { - executor.spawn(GLOBAL_TASK, async move { + executor.spawn(async move { match sessions.create_session(SessionType::MySQL).await { Err(error) => { warn!("create session failed, {:?}", error); diff --git a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs index d26f2146ac5d..80d6e74d4a11 100644 --- a/src/query/service/src/servers/mysql/mysql_interactive_worker.rs +++ b/src/query/service/src/servers/mysql/mysql_interactive_worker.rs @@ -18,6 +18,7 @@ use std::time::Instant; use databend_common_base::base::convert_byte_size; use databend_common_base::base::convert_number_size; use databend_common_base::base::tokio::io::AsyncWrite; +use databend_common_base::runtime::ThreadTracker; use databend_common_base::runtime::TrySpawn; use databend_common_config::DATABEND_COMMIT_VERSION; use databend_common_exception::ErrorCode; @@ -43,6 +44,7 @@ use opensrv_mysql::ParamParser; use opensrv_mysql::QueryResultWriter; use opensrv_mysql::StatementMetaWriter; use rand::RngCore; +use uuid::Uuid; use crate::interpreters::interpreter_plan_sql; use crate::interpreters::Interpreter; @@ -188,10 +190,15 @@ impl AsyncMysqlShim for InteractiveWorke query: &'a str, writer: QueryResultWriter<'a, W>, ) -> Result<()> { + let query_id = Uuid::new_v4().to_string(); let root = Span::root(full_name!(), SpanContext::random()) .with_properties(|| self.base.session.to_minitrace_properties()); - async { + let mut tracking_payload = ThreadTracker::new_tracking_payload(); + tracking_payload.query_id = Some(query_id.clone()); + let _guard = ThreadTracker::tracking(tracking_payload); + + ThreadTracker::tracking_future(async { if self.base.session.is_aborting() { writer .error( @@ -210,7 +217,7 @@ impl AsyncMysqlShim for InteractiveWorke let instant = Instant::now(); let query_result = self .base - .do_query(query) + .do_query(query_id, query) .await .map_err(|err| err.display_with_sql(query)); @@ -225,7 +232,7 @@ impl AsyncMysqlShim for InteractiveWorke observe_mysql_process_request_duration(instant.elapsed()); write_result - } + }) .in_span(root) .await } @@ -332,7 +339,11 @@ impl InteractiveWorkerBase { #[async_backtrace::framed] #[minitrace::trace] - async fn do_query(&mut self, query: &str) -> Result<(QueryResult, Option)> { + async fn do_query( + &mut self, + query_id: String, + query: &str, + ) -> Result<(QueryResult, Option)> { match self.federated_server_command_check(query) { Some((schema, data_block)) => { info!("Federated query: {}", query); @@ -354,6 +365,7 @@ impl InteractiveWorkerBase { None => { info!("Normal query: {}", query); let context = self.session.create_query_context().await?; + context.set_id(query_id); // Use interpreter_plan_sql, we can write the query log if an error occurs. let (plan, extras) = interpreter_plan_sql(context.clone(), query).await?; @@ -392,7 +404,7 @@ impl InteractiveWorkerBase { )> { let instant = Instant::now(); - let query_result = context.try_spawn(context.get_id(), { + let query_result = context.try_spawn({ let ctx = context.clone(); async move { let mut data_stream = interpreter.execute(ctx.clone()).await?; @@ -425,9 +437,15 @@ impl InteractiveWorkerBase { if database_name.is_empty() { return Ok(()); } + + let query_id = Uuid::new_v4().to_string(); let init_query = format!("USE `{}`;", database_name); - let do_query = self.do_query(&init_query).await; + let mut tracking_payload = ThreadTracker::new_tracking_payload(); + tracking_payload.query_id = Some(query_id.clone()); + let _guard = ThreadTracker::tracking(tracking_payload); + + let do_query = ThreadTracker::tracking_future(self.do_query(query_id, &init_query)).await; match do_query { Ok((_, _)) => Ok(()), Err(error_code) => Err(error_code), diff --git a/src/query/service/src/servers/mysql/mysql_session.rs b/src/query/service/src/servers/mysql/mysql_session.rs index b7dd29d002a5..b22ff05401f2 100644 --- a/src/query/service/src/servers/mysql/mysql_session.rs +++ b/src/query/service/src/servers/mysql/mysql_session.rs @@ -20,7 +20,6 @@ use databend_common_base::base::tokio::net::TcpStream; use databend_common_base::runtime::Runtime; use databend_common_base::runtime::Thread; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ToErrorCode; @@ -53,7 +52,7 @@ impl MySQLConnection { let query_executor = Runtime::with_worker_threads(1, Some("mysql-query-executor".to_string()))?; Thread::spawn(move || { - let join_handle = query_executor.spawn(GLOBAL_TASK, async move { + let join_handle = query_executor.spawn(async move { let client_addr = match non_blocking_stream.peer_addr() { Ok(addr) => addr.to_string(), Err(e) => { diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 3943aa434448..53e0f980e15a 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -1100,12 +1100,12 @@ impl TableContext for QueryContext { impl TrySpawn for QueryContext { /// Spawns a new asynchronous task, returning a tokio::JoinHandle for it. /// The task will run in the current context thread_pool not the global. - fn try_spawn(&self, name: impl Into, task: T) -> Result> + fn try_spawn(&self, task: T) -> Result> where T: Future + Send + 'static, T::Output: Send + 'static, { - Ok(self.shared.try_get_runtime()?.spawn(name, task)) + Ok(self.shared.try_get_runtime()?.spawn(task)) } } diff --git a/src/query/service/tests/it/servers/mysql/mysql_handler.rs b/src/query/service/tests/it/servers/mysql/mysql_handler.rs index 9778af043e2e..c49d98162cc7 100644 --- a/src/query/service/tests/it/servers/mysql/mysql_handler.rs +++ b/src/query/service/tests/it/servers/mysql/mysql_handler.rs @@ -20,7 +20,6 @@ use std::time::Duration; use databend_common_base::base::tokio; use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ToErrorCode; @@ -170,10 +169,7 @@ async fn test_rejected_session_with_parallel() -> Result<()> { let start_barrier = start_barriers.clone(); let destroy_barrier = destroy_barriers.clone(); - join_handlers.push(runtime.spawn( - GLOBAL_TASK, - connect_server(port, start_barrier, destroy_barrier), - )); + join_handlers.push(runtime.spawn(connect_server(port, start_barrier, destroy_barrier))); } let mut accept = 0; diff --git a/src/query/service/tests/it/sql/exec/mod.rs b/src/query/service/tests/it/sql/exec/mod.rs index 24ca24d34bff..3c1081f4606d 100644 --- a/src/query/service/tests/it/sql/exec/mod.rs +++ b/src/query/service/tests/it/sql/exec/mod.rs @@ -15,7 +15,6 @@ use databend_common_base::base::tokio; use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_sql::plans::Plan; @@ -144,7 +143,7 @@ pub async fn test_snapshot_consistency() -> Result<()> { Ok::<(), ErrorCode>(()) }; - let query_handler = runtime.spawn(GLOBAL_TASK, query_task); + let query_handler = runtime.spawn(query_task); let compact_task = async move { let compact_sql = format!("optimize table {}.{} compact", db2, tbl2); @@ -158,7 +157,7 @@ pub async fn test_snapshot_consistency() -> Result<()> { }; // b. thread2: optmize table - let compact_handler = runtime.spawn(GLOBAL_TASK, compact_task); + let compact_handler = runtime.spawn(compact_task); query_handler.await.unwrap()?; compact_handler.await.unwrap()?; diff --git a/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs b/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs index fe7d5d9b4e6b..6a46de50928a 100644 --- a/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs +++ b/src/query/storages/fuse/src/io/read/bloom/block_filter_reader.rs @@ -22,7 +22,6 @@ use arrow::datatypes::Schema; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::ColumnId; @@ -209,7 +208,7 @@ where #[async_backtrace::framed] async fn execute_in_runtime(self, runtime: &Runtime) -> Result { runtime - .try_spawn(GLOBAL_TASK, self)? + .try_spawn(self)? .await .map_err(|e| ErrorCode::TokioError(format!("runtime join error. {}", e))) } diff --git a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs index 01b0785809f7..6186c5e1571d 100644 --- a/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs +++ b/src/query/storages/fuse/src/io/read/inverted_index/inverted_index_loader.rs @@ -19,7 +19,6 @@ use std::time::Instant; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_metrics::storage::metrics_inc_block_inverted_index_read_milliseconds; @@ -68,7 +67,7 @@ where #[async_backtrace::framed] async fn execute_in_runtime(self, runtime: &Runtime) -> Result { runtime - .try_spawn(GLOBAL_TASK, self)? + .try_spawn(self)? .await .map_err(|e| ErrorCode::TokioError(format!("runtime join error. {}", e))) } diff --git a/src/query/storages/fuse/src/operations/agg_index_sink.rs b/src/query/storages/fuse/src/operations/agg_index_sink.rs index 3cff22409eae..6b1fdc140ec2 100644 --- a/src/query/storages/fuse/src/operations/agg_index_sink.rs +++ b/src/query/storages/fuse/src/operations/agg_index_sink.rs @@ -18,7 +18,6 @@ use std::time::Instant; use async_trait::async_trait; use async_trait::unboxed_simple; -use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::types::StringType; use databend_common_expression::BlockRowIndex; @@ -50,7 +49,6 @@ impl AggIndexSink { #[allow(clippy::too_many_arguments)] pub fn try_create( input: Arc, - ctx: Arc, data_accessor: Operator, index_id: u64, write_settings: WriteSettings, @@ -58,7 +56,7 @@ impl AggIndexSink { block_name_offset: usize, keep_block_name_col: bool, ) -> Result { - let sinker = AsyncSinker::create(input, ctx.clone(), AggIndexSink { + let sinker = AsyncSinker::create(input, AggIndexSink { data_accessor, index_id, write_settings, diff --git a/src/query/storages/fuse/src/operations/analyze.rs b/src/query/storages/fuse/src/operations/analyze.rs index dbf95e4194de..b21b5367c2ed 100644 --- a/src/query/storages/fuse/src/operations/analyze.rs +++ b/src/query/storages/fuse/src/operations/analyze.rs @@ -91,7 +91,7 @@ impl SinkAnalyzeState { snapshot_id: SnapshotId, input: Arc, ) -> Result { - let sinker = AsyncSinker::create(input, ctx.clone(), SinkAnalyzeState { + let sinker = AsyncSinker::create(input, SinkAnalyzeState { ctx, output_schema, catalog: catalog.to_string(), diff --git a/src/query/storages/fuse/src/operations/inverted_index.rs b/src/query/storages/fuse/src/operations/inverted_index.rs index fbbd51e1ac50..ac9611850c21 100644 --- a/src/query/storages/fuse/src/operations/inverted_index.rs +++ b/src/query/storages/fuse/src/operations/inverted_index.rs @@ -183,7 +183,7 @@ impl FuseTable { }); pipeline.try_resize(1)?; - pipeline.add_sink(|input| InvertedIndexSink::try_create(input, ctx.clone(), block_nums))?; + pipeline.add_sink(|input| InvertedIndexSink::try_create(input, block_nums))?; Ok(()) } @@ -319,12 +319,8 @@ pub struct InvertedIndexSink { } impl InvertedIndexSink { - pub fn try_create( - input: Arc, - ctx: Arc, - block_nums: usize, - ) -> Result { - let sinker = AsyncSinker::create(input, ctx, InvertedIndexSink { + pub fn try_create(input: Arc, block_nums: usize) -> Result { + let sinker = AsyncSinker::create(input, InvertedIndexSink { block_nums: AtomicUsize::new(block_nums), }); Ok(ProcessorPtr::create(sinker)) diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index e7caaffb76d2..32bbb4f4cfa0 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -335,8 +335,7 @@ impl MatchedAggregator { )); } - let query_id = aggregation_ctx.ctx.get_id(); - let handle = io_runtime.spawn(query_id, async move { + let handle = io_runtime.spawn(async move { let mutation_log_entry = aggregation_ctx .apply_update_and_deletion_to_data_block( segment_idx, @@ -397,7 +396,6 @@ impl AggregationContext { &self.block_reader, block_meta, &self.read_settings, - self.ctx.get_id(), ) .await?; let origin_num_rows = origin_data_block.num_rows(); @@ -439,17 +437,17 @@ impl AggregationContext { let origin_stats = block_meta.cluster_stats.clone(); let serialized = GlobalIORuntime::instance() - .spawn(self.ctx.get_id(), async move { - block_builder.build(res_block, |block, generator| { - let cluster_stats = - generator.gen_with_origin_stats(&block, origin_stats.clone())?; - info!( - "serialize block after get cluster_stats:\n {:?}", - cluster_stats - ); - Ok((cluster_stats, block)) - }) - }) + .spawn(async move { + block_builder.build(res_block, |block, generator| { + let cluster_stats = + generator.gen_with_origin_stats(&block, origin_stats.clone())?; + info!( + "serialize block after get cluster_stats:\n {:?}", + cluster_stats + ); + Ok((cluster_stats, block)) + }) + }) .await .map_err(|e| { ErrorCode::Internal( diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs index 41414970aee5..373b97b247eb 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs @@ -254,25 +254,21 @@ impl BlockCompactMutator { let semaphore = semaphore.clone(); let batch = lazy_parts.drain(0..batch_size).collect::>(); - works.push({ - let ctx = ctx.clone(); - async move { - let mut res = vec![]; - for lazy_part in batch { - let mut builder = - CompactTaskBuilder::new(column_ids.clone(), cluster_key_id, thresholds); - let parts = builder - .build_tasks( - ctx.clone(), - lazy_part.segment_indices, - lazy_part.compact_segments, - semaphore.clone(), - ) - .await?; - res.extend(parts); - } - Ok::<_, ErrorCode>(res) + works.push(async move { + let mut res = vec![]; + for lazy_part in batch { + let mut builder = + CompactTaskBuilder::new(column_ids.clone(), cluster_key_id, thresholds); + let parts = builder + .build_tasks( + lazy_part.segment_indices, + lazy_part.compact_segments, + semaphore.clone(), + ) + .await?; + res.extend(parts); } + Ok::<_, ErrorCode>(res) }); } @@ -499,7 +495,6 @@ impl CompactTaskBuilder { // through the blocks, and finds the blocks >= N and blocks < 2N as a task. async fn build_tasks( &mut self, - ctx: Arc, segment_indices: Vec, compact_segments: Vec>, semaphore: Arc, @@ -514,7 +509,7 @@ impl CompactTaskBuilder { let mut handlers = Vec::with_capacity(compact_segments.len()); for segment in compact_segments.into_iter().rev() { let permit = acquire_task_permit(semaphore.clone()).await?; - let handler = runtime.spawn(ctx.get_id(), async move { + let handler = runtime.spawn(async move { let blocks = segment.block_metas()?; drop(permit); Ok::<_, ErrorCode>((blocks, segment.summary.clone())) diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 121af5741766..997a3ee73963 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -16,7 +16,6 @@ use std::sync::Arc; use std::time::Instant; use databend_common_base::runtime::TrySpawn; -use databend_common_base::GLOBAL_TASK; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; @@ -262,7 +261,7 @@ impl FuseTable { remain -= gap_size; let batch = segment_locs.drain(0..batch_size).collect::>(); - works.push(pruning_ctx.pruning_runtime.spawn(GLOBAL_TASK, { + works.push(pruning_ctx.pruning_runtime.spawn({ let segment_pruner = segment_pruner.clone(); async move { diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs index 25dae2c188da..5aac4671ab03 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs @@ -99,14 +99,12 @@ struct AggregationContext { segment_reader: CompactSegmentInfoReader, block_builder: BlockBuilder, io_request_semaphore: Arc, - query_id: String, // generate stream columns if necessary stream_ctx: Option, } // Apply MergeIntoOperations to segments pub struct MergeIntoOperationAggregator { - ctx: Arc, deletion_accumulator: DeletionAccumulator, aggregation_ctx: Arc, } @@ -182,7 +180,6 @@ impl MergeIntoOperationAggregator { Some(reader) } }; - let query_id = ctx.get_id(); let stream_ctx = if update_stream_columns { Some(StreamContext::try_create( @@ -196,7 +193,6 @@ impl MergeIntoOperationAggregator { }; Ok(Self { - ctx, deletion_accumulator, aggregation_ctx: Arc::new(AggregationContext { segment_locations: AHashMap::from_iter(segment_locations), @@ -212,7 +208,6 @@ impl MergeIntoOperationAggregator { segment_reader, block_builder, io_request_semaphore, - query_id, stream_ctx, }), }) @@ -350,7 +345,7 @@ impl MergeIntoOperationAggregator { let aggregation_ctx = aggregation_ctx.clone(); num_rows_mutated += block_meta.row_count; // self.aggregation_ctx. - let handle = io_runtime.spawn(self.ctx.get_id(), async move { + let handle = io_runtime.spawn(async move { let mutation_log_entry = aggregation_ctx .apply_deletion_to_data_block(segment_idx, block_index, &block_meta, &keys) .await?; @@ -422,7 +417,6 @@ impl AggregationContext { &self.key_column_reader, block_meta, &self.read_settings, - self.query_id.clone(), ) .await?; @@ -554,7 +548,7 @@ impl AggregationContext { let block_builder = self.block_builder.clone(); let origin_stats = block_meta.cluster_stats.clone(); let serialized = GlobalIORuntime::instance() - .spawn(self.query_id.clone(), async move { + .spawn(async move { block_builder.build(new_block, |block, generator| { let cluster_stats = generator.gen_with_origin_stats(&block, origin_stats.clone())?; @@ -653,17 +647,17 @@ impl AggregationContext { let block_meta_ptr = block_meta.clone(); let reader = reader.clone(); GlobalIORuntime::instance() - .spawn(self.query_id.clone(), async move { - let column_chunks = merged_io_read_result.columns_chunks()?; - reader.deserialize_chunks( - block_meta_ptr.location.0.as_str(), - block_meta_ptr.row_count as usize, - &block_meta_ptr.compression, - &block_meta_ptr.col_metas, - column_chunks, - &storage_format, - ) - }) + .spawn(async move { + let column_chunks = merged_io_read_result.columns_chunks()?; + reader.deserialize_chunks( + block_meta_ptr.location.0.as_str(), + block_meta_ptr.row_count as usize, + &block_meta_ptr.compression, + &block_meta_ptr.col_metas, + column_chunks, + &storage_format, + ) + }) .await .map_err(|e| { ErrorCode::Internal( diff --git a/src/query/storages/fuse/src/operations/util.rs b/src/query/storages/fuse/src/operations/util.rs index db3c161f11a9..248d07aeca34 100644 --- a/src/query/storages/fuse/src/operations/util.rs +++ b/src/query/storages/fuse/src/operations/util.rs @@ -136,7 +136,6 @@ pub async fn read_block( reader: &BlockReader, block_meta: &BlockMeta, read_settings: &ReadSettings, - query_id: String, ) -> Result { let merged_io_read_result = reader .read_columns_data_by_merge_io( @@ -153,7 +152,7 @@ pub async fn read_block( let reader = reader.clone(); GlobalIORuntime::instance() - .spawn(query_id, async move { + .spawn(async move { let column_chunks = merged_io_read_result.columns_chunks()?; reader.deserialize_chunks( block_meta_ptr.location.0.as_str(), diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index 786620214b07..8edad2f6e2b3 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -291,79 +291,72 @@ impl FusePruner { let mut batch = segment_locs.drain(0..batch_size).collect::>(); let inverse_range_index = self.get_inverse_range_index(); - works.push( - self.pruning_ctx - .pruning_runtime - .spawn(self.pruning_ctx.ctx.get_id(), { - let block_pruner = block_pruner.clone(); - let segment_pruner = segment_pruner.clone(); - let pruning_ctx = self.pruning_ctx.clone(); - - async move { - // Build pruning tasks. - if let Some(internal_column_pruner) = - &pruning_ctx.internal_column_pruner - { - batch = batch - .into_iter() - .filter(|segment| { - internal_column_pruner - .should_keep(SEGMENT_NAME_COL_NAME, &segment.location.0) - }) - .collect::>(); - } - - let mut res = vec![]; - let mut deleted_segments = vec![]; - let pruned_segments = segment_pruner.pruning(batch).await?; - - if delete_pruning { - // inverse prun - for (segment_location, compact_segment_info) in &pruned_segments { - // for delete_prune - match inverse_range_index.as_ref() { - Some(range_index) => { - if !range_index.should_keep( - &compact_segment_info.summary.col_stats, - None, - ) { - deleted_segments.push(DeletedSegmentInfo { - index: segment_location.segment_idx, - summary: compact_segment_info.summary.clone(), - }) - } else { - res.extend( - block_pruner - .pruning( - segment_location.clone(), - compact_segment_info.block_metas()?, - ) - .await?, - ); - } - } - None => { - res.extend( - block_pruner - .pruning( - segment_location.clone(), - compact_segment_info.block_metas()?, - ) - .await?, - ); - } + works.push(self.pruning_ctx.pruning_runtime.spawn({ + let block_pruner = block_pruner.clone(); + let segment_pruner = segment_pruner.clone(); + let pruning_ctx = self.pruning_ctx.clone(); + + async move { + // Build pruning tasks. + if let Some(internal_column_pruner) = &pruning_ctx.internal_column_pruner { + batch = batch + .into_iter() + .filter(|segment| { + internal_column_pruner + .should_keep(SEGMENT_NAME_COL_NAME, &segment.location.0) + }) + .collect::>(); + } + + let mut res = vec![]; + let mut deleted_segments = vec![]; + let pruned_segments = segment_pruner.pruning(batch).await?; + + if delete_pruning { + // inverse prun + for (segment_location, compact_segment_info) in &pruned_segments { + // for delete_prune + match inverse_range_index.as_ref() { + Some(range_index) => { + if !range_index + .should_keep(&compact_segment_info.summary.col_stats, None) + { + deleted_segments.push(DeletedSegmentInfo { + index: segment_location.segment_idx, + summary: compact_segment_info.summary.clone(), + }) + } else { + res.extend( + block_pruner + .pruning( + segment_location.clone(), + compact_segment_info.block_metas()?, + ) + .await?, + ); } } - } else { - for (location, info) in pruned_segments { - let block_metas = info.block_metas()?; - res.extend(block_pruner.pruning(location, block_metas).await?); + None => { + res.extend( + block_pruner + .pruning( + segment_location.clone(), + compact_segment_info.block_metas()?, + ) + .await?, + ); } } - Result::<_, ErrorCode>::Ok((res, deleted_segments)) } - }), - ); + } else { + for (location, info) in pruned_segments { + let block_metas = info.block_metas()?; + res.extend(block_pruner.pruning(location, block_metas).await?); + } + } + Result::<_, ErrorCode>::Ok((res, deleted_segments)) + } + })); } match futures::future::try_join_all(works).await { @@ -407,29 +400,25 @@ impl FusePruner { remain -= gap_size; let batch = block_metas.drain(0..batch_size).collect::>(); - works.push( - self.pruning_ctx - .pruning_runtime - .spawn(self.pruning_ctx.ctx.get_id(), { - let block_pruner = block_pruner.clone(); - async move { - // Build pruning tasks. - let res = block_pruner - .pruning( - // unused segment location. - SegmentLocation { - segment_idx, - location: ("".to_string(), 0), - snapshot_loc: None, - }, - batch, - ) - .await?; - - Result::<_, ErrorCode>::Ok(res) - } - }), - ); + works.push(self.pruning_ctx.pruning_runtime.spawn({ + let block_pruner = block_pruner.clone(); + async move { + // Build pruning tasks. + let res = block_pruner + .pruning( + // unused segment location. + SegmentLocation { + segment_idx, + location: ("".to_string(), 0), + snapshot_loc: None, + }, + batch, + ) + .await?; + + Result::<_, ErrorCode>::Ok(res) + } + })); segment_idx += 1; } diff --git a/src/query/storages/stage/src/read/row_based/formats/ndjson/block_builder.rs b/src/query/storages/stage/src/read/row_based/formats/ndjson/block_builder.rs index 544a2a985ca9..43380cb5f051 100644 --- a/src/query/storages/stage/src/read/row_based/formats/ndjson/block_builder.rs +++ b/src/query/storages/stage/src/read/row_based/formats/ndjson/block_builder.rs @@ -51,9 +51,7 @@ impl NdJsonDecoder { null_if: &[&str], ) -> std::result::Result<(), FileParseError> { let mut json: serde_json::Value = - serde_json::from_reader(buf).map_err(|e| FileParseError::InvalidNDJsonRow { - message: e.to_string(), - })?; + serde_json::from_reader(buf).map_err(|e| map_json_error(e, buf))?; // todo: this is temporary if self.field_decoder.is_select { self.field_decoder @@ -197,3 +195,64 @@ impl RowDecoder for NdJsonDecoder { Ok(vec![]) } } + +// The origin JSON error format "{} at line {} column {}" is misleading for NDJSON. +// - rm `line {}` +// - rename `column {}` to `pos {}`, 1-based to 0 based +// - add info for size and next byte +// +// Use test in case of changes of serde_json. +fn map_json_error(err: serde_json::Error, data: &[u8]) -> FileParseError { + let pos = if err.column() > 0 { + err.column() - 1 + } else { + err.column() + }; + let len = data.len(); + + let mut message = err.to_string(); + if let Some(p) = message.rfind(" at line") { + message = message[..p].to_string() + } + message = format!("{message} at pos {pos} of size {len}"); + if err.column() < len { + message = format!("{message}, next byte is '{}'", data[pos] as char) + } + FileParseError::InvalidNDJsonRow { message } +} + +#[cfg(test)] +mod test { + use super::map_json_error; + use super::FileParseError; + + fn decode_err(data: &str) -> String { + serde_json::from_slice::(data.as_bytes()) + .map_err(|e| { + let e = map_json_error(e, data.as_bytes()); + if let FileParseError::InvalidNDJsonRow { message } = e { + message + } else { + unreachable!() + } + }) + .err() + .unwrap() + } + + #[test] + fn test_json_decode_error() { + assert_eq!( + decode_err("{").as_str(), + "EOF while parsing an object at pos 0 of size 1" + ); + assert_eq!( + decode_err("").as_str(), + "EOF while parsing a value at pos 0 of size 0" + ); + assert_eq!( + decode_err("{\"k\"-}").as_str(), + "expected `:` at pos 4 of size 6, next byte is '-'" + ); + } +} diff --git a/src/query/storages/system/src/streams_table.rs b/src/query/storages/system/src/streams_table.rs index def3f51883aa..b045f6cc13c1 100644 --- a/src/query/storages/system/src/streams_table.rs +++ b/src/query/storages/system/src/streams_table.rs @@ -232,7 +232,7 @@ impl AsyncSystemTable for StreamsTable { let permit = acquire_task_permit(io_request_semaphore.clone()).await?; let ctx = ctx.clone(); let table = table.clone(); - let handler = runtime.spawn(ctx.get_id(), async move { + let handler = runtime.spawn(async move { let mut reason = "".to_string(); // safe unwrap. let stream_table = diff --git a/tests/sqllogictests/suites/stage/formats/ndjson/ndjson_on_error.test b/tests/sqllogictests/suites/stage/formats/ndjson/ndjson_on_error.test index ebb984f76e3c..054626669ba9 100644 --- a/tests/sqllogictests/suites/stage/formats/ndjson/ndjson_on_error.test +++ b/tests/sqllogictests/suites/stage/formats/ndjson/ndjson_on_error.test @@ -7,8 +7,8 @@ CREATE TABLE wrong_ndjson (a Boolean, b Int, c Float, d String, e Date, f Timest query copy /*+ set_var(max_threads=1) */ into wrong_ndjson from @data/ndjson/ pattern = 'wrong_sample.*[.]ndjson' file_format = (type = NDJSON) ON_ERROR=continue ---- -ndjson/wrong_sample.ndjson 3 1 Invalid JSON row: key must be a string at line 1 column 89 2 -ndjson/wrong_sample2.ndjson 3 1 Invalid JSON row: key must be a string at line 1 column 89 2 +ndjson/wrong_sample.ndjson 3 1 Invalid JSON row: key must be a string at pos 88 of size 114, next byte is 'h' 2 +ndjson/wrong_sample2.ndjson 3 1 Invalid JSON row: key must be a string at pos 88 of size 114, next byte is 'h' 2 query select * from wrong_ndjson order by a