Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(executor): add plan info for processor profile #13684

Merged
merged 11 commits into from
Nov 20, 2023
Merged
2 changes: 2 additions & 0 deletions src/query/pipeline/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ pub use pipe::SourcePipeBuilder;
pub use pipe::TransformPipeBuilder;
pub use pipeline::query_spill_prefix;
pub use pipeline::Pipeline;
pub use processors::PlanScope;
pub use processors::PlanScopeGuard;
3 changes: 3 additions & 0 deletions src/query/pipeline/core/src/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::sync::Arc;
use crate::processors::InputPort;
use crate::processors::OutputPort;
use crate::processors::ProcessorPtr;
use crate::PlanScope;

#[derive(Clone)]
pub struct PipeItem {
Expand Down Expand Up @@ -56,6 +57,7 @@ pub struct Pipe {
pub items: Vec<PipeItem>,
pub input_length: usize,
pub output_length: usize,
pub scope: Option<PlanScope>,
}

impl Debug for Pipe {
Expand All @@ -70,6 +72,7 @@ impl Pipe {
items,
input_length: inputs,
output_length: outputs,
scope: None,
}
}
}
Expand Down
103 changes: 83 additions & 20 deletions src/query/pipeline/core/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,24 @@

use std::fmt::Debug;
use std::fmt::Formatter;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;

use common_exception::ErrorCode;
use common_exception::Result;

use crate::pipe::Pipe;
use crate::pipe::PipeItem;
use crate::processors::profile::PlanScope;
use crate::processors::DuplicateProcessor;
use crate::processors::InputPort;
use crate::processors::OutputPort;
use crate::processors::ProcessorPtr;
use crate::processors::ResizeProcessor;
use crate::processors::ShuffleProcessor;
use crate::LockGuard;
use crate::PlanScopeGuard;
use crate::SinkPipeBuilder;
use crate::SourcePipeBuilder;
use crate::TransformPipeBuilder;
Expand Down Expand Up @@ -56,6 +60,9 @@ pub struct Pipeline {
on_init: Option<InitCallback>,
on_finished: Option<FinishedCallback>,
lock_guards: Vec<LockGuard>,

pub plans_scope: Vec<PlanScope>,
scope_size: Arc<AtomicUsize>,
}

impl Debug for Pipeline {
Expand All @@ -77,6 +84,21 @@ impl Pipeline {
on_init: None,
on_finished: None,
lock_guards: vec![],
plans_scope: vec![],
scope_size: Arc::new(AtomicUsize::new(0)),
}
}

pub fn with_scopes(scope: Vec<PlanScope>) -> Pipeline {
let scope_size = Arc::new(AtomicUsize::new(scope.len()));
Pipeline {
scope_size,
max_threads: 0,
pipes: Vec::new(),
on_init: None,
on_finished: None,
lock_guards: vec![],
plans_scope: scope,
}
}

Expand Down Expand Up @@ -113,7 +135,41 @@ impl Pipeline {
)
}

