Skip to content

Commit

Permalink
Merge branch 'main' into feature_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass authored Dec 19, 2024
2 parents af24147 + e131edf commit 7c7b239
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 12 deletions.
22 changes: 15 additions & 7 deletions src/query/storages/fuse/src/operations/read_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
use std::sync::Arc;

use async_channel::Receiver;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::PartInfoPtr;
Expand All @@ -23,7 +25,6 @@ use databend_common_catalog::plan::PushDownInfo;
use databend_common_catalog::plan::TopK;
use databend_common_catalog::table::Table;
use databend_common_catalog::table_context::TableContext;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_functions::BUILTIN_FUNCTIONS;
use databend_common_pipeline_core::Pipeline;
Expand Down Expand Up @@ -205,8 +206,11 @@ impl FuseTable {
let ctx = ctx.clone();
let (tx, rx) = async_channel::bounded(max_io_requests);
pipeline.set_on_init(move || {
ctx.get_runtime()?.try_spawn(
async move {
// We cannot use the runtime associated with the query to avoid increasing its lifetime.
GlobalIORuntime::instance().spawn(async move {
// avoid block global io runtime
let runtime = Runtime::with_worker_threads(2, None)?;
let handler = runtime.spawn(async move {
match table
.prune_snapshot_blocks(
ctx,
Expand All @@ -229,10 +233,14 @@ impl FuseTable {
let _ = tx.send(Err(err)).await;
}
}
Ok::<_, ErrorCode>(())
},
None,
)?;
});

if let Err(cause) = handler.await {
log::warn!("Join error while in prune pipeline, cause: {:?}", cause);
}

Result::Ok(())
});

Ok(())
});
Expand Down
21 changes: 16 additions & 5 deletions src/query/storages/fuse/src/operations/read_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use std::time::Instant;

use async_channel::Receiver;
use async_channel::Sender;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::plan::DataSourcePlan;
use databend_common_catalog::plan::PartInfoPtr;
Expand Down Expand Up @@ -195,17 +197,26 @@ impl FuseTable {
let sender = part_info_tx.clone();
info!("prune pipeline: get prune result from cache");
source_pipeline.set_on_init(move || {
ctx.get_runtime()?.try_spawn(
async move {
// We cannot use the runtime associated with the query to avoid increasing its lifetime.
GlobalIORuntime::instance().spawn(async move {
// avoid block global io runtime
let runtime = Runtime::with_worker_threads(2, None)?;

let join_handler = runtime.spawn(async move {
for part in part.partitions {
// the sql may be killed or early stop, ignore the error
if let Err(_e) = sender.send(Ok(part)).await {
break;
}
}
},
None,
)?;
});

if let Err(cause) = join_handler.await {
log::warn!("Join error while in prune pipeline, cause: {:?}", cause);
}

Result::Ok(())
});

Ok(())
});
Expand Down

0 comments on commit 7c7b239

Please sign in to comment.