diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index ded0fdc1f8200..75a9ca4dbf320 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -15,6 +15,8 @@ use std::sync::Arc; use async_channel::Receiver; +use databend_common_base::runtime::GlobalIORuntime; +use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; @@ -23,7 +25,6 @@ use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::plan::TopK; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_pipeline_core::Pipeline; @@ -205,8 +206,11 @@ impl FuseTable { let ctx = ctx.clone(); let (tx, rx) = async_channel::bounded(max_io_requests); pipeline.set_on_init(move || { - ctx.get_runtime()?.try_spawn( - async move { + // We cannot use the runtime associated with the query to avoid increasing its lifetime. + GlobalIORuntime::instance().spawn(async move { + // avoid block global io runtime + let runtime = Runtime::with_worker_threads(2, None)?; + let handler = runtime.spawn(async move { match table .prune_snapshot_blocks( ctx, @@ -229,10 +233,14 @@ impl FuseTable { let _ = tx.send(Err(err)).await; } } - Ok::<_, ErrorCode>(()) - }, - None, - )?; + }); + + if let Err(cause) = handler.await { + log::warn!("Join error while in prune pipeline, cause: {:?}", cause); + } + + Result::Ok(()) + }); Ok(()) }); diff --git a/src/query/storages/fuse/src/operations/read_partitions.rs b/src/query/storages/fuse/src/operations/read_partitions.rs index 926bf78bd80bb..3d53dcc5776af 100644 --- a/src/query/storages/fuse/src/operations/read_partitions.rs +++ b/src/query/storages/fuse/src/operations/read_partitions.rs @@ -18,6 +18,8 @@ use std::time::Instant; use async_channel::Receiver; use async_channel::Sender; +use databend_common_base::runtime::GlobalIORuntime; +use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; @@ -195,17 +197,26 @@ impl FuseTable { let sender = part_info_tx.clone(); info!("prune pipeline: get prune result from cache"); source_pipeline.set_on_init(move || { - ctx.get_runtime()?.try_spawn( - async move { + // We cannot use the runtime associated with the query to avoid increasing its lifetime. + GlobalIORuntime::instance().spawn(async move { + // avoid block global io runtime + let runtime = Runtime::with_worker_threads(2, None)?; + + let join_handler = runtime.spawn(async move { for part in part.partitions { // the sql may be killed or early stop, ignore the error if let Err(_e) = sender.send(Ok(part)).await { break; } } - }, - None, - )?; + }); + + if let Err(cause) = join_handler.await { + log::warn!("Join error while in prune pipeline, cause: {:?}", cause); + } + + Result::Ok(()) + }); Ok(()) });