diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index aec249827a51..62f5ee0bf0a0 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -808,6 +808,7 @@ impl FragmentCoordinator { let pipeline_ctx = QueryContext::create_from(ctx); let pipeline_builder = PipelineBuilder::create( pipeline_ctx.get_function_context()?, + pipeline_ctx.get_settings(), pipeline_ctx, enable_profiling, SharedProcessorProfiles::default(), diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 126637880f26..c829c42b533e 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -65,6 +65,7 @@ use common_pipeline_transforms::processors::transforms::build_full_sort_pipeline use common_pipeline_transforms::processors::transforms::create_dummy_item; use common_pipeline_transforms::processors::transforms::Transformer; use common_profile::SharedProcessorProfiles; +use common_settings::Settings; use common_sql::evaluator::BlockOperator; use common_sql::evaluator::CompoundBlockOperator; use common_sql::executor::AggregateExpand; @@ -177,6 +178,7 @@ pub struct PipelineBuilder { main_pipeline: Pipeline, pub pipelines: Vec, func_ctx: FunctionContext, + settings: Arc, // Used in runtime filter source pub join_state: Option>, @@ -194,6 +196,7 @@ pub struct PipelineBuilder { impl PipelineBuilder { pub fn create( func_ctx: FunctionContext, + settings: Arc, ctx: Arc, enable_profiling: bool, prof_span_set: SharedProcessorProfiles, @@ -202,6 +205,7 @@ impl PipelineBuilder { enable_profiling, ctx, func_ctx, + settings, pipelines: vec![], join_state: None, main_pipeline: Pipeline::create(), @@ -527,7 +531,7 @@ impl PipelineBuilder { self.main_pipeline.add_pipe(builder.finalize()); } - let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?; + let max_io_request = self.settings.get_max_storage_io_requests()?; let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize)); let pipe_items = vec![ @@ -572,7 +576,7 @@ impl PipelineBuilder { block_slots, need_insert, } = replace; - let max_threads = self.ctx.get_settings().get_max_threads()?; + let max_threads = self.settings.get_max_threads()?; let segment_partition_num = std::cmp::min(segments.len(), max_threads as usize); let table = self .ctx @@ -607,7 +611,7 @@ impl PipelineBuilder { .add_pipe(Pipe::create(1, segment_partition_num, vec![ broadcast_processor.into_pipe_item(), ])); - let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?; + let max_io_request = self.settings.get_max_storage_io_requests()?; let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize)); let merge_into_operation_aggregators = table.merge_into_mutators( @@ -688,7 +692,7 @@ impl PipelineBuilder { // setup the dummy transform pipe_items.push(serialize_segment_transform.into_pipe_item()); - let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?; + let max_io_request = self.settings.get_max_storage_io_requests()?; let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize)); // setup the merge into operation aggregators @@ -718,10 +722,9 @@ impl PipelineBuilder { } fn build_async_sourcer(&mut self, async_sourcer: &AsyncSourcerPlan) -> Result<()> { - let settings = self.ctx.get_settings(); self.main_pipeline.add_source( |output| { - let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?; + let name_resolution_ctx = NameResolutionContext::try_from(self.settings.as_ref())?; let inner = ValueSource::new( async_sourcer.value_data.clone(), self.ctx.clone(), @@ -869,7 +872,7 @@ impl PipelineBuilder { state: Arc, ) -> Result<()> { self.build_pipeline(&range_join.left)?; - let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + let max_threads = self.settings.get_max_threads()? as usize; self.main_pipeline.try_resize(max_threads)?; self.main_pipeline.add_transform(|input, output| { let transform = TransformRangeJoinLeft::create(input, output, state.clone()); @@ -894,6 +897,7 @@ impl PipelineBuilder { let right_side_context = QueryContext::create_from(self.ctx.clone()); let mut right_side_builder = PipelineBuilder::create( self.func_ctx.clone(), + self.settings.clone(), right_side_context, self.enable_profiling, self.proc_profs.clone(), @@ -946,6 +950,7 @@ impl PipelineBuilder { let build_side_context = QueryContext::create_from(self.ctx.clone()); let mut build_side_builder = PipelineBuilder::create( self.func_ctx.clone(), + self.settings.clone(), build_side_context, self.enable_profiling, self.proc_profs.clone(), @@ -964,7 +969,7 @@ impl PipelineBuilder { build_res.main_pipeline.output_len(), )?; let create_sink_processor = |input| { - let spill_state = if self.ctx.get_settings().get_enable_join_spill()? { + let spill_state = if self.settings.get_enable_join_spill()? { Some(Box::new(BuildSpillState::create( self.ctx.clone(), spill_coordinator.clone(), @@ -1110,7 +1115,7 @@ impl PipelineBuilder { } fn build_cte_scan(&mut self, cte_scan: &CteScan) -> Result<()> { - let max_threads = self.ctx.get_settings().get_max_threads()?; + let max_threads = self.settings.get_max_threads()?; self.main_pipeline.add_source( |output| { MaterializedCteSource::create( @@ -1393,9 +1398,7 @@ impl PipelineBuilder { }); } - // let is_standalone = self.ctx.get_cluster().is_empty(); - let settings = self.ctx.get_settings(); - let efficiently_memory = settings.get_efficiently_memory_group_by()?; + let efficiently_memory = self.settings.get_efficiently_memory_group_by()?; let group_cols = ¶ms.group_columns; let schema_before_group_by = params.input_schema.clone(); @@ -1530,8 +1533,7 @@ impl PipelineBuilder { return Ok(()); } - let settings = self.ctx.get_settings(); - let efficiently_memory = settings.get_efficiently_memory_group_by()?; + let efficiently_memory = self.settings.get_efficiently_memory_group_by()?; let group_cols = ¶ms.group_columns; let schema_before_group_by = params.input_schema.clone(); @@ -1807,8 +1809,8 @@ impl PipelineBuilder { limit: Option, after_exchange: bool, ) -> Result<()> { - let block_size = self.ctx.get_settings().get_max_block_size()? as usize; - let max_threads = self.ctx.get_settings().get_max_threads()? as usize; + let block_size = self.settings.get_max_block_size()? as usize; + let max_threads = self.settings.get_max_threads()? as usize; // TODO(Winter): the query will hang in MultiSortMergeProcessor when max_threads == 1 and output_len != 1 if self.main_pipeline.output_len() == 1 || max_threads == 1 { @@ -1870,7 +1872,7 @@ impl PipelineBuilder { fn build_join_probe(&mut self, join: &HashJoin, state: Arc) -> Result<()> { self.build_pipeline(&join.probe)?; - let max_block_size = self.ctx.get_settings().get_max_block_size()? as usize; + let max_block_size = self.settings.get_max_block_size()? as usize; let probe_state = Arc::new(HashJoinProbeState::create( self.ctx.clone(), @@ -1888,7 +1890,7 @@ impl PipelineBuilder { } self.main_pipeline.add_transform(|input, output| { - let probe_spill_state = if self.ctx.get_settings().get_enable_join_spill()? { + let probe_spill_state = if self.settings.get_enable_join_spill()? { Some(Box::new(ProbeSpillState::create( self.ctx.clone(), probe_state.clone(), @@ -1962,6 +1964,7 @@ impl PipelineBuilder { let union_ctx = QueryContext::create_from(self.ctx.clone()); let mut pipeline_builder = PipelineBuilder::create( self.func_ctx.clone(), + self.settings.clone(), union_ctx, self.enable_profiling, self.proc_profs.clone(), @@ -2171,6 +2174,7 @@ impl PipelineBuilder { self.cte_state.insert(cte_idx, state.clone()); let mut left_side_builder = PipelineBuilder::create( self.func_ctx.clone(), + self.settings.clone(), left_side_ctx, self.enable_profiling, self.proc_profs.clone(), diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index 445f1b485769..0b0a1785e2a1 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -80,6 +80,7 @@ pub async fn build_local_pipeline( ) -> Result { let pipeline = PipelineBuilder::create( ctx.get_function_context()?, + ctx.get_settings(), ctx.clone(), enable_profiling, SharedProcessorProfiles::default(),