pub fn add_pipe(&mut self, pipe: Pipe) {
pub fn finalize(mut self) -> Pipeline {
for pipe in &mut self.pipes {
if let Some(uninitialized_scope) = &mut pipe.scope {
if uninitialized_scope.parent_id == 0 {
for (index, scope) in self.plans_scope.iter().enumerate() {
if scope.id == uninitialized_scope.id && index != 0 {
if let Some(parent_scope) = self.plans_scope.get(index - 1) {
uninitialized_scope.parent_id = parent_scope.id;
}
}
}
}
}
}

self
}

pub fn add_pipe(&mut self, mut pipe: Pipe) {
let (scope_idx, _) = self.scope_size.load(Ordering::SeqCst).overflowing_sub(1);

if let Some(scope) = self.plans_scope.get_mut(scope_idx) {
// stack, new plan is always the parent node of previous node.
// set the parent node in 'add_pipe' helps skip empty plans(no pipeline).
for pipe in &mut self.pipes {
if let Some(children) = &mut pipe.scope {
if children.parent_id == 0 && children.id != scope.id {
children.parent_id = scope.id;
}
}
}

pipe.scope = Some(scope.clone());
}

self.pipes.push(pipe);
}

Expand Down Expand Up @@ -235,14 +291,13 @@ impl Pipeline {
let processor = ResizeProcessor::create(pipe.output_length, new_size);
let inputs_port = processor.get_inputs();
let outputs_port = processor.get_outputs();
self.pipes
.push(Pipe::create(inputs_port.len(), outputs_port.len(), vec![
PipeItem::create(
ProcessorPtr::create(Box::new(processor)),
inputs_port,
outputs_port,
),
]));
self.add_pipe(Pipe::create(inputs_port.len(), outputs_port.len(), vec![
PipeItem::create(
ProcessorPtr::create(Box::new(processor)),
inputs_port,
outputs_port,
),
]));
Ok(())
}
}
Expand Down Expand Up @@ -280,8 +335,7 @@ impl Pipeline {
outputs_port,
));
}
self.pipes
.push(Pipe::create(input_len, output_len, pipe_items));
self.add_pipe(Pipe::create(input_len, output_len, pipe_items));
Ok(())
}
}
Expand Down Expand Up @@ -310,7 +364,7 @@ impl Pipeline {
vec![output1, output2],
));
}
self.pipes.push(Pipe::create(
self.add_pipe(Pipe::create(
pipe.output_length,
pipe.output_length * 2,
items,
Expand Down Expand Up @@ -341,14 +395,9 @@ impl Pipeline {
outputs.push(OutputPort::create());
}
let processor = ShuffleProcessor::create(inputs.clone(), outputs.clone(), rule);
self.pipes
.push(Pipe::create(inputs.len(), outputs.len(), vec![
PipeItem::create(
ProcessorPtr::create(Box::new(processor)),
inputs,
outputs,
),
]));
self.add_pipe(Pipe::create(inputs.len(), outputs.len(), vec![
PipeItem::create(ProcessorPtr::create(Box::new(processor)), inputs, outputs),
]));
}
_ => {}
}
Expand Down Expand Up @@ -396,6 +445,20 @@ impl Pipeline {
Some(on_finished) => on_finished,
}
}

pub fn add_plan_scope(&mut self, scope: PlanScope) -> PlanScopeGuard {
let scope_idx = self.scope_size.fetch_add(1, Ordering::SeqCst);

if self.plans_scope.len() > scope_idx {
self.plans_scope[scope_idx] = scope;
self.plans_scope.shrink_to(scope_idx + 1);
return PlanScopeGuard::create(self.scope_size.clone(), scope_idx);
}

assert_eq!(self.plans_scope.len(), scope_idx);
self.plans_scope.push(scope);
PlanScopeGuard::create(self.scope_size.clone(), scope_idx)
}
}

