From e4f7450ffeb224c6bac5e7ffe2cac4cb902c5192 Mon Sep 17 00:00:00 2001 From: JackTan25 Date: Wed, 18 Oct 2023 00:10:27 +0800 Subject: [PATCH] uniform row_kind and mutation_log --- .../interpreters/interpreter_merge_into.rs | 5 + .../service/src/pipelines/pipeline_builder.rs | 20 +++- src/query/sql/src/executor/physical_plan.rs | 3 +- .../physical_plans/physical_merge_into.rs | 1 + .../fuse/src/operations/merge_into/mod.rs | 2 + .../operations/merge_into/processors/mod.rs | 4 + ...istributed_merge_into_block_deserialize.rs | 82 ++++++++++++++++ ..._distributed_merge_into_block_serialize.rs | 94 +++++++++++++++++++ .../processor_merge_into_matched_and_split.rs | 20 +++- 9 files changed, 226 insertions(+), 5 deletions(-) create mode 100644 src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs create mode 100644 src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index c863a963437a..9e4f07b8c5e9 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -21,6 +21,7 @@ use common_base::runtime::GlobalIORuntime; use common_exception::ErrorCode; use common_exception::Result; use common_expression::ConstantFolder; +use common_expression::DataSchema; use common_expression::DataSchemaRef; use common_expression::FieldIndex; use common_expression::RemoteExpr; @@ -340,6 +341,7 @@ impl MergeIntoInterpreter { field_index_of_input_schema, row_id_idx, segments: Some(segments), + output_schema: DataSchemaRef::default(), })) } else { let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto { @@ -351,6 +353,9 @@ impl MergeIntoInterpreter { field_index_of_input_schema, row_id_idx, segments: None, + output_schema: DataSchemaRef::new(DataSchema::new(vec![ + join_output_schema.fields[row_id_idx].clone(), + ])), })); let exchange = exchange.unwrap(); PhysicalPlan::MergeIntoRowIdApply(Box::new(MergeIntoRowIdApply { diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 8d16c27ad0de..e276142ec0d8 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -121,6 +121,8 @@ use common_storages_fuse::operations::common::TransformSerializeSegment; use common_storages_fuse::operations::merge_into::MatchedSplitProcessor; use common_storages_fuse::operations::merge_into::MergeIntoNotMatchedProcessor; use common_storages_fuse::operations::merge_into::MergeIntoSplitProcessor; +use common_storages_fuse::operations::merge_into::TransformDistributedMergeIntoBlockDeserialize; +use common_storages_fuse::operations::merge_into::TransformDistributedMergeIntoBlockSerialize; use common_storages_fuse::operations::replace_into::BroadcastProcessor; use common_storages_fuse::operations::replace_into::ReplaceIntoProcessor; use common_storages_fuse::operations::replace_into::UnbranchedReplaceIntoProcessor; @@ -318,7 +320,12 @@ impl PipelineBuilder { } = merge_into_row_id_apply; // receive rowids and MutationLogs self.build_pipeline(input)?; - let mut pipe_items = Vec::with_capacity(self.main_pipeline.output_len()); + self.main_pipeline.try_resize(1)?; + + self.main_pipeline + .add_pipe(TransformDistributedMergeIntoBlockDeserialize::into_pipe()); + + let mut pipe_items = Vec::with_capacity(1); let tbl = self .ctx .build_table_by_table_info(catalog_info, table_info, None)?; @@ -351,7 +358,9 @@ impl PipelineBuilder { true, )?); - todo!() + self.main_pipeline + .add_pipe(Pipe::create(self.main_pipeline.output_len(), 1, pipe_items)); + Ok(()) } fn build_merge_into_source(&mut self, merge_into_source: &MergeIntoSource) -> Result<()> { @@ -516,6 +525,7 @@ impl PipelineBuilder { field_index_of_input_schema, row_id_idx, segments, + .. } = merge_into; self.build_pipeline(input)?; @@ -762,6 +772,12 @@ impl PipelineBuilder { pipe_items, )); + // distributed execution + if !apply_row_id { + self.main_pipeline.try_resize(1)?; + self.main_pipeline + .add_pipe(TransformDistributedMergeIntoBlockSerialize::into_pipe()) + } Ok(()) } diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 1c42b06a92a2..687e49ecfb45 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -178,9 +178,8 @@ impl PhysicalPlan { PhysicalPlan::MaterializedCte(plan) => plan.output_schema(), PhysicalPlan::ConstantTableScan(plan) => plan.output_schema(), PhysicalPlan::MergeIntoSource(plan) => plan.input.output_schema(), - + PhysicalPlan::MergeInto(plan) => Ok(plan.output_schema.clone()), PhysicalPlan::AsyncSourcer(_) - | PhysicalPlan::MergeInto(_) | PhysicalPlan::Deduplicate(_) | PhysicalPlan::ReplaceInto(_) | PhysicalPlan::MergeIntoRowIdApply(_) diff --git a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs index 0a5b84b5c04b..31ae6a1bb3f3 100644 --- a/src/query/sql/src/executor/physical_plans/physical_merge_into.rs +++ b/src/query/sql/src/executor/physical_plans/physical_merge_into.rs @@ -46,6 +46,7 @@ pub struct MergeInto { pub field_index_of_input_schema: HashMap, pub row_id_idx: usize, pub segments: Option>, + pub output_schema: DataSchemaRef, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] diff --git a/src/query/storages/fuse/src/operations/merge_into/mod.rs b/src/query/storages/fuse/src/operations/merge_into/mod.rs index 36b207cd1900..fc68baac7203 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mod.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mod.rs @@ -19,3 +19,5 @@ pub use mutator::MatchedAggregator; pub use processors::MatchedSplitProcessor; pub use processors::MergeIntoNotMatchedProcessor; pub use processors::MergeIntoSplitProcessor; +pub use processors::TransformDistributedMergeIntoBlockDeserialize; +pub use processors::TransformDistributedMergeIntoBlockSerialize; diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs b/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs index 6425672d538f..7e6fe208619e 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/mod.rs @@ -12,10 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. +mod processor_distributed_merge_into_block_deserialize; +mod processor_distributed_merge_into_block_serialize; mod processor_merge_into_matched_and_split; mod processor_merge_into_not_matched; mod processor_merge_into_split; mod transform_matched_mutation_aggregator; +pub use processor_distributed_merge_into_block_deserialize::TransformDistributedMergeIntoBlockDeserialize; +pub use processor_distributed_merge_into_block_serialize::TransformDistributedMergeIntoBlockSerialize; pub use processor_merge_into_matched_and_split::MatchedSplitProcessor; pub(crate) use processor_merge_into_matched_and_split::RowIdKind; pub use processor_merge_into_not_matched::MergeIntoNotMatchedProcessor; diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs new file mode 100644 index 000000000000..9d1570f3bff5 --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_deserialize.rs @@ -0,0 +1,82 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use common_expression::BlockMetaInfoDowncast; +use common_expression::DataBlock; +use common_pipeline_core::pipe::Pipe; +use common_pipeline_core::pipe::PipeItem; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_transforms::processors::transforms::Transform; +use common_pipeline_transforms::processors::transforms::Transformer; + +use super::processor_merge_into_matched_and_split::MixRowIdKindAndLog; + +// It will recieve MutationLogs Or RowIds. +// But for MutationLogs, it's a empty block +// we will add a fake BlockEntry to make it consistent with +// RowIds, because arrow-flight requires this. +pub struct TransformDistributedMergeIntoBlockDeserialize; + +impl TransformDistributedMergeIntoBlockDeserialize { + #[allow(dead_code)] + fn create(input: Arc, output: Arc) -> ProcessorPtr { + ProcessorPtr::create(Transformer::create( + input, + output, + TransformDistributedMergeIntoBlockDeserialize {}, + )) + } + + fn create_distributed_merge_into_transform_item() -> PipeItem { + let input = InputPort::create(); + let output = OutputPort::create(); + PipeItem::create( + TransformDistributedMergeIntoBlockDeserialize::create(input.clone(), output.clone()), + vec![input], + vec![output], + ) + } + + pub fn into_pipe() -> Pipe { + let pipe_item = Self::create_distributed_merge_into_transform_item(); + Pipe::create(1, 1, vec![pipe_item]) + } +} + +#[async_trait::async_trait] +impl Transform for TransformDistributedMergeIntoBlockDeserialize { + const NAME: &'static str = "TransformDistributedMergeIntoBlockDeserialize"; + + fn transform(&mut self, data: DataBlock) -> Result { + let kind = MixRowIdKindAndLog::downcast_ref_from(data.get_meta().unwrap()).unwrap(); + let data_block = match kind { + MixRowIdKindAndLog::MutationLogs(logs) => DataBlock::new_with_meta( + data.columns().to_vec(), + data.num_rows(), + Some(Box::new(logs.clone())), + ), + MixRowIdKindAndLog::RowIdKind(row_id_kind) => DataBlock::new_with_meta( + data.columns().to_vec(), + data.num_rows(), + Some(Box::new(row_id_kind.clone())), + ), + }; + Ok(data_block) + } +} diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs new file mode 100644 index 000000000000..3fed939d9e9d --- /dev/null +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_distributed_merge_into_block_serialize.rs @@ -0,0 +1,94 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use common_exception::Result; +use common_expression::types::DataType; +use common_expression::types::NumberDataType::UInt64; +use common_expression::types::NumberType; +use common_expression::BlockEntry; +use common_expression::BlockMetaInfoDowncast; +use common_expression::DataBlock; +use common_expression::Value; +use common_pipeline_core::pipe::Pipe; +use common_pipeline_core::pipe::PipeItem; +use common_pipeline_core::processors::port::InputPort; +use common_pipeline_core::processors::port::OutputPort; +use common_pipeline_core::processors::processor::ProcessorPtr; +use common_pipeline_transforms::processors::transforms::Transform; +use common_pipeline_transforms::processors::transforms::Transformer; + +use super::processor_merge_into_matched_and_split::MixRowIdKindAndLog; +use super::RowIdKind; +use crate::operations::common::MutationLogs; + +// It will recieve MutationLogs Or RowIds. +// But for MutationLogs, it's a empty block +// we will add a fake BlockEntry to make it consistent with +// RowIds, because arrow-flight requires this. +pub struct TransformDistributedMergeIntoBlockSerialize; + +impl TransformDistributedMergeIntoBlockSerialize { + #[allow(dead_code)] + fn create(input: Arc, output: Arc) -> ProcessorPtr { + ProcessorPtr::create(Transformer::create( + input, + output, + TransformDistributedMergeIntoBlockSerialize {}, + )) + } + + fn create_distributed_merge_into_transform_item() -> PipeItem { + let input = InputPort::create(); + let output = OutputPort::create(); + PipeItem::create( + TransformDistributedMergeIntoBlockSerialize::create(input.clone(), output.clone()), + vec![input], + vec![output], + ) + } + + pub fn into_pipe() -> Pipe { + let pipe_item = Self::create_distributed_merge_into_transform_item(); + Pipe::create(1, 1, vec![pipe_item]) + } +} + +#[async_trait::async_trait] +impl Transform for TransformDistributedMergeIntoBlockSerialize { + const NAME: &'static str = "TransformDistributedMergeIntoBlockSerialize"; + + fn transform(&mut self, data: DataBlock) -> Result { + // 1. MutationLogs + if data.is_empty() { + let scalar_value = Value::>::Scalar(0); + let entry = BlockEntry::new(DataType::Number(UInt64), scalar_value.upcast()); + let log = MutationLogs::try_from(data)?; + Ok(DataBlock::new_with_meta( + vec![entry], + 1, + Some(Box::new(MixRowIdKindAndLog::MutationLogs(log))), + )) + } else { + // RowIdKind + let row_id_kind = RowIdKind::downcast_ref_from(data.get_meta().unwrap()).unwrap(); + Ok(DataBlock::new_with_meta( + data.columns().to_vec(), + 1, + Some(Box::new(MixRowIdKindAndLog::RowIdKind(row_id_kind.clone()))), + )) + } + } +} diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs index 79e0a7dead5b..a16b28c2b20e 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs @@ -39,6 +39,7 @@ use common_storage::metrics::merge_into::merge_into_matched_operation_millisecon use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_counter; use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_rows_counter; +use crate::operations::common::MutationLogs; use crate::operations::merge_into::mutator::DeleteByExprMutator; use crate::operations::merge_into::mutator::UpdateByExprMutator; @@ -47,7 +48,24 @@ enum MutationKind { Delete(DeleteDataBlockMutation), } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +pub enum MixRowIdKindAndLog { + MutationLogs(MutationLogs), + RowIdKind(RowIdKind), +} + +#[typetag::serde(name = "mix_row_id_kind_and_log")] +impl BlockMetaInfo for MixRowIdKindAndLog { + fn equals(&self, info: &Box) -> bool { + MixRowIdKindAndLog::downcast_ref_from(info).is_some_and(|other| self == other) + } + + fn clone_self(&self) -> Box { + Box::new(self.clone()) + } +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] pub enum RowIdKind { Update, Delete,