Skip to content

Commit

Permalink
improve settings
Browse files Browse the repository at this point in the history
  • Loading branch information
Dousir9 committed Sep 22, 2023
1 parent 82dd7ac commit 0e8e35a
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 18 deletions.
1 change: 1 addition & 0 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ impl FragmentCoordinator {
let pipeline_ctx = QueryContext::create_from(ctx);
let pipeline_builder = PipelineBuilder::create(
pipeline_ctx.get_function_context()?,
pipeline_ctx.get_settings(),
pipeline_ctx,
enable_profiling,
SharedProcessorProfiles::default(),
Expand Down
40 changes: 22 additions & 18 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ use common_pipeline_transforms::processors::transforms::build_full_sort_pipeline
use common_pipeline_transforms::processors::transforms::create_dummy_item;
use common_pipeline_transforms::processors::transforms::Transformer;
use common_profile::SharedProcessorProfiles;
use common_settings::Settings;
use common_sql::evaluator::BlockOperator;
use common_sql::evaluator::CompoundBlockOperator;
use common_sql::executor::AggregateExpand;
Expand Down Expand Up @@ -177,6 +178,7 @@ pub struct PipelineBuilder {
main_pipeline: Pipeline,
pub pipelines: Vec<Pipeline>,
func_ctx: FunctionContext,
settings: Arc<Settings>,

// Used in runtime filter source
pub join_state: Option<Arc<HashJoinBuildState>>,
Expand All @@ -194,6 +196,7 @@ pub struct PipelineBuilder {
impl PipelineBuilder {
pub fn create(
func_ctx: FunctionContext,
settings: Arc<Settings>,
ctx: Arc<QueryContext>,
enable_profiling: bool,
prof_span_set: SharedProcessorProfiles,
Expand All @@ -202,6 +205,7 @@ impl PipelineBuilder {
enable_profiling,
ctx,
func_ctx,
settings,
pipelines: vec![],
join_state: None,
main_pipeline: Pipeline::create(),
Expand Down Expand Up @@ -527,7 +531,7 @@ impl PipelineBuilder {
self.main_pipeline.add_pipe(builder.finalize());
}

let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?;
let max_io_request = self.settings.get_max_storage_io_requests()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize));

let pipe_items = vec![
Expand Down Expand Up @@ -572,7 +576,7 @@ impl PipelineBuilder {
block_slots,
need_insert,
} = replace;
let max_threads = self.ctx.get_settings().get_max_threads()?;
let max_threads = self.settings.get_max_threads()?;
let segment_partition_num = std::cmp::min(segments.len(), max_threads as usize);
let table = self
.ctx
Expand Down Expand Up @@ -607,7 +611,7 @@ impl PipelineBuilder {
.add_pipe(Pipe::create(1, segment_partition_num, vec![
broadcast_processor.into_pipe_item(),
]));
let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?;
let max_io_request = self.settings.get_max_storage_io_requests()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize));

let merge_into_operation_aggregators = table.merge_into_mutators(
Expand Down Expand Up @@ -688,7 +692,7 @@ impl PipelineBuilder {
// setup the dummy transform
pipe_items.push(serialize_segment_transform.into_pipe_item());

let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?;
let max_io_request = self.settings.get_max_storage_io_requests()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize));

// setup the merge into operation aggregators
Expand Down Expand Up @@ -718,10 +722,9 @@ impl PipelineBuilder {
}

fn build_async_sourcer(&mut self, async_sourcer: &AsyncSourcerPlan) -> Result<()> {
let settings = self.ctx.get_settings();
self.main_pipeline.add_source(
|output| {
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
let name_resolution_ctx = NameResolutionContext::try_from(self.settings.as_ref())?;
let inner = ValueSource::new(
async_sourcer.value_data.clone(),
self.ctx.clone(),
Expand Down Expand Up @@ -869,7 +872,7 @@ impl PipelineBuilder {
state: Arc<RangeJoinState>,
) -> Result<()> {
self.build_pipeline(&range_join.left)?;
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
let max_threads = self.settings.get_max_threads()? as usize;
self.main_pipeline.try_resize(max_threads)?;
self.main_pipeline.add_transform(|input, output| {
let transform = TransformRangeJoinLeft::create(input, output, state.clone());
Expand All @@ -894,6 +897,7 @@ impl PipelineBuilder {
let right_side_context = QueryContext::create_from(self.ctx.clone());
let mut right_side_builder = PipelineBuilder::create(
self.func_ctx.clone(),
self.settings.clone(),
right_side_context,
self.enable_profiling,
self.proc_profs.clone(),
Expand Down Expand Up @@ -946,6 +950,7 @@ impl PipelineBuilder {
let build_side_context = QueryContext::create_from(self.ctx.clone());
let mut build_side_builder = PipelineBuilder::create(
self.func_ctx.clone(),
self.settings.clone(),
build_side_context,
self.enable_profiling,
self.proc_profs.clone(),
Expand All @@ -964,7 +969,7 @@ impl PipelineBuilder {
build_res.main_pipeline.output_len(),
)?;
let create_sink_processor = |input| {
let spill_state = if self.ctx.get_settings().get_enable_join_spill()? {
let spill_state = if self.settings.get_enable_join_spill()? {
Some(Box::new(BuildSpillState::create(
self.ctx.clone(),
spill_coordinator.clone(),
Expand Down Expand Up @@ -1110,7 +1115,7 @@ impl PipelineBuilder {
}

fn build_cte_scan(&mut self, cte_scan: &CteScan) -> Result<()> {
let max_threads = self.ctx.get_settings().get_max_threads()?;
let max_threads = self.settings.get_max_threads()?;
self.main_pipeline.add_source(
|output| {
MaterializedCteSource::create(
Expand Down Expand Up @@ -1393,9 +1398,7 @@ impl PipelineBuilder {
});
}

// let is_standalone = self.ctx.get_cluster().is_empty();
let settings = self.ctx.get_settings();
let efficiently_memory = settings.get_efficiently_memory_group_by()?;
let efficiently_memory = self.settings.get_efficiently_memory_group_by()?;

let group_cols = &params.group_columns;
let schema_before_group_by = params.input_schema.clone();
Expand Down Expand Up @@ -1530,8 +1533,7 @@ impl PipelineBuilder {
return Ok(());
}

let settings = self.ctx.get_settings();
let efficiently_memory = settings.get_efficiently_memory_group_by()?;
let efficiently_memory = self.settings.get_efficiently_memory_group_by()?;

let group_cols = &params.group_columns;
let schema_before_group_by = params.input_schema.clone();
Expand Down Expand Up @@ -1807,8 +1809,8 @@ impl PipelineBuilder {
limit: Option<usize>,
after_exchange: bool,
) -> Result<()> {
let block_size = self.ctx.get_settings().get_max_block_size()? as usize;
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
let block_size = self.settings.get_max_block_size()? as usize;
let max_threads = self.settings.get_max_threads()? as usize;

// TODO(Winter): the query will hang in MultiSortMergeProcessor when max_threads == 1 and output_len != 1
if self.main_pipeline.output_len() == 1 || max_threads == 1 {
Expand Down Expand Up @@ -1870,7 +1872,7 @@ impl PipelineBuilder {
fn build_join_probe(&mut self, join: &HashJoin, state: Arc<HashJoinState>) -> Result<()> {
self.build_pipeline(&join.probe)?;

let max_block_size = self.ctx.get_settings().get_max_block_size()? as usize;
let max_block_size = self.settings.get_max_block_size()? as usize;

let probe_state = Arc::new(HashJoinProbeState::create(
self.ctx.clone(),
Expand All @@ -1888,7 +1890,7 @@ impl PipelineBuilder {
}

self.main_pipeline.add_transform(|input, output| {
let probe_spill_state = if self.ctx.get_settings().get_enable_join_spill()? {
let probe_spill_state = if self.settings.get_enable_join_spill()? {
Some(Box::new(ProbeSpillState::create(
self.ctx.clone(),
probe_state.clone(),
Expand Down Expand Up @@ -1962,6 +1964,7 @@ impl PipelineBuilder {
let union_ctx = QueryContext::create_from(self.ctx.clone());
let mut pipeline_builder = PipelineBuilder::create(
self.func_ctx.clone(),
self.settings.clone(),
union_ctx,
self.enable_profiling,
self.proc_profs.clone(),
Expand Down Expand Up @@ -2171,6 +2174,7 @@ impl PipelineBuilder {
self.cte_state.insert(cte_idx, state.clone());
let mut left_side_builder = PipelineBuilder::create(
self.func_ctx.clone(),
self.settings.clone(),
left_side_ctx,
self.enable_profiling,
self.proc_profs.clone(),
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/schedulers/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pub async fn build_local_pipeline(
) -> Result<PipelineBuildResult> {
let pipeline = PipelineBuilder::create(
ctx.get_function_context()?,
ctx.get_settings(),
ctx.clone(),
enable_profiling,
SharedProcessorProfiles::default(),
Expand Down

0 comments on commit 0e8e35a

Please sign in to comment.