From 99f6a9fdd26fcbe36d4ebe3a5e8535f86fd81bbe Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Fri, 1 Nov 2024 16:41:29 +0800 Subject: [PATCH] try to find problem --- src/query/catalog/src/table_context.rs | 3 ++ src/query/service/src/sessions/query_ctx.rs | 5 +++ src/query/storages/fuse/src/fuse_table.rs | 9 ---- .../storages/fuse/src/operations/read_data.rs | 42 ++++++++++++------- 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 631c0a493b90..a7c0997aaa64 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -24,6 +24,7 @@ use std::time::SystemTime; use dashmap::DashMap; use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; +use databend_common_base::runtime::Runtime; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ResultExt; @@ -384,4 +385,6 @@ pub trait TableContext: Send + Sync { fn is_temp_table(&self, catalog_name: &str, database_name: &str, table_name: &str) -> bool; fn get_shared_settings(&self) -> Arc; + + fn get_runtime(&self) -> Result>; } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 375d7ae9e2dc..bdf4b7866eda 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -36,6 +36,7 @@ use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; use databend_common_base::JoinHandle; use databend_common_catalog::catalog::CATALOG_DEFAULT; @@ -1448,6 +1449,10 @@ impl TableContext for QueryContext { .lock() .is_temp_table(database_name, table_name) } + + fn get_runtime(&self) -> Result> { + self.shared.try_get_runtime() + } } impl TrySpawn for QueryContext { diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 13fe228b40e5..6089bba69293 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -23,7 +23,6 @@ use std::sync::Arc; use chrono::Duration; use chrono::TimeDelta; -use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::StorageDescription; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartStatistics; @@ -133,8 +132,6 @@ pub struct FuseTable { // If this is set, reading from fuse_table should only returns the increment blocks pub(crate) changes_desc: Option, - - pub(crate) runtime: Arc, } impl FuseTable { @@ -227,11 +224,6 @@ impl FuseTable { let meta_location_generator = TableMetaLocationGenerator::with_prefix(storage_prefix).with_part_prefix(part_prefix); - let runtime = Arc::new(Runtime::with_worker_threads( - 2, - Some(String::from("PruneSnapshot")), - )?); - Ok(Box::new(FuseTable { table_info, meta_location_generator, @@ -243,7 +235,6 @@ impl FuseTable { table_compression: table_compression.as_str().try_into()?, table_type, changes_desc: None, - runtime, })) } diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index c384849ac011..92157e39e9cd 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -225,25 +225,35 @@ impl FuseTable { let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); pipeline.set_on_init(move || { - table.runtime.clone().spawn(async move { - match table - .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) - .await - { - Ok((_, partitions)) => { - for part in partitions.partitions { - // ignore the error, the sql may be killed or early stop - let _ = sender.send(Ok(part)).await; + ctx.get_runtime()?.try_spawn( + async move { + match table + .prune_snapshot_blocks( + ctx, + push_downs, + table_schema, + lazy_init_segments, + 0, + ) + .await + { + Ok((_, partitions)) => { + for part in partitions.partitions { + // ignore the error, the sql may be killed or early stop + let _ = sender.send(Ok(part)).await; + } + } + Err(err) => { + let _ = sender.send(Err(err)).await; } } - Err(err) => { - let _ = sender.send(Err(err)).await; - } - } - Ok::<_, ErrorCode>(()) - }); + Ok::<_, ErrorCode>(()) + }, + None, + )?; + Ok(()) - }) + }); } Ok(())