impl Drop for Pipeline {
Expand Down
2 changes: 2 additions & 0 deletions src/query/pipeline/core/src/processors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub use processor::Event;
pub use processor::EventCause;
pub use processor::Processor;
pub use processor::ProcessorPtr;
pub use profile::PlanScope;
pub use profile::PlanScopeGuard;
pub use resize_processor::create_resize_item;
pub use resize_processor::ResizeProcessor;
pub use shuffle_processor::ShuffleProcessor;
50 changes: 49 additions & 1 deletion src/query/pipeline/core/src/processors/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
// limitations under the License.

use std::sync::atomic::AtomicU64;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;

#[derive(Default)]
pub struct Profile {
Expand All @@ -21,6 +24,10 @@ pub struct Profile {
/// The name of processor
pub p_name: String,

pub plan_id: Option<u32>,
pub plan_name: Option<String>,
pub plan_parent_id: Option<u32>,

/// The time spent to process in nanoseconds
pub cpu_time: AtomicU64,
/// The time spent to wait in nanoseconds, usually used to
Expand All @@ -29,12 +36,53 @@ pub struct Profile {
}

impl Profile {
pub fn create(pid: usize, p_name: String) -> Profile {
pub fn create(pid: usize, p_name: String, scope: Option<PlanScope>) -> Profile {
Profile {
pid,
p_name,
cpu_time: AtomicU64::new(0),
wait_time: AtomicU64::new(0),
plan_id: scope.as_ref().map(|x| x.id),
plan_name: scope.as_ref().map(|x| x.name.clone()),
plan_parent_id: scope.as_ref().map(|x| x.parent_id),
}
}
}

pub struct PlanScopeGuard {
idx: usize,
scope_size: Arc<AtomicUsize>,
}

impl PlanScopeGuard {
pub fn create(scope_size: Arc<AtomicUsize>, idx: usize) -> PlanScopeGuard {
PlanScopeGuard { idx, scope_size }
}
}

impl Drop for PlanScopeGuard {
fn drop(&mut self) {
if self.scope_size.fetch_sub(1, Ordering::SeqCst) != self.idx + 1
&& !std::thread::panicking()
{
panic!("Broken pipeline scope stack.");
}
}
}

#[derive(Clone, Debug)]
pub struct PlanScope {
pub id: u32,
pub name: String,
pub parent_id: u32,
}

impl PlanScope {
pub fn create(id: u32, name: String) -> PlanScope {
PlanScope {
id,
parent_id: 0,
name,
}
}
}
1 change: 1 addition & 0 deletions src/query/service/src/api/rpc/exchange/exchange_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -848,6 +848,7 @@ impl FragmentCoordinator {
pipeline_ctx,
enable_profiling,
SharedProcessorProfiles::default(),
vec![],
);

let res = pipeline_builder.finalize(&self.physical_plan)?;
Expand Down
10 changes: 7 additions & 3 deletions src/query/service/src/pipelines/builders/builder_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ impl PipelineBuilder {
right_side_context,
self.enable_profiling,
self.proc_profs.clone(),
self.main_pipeline.plans_scope.clone(),
);
right_side_builder.cte_state = self.cte_state.clone();
let mut right_res = right_side_builder.finalize(&range_join.right)?;
Expand All @@ -124,7 +125,7 @@ impl PipelineBuilder {
Ok(ProcessorPtr::create(transform))
}
})?;
self.pipelines.push(right_res.main_pipeline);
self.pipelines.push(right_res.main_pipeline.finalize());
self.pipelines.extend(right_res.sources_pipelines);
Ok(())
}
Expand Down Expand Up @@ -158,6 +159,7 @@ impl PipelineBuilder {
build_side_context,
self.enable_profiling,
self.proc_profs.clone(),
self.main_pipeline.plans_scope.clone(),
);
build_side_builder.cte_state = self.cte_state.clone();
let mut build_res = build_side_builder.finalize(build)?;
Expand Down Expand Up @@ -212,7 +214,7 @@ impl PipelineBuilder {
build_res.main_pipeline.add_sink(create_sink_processor)?;
}

self.pipelines.push(build_res.main_pipeline);
self.pipelines.push(build_res.main_pipeline.finalize());
self.pipelines.extend(build_res.sources_pipelines);
Ok(())
}
Expand Down Expand Up @@ -403,6 +405,7 @@ impl PipelineBuilder {
left_side_ctx,
self.enable_profiling,
self.proc_profs.clone(),
self.main_pipeline.plans_scope.clone(),
);
left_side_builder.cte_state = self.cte_state.clone();
let mut left_side_pipeline = left_side_builder.finalize(left_side)?;
Expand All @@ -423,7 +426,8 @@ impl PipelineBuilder {
);
Ok(ProcessorPtr::create(transform))
})?;
self.pipelines.push(left_side_pipeline.main_pipeline);
self.pipelines
.push(left_side_pipeline.main_pipeline.finalize());
self.pipelines.extend(left_side_pipeline.sources_pipelines);
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl PipelineBuilder {
union_ctx,
self.enable_profiling,
self.proc_profs.clone(),
self.main_pipeline.plans_scope.clone(),
);
pipeline_builder.cte_state = self.cte_state.clone();
let mut build_res = pipeline_builder.finalize(input)?;
Expand All @@ -88,7 +89,7 @@ impl PipelineBuilder {
}
})?;

self.pipelines.push(build_res.main_pipeline);
self.pipelines.push(build_res.main_pipeline.finalize());
self.pipelines.extend(build_res.sources_pipelines);
Ok(rx)
}
Expand Down
Loading
Loading