Skip to content

Commit

Permalink
add status
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Oct 18, 2023
1 parent cb37350 commit 0ec291d
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 44 deletions.
2 changes: 1 addition & 1 deletion src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ pub trait TableExt: Table {
let tid = table_info.ident.table_id;
let catalog = ctx.get_catalog(table_info.catalog()).await?;
let (ident, meta) = catalog.get_table_meta_by_id(tid).await?;
let table_info: TableInfo = TableInfo {
let table_info = TableInfo {
ident,
desc: "".to_owned(),
name,
Expand Down
12 changes: 7 additions & 5 deletions src/query/service/src/interpreters/interpreter_table_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::time::SystemTime;
use common_base::runtime::GlobalIORuntime;
use common_catalog::plan::Partitions;
use common_catalog::table::CompactTarget;
use common_catalog::table::TableExt;
use common_exception::ErrorCode;
use common_exception::Result;
use common_meta_app::schema::CatalogInfo;
Expand Down Expand Up @@ -124,9 +123,10 @@ impl OptimizeTableInterpreter {
target: CompactTarget,
need_purge: bool,
) -> Result<PipelineBuildResult> {
let mut table = self
.ctx
.get_table(&self.plan.catalog, &self.plan.database, &self.plan.table)
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let tenant = self.ctx.get_tenant();
let mut table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;

let table_info = table.get_table_info().clone();
Expand Down Expand Up @@ -190,7 +190,9 @@ impl OptimizeTableInterpreter {
executor.execute()?;

// refresh table.
table = table.as_ref().refresh(self.ctx.as_ref()).await?;
table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;
}

let fuse_table = FuseTable::try_from_table(table.as_ref())?;
Expand Down
26 changes: 14 additions & 12 deletions src/query/service/src/interpreters/interpreter_table_recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use std::time::SystemTime;

use common_catalog::plan::Filters;
use common_catalog::plan::PushDownInfo;
use common_catalog::table::TableExt;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::type_check::check_function;
Expand Down Expand Up @@ -75,13 +74,6 @@ impl Interpreter for ReclusterTableInterpreter {
let max_threads = settings.get_max_threads()? as usize;
let recluster_timeout_secs = settings.get_recluster_timeout_secs()?;

// Status.
{
let status = "recluster: begin to run recluster";
ctx.set_status_info(status);
info!("{}", status);
}

// Build extras via push down scalar
let extras = if let Some(scalar) = &plan.push_downs {
// prepare the filter expression
Expand All @@ -107,9 +99,10 @@ impl Interpreter for ReclusterTableInterpreter {
None
};

let mut table = self
.ctx
.get_table(&plan.catalog, &plan.database, &plan.table)
let catalog = self.ctx.get_catalog(&self.plan.catalog).await?;
let tenant = self.ctx.get_tenant();
let mut table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;

let mut times = 0;
Expand Down Expand Up @@ -137,6 +130,13 @@ impl Interpreter for ReclusterTableInterpreter {
)));
}

// Status.
{
let status = "recluster: begin to run recluster";
ctx.set_status_info(status);
info!("{}", status);
}

let fuse_table = FuseTable::try_from_table(table.as_ref())?;
let mutator = fuse_table
.build_recluster_mutator(ctx.clone(), extras.clone(), plan.limit)
Expand Down Expand Up @@ -203,7 +203,9 @@ impl Interpreter for ReclusterTableInterpreter {
}

// refresh table.
table = table.as_ref().refresh(self.ctx.as_ref()).await?;
table = catalog
.get_table(tenant.as_str(), &self.plan.database, &self.plan.table)
.await?;
}

if block_count != 0 {
Expand Down
37 changes: 20 additions & 17 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use common_pipeline_sinks::Sinker;
use common_pipeline_sinks::UnionReceiveSink;
use common_pipeline_sources::AsyncSource;
use common_pipeline_sources::AsyncSourcer;
use common_pipeline_sources::EmptySource;
use common_pipeline_sources::OneBlockSource;
use common_pipeline_transforms::processors::profile_wrapper::ProcessorProfileWrapper;
use common_pipeline_transforms::processors::profile_wrapper::ProfileStub;
Expand Down Expand Up @@ -911,25 +912,27 @@ impl PipelineBuilder {
}

fn build_recluster_source(&mut self, recluster_source: &ReclusterSource) -> Result<()> {
if recluster_source.tasks.len() != 1 {
return Err(ErrorCode::Internal(
match recluster_source.tasks.len() {
0 => self.main_pipeline.add_source(EmptySource::create, 1),
1 => {
let table = self.ctx.build_table_by_table_info(
&recluster_source.catalog_info,
&recluster_source.table_info,
None,
)?;
let table = FuseTable::try_from_table(table.as_ref())?;

table.build_recluster_source(
self.ctx.clone(),
recluster_source.tasks[0].clone(),
recluster_source.catalog_info.clone(),
&mut self.main_pipeline,
)
}
_ => Err(ErrorCode::Internal(
"A node can only execute one recluster task".to_string(),
));
)),
}

let table = self.ctx.build_table_by_table_info(
&recluster_source.catalog_info,
&recluster_source.table_info,
None,
)?;
let table = FuseTable::try_from_table(table.as_ref())?;

table.build_recluster_source(
self.ctx.clone(),
recluster_source.tasks[0].clone(),
recluster_source.catalog_info.clone(),
&mut self.main_pipeline,
)
}

fn build_recluster_sink(&mut self, recluster_sink: &ReclusterSink) -> Result<()> {
Expand Down
9 changes: 5 additions & 4 deletions src/query/service/src/schedulers/fragments/plan_fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,10 @@ impl PlanFragment {
)));
}

for (task, executor) in tasks.into_iter().zip(executors.into_iter()) {
let task_reshuffle = Self::reshuffle(executors, tasks)?;
for (executor, tasks) in task_reshuffle.into_iter() {
let mut plan = self.plan.clone();
let mut replace_recluster = ReplaceReclusterSource { task };
let mut replace_recluster = ReplaceReclusterSource { tasks };
plan = replace_recluster.replace(&plan)?;
fragment_actions.add_action(QueryFragmentAction::create(executor, plan));
}
Expand Down Expand Up @@ -453,13 +454,13 @@ impl PhysicalPlanReplacer for ReplaceReadSource {
}

struct ReplaceReclusterSource {
pub task: ReclusterTask,
pub tasks: Vec<ReclusterTask>,
}

impl PhysicalPlanReplacer for ReplaceReclusterSource {
fn replace_recluster_source(&mut self, plan: &ReclusterSource) -> Result<PhysicalPlan> {
Ok(PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
tasks: vec![self.task.clone()],
tasks: self.tasks.clone(),
..plan.clone()
})))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,15 @@ where F: SnapshotGenerator + Send + 'static
}
}
metrics_inc_commit_mutation_success();
let duration = self.start_time.elapsed();
{
let elapsed_time = self.start_time.elapsed().as_millis();
let status = format!(
"commit mutation success after {} retries, which took {} ms",
self.retries, elapsed_time
);
metrics_inc_commit_milliseconds(elapsed_time);
self.ctx.set_status_info(&status);
}
if let Some(files) = &self.copied_files {
metrics_inc_commit_copied_files(files.file_info.len() as u64);
}
Expand All @@ -368,7 +376,6 @@ where F: SnapshotGenerator + Send + 'static
SegmentInfo::VERSION,
))?;
}
metrics_inc_commit_milliseconds(duration.as_millis());
self.heartbeat.shutdown().await?;
self.state = State::Finish;
}
Expand Down
4 changes: 1 addition & 3 deletions src/query/storages/fuse/src/operations/recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,7 @@ impl FuseTable {
}

// merge sort
let block_num = task
.total_bytes
.div_ceil(block_thresholds.max_bytes_per_block);
let block_num = std::cmp::max(task.total_bytes / block_thresholds.max_bytes_per_block, 1);
let final_block_size = std::cmp::min(
// estimate block_size based on max_bytes_per_block.
task.total_rows / block_num,
Expand Down

0 comments on commit 0ec291d

Please sign in to comment.