Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: distributed execution of update statement #13971

Merged
merged 11 commits into from
Jan 4, 2024
1 change: 1 addition & 0 deletions src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ pub trait Table: Sync + Send {
update_stream_meta: Vec<UpdateStreamMetaReq>,
overwrite: bool,
prev_snapshot_id: Option<SnapshotId>,
_deduplicated_label: Option<String>,
) -> Result<()> {
let (_, _, _, _, _, _) = (
ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ impl CopyIntoLocationInterpreter {
vec![],
false,
AppendMode::Normal,
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
)?;
Ok(build_res)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ impl CopyIntoTableInterpreter {
plan: &CopyIntoTablePlan,
files: &[StageFileInfo],
update_stream_meta: Vec<UpdateStreamMetaReq>,
deduplicated_label: Option<String>,
) -> Result<()> {
let ctx = self.ctx.clone();
let to_table = ctx
Expand Down Expand Up @@ -268,6 +269,7 @@ impl CopyIntoTableInterpreter {
update_stream_meta,
plan.write_mode.is_overwrite(),
None,
deduplicated_label,
)?;
}

Expand Down Expand Up @@ -319,6 +321,7 @@ impl Interpreter for CopyIntoTableInterpreter {
&self.plan,
&files,
update_stream_meta,
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
)
.await?;
}
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/interpreters/interpreter_delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ impl DeleteInterpreter {
update_stream_meta: vec![],
merge_meta,
need_lock: false,
deduplicated_label: None,
})))
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/query/service/src/interpreters/interpreter_insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ impl Interpreter for InsertInterpreter {
update_stream_meta,
self.plan.overwrite,
None,
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
)?;

// Compact if 'enable_compact_after_write' on.
Expand Down Expand Up @@ -302,6 +303,7 @@ impl Interpreter for InsertInterpreter {
vec![],
self.plan.overwrite,
append_mode,
unsafe { self.ctx.get_settings().get_deduplicate_label()? },
)?;

// Compact if 'enable_compact_after_write' on.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ impl MergeIntoInterpreter {
update_stream_meta: update_stream_meta.clone(),
merge_meta: false,
need_lock: false,
deduplicated_label: unsafe { self.ctx.get_settings().get_deduplicate_label()? },
}));

Ok((physical_plan, table_info))
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ impl ReplaceInterpreter {
update_stream_meta: update_stream_meta.clone(),
merge_meta: false,
need_lock: false,
deduplicated_label: unsafe { self.ctx.get_settings().get_deduplicate_label()? },
})));
Ok((root, purge_info))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ impl ModifyTableColumnInterpreter {
vec![],
true,
prev_snapshot_id,
None,
)?;

build_res.main_pipeline.add_lock_guard(lock_guard);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ impl OptimizeTableInterpreter {
update_stream_meta: vec![],
merge_meta,
need_lock,
deduplicated_label: None,
})))
}

Expand Down
21 changes: 20 additions & 1 deletion src/query/service/src/interpreters/interpreter_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use databend_common_meta_app::schema::CatalogInfo;
use databend_common_meta_app::schema::TableInfo;
use databend_common_sql::binder::ColumnBindingBuilder;
use databend_common_sql::executor::physical_plans::CommitSink;
use databend_common_sql::executor::physical_plans::Exchange;
use databend_common_sql::executor::physical_plans::FragmentKind;
use databend_common_sql::executor::physical_plans::MutationKind;
use databend_common_sql::executor::physical_plans::UpdateSource;
use databend_common_sql::executor::PhysicalPlan;
Expand Down Expand Up @@ -223,6 +225,7 @@ impl Interpreter for UpdateInterpreter {
)
.await?;

