diff --git a/src/query/pipeline/core/src/lib.rs b/src/query/pipeline/core/src/lib.rs index 74a4e56830e8..731667028d39 100644 --- a/src/query/pipeline/core/src/lib.rs +++ b/src/query/pipeline/core/src/lib.rs @@ -33,3 +33,5 @@ pub use pipe::SourcePipeBuilder; pub use pipe::TransformPipeBuilder; pub use pipeline::query_spill_prefix; pub use pipeline::Pipeline; +pub use processors::PlanScope; +pub use processors::PlanScopeGuard; diff --git a/src/query/pipeline/core/src/pipe.rs b/src/query/pipeline/core/src/pipe.rs index 10bbd4463828..14f5a7a3e04f 100644 --- a/src/query/pipeline/core/src/pipe.rs +++ b/src/query/pipeline/core/src/pipe.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use crate::processors::InputPort; use crate::processors::OutputPort; use crate::processors::ProcessorPtr; +use crate::PlanScope; #[derive(Clone)] pub struct PipeItem { @@ -56,6 +57,7 @@ pub struct Pipe { pub items: Vec, pub input_length: usize, pub output_length: usize, + pub scope: Option, } impl Debug for Pipe { @@ -70,6 +72,7 @@ impl Pipe { items, input_length: inputs, output_length: outputs, + scope: None, } } } diff --git a/src/query/pipeline/core/src/pipeline.rs b/src/query/pipeline/core/src/pipeline.rs index 5ce38d86ed9a..77a651a1f0b8 100644 --- a/src/query/pipeline/core/src/pipeline.rs +++ b/src/query/pipeline/core/src/pipeline.rs @@ -14,6 +14,8 @@ use std::fmt::Debug; use std::fmt::Formatter; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; use std::sync::Arc; use common_exception::ErrorCode; @@ -21,6 +23,7 @@ use common_exception::Result; use crate::pipe::Pipe; use crate::pipe::PipeItem; +use crate::processors::profile::PlanScope; use crate::processors::DuplicateProcessor; use crate::processors::InputPort; use crate::processors::OutputPort; @@ -28,6 +31,7 @@ use crate::processors::ProcessorPtr; use crate::processors::ResizeProcessor; use crate::processors::ShuffleProcessor; use crate::LockGuard; +use crate::PlanScopeGuard; use crate::SinkPipeBuilder; use crate::SourcePipeBuilder; use crate::TransformPipeBuilder; @@ -56,6 +60,9 @@ pub struct Pipeline { on_init: Option, on_finished: Option, lock_guards: Vec, + + pub plans_scope: Vec, + scope_size: Arc, } impl Debug for Pipeline { @@ -77,6 +84,21 @@ impl Pipeline { on_init: None, on_finished: None, lock_guards: vec![], + plans_scope: vec![], + scope_size: Arc::new(AtomicUsize::new(0)), + } + } + + pub fn with_scopes(scope: Vec) -> Pipeline { + let scope_size = Arc::new(AtomicUsize::new(scope.len())); + Pipeline { + scope_size, + max_threads: 0, + pipes: Vec::new(), + on_init: None, + on_finished: None, + lock_guards: vec![], + plans_scope: scope, } } @@ -113,7 +135,41 @@ impl Pipeline { ) } - pub fn add_pipe(&mut self, pipe: Pipe) { + pub fn finalize(mut self) -> Pipeline { + for pipe in &mut self.pipes { + if let Some(uninitialized_scope) = &mut pipe.scope { + if uninitialized_scope.parent_id == 0 { + for (index, scope) in self.plans_scope.iter().enumerate() { + if scope.id == uninitialized_scope.id && index != 0 { + if let Some(parent_scope) = self.plans_scope.get(index - 1) { + uninitialized_scope.parent_id = parent_scope.id; + } + } + } + } + } + } + + self + } + + pub fn add_pipe(&mut self, mut pipe: Pipe) { + let (scope_idx, _) = self.scope_size.load(Ordering::SeqCst).overflowing_sub(1); + + if let Some(scope) = self.plans_scope.get_mut(scope_idx) { + // stack, new plan is always the parent node of previous node. + // set the parent node in 'add_pipe' helps skip empty plans(no pipeline). + for pipe in &mut self.pipes { + if let Some(children) = &mut pipe.scope { + if children.parent_id == 0 && children.id != scope.id { + children.parent_id = scope.id; + } + } + } + + pipe.scope = Some(scope.clone()); + } + self.pipes.push(pipe); } @@ -235,14 +291,13 @@ impl Pipeline { let processor = ResizeProcessor::create(pipe.output_length, new_size); let inputs_port = processor.get_inputs(); let outputs_port = processor.get_outputs(); - self.pipes - .push(Pipe::create(inputs_port.len(), outputs_port.len(), vec![ - PipeItem::create( - ProcessorPtr::create(Box::new(processor)), - inputs_port, - outputs_port, - ), - ])); + self.add_pipe(Pipe::create(inputs_port.len(), outputs_port.len(), vec![ + PipeItem::create( + ProcessorPtr::create(Box::new(processor)), + inputs_port, + outputs_port, + ), + ])); Ok(()) } } @@ -280,8 +335,7 @@ impl Pipeline { outputs_port, )); } - self.pipes - .push(Pipe::create(input_len, output_len, pipe_items)); + self.add_pipe(Pipe::create(input_len, output_len, pipe_items)); Ok(()) } } @@ -310,7 +364,7 @@ impl Pipeline { vec![output1, output2], )); } - self.pipes.push(Pipe::create( + self.add_pipe(Pipe::create( pipe.output_length, pipe.output_length * 2, items, @@ -341,14 +395,9 @@ impl Pipeline { outputs.push(OutputPort::create()); } let processor = ShuffleProcessor::create(inputs.clone(), outputs.clone(), rule); - self.pipes - .push(Pipe::create(inputs.len(), outputs.len(), vec![ - PipeItem::create( - ProcessorPtr::create(Box::new(processor)), - inputs, - outputs, - ), - ])); + self.add_pipe(Pipe::create(inputs.len(), outputs.len(), vec![ + PipeItem::create(ProcessorPtr::create(Box::new(processor)), inputs, outputs), + ])); } _ => {} } @@ -396,6 +445,20 @@ impl Pipeline { Some(on_finished) => on_finished, } } + + pub fn add_plan_scope(&mut self, scope: PlanScope) -> PlanScopeGuard { + let scope_idx = self.scope_size.fetch_add(1, Ordering::SeqCst); + + if self.plans_scope.len() > scope_idx { + self.plans_scope[scope_idx] = scope; + self.plans_scope.shrink_to(scope_idx + 1); + return PlanScopeGuard::create(self.scope_size.clone(), scope_idx); + } + + assert_eq!(self.plans_scope.len(), scope_idx); + self.plans_scope.push(scope); + PlanScopeGuard::create(self.scope_size.clone(), scope_idx) + } } impl Drop for Pipeline { diff --git a/src/query/pipeline/core/src/processors/mod.rs b/src/query/pipeline/core/src/processors/mod.rs index a1e4cb977222..5c51f4ee2392 100644 --- a/src/query/pipeline/core/src/processors/mod.rs +++ b/src/query/pipeline/core/src/processors/mod.rs @@ -32,6 +32,8 @@ pub use processor::Event; pub use processor::EventCause; pub use processor::Processor; pub use processor::ProcessorPtr; +pub use profile::PlanScope; +pub use profile::PlanScopeGuard; pub use resize_processor::create_resize_item; pub use resize_processor::ResizeProcessor; pub use shuffle_processor::ShuffleProcessor; diff --git a/src/query/pipeline/core/src/processors/profile.rs b/src/query/pipeline/core/src/processors/profile.rs index e556b1e5cb76..33ef11fc2244 100644 --- a/src/query/pipeline/core/src/processors/profile.rs +++ b/src/query/pipeline/core/src/processors/profile.rs @@ -13,6 +13,9 @@ // limitations under the License. use std::sync::atomic::AtomicU64; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering; +use std::sync::Arc; #[derive(Default)] pub struct Profile { @@ -21,6 +24,10 @@ pub struct Profile { /// The name of processor pub p_name: String, + pub plan_id: Option, + pub plan_name: Option, + pub plan_parent_id: Option, + /// The time spent to process in nanoseconds pub cpu_time: AtomicU64, /// The time spent to wait in nanoseconds, usually used to @@ -29,12 +36,53 @@ pub struct Profile { } impl Profile { - pub fn create(pid: usize, p_name: String) -> Profile { + pub fn create(pid: usize, p_name: String, scope: Option) -> Profile { Profile { pid, p_name, cpu_time: AtomicU64::new(0), wait_time: AtomicU64::new(0), + plan_id: scope.as_ref().map(|x| x.id), + plan_name: scope.as_ref().map(|x| x.name.clone()), + plan_parent_id: scope.as_ref().map(|x| x.parent_id), + } + } +} + +pub struct PlanScopeGuard { + idx: usize, + scope_size: Arc, +} + +impl PlanScopeGuard { + pub fn create(scope_size: Arc, idx: usize) -> PlanScopeGuard { + PlanScopeGuard { idx, scope_size } + } +} + +impl Drop for PlanScopeGuard { + fn drop(&mut self) { + if self.scope_size.fetch_sub(1, Ordering::SeqCst) != self.idx + 1 + && !std::thread::panicking() + { + panic!("Broken pipeline scope stack."); + } + } +} + +#[derive(Clone, Debug)] +pub struct PlanScope { + pub id: u32, + pub name: String, + pub parent_id: u32, +} + +impl PlanScope { + pub fn create(id: u32, name: String) -> PlanScope { + PlanScope { + id, + parent_id: 0, + name, } } } diff --git a/src/query/service/src/api/rpc/exchange/exchange_manager.rs b/src/query/service/src/api/rpc/exchange/exchange_manager.rs index 8bcb59da29a3..4e7a1eabe9d6 100644 --- a/src/query/service/src/api/rpc/exchange/exchange_manager.rs +++ b/src/query/service/src/api/rpc/exchange/exchange_manager.rs @@ -848,6 +848,7 @@ impl FragmentCoordinator { pipeline_ctx, enable_profiling, SharedProcessorProfiles::default(), + vec![], ); let res = pipeline_builder.finalize(&self.physical_plan)?; diff --git a/src/query/service/src/pipelines/builders/builder_join.rs b/src/query/service/src/pipelines/builders/builder_join.rs index fb717d332bd2..5ea3424fc816 100644 --- a/src/query/service/src/pipelines/builders/builder_join.rs +++ b/src/query/service/src/pipelines/builders/builder_join.rs @@ -106,6 +106,7 @@ impl PipelineBuilder { right_side_context, self.enable_profiling, self.proc_profs.clone(), + self.main_pipeline.plans_scope.clone(), ); right_side_builder.cte_state = self.cte_state.clone(); let mut right_res = right_side_builder.finalize(&range_join.right)?; @@ -124,7 +125,7 @@ impl PipelineBuilder { Ok(ProcessorPtr::create(transform)) } })?; - self.pipelines.push(right_res.main_pipeline); + self.pipelines.push(right_res.main_pipeline.finalize()); self.pipelines.extend(right_res.sources_pipelines); Ok(()) } @@ -158,6 +159,7 @@ impl PipelineBuilder { build_side_context, self.enable_profiling, self.proc_profs.clone(), + self.main_pipeline.plans_scope.clone(), ); build_side_builder.cte_state = self.cte_state.clone(); let mut build_res = build_side_builder.finalize(build)?; @@ -212,7 +214,7 @@ impl PipelineBuilder { build_res.main_pipeline.add_sink(create_sink_processor)?; } - self.pipelines.push(build_res.main_pipeline); + self.pipelines.push(build_res.main_pipeline.finalize()); self.pipelines.extend(build_res.sources_pipelines); Ok(()) } @@ -403,6 +405,7 @@ impl PipelineBuilder { left_side_ctx, self.enable_profiling, self.proc_profs.clone(), + self.main_pipeline.plans_scope.clone(), ); left_side_builder.cte_state = self.cte_state.clone(); let mut left_side_pipeline = left_side_builder.finalize(left_side)?; @@ -423,7 +426,8 @@ impl PipelineBuilder { ); Ok(ProcessorPtr::create(transform)) })?; - self.pipelines.push(left_side_pipeline.main_pipeline); + self.pipelines + .push(left_side_pipeline.main_pipeline.finalize()); self.pipelines.extend(left_side_pipeline.sources_pipelines); Ok(()) } diff --git a/src/query/service/src/pipelines/builders/builder_union_all.rs b/src/query/service/src/pipelines/builders/builder_union_all.rs index 0e45caa80e0f..3fd88e0c09ef 100644 --- a/src/query/service/src/pipelines/builders/builder_union_all.rs +++ b/src/query/service/src/pipelines/builders/builder_union_all.rs @@ -65,6 +65,7 @@ impl PipelineBuilder { union_ctx, self.enable_profiling, self.proc_profs.clone(), + self.main_pipeline.plans_scope.clone(), ); pipeline_builder.cte_state = self.cte_state.clone(); let mut build_res = pipeline_builder.finalize(input)?; @@ -88,7 +89,7 @@ impl PipelineBuilder { } })?; - self.pipelines.push(build_res.main_pipeline); + self.pipelines.push(build_res.main_pipeline.finalize()); self.pipelines.extend(build_res.sources_pipelines); Ok(rx) } diff --git a/src/query/service/src/pipelines/executor/executor_graph.rs b/src/query/service/src/pipelines/executor/executor_graph.rs index f296a7822df8..09440e63229a 100644 --- a/src/query/service/src/pipelines/executor/executor_graph.rs +++ b/src/query/service/src/pipelines/executor/executor_graph.rs @@ -26,6 +26,7 @@ use common_exception::Result; use common_pipeline_core::processors::profile::Profile; use common_pipeline_core::processors::EventCause; use common_pipeline_core::Pipeline; +use common_pipeline_core::PlanScope; use log::debug; use log::trace; use minitrace::prelude::*; @@ -76,6 +77,7 @@ pub(crate) struct Node { impl Node { pub fn create( pid: usize, + scope: Option, processor: &ProcessorPtr, inputs_port: &[Arc], outputs_port: &[Arc], @@ -87,7 +89,7 @@ impl Node { updated_list: UpdateList::create(), inputs_port: inputs_port.to_vec(), outputs_port: outputs_port.to_vec(), - profile: Arc::new(Profile::create(pid, p_name)), + profile: Arc::new(Profile::create(pid, p_name, scope)), }) } @@ -151,8 +153,13 @@ impl ExecutingGraph { for item in &pipe.items { let pid = graph.node_count(); - let node = - Node::create(pid, &item.processor, &item.inputs_port, &item.outputs_port); + let node = Node::create( + pid, + pipe.scope.clone(), + &item.processor, + &item.inputs_port, + &item.outputs_port, + ); let graph_node_index = graph.add_node(node.clone()); unsafe { diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index fe24e6476940..f337b4832386 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -20,6 +20,7 @@ use common_exception::Result; use common_expression::DataField; use common_expression::FunctionContext; use common_pipeline_core::Pipeline; +use common_pipeline_core::PlanScope; use common_profile::SharedProcessorProfiles; use common_settings::Settings; use common_sql::executor::PhysicalPlan; @@ -63,6 +64,7 @@ impl PipelineBuilder { ctx: Arc, enable_profiling: bool, prof_span_set: SharedProcessorProfiles, + scopes: Vec, ) -> PipelineBuilder { PipelineBuilder { enable_profiling, @@ -71,7 +73,7 @@ impl PipelineBuilder { settings, pipelines: vec![], join_state: None, - main_pipeline: Pipeline::create(), + main_pipeline: Pipeline::with_scopes(scopes), proc_profs: prof_span_set, exchange_injector: DefaultExchangeInjector::create(), index: None, @@ -104,6 +106,8 @@ impl PipelineBuilder { } pub(crate) fn build_pipeline(&mut self, plan: &PhysicalPlan) -> Result<()> { + let scope = PlanScope::create(plan.get_id(), plan.name()); + let _guard = self.main_pipeline.add_plan_scope(scope); match plan { PhysicalPlan::TableScan(scan) => self.build_table_scan(scan), PhysicalPlan::CteScan(scan) => self.build_cte_scan(scan), diff --git a/src/query/service/src/schedulers/scheduler.rs b/src/query/service/src/schedulers/scheduler.rs index 35ad8c39ef18..5aec6e3abcb1 100644 --- a/src/query/service/src/schedulers/scheduler.rs +++ b/src/query/service/src/schedulers/scheduler.rs @@ -78,6 +78,7 @@ pub async fn build_local_pipeline( ctx.clone(), enable_profiling, SharedProcessorProfiles::default(), + vec![], ); let mut build_res = pipeline.finalize(plan)?; diff --git a/src/query/service/tests/it/storages/testdata/columns_table.txt b/src/query/service/tests/it/storages/testdata/columns_table.txt index 793a4eabefd8..90513a717a06 100644 --- a/src/query/service/tests/it/storages/testdata/columns_table.txt +++ b/src/query/service/tests/it/storages/testdata/columns_table.txt @@ -244,8 +244,11 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo | 'owner' | 'system' | 'task_history' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'owner' | 'system' | 'tasks' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'packed' | 'information_schema' | 'statistics' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | +| 'parent_plan_id' | 'system' | 'processor_profile' | 'Nullable(UInt32)' | 'INT UNSIGNED' | '' | '' | 'YES' | '' | | 'partitions_sha' | 'system' | 'query_cache' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'pid' | 'system' | 'processor_profile' | 'UInt64' | 'BIGINT UNSIGNED' | '' | '' | 'NO' | '' | +| 'plan_id' | 'system' | 'processor_profile' | 'Nullable(UInt32)' | 'INT UNSIGNED' | '' | '' | 'YES' | '' | +| 'plan_name' | 'system' | 'processor_profile' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' | | 'pname' | 'system' | 'processor_profile' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' | | 'port' | 'system' | 'clusters' | 'UInt16' | 'SMALLINT UNSIGNED' | '' | '' | 'NO' | '' | | 'position_in_unique_constraint' | 'information_schema' | 'key_column_usage' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' | diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index d9e7248ffb8f..8b8e1f9e9d86 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -158,9 +158,7 @@ impl PhysicalPlan { | PhysicalPlan::ReplaceInto(_) | PhysicalPlan::CompactSource(_) | PhysicalPlan::ReclusterSource(_) - | PhysicalPlan::ReclusterSink(_) => { - unreachable!() - } + | PhysicalPlan::ReclusterSink(_) => u32::MAX, } } diff --git a/src/query/storages/system/src/processor_profile_table.rs b/src/query/storages/system/src/processor_profile_table.rs index 916e31c18128..f4abceb5a2f3 100644 --- a/src/query/storages/system/src/processor_profile_table.rs +++ b/src/query/storages/system/src/processor_profile_table.rs @@ -20,6 +20,7 @@ use common_catalog::table_context::TableContext; use common_exception::Result; use common_expression::types::NumberDataType; use common_expression::types::StringType; +use common_expression::types::UInt32Type; use common_expression::types::UInt64Type; use common_expression::DataBlock; use common_expression::FromData; @@ -56,6 +57,9 @@ impl SyncSystemTable for ProcessorProfileTable { let mut queries_id: Vec> = Vec::with_capacity(total_size); let mut pid: Vec = Vec::with_capacity(total_size); let mut p_name: Vec> = Vec::with_capacity(total_size); + let mut plan_id: Vec> = Vec::with_capacity(total_size); + let mut parent_id: Vec> = Vec::with_capacity(total_size); + let mut plan_name: Vec>> = Vec::with_capacity(total_size); let mut cpu_time: Vec = Vec::with_capacity(total_size); let mut wait_time: Vec = Vec::with_capacity(total_size); @@ -65,6 +69,9 @@ impl SyncSystemTable for ProcessorProfileTable { queries_id.push(query_id.clone().into_bytes()); pid.push(query_profile.pid as u64); p_name.push(query_profile.p_name.clone().into_bytes()); + plan_id.push(query_profile.plan_id); + parent_id.push(query_profile.plan_parent_id); + plan_name.push(query_profile.plan_name.clone().map(String::into_bytes)); cpu_time.push(query_profile.cpu_time.load(Ordering::Relaxed)); wait_time.push(query_profile.wait_time.load(Ordering::Relaxed)); @@ -76,6 +83,9 @@ impl SyncSystemTable for ProcessorProfileTable { StringType::from_data(queries_id), UInt64Type::from_data(pid), StringType::from_data(p_name), + UInt32Type::from_opt_data(plan_id), + UInt32Type::from_opt_data(parent_id), + StringType::from_opt_data(plan_name), UInt64Type::from_data(cpu_time), UInt64Type::from_data(wait_time), ])) @@ -89,6 +99,18 @@ impl ProcessorProfileTable { TableField::new("query_id", TableDataType::String), TableField::new("pid", TableDataType::Number(NumberDataType::UInt64)), TableField::new("pname", TableDataType::String), + TableField::new( + "plan_id", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt32))), + ), + TableField::new( + "parent_plan_id", + TableDataType::Nullable(Box::new(TableDataType::Number(NumberDataType::UInt32))), + ), + TableField::new( + "plan_name", + TableDataType::Nullable(Box::new(TableDataType::String)), + ), TableField::new("cpu_time", TableDataType::Number(NumberDataType::UInt64)), TableField::new("wait_time", TableDataType::Number(NumberDataType::UInt64)), ]);