Skip to content

Commit

Permalink
fix(query): fix thread leak and remove all usage (#17081)
Browse files Browse the repository at this point in the history
* fix(query): fix thread leak and remove all usage

* fix(query): fix thread leak and remove all usage

* fix(query): fix thread leak and remove all usage
  • Loading branch information
dqhl76 authored Dec 19, 2024
1 parent e131edf commit 327cd75
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 29 deletions.
3 changes: 0 additions & 3 deletions src/query/catalog/src/table_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::time::SystemTime;
use dashmap::DashMap;
use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::Runtime;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_exception::ResultExt;
Expand Down Expand Up @@ -374,8 +373,6 @@ pub trait TableContext: Send + Sync {
fn is_temp_table(&self, catalog_name: &str, database_name: &str, table_name: &str) -> bool;
fn get_shared_settings(&self) -> Arc<Settings>;

fn get_runtime(&self) -> Result<Arc<Runtime>>;

fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str);

async fn drop_m_cte_temp_table(&self) -> Result<()>;
Expand Down
5 changes: 0 additions & 5 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::TrySpawn;
use databend_common_base::JoinHandle;
use databend_common_catalog::catalog::CATALOG_DEFAULT;
Expand Down Expand Up @@ -1476,10 +1475,6 @@ impl TableContext for QueryContext {
.is_temp_table(database_name, table_name)
}

fn get_runtime(&self) -> Result<Arc<Runtime>> {
self.shared.try_get_runtime()
}

fn add_m_cte_temp_table(&self, database_name: &str, table_name: &str) {
self.m_cte_temp_table
.write()
Expand Down
5 changes: 0 additions & 5 deletions src/query/service/tests/it/sql/exec/get_table_bind_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use dashmap::DashMap;
use databend_common_base::base::tokio;
use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::Runtime;
use databend_common_catalog::catalog::Catalog;
use databend_common_catalog::cluster_info::Cluster;
use databend_common_catalog::database::Database;
Expand Down Expand Up @@ -997,10 +996,6 @@ impl TableContext for CtxDelegation {
false
}

fn get_runtime(&self) -> Result<Arc<Runtime>> {
todo!()
}

fn add_m_cte_temp_table(&self, _database_name: &str, _table_name: &str) {
todo!()
}
Expand Down
6 changes: 0 additions & 6 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use dashmap::DashMap;
use databend_common_base::base::tokio;
use databend_common_base::base::Progress;
use databend_common_base::base::ProgressValues;
use databend_common_base::runtime::Runtime;
use databend_common_catalog::catalog::Catalog;
use databend_common_catalog::cluster_info::Cluster;
use databend_common_catalog::database::Database;
Expand Down Expand Up @@ -876,11 +875,6 @@ impl TableContext for CtxDelegation {
fn is_temp_table(&self, _catalog_name: &str, _database_name: &str, _table_name: &str) -> bool {
false
}

fn get_runtime(&self) -> Result<Arc<Runtime>> {
todo!()
}

fn add_m_cte_temp_table(&self, _database_name: &str, _table_name: &str) {
todo!()
}
Expand Down
20 changes: 15 additions & 5 deletions src/query/service/tests/it/storages/fuse/pruning_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use std::sync::Arc;

use databend_common_ast::ast::Engine;
use databend_common_base::base::tokio;
use databend_common_base::runtime::GlobalIORuntime;
use databend_common_base::runtime::Runtime;
use databend_common_base::runtime::TrySpawn;
use databend_common_catalog::plan::PartInfoPtr;
use databend_common_catalog::plan::PartStatistics;
Expand Down Expand Up @@ -96,17 +98,25 @@ async fn apply_block_pruning(
)?;
prune_pipeline.set_max_threads(1);
prune_pipeline.set_on_init(move || {
ctx.get_runtime()?.try_spawn(
async move {
// We cannot use the runtime associated with the query to avoid increasing its lifetime.
GlobalIORuntime::instance().spawn(async move {
// avoid block global io runtime
let runtime = Runtime::with_worker_threads(2, None)?;

let join_handler = runtime.spawn(async move {
let segment_pruned_result =
fuse_pruner.clone().segment_pruning(segment_locs).await?;
for segment in segment_pruned_result {
let _ = segment_tx.send(Ok(segment)).await;
}
Ok::<_, ErrorCode>(())
},
None,
)?;
});
join_handler
.await
.unwrap()
.expect("Join error while in prune pipeline");
Ok::<_, ErrorCode>(())
});
Ok(())
});

Expand Down
17 changes: 12 additions & 5 deletions src/query/storages/fuse/src/operations/read_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,11 @@ impl FuseTable {
derterministic_cache_key.clone(),
)?;
prune_pipeline.set_on_init(move || {
ctx.get_runtime()?.try_spawn(
async move {
// We cannot use the runtime associated with the query to avoid increasing its lifetime.
GlobalIORuntime::instance().spawn(async move {
// avoid block global io runtime
let runtime = Runtime::with_worker_threads(2, None)?;
let join_handler = runtime.spawn(async move {
let segment_pruned_result =
pruner.clone().segment_pruning(lazy_init_segments).await?;
for segment in segment_pruned_result {
Expand All @@ -249,9 +252,13 @@ impl FuseTable {
}
}
Ok::<_, ErrorCode>(())
},
None,
)?;
});

if let Err(cause) = join_handler.await {
log::warn!("Join error while in prune pipeline, cause: {:?}", cause);
}
Ok::<_, ErrorCode>(())
});
Ok(())
});

Expand Down

0 comments on commit 327cd75

Please sign in to comment.