let is_distributed = !self.ctx.get_cluster().is_empty();
let physical_plan = Self::build_physical_plan(
filters,
update_list,
Expand All @@ -233,6 +236,8 @@ impl Interpreter for UpdateInterpreter {
snapshot,
catalog_info,
query_row_id_col,
is_distributed,
self.ctx.clone(),
)?;

build_res =
Expand Down Expand Up @@ -268,9 +273,11 @@ impl UpdateInterpreter {
snapshot: Arc<TableSnapshot>,
catalog_info: CatalogInfo,
query_row_id_col: bool,
is_distributed: bool,
ctx: Arc<QueryContext>,
) -> Result<PhysicalPlan> {
let merge_meta = partitions.is_lazy;
let root = PhysicalPlan::UpdateSource(Box::new(UpdateSource {
let mut root = PhysicalPlan::UpdateSource(Box::new(UpdateSource {
parts: partitions,
filters,
table_info: table_info.clone(),
Expand All @@ -281,6 +288,17 @@ impl UpdateInterpreter {
computed_list,
}));

if is_distributed {
root = PhysicalPlan::Exchange(Exchange {
plan_id: 0,
input: Box::new(root),
kind: FragmentKind::Merge,
keys: vec![],
allow_adjust_parallelism: true,
ignore_exchange: false,
});
}

Ok(PhysicalPlan::CommitSink(Box::new(CommitSink {
input: Box::new(root),
snapshot,
Expand All @@ -290,6 +308,7 @@ impl UpdateInterpreter {
update_stream_meta: vec![],
merge_meta,
need_lock: false,
deduplicated_label: unsafe { ctx.get_settings().get_deduplicate_label()? },
})))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ impl PipelineBuilder {
update_stream_meta: Vec<UpdateStreamMetaReq>,
overwrite: bool,
append_mode: AppendMode,
deduplicated_label: Option<String>,
) -> Result<()> {
Self::build_fill_missing_columns_pipeline(
ctx.clone(),
Expand All @@ -54,6 +55,7 @@ impl PipelineBuilder {
update_stream_meta,
overwrite,
None,
deduplicated_label,
)?;

Ok(())
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/pipelines/builders/builder_commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl PipelineBuilder {
None,
lock.clone(),
None,
plan.deduplicated_label.clone(),
)
})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ impl PipelineBuilder {
None,
Some(lock.clone()),
None,
None,
)
})
}
Expand Down
11 changes: 11 additions & 0 deletions src/query/service/src/schedulers/fragments/fragmenter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ enum State {
SelectLeaf,
DeleteLeaf,
ReplaceInto,
Update,
Compact,
Recluster,
Other,
Expand Down Expand Up @@ -152,6 +153,15 @@ impl PhysicalPlanReplacer for Fragmenter {
Ok(PhysicalPlan::TableScan(plan.clone()))
}

fn replace_update_source(
&mut self,
plan: &databend_common_sql::executor::physical_plans::UpdateSource,
) -> Result<PhysicalPlan> {
self.state = State::Update;

Ok(PhysicalPlan::UpdateSource(Box::new(plan.clone())))
}

fn replace_replace_into(&mut self, plan: &ReplaceInto) -> Result<PhysicalPlan> {
let input = self.replace(&plan.input)?;
self.state = State::ReplaceInto;
Expand Down Expand Up @@ -293,6 +303,7 @@ impl PhysicalPlanReplacer for Fragmenter {
State::ReplaceInto => FragmentType::ReplaceInto,
State::Compact => FragmentType::Compact,
State::Recluster => FragmentType::Recluster,
State::Update => FragmentType::Update,
};
self.state = State::Other;
let exchange = Self::get_exchange(self.ctx.clone(), &plan)?;
Expand Down
49 changes: 49 additions & 0 deletions src/query/service/src/schedulers/fragments/plan_fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::schedulers::QueryFragmentAction;
use crate::schedulers::QueryFragmentActions;
use crate::schedulers::QueryFragmentsActions;
use crate::sessions::QueryContext;
use crate::sql::executor::physical_plans::UpdateSource;
use crate::sql::executor::PhysicalPlan;
use crate::sql::executor::PhysicalPlanReplacer;

Expand All @@ -63,6 +64,7 @@ pub enum FragmentType {
ReplaceInto,
Compact,
Recluster,
Update,
}

#[derive(Clone)]
Expand Down Expand Up @@ -135,6 +137,9 @@ impl PlanFragment {
FragmentType::Recluster => {
self.redistribute_recluster(ctx, &mut fragment_actions)?;
}
FragmentType::Update => {
self.redistribute_update(ctx, &mut fragment_actions)?;
}
}

if let Some(ref exchange) = self.exchange {
Expand Down Expand Up @@ -219,6 +224,37 @@ impl PlanFragment {
Ok(())
}

fn redistribute_update(
&self,
ctx: Arc<QueryContext>,
fragment_actions: &mut QueryFragmentActions,
) -> Result<()> {
let plan = match &self.plan {
PhysicalPlan::ExchangeSink(plan) => plan,
_ => unreachable!("logic error"),
};
let plan = match plan.input.as_ref() {
PhysicalPlan::UpdateSource(plan) => plan,
_ => unreachable!("logic error"),
};

let partitions: &Partitions = &plan.parts;
let executors = Fragmenter::get_executors(ctx);

let partition_reshuffle = partitions.reshuffle(executors)?;

for (executor, parts) in partition_reshuffle.into_iter() {
let mut plan = self.plan.clone();

let mut replace_update = ReplaceUpdate { partitions: parts };
plan = replace_update.replace(&plan)?;

fragment_actions.add_action(QueryFragmentAction::create(executor, plan));
}

Ok(())
}

fn redistribute_replace_into(
&self,
ctx: Arc<QueryContext>,
Expand Down Expand Up @@ -501,6 +537,19 @@ impl PhysicalPlanReplacer for ReplaceDeleteSource {
}
}

struct ReplaceUpdate {
pub partitions: Partitions,
}

impl PhysicalPlanReplacer for ReplaceUpdate {
fn replace_update_source(&mut self, plan: &UpdateSource) -> Result<PhysicalPlan> {
Ok(PhysicalPlan::UpdateSource(Box::new(UpdateSource {
parts: self.partitions.clone(),
..plan.clone()
})))
}
}

struct ReplaceReplaceInto {
pub partitions: Vec<(usize, Location)>,
// for standalone mode, slot is None
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/test_kits/fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,7 @@ impl TestFixture {
vec![],
overwrite,
None,
None,
)?;
} else {
build_res
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use databend_storages_common_table_meta::meta::TableSnapshot;
use crate::executor::physical_plans::common::MutationKind;
use crate::executor::PhysicalPlan;

// TODO(sky): make TableMutationAggregator distributed
/// The commit sink is used to commit the data to the table.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct CommitSink {
Expand All @@ -34,4 +33,5 @@ pub struct CommitSink {
pub update_stream_meta: Vec<UpdateStreamMetaReq>,
pub merge_meta: bool,
pub need_lock: bool,
pub deduplicated_label: Option<String>,
}
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ impl Table for FuseTable {
update_stream_meta: Vec<UpdateStreamMetaReq>,
overwrite: bool,
prev_snapshot_id: Option<SnapshotId>,
deduplicated_label: Option<String>,
) -> Result<()> {
self.do_commit(
ctx,
Expand All @@ -648,6 +649,7 @@ impl Table for FuseTable {
update_stream_meta,
overwrite,
prev_snapshot_id,
deduplicated_label,
)
}

Expand Down
6 changes: 5 additions & 1 deletion src/query/storages/fuse/src/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl FuseTable {
update_stream_meta: Vec<UpdateStreamMetaReq>,
overwrite: bool,
prev_snapshot_id: Option<SnapshotId>,
deduplicated_label: Option<String>,
) -> Result<()> {
let block_thresholds = self.get_block_thresholds();

Expand Down Expand Up @@ -105,6 +106,7 @@ impl FuseTable {
None,
None,
prev_snapshot_id,
deduplicated_label.clone(),
)
})?;

Expand Down Expand Up @@ -151,6 +153,7 @@ impl FuseTable {
copied_files,
&[],
operator,
None,
)
.await;
if need_to_save_statistics {
Expand Down Expand Up @@ -181,6 +184,7 @@ impl FuseTable {
copied_files: &Option<UpsertTableCopiedFileReq>,
update_stream_meta: &[UpdateStreamMetaReq],
operator: &Operator,
deduplicated_label: Option<String>,
) -> Result<()> {
// 1. prepare table meta
let mut new_table_meta = table_info.meta.clone();
Expand Down Expand Up @@ -215,7 +219,7 @@ impl FuseTable {
seq: MatchSeq::Exact(table_version),
new_table_meta,
copied_files: copied_files.clone(),
deduplicated_label: unsafe { ctx.get_settings().get_deduplicate_label()? },
deduplicated_label,
update_stream_meta: update_stream_meta.to_vec(),
};

Expand Down
Loading
Loading