Skip to content

Commit

Permalink
uniform row_kind and mutation_log
Browse files Browse the repository at this point in the history
  • Loading branch information
JackTan25 committed Oct 17, 2023
1 parent 76e1352 commit e4f7450
Show file tree
Hide file tree
Showing 9 changed files with 226 additions and 5 deletions.
5 changes: 5 additions & 0 deletions src/query/service/src/interpreters/interpreter_merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_base::runtime::GlobalIORuntime;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::ConstantFolder;
use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_expression::FieldIndex;
use common_expression::RemoteExpr;
Expand Down Expand Up @@ -340,6 +341,7 @@ impl MergeIntoInterpreter {
field_index_of_input_schema,
row_id_idx,
segments: Some(segments),
output_schema: DataSchemaRef::default(),
}))
} else {
let merge_append = PhysicalPlan::MergeInto(Box::new(MergeInto {
Expand All @@ -351,6 +353,9 @@ impl MergeIntoInterpreter {
field_index_of_input_schema,
row_id_idx,
segments: None,
output_schema: DataSchemaRef::new(DataSchema::new(vec![
join_output_schema.fields[row_id_idx].clone(),
])),
}));
let exchange = exchange.unwrap();
PhysicalPlan::MergeIntoRowIdApply(Box::new(MergeIntoRowIdApply {
Expand Down
20 changes: 18 additions & 2 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ use common_storages_fuse::operations::common::TransformSerializeSegment;
use common_storages_fuse::operations::merge_into::MatchedSplitProcessor;
use common_storages_fuse::operations::merge_into::MergeIntoNotMatchedProcessor;
use common_storages_fuse::operations::merge_into::MergeIntoSplitProcessor;
use common_storages_fuse::operations::merge_into::TransformDistributedMergeIntoBlockDeserialize;
use common_storages_fuse::operations::merge_into::TransformDistributedMergeIntoBlockSerialize;
use common_storages_fuse::operations::replace_into::BroadcastProcessor;
use common_storages_fuse::operations::replace_into::ReplaceIntoProcessor;
use common_storages_fuse::operations::replace_into::UnbranchedReplaceIntoProcessor;
Expand Down Expand Up @@ -318,7 +320,12 @@ impl PipelineBuilder {
} = merge_into_row_id_apply;
// receive rowids and MutationLogs
self.build_pipeline(input)?;
let mut pipe_items = Vec::with_capacity(self.main_pipeline.output_len());
self.main_pipeline.try_resize(1)?;

self.main_pipeline
.add_pipe(TransformDistributedMergeIntoBlockDeserialize::into_pipe());

let mut pipe_items = Vec::with_capacity(1);
let tbl = self
.ctx
.build_table_by_table_info(catalog_info, table_info, None)?;
Expand Down Expand Up @@ -351,7 +358,9 @@ impl PipelineBuilder {
true,
)?);

todo!()
self.main_pipeline
.add_pipe(Pipe::create(self.main_pipeline.output_len(), 1, pipe_items));
Ok(())
}

fn build_merge_into_source(&mut self, merge_into_source: &MergeIntoSource) -> Result<()> {
Expand Down Expand Up @@ -516,6 +525,7 @@ impl PipelineBuilder {
field_index_of_input_schema,
row_id_idx,
segments,
..
} = merge_into;

self.build_pipeline(input)?;
Expand Down Expand Up @@ -762,6 +772,12 @@ impl PipelineBuilder {
pipe_items,
));

// distributed execution
if !apply_row_id {
self.main_pipeline.try_resize(1)?;
self.main_pipeline
.add_pipe(TransformDistributedMergeIntoBlockSerialize::into_pipe())
}
Ok(())
}

Expand Down
3 changes: 1 addition & 2 deletions src/query/sql/src/executor/physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,8 @@ impl PhysicalPlan {
PhysicalPlan::MaterializedCte(plan) => plan.output_schema(),
PhysicalPlan::ConstantTableScan(plan) => plan.output_schema(),
PhysicalPlan::MergeIntoSource(plan) => plan.input.output_schema(),

PhysicalPlan::MergeInto(plan) => Ok(plan.output_schema.clone()),
PhysicalPlan::AsyncSourcer(_)
| PhysicalPlan::MergeInto(_)
| PhysicalPlan::Deduplicate(_)
| PhysicalPlan::ReplaceInto(_)
| PhysicalPlan::MergeIntoRowIdApply(_)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub struct MergeInto {
pub field_index_of_input_schema: HashMap<FieldIndex, usize>,
pub row_id_idx: usize,
pub segments: Option<Vec<(usize, Location)>>,
pub output_schema: DataSchemaRef,
}

#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
Expand Down
2 changes: 2 additions & 0 deletions src/query/storages/fuse/src/operations/merge_into/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ pub use mutator::MatchedAggregator;
pub use processors::MatchedSplitProcessor;
pub use processors::MergeIntoNotMatchedProcessor;
pub use processors::MergeIntoSplitProcessor;
pub use processors::TransformDistributedMergeIntoBlockDeserialize;
pub use processors::TransformDistributedMergeIntoBlockSerialize;
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,14 @@
// See the License for the specific language governing permissions and
// limitations under the License.

mod processor_distributed_merge_into_block_deserialize;
mod processor_distributed_merge_into_block_serialize;
mod processor_merge_into_matched_and_split;
mod processor_merge_into_not_matched;
mod processor_merge_into_split;
mod transform_matched_mutation_aggregator;
pub use processor_distributed_merge_into_block_deserialize::TransformDistributedMergeIntoBlockDeserialize;
pub use processor_distributed_merge_into_block_serialize::TransformDistributedMergeIntoBlockSerialize;
pub use processor_merge_into_matched_and_split::MatchedSplitProcessor;
pub(crate) use processor_merge_into_matched_and_split::RowIdKind;
pub use processor_merge_into_not_matched::MergeIntoNotMatchedProcessor;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::Result;
use common_expression::BlockMetaInfoDowncast;
use common_expression::DataBlock;
use common_pipeline_core::pipe::Pipe;
use common_pipeline_core::pipe::PipeItem;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_transforms::processors::transforms::Transform;
use common_pipeline_transforms::processors::transforms::Transformer;

use super::processor_merge_into_matched_and_split::MixRowIdKindAndLog;

// It will recieve MutationLogs Or RowIds.
// But for MutationLogs, it's a empty block
// we will add a fake BlockEntry to make it consistent with
// RowIds, because arrow-flight requires this.
pub struct TransformDistributedMergeIntoBlockDeserialize;

impl TransformDistributedMergeIntoBlockDeserialize {
#[allow(dead_code)]
fn create(input: Arc<InputPort>, output: Arc<OutputPort>) -> ProcessorPtr {
ProcessorPtr::create(Transformer::create(
input,
output,
TransformDistributedMergeIntoBlockDeserialize {},
))
}

fn create_distributed_merge_into_transform_item() -> PipeItem {
let input = InputPort::create();
let output = OutputPort::create();
PipeItem::create(
TransformDistributedMergeIntoBlockDeserialize::create(input.clone(), output.clone()),
vec![input],
vec![output],
)
}

pub fn into_pipe() -> Pipe {
let pipe_item = Self::create_distributed_merge_into_transform_item();
Pipe::create(1, 1, vec![pipe_item])
}
}

#[async_trait::async_trait]
impl Transform for TransformDistributedMergeIntoBlockDeserialize {
const NAME: &'static str = "TransformDistributedMergeIntoBlockDeserialize";

fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
let kind = MixRowIdKindAndLog::downcast_ref_from(data.get_meta().unwrap()).unwrap();
let data_block = match kind {
MixRowIdKindAndLog::MutationLogs(logs) => DataBlock::new_with_meta(
data.columns().to_vec(),
data.num_rows(),
Some(Box::new(logs.clone())),
),
MixRowIdKindAndLog::RowIdKind(row_id_kind) => DataBlock::new_with_meta(
data.columns().to_vec(),
data.num_rows(),
Some(Box::new(row_id_kind.clone())),
),
};
Ok(data_block)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_exception::Result;
use common_expression::types::DataType;
use common_expression::types::NumberDataType::UInt64;
use common_expression::types::NumberType;
use common_expression::BlockEntry;
use common_expression::BlockMetaInfoDowncast;
use common_expression::DataBlock;
use common_expression::Value;
use common_pipeline_core::pipe::Pipe;
use common_pipeline_core::pipe::PipeItem;
use common_pipeline_core::processors::port::InputPort;
use common_pipeline_core::processors::port::OutputPort;
use common_pipeline_core::processors::processor::ProcessorPtr;
use common_pipeline_transforms::processors::transforms::Transform;
use common_pipeline_transforms::processors::transforms::Transformer;

use super::processor_merge_into_matched_and_split::MixRowIdKindAndLog;
use super::RowIdKind;
use crate::operations::common::MutationLogs;

// It will recieve MutationLogs Or RowIds.
// But for MutationLogs, it's a empty block
// we will add a fake BlockEntry to make it consistent with
// RowIds, because arrow-flight requires this.
pub struct TransformDistributedMergeIntoBlockSerialize;

impl TransformDistributedMergeIntoBlockSerialize {
#[allow(dead_code)]
fn create(input: Arc<InputPort>, output: Arc<OutputPort>) -> ProcessorPtr {
ProcessorPtr::create(Transformer::create(
input,
output,
TransformDistributedMergeIntoBlockSerialize {},
))
}

fn create_distributed_merge_into_transform_item() -> PipeItem {
let input = InputPort::create();
let output = OutputPort::create();
PipeItem::create(
TransformDistributedMergeIntoBlockSerialize::create(input.clone(), output.clone()),
vec![input],
vec![output],
)
}

pub fn into_pipe() -> Pipe {
let pipe_item = Self::create_distributed_merge_into_transform_item();
Pipe::create(1, 1, vec![pipe_item])
}
}

#[async_trait::async_trait]
impl Transform for TransformDistributedMergeIntoBlockSerialize {
const NAME: &'static str = "TransformDistributedMergeIntoBlockSerialize";

fn transform(&mut self, data: DataBlock) -> Result<DataBlock> {
// 1. MutationLogs
if data.is_empty() {
let scalar_value = Value::<NumberType<u64>>::Scalar(0);
let entry = BlockEntry::new(DataType::Number(UInt64), scalar_value.upcast());
let log = MutationLogs::try_from(data)?;
Ok(DataBlock::new_with_meta(
vec![entry],
1,
Some(Box::new(MixRowIdKindAndLog::MutationLogs(log))),
))
} else {
// RowIdKind
let row_id_kind = RowIdKind::downcast_ref_from(data.get_meta().unwrap()).unwrap();
Ok(DataBlock::new_with_meta(
data.columns().to_vec(),
1,
Some(Box::new(MixRowIdKindAndLog::RowIdKind(row_id_kind.clone()))),
))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use common_storage::metrics::merge_into::merge_into_matched_operation_millisecon
use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_counter;
use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_rows_counter;

use crate::operations::common::MutationLogs;
use crate::operations::merge_into::mutator::DeleteByExprMutator;
use crate::operations::merge_into::mutator::UpdateByExprMutator;

Expand All @@ -47,7 +48,24 @@ enum MutationKind {
Delete(DeleteDataBlockMutation),
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub enum MixRowIdKindAndLog {
MutationLogs(MutationLogs),
RowIdKind(RowIdKind),
}

#[typetag::serde(name = "mix_row_id_kind_and_log")]
impl BlockMetaInfo for MixRowIdKindAndLog {
fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
MixRowIdKindAndLog::downcast_ref_from(info).is_some_and(|other| self == other)
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
Box::new(self.clone())
}
}

#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
pub enum RowIdKind {
Update,
Delete,
Expand Down

0 comments on commit e4f7450

Please sign in to comment.