From fd2c10b5915df44a0f2c1a2ec3286985b1eacfd0 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Mon, 9 Oct 2023 21:12:46 +0800 Subject: [PATCH 1/2] refactor: using column name instead of column id for agg index schema (#13125) * using column name instead of column id for agg index schema * fix reviewer comments * fix tests --- .../interpreters/interpreter_index_refresh.rs | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_index_refresh.rs b/src/query/service/src/interpreters/interpreter_index_refresh.rs index 568cb4082cf1..b3ffc3921442 100644 --- a/src/query/service/src/interpreters/interpreter_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_index_refresh.rs @@ -21,9 +21,11 @@ use common_catalog::plan::Partitions; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; use common_exception::Result; -use common_expression::infer_table_schema; +use common_expression::infer_schema_type; use common_expression::DataField; use common_expression::DataSchemaRefExt; +use common_expression::TableField; +use common_expression::TableSchema; use common_expression::BLOCK_NAME_COL_NAME; use common_license::license::Feature; use common_license::license_manager::get_license_manager; @@ -324,10 +326,23 @@ impl Interpreter for RefreshIndexInterpreter { })?; let block_name_offset = output_schema.index_of(&block_name_col.index.to_string())?; + let fields = output_schema + .fields() + .iter() + .map(|f| { + let pos = select_columns + .iter() + .find(|col| col.index.to_string().eq_ignore_ascii_case(f.name())) + .ok_or_else(|| ErrorCode::Internal("should find the corresponding column"))?; + let field_type = infer_schema_type(f.data_type())?; + Ok(TableField::new(&pos.column_name, field_type)) + }) + .collect::>>()?; + // Build the final sink schema. - let mut sink_schema = infer_table_schema(&output_schema)?.as_ref().clone(); + let mut sink_schema = TableSchema::new(fields); if !self.plan.user_defined_block_name { - sink_schema.drop_column(&block_name_col.index.to_string())?; + sink_schema.drop_column(&block_name_col.column_name)?; } let sink_schema = Arc::new(sink_schema); From 0e09c51f7c6edcf84aadde3d5cbaee79c04ef528 Mon Sep 17 00:00:00 2001 From: JackTan25 <60096118+JackTan25@users.noreply.github.com> Date: Mon, 9 Oct 2023 23:11:58 +0800 Subject: [PATCH 2/2] add replace metrics (#13145) Co-authored-by: dantengsky --- src/common/storage/src/metrics/merge_into.rs | 18 ++++++++++++++++ src/query/storages/fuse/src/lib.rs | 1 + .../storages/fuse/src/metrics/fuse_metrics.rs | 21 +++++++++++++++++++ .../merge_into/mutator/matched_mutator.rs | 8 +++++-- .../processor_merge_into_matched_and_split.rs | 2 ++ .../processor_merge_into_not_matched.rs | 4 ++++ .../mutator/merge_into_mutator.rs | 4 ++++ .../processors/processor_replace_into.rs | 2 ++ 8 files changed, 58 insertions(+), 2 deletions(-) diff --git a/src/common/storage/src/metrics/merge_into.rs b/src/common/storage/src/metrics/merge_into.rs index cd1f6a690bf1..c36f452fbef5 100644 --- a/src/common/storage/src/metrics/merge_into.rs +++ b/src/common/storage/src/metrics/merge_into.rs @@ -27,10 +27,16 @@ macro_rules! key { lazy_static! { static ref MERGE_INTO_REPLACE_BLOCKS_COUNTER: Counter = register_counter(key!("merge_into_replace_blocks_counter")); + static ref MERGE_INTO_REPLACE_BLOCKS_ROWS_COUNTER: Counter = + register_counter(key!("merge_into_replace_blocks_rows_counter")); static ref MERGE_INTO_DELETED_BLOCKS_COUNTER: Counter = register_counter(key!("merge_into_deleted_blocks_counter")); + static ref MERGE_INTO_DELETED_BLOCKS_ROWS_COUNTER: Counter = + register_counter(key!("merge_into_deleted_blocks_rows_counter")); static ref MERGE_INTO_APPEND_BLOCKS_COUNTER: Counter = register_counter(key!("merge_into_append_blocks_counter")); + static ref MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER: Counter = + register_counter(key!("merge_into_append_blocks_rows_counter")); static ref MERGE_INTO_MATCHED_ROWS: Counter = register_counter(key!("merge_into_matched_rows")); static ref MERGE_INTO_UNMATCHED_ROWS: Counter = register_counter(key!("merge_into_unmatched_rows")); @@ -50,15 +56,27 @@ pub fn metrics_inc_merge_into_replace_blocks_counter(c: u32) { MERGE_INTO_REPLACE_BLOCKS_COUNTER.inc_by(c as u64); } +pub fn metrics_inc_merge_into_replace_blocks_rows_counter(c: u32) { + MERGE_INTO_REPLACE_BLOCKS_ROWS_COUNTER.inc_by(c as u64); +} + pub fn metrics_inc_merge_into_deleted_blocks_counter(c: u32) { MERGE_INTO_DELETED_BLOCKS_COUNTER.inc_by(c as u64); } +pub fn metrics_inc_merge_into_deleted_blocks_rows_counter(c: u32) { + MERGE_INTO_DELETED_BLOCKS_ROWS_COUNTER.inc_by(c as u64); +} + // used to record append new blocks in matched split and not match insert. pub fn metrics_inc_merge_into_append_blocks_counter(c: u32) { MERGE_INTO_APPEND_BLOCKS_COUNTER.inc_by(c as u64); } +pub fn metrics_inc_merge_into_append_blocks_rows_counter(c: u32) { + MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER.inc_by(c as u64); +} + // matched_rows and not unmatched_rows is used in the join phase of merge_source. pub fn metrics_inc_merge_into_matched_rows(c: u32) { MERGE_INTO_MATCHED_ROWS.inc_by(c as u64); diff --git a/src/query/storages/fuse/src/lib.rs b/src/query/storages/fuse/src/lib.rs index 14b86d0dacd7..1713771bc085 100644 --- a/src/query/storages/fuse/src/lib.rs +++ b/src/query/storages/fuse/src/lib.rs @@ -20,6 +20,7 @@ #![feature(impl_trait_in_assoc_type)] #![feature(int_roundings)] #![feature(result_option_inspect)] +#![recursion_limit = "256"] mod constants; mod fuse_lazy_part; diff --git a/src/query/storages/fuse/src/metrics/fuse_metrics.rs b/src/query/storages/fuse/src/metrics/fuse_metrics.rs index 2ec2ab9e0d15..2ff3c9511231 100644 --- a/src/query/storages/fuse/src/metrics/fuse_metrics.rs +++ b/src/query/storages/fuse/src/metrics/fuse_metrics.rs @@ -134,6 +134,12 @@ lazy_static! { register_counter(key!("replace_into_block_number_bloom_pruned")); static ref REPLACE_INTO_BLOCK_NUMBER_SOURCE: Counter = register_counter(key!("replace_into_block_number_source")); + static ref REPLACE_INTO_REPLACED_BLOCKS_ROWS: Counter = + register_counter(key!("replace_into_replaced_blocks_rows")); + static ref REPLACE_INTO_DELETED_BLOCKS_ROWS: Counter = + register_counter(key!("replace_into_deleted_blocks_rows")); + static ref REPLACE_INTO_APPEND_BLOCKS_ROWS: Counter = + register_counter(key!("replace_into_append_blocks_rows")); } pub fn metrics_inc_commit_mutation_unresolvable_conflict() { @@ -392,3 +398,18 @@ pub fn metrics_inc_replace_block_number_bloom_pruned(c: u64) { pub fn metrics_inc_replace_block_number_input(c: u64) { REPLACE_INTO_BLOCK_NUMBER_SOURCE.inc_by(c); } + +// rows of blocks that are replaced +pub fn metrics_inc_replace_replaced_blocks_rows(c: u64) { + REPLACE_INTO_REPLACED_BLOCKS_ROWS.inc_by(c); +} + +// rows of blocks that are deleted +pub fn metrics_inc_replace_deleted_blocks_rows(c: u64) { + REPLACE_INTO_DELETED_BLOCKS_ROWS.inc_by(c); +} + +// rows of blocks that are appended +pub fn metrics_inc_replace_append_blocks_rows(c: u64) { + REPLACE_INTO_APPEND_BLOCKS_ROWS.inc_by(c); +} diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index b5477b16b70f..05704c975bff 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -39,7 +39,9 @@ use common_expression::TableSchemaRef; use common_storage::metrics::merge_into::metrics_inc_merge_into_accumulate_milliseconds; use common_storage::metrics::merge_into::metrics_inc_merge_into_apply_milliseconds; use common_storage::metrics::merge_into::metrics_inc_merge_into_deleted_blocks_counter; +use common_storage::metrics::merge_into::metrics_inc_merge_into_deleted_blocks_rows_counter; use common_storage::metrics::merge_into::metrics_inc_merge_into_replace_blocks_counter; +use common_storage::metrics::merge_into::metrics_inc_merge_into_replace_blocks_rows_counter; use itertools::Itertools; use log::info; use opendal::Operator; @@ -290,10 +292,10 @@ impl AggregationContext { &self.read_settings, ) .await?; - + let origin_num_rows = origin_data_block.num_rows(); // apply delete let mut bitmap = MutableBitmap::new(); - for row in 0..origin_data_block.num_rows() { + for row in 0..origin_num_rows { if modified_offsets.contains(&row) { bitmap.push(false); } else { @@ -304,6 +306,7 @@ impl AggregationContext { if res_block.is_empty() { metrics_inc_merge_into_deleted_blocks_counter(1); + metrics_inc_merge_into_deleted_blocks_rows_counter(origin_num_rows as u32); return Ok(Some(MutationLogEntry::DeletedBlock { index: BlockMetaIndex { segment_idx, @@ -338,6 +341,7 @@ impl AggregationContext { write_data(new_block_raw_data, &data_accessor, &new_block_location).await?; metrics_inc_merge_into_replace_blocks_counter(1); + metrics_inc_merge_into_replace_blocks_rows_counter(origin_num_rows as u32); // generate log let mutation = MutationLogEntry::ReplacedBlock { index: BlockMetaIndex { diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs index 658b8f1b5455..1bbdc2b8f1a7 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_matched_and_split.rs @@ -37,6 +37,7 @@ use common_sql::evaluator::BlockOperator; use common_sql::executor::MatchExpr; use common_storage::metrics::merge_into::merge_into_matched_operation_milliseconds; use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_counter; +use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_rows_counter; use crate::operations::merge_into::mutator::DeleteByExprMutator; use crate::operations::merge_into::mutator::UpdateByExprMutator; @@ -278,6 +279,7 @@ impl Processor for MatchedSplitProcessor { }; current_block = op.execute(&self.ctx.get_function_context()?, current_block)?; metrics_inc_merge_into_append_blocks_counter(1); + metrics_inc_merge_into_append_blocks_rows_counter(current_block.num_rows() as u32); current_block = current_block.add_meta(Some(Box::new(self.target_table_schema.clone())))?; self.output_data_updated_data = Some(current_block); diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs index f75d92ef1d36..5816eaeda10d 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/processor_merge_into_not_matched.rs @@ -33,6 +33,7 @@ use common_pipeline_core::processors::Processor; use common_sql::evaluator::BlockOperator; use common_storage::metrics::merge_into::merge_into_not_matched_operation_milliseconds; use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_counter; +use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_rows_counter; use itertools::Itertools; use crate::operations::merge_into::mutator::SplitByExprMutator; @@ -161,6 +162,9 @@ impl Processor for MergeIntoNotMatchedProcessor { .add_meta(Some(Box::new(self.data_schemas.get(&idx).unwrap().clone())))?; if !satisfied_block.is_empty() { metrics_inc_merge_into_append_blocks_counter(1); + metrics_inc_merge_into_append_blocks_rows_counter( + satisfied_block.num_rows() as u32 + ); self.output_data .push(op.op.execute(&self.func_ctx, satisfied_block)?) } diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs index 9fa8d0f691af..66752b93d3b7 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/merge_into_mutator.rs @@ -62,8 +62,10 @@ use crate::metrics::metrics_inc_replace_block_number_bloom_pruned; use crate::metrics::metrics_inc_replace_block_number_totally_loaded; use crate::metrics::metrics_inc_replace_block_number_write; use crate::metrics::metrics_inc_replace_block_of_zero_row_deleted; +use crate::metrics::metrics_inc_replace_deleted_blocks_rows; use crate::metrics::metrics_inc_replace_number_accumulated_merge_action; use crate::metrics::metrics_inc_replace_number_apply_deletion; +use crate::metrics::metrics_inc_replace_replaced_blocks_rows; use crate::metrics::metrics_inc_replace_row_number_after_pruning; use crate::metrics::metrics_inc_replace_row_number_totally_loaded; use crate::metrics::metrics_inc_replace_row_number_write; @@ -463,6 +465,7 @@ impl AggregationContext { if delete_nums == block_meta.row_count as usize { info!("whole block deletion"); metrics_inc_replace_whole_block_deletion(1); + metrics_inc_replace_deleted_blocks_rows(num_rows as u64); // whole block deletion // NOTE that if deletion marker is enabled, check the real meaning of `row_count` let mutation = MutationLogEntry::DeletedBlock { @@ -535,6 +538,7 @@ impl AggregationContext { metrics_inc_replace_block_number_write(1); metrics_inc_replace_row_number_write(new_block_meta.row_count); + metrics_inc_replace_replaced_blocks_rows(num_rows as u64); if let Some(index_state) = serialized.bloom_index_state { write_data(index_state.data, &data_accessor, &index_state.location.0).await?; } diff --git a/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs b/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs index ed98b09342be..3775356fbc86 100644 --- a/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs +++ b/src/query/storages/fuse/src/operations/replace_into/processors/processor_replace_into.rs @@ -34,6 +34,7 @@ use common_pipeline_core::processors::Processor; use common_sql::executor::OnConflictField; use storages_common_table_meta::meta::ColumnStatistics; +use crate::metrics::metrics_inc_replace_append_blocks_rows; use crate::metrics::metrics_inc_replace_block_number_input; use crate::metrics::metrics_inc_replace_process_input_block_time_ms; use crate::operations::replace_into::mutator::mutator_replace_into::ReplaceIntoMutator; @@ -173,6 +174,7 @@ impl Processor for ReplaceIntoProcessor { self.output_data_merge_into_action = Some(DataBlock::empty_with_meta(Box::new(merge_into_action))); } + metrics_inc_replace_append_blocks_rows(data_block.num_rows() as u64); self.output_data_append = Some(data_block); return Ok(()); }