Skip to content

Commit

Permalink
add replace metrics (#13145)
Browse files Browse the repository at this point in the history
Co-authored-by: dantengsky <[email protected]>
  • Loading branch information
JackTan25 and dantengsky authored Oct 9, 2023
1 parent fd2c10b commit 0e09c51
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 2 deletions.
18 changes: 18 additions & 0 deletions src/common/storage/src/metrics/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ macro_rules! key {
lazy_static! {
static ref MERGE_INTO_REPLACE_BLOCKS_COUNTER: Counter =
register_counter(key!("merge_into_replace_blocks_counter"));
static ref MERGE_INTO_REPLACE_BLOCKS_ROWS_COUNTER: Counter =
register_counter(key!("merge_into_replace_blocks_rows_counter"));
static ref MERGE_INTO_DELETED_BLOCKS_COUNTER: Counter =
register_counter(key!("merge_into_deleted_blocks_counter"));
static ref MERGE_INTO_DELETED_BLOCKS_ROWS_COUNTER: Counter =
register_counter(key!("merge_into_deleted_blocks_rows_counter"));
static ref MERGE_INTO_APPEND_BLOCKS_COUNTER: Counter =
register_counter(key!("merge_into_append_blocks_counter"));
static ref MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER: Counter =
register_counter(key!("merge_into_append_blocks_rows_counter"));
static ref MERGE_INTO_MATCHED_ROWS: Counter = register_counter(key!("merge_into_matched_rows"));
static ref MERGE_INTO_UNMATCHED_ROWS: Counter =
register_counter(key!("merge_into_unmatched_rows"));
Expand All @@ -50,15 +56,27 @@ pub fn metrics_inc_merge_into_replace_blocks_counter(c: u32) {
MERGE_INTO_REPLACE_BLOCKS_COUNTER.inc_by(c as u64);
}

pub fn metrics_inc_merge_into_replace_blocks_rows_counter(c: u32) {
MERGE_INTO_REPLACE_BLOCKS_ROWS_COUNTER.inc_by(c as u64);
}

pub fn metrics_inc_merge_into_deleted_blocks_counter(c: u32) {
MERGE_INTO_DELETED_BLOCKS_COUNTER.inc_by(c as u64);
}

pub fn metrics_inc_merge_into_deleted_blocks_rows_counter(c: u32) {
MERGE_INTO_DELETED_BLOCKS_ROWS_COUNTER.inc_by(c as u64);
}

// used to record append new blocks in matched split and not match insert.
pub fn metrics_inc_merge_into_append_blocks_counter(c: u32) {
MERGE_INTO_APPEND_BLOCKS_COUNTER.inc_by(c as u64);
}

pub fn metrics_inc_merge_into_append_blocks_rows_counter(c: u32) {
MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER.inc_by(c as u64);
}

// matched_rows and not unmatched_rows is used in the join phase of merge_source.
pub fn metrics_inc_merge_into_matched_rows(c: u32) {
MERGE_INTO_MATCHED_ROWS.inc_by(c as u64);
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#![feature(impl_trait_in_assoc_type)]
#![feature(int_roundings)]
#![feature(result_option_inspect)]
#![recursion_limit = "256"]

mod constants;
mod fuse_lazy_part;
Expand Down
21 changes: 21 additions & 0 deletions src/query/storages/fuse/src/metrics/fuse_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ lazy_static! {
register_counter(key!("replace_into_block_number_bloom_pruned"));
static ref REPLACE_INTO_BLOCK_NUMBER_SOURCE: Counter =
register_counter(key!("replace_into_block_number_source"));
static ref REPLACE_INTO_REPLACED_BLOCKS_ROWS: Counter =
register_counter(key!("replace_into_replaced_blocks_rows"));
static ref REPLACE_INTO_DELETED_BLOCKS_ROWS: Counter =
register_counter(key!("replace_into_deleted_blocks_rows"));
static ref REPLACE_INTO_APPEND_BLOCKS_ROWS: Counter =
register_counter(key!("replace_into_append_blocks_rows"));
}

pub fn metrics_inc_commit_mutation_unresolvable_conflict() {
Expand Down Expand Up @@ -392,3 +398,18 @@ pub fn metrics_inc_replace_block_number_bloom_pruned(c: u64) {
pub fn metrics_inc_replace_block_number_input(c: u64) {
REPLACE_INTO_BLOCK_NUMBER_SOURCE.inc_by(c);
}

// rows of blocks that are replaced
pub fn metrics_inc_replace_replaced_blocks_rows(c: u64) {
REPLACE_INTO_REPLACED_BLOCKS_ROWS.inc_by(c);
}

// rows of blocks that are deleted
pub fn metrics_inc_replace_deleted_blocks_rows(c: u64) {
REPLACE_INTO_DELETED_BLOCKS_ROWS.inc_by(c);
}

// rows of blocks that are appended
pub fn metrics_inc_replace_append_blocks_rows(c: u64) {
REPLACE_INTO_APPEND_BLOCKS_ROWS.inc_by(c);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use common_expression::TableSchemaRef;
use common_storage::metrics::merge_into::metrics_inc_merge_into_accumulate_milliseconds;
use common_storage::metrics::merge_into::metrics_inc_merge_into_apply_milliseconds;
use common_storage::metrics::merge_into::metrics_inc_merge_into_deleted_blocks_counter;
use common_storage::metrics::merge_into::metrics_inc_merge_into_deleted_blocks_rows_counter;
use common_storage::metrics::merge_into::metrics_inc_merge_into_replace_blocks_counter;
use common_storage::metrics::merge_into::metrics_inc_merge_into_replace_blocks_rows_counter;
use itertools::Itertools;
use log::info;
use opendal::Operator;
Expand Down Expand Up @@ -290,10 +292,10 @@ impl AggregationContext {
&self.read_settings,
)
.await?;

let origin_num_rows = origin_data_block.num_rows();
// apply delete
let mut bitmap = MutableBitmap::new();
for row in 0..origin_data_block.num_rows() {
for row in 0..origin_num_rows {
if modified_offsets.contains(&row) {
bitmap.push(false);
} else {
Expand All @@ -304,6 +306,7 @@ impl AggregationContext {

if res_block.is_empty() {
metrics_inc_merge_into_deleted_blocks_counter(1);
metrics_inc_merge_into_deleted_blocks_rows_counter(origin_num_rows as u32);
return Ok(Some(MutationLogEntry::DeletedBlock {
index: BlockMetaIndex {
segment_idx,
Expand Down Expand Up @@ -338,6 +341,7 @@ impl AggregationContext {
write_data(new_block_raw_data, &data_accessor, &new_block_location).await?;

metrics_inc_merge_into_replace_blocks_counter(1);
metrics_inc_merge_into_replace_blocks_rows_counter(origin_num_rows as u32);
// generate log
let mutation = MutationLogEntry::ReplacedBlock {
index: BlockMetaIndex {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use common_sql::evaluator::BlockOperator;
use common_sql::executor::MatchExpr;
use common_storage::metrics::merge_into::merge_into_matched_operation_milliseconds;
use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_counter;
use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_rows_counter;

use crate::operations::merge_into::mutator::DeleteByExprMutator;
use crate::operations::merge_into::mutator::UpdateByExprMutator;
Expand Down Expand Up @@ -278,6 +279,7 @@ impl Processor for MatchedSplitProcessor {
};
current_block = op.execute(&self.ctx.get_function_context()?, current_block)?;
metrics_inc_merge_into_append_blocks_counter(1);
metrics_inc_merge_into_append_blocks_rows_counter(current_block.num_rows() as u32);
current_block =
current_block.add_meta(Some(Box::new(self.target_table_schema.clone())))?;
self.output_data_updated_data = Some(current_block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use common_pipeline_core::processors::Processor;
use common_sql::evaluator::BlockOperator;
use common_storage::metrics::merge_into::merge_into_not_matched_operation_milliseconds;
use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_counter;
use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_rows_counter;
use itertools::Itertools;

use crate::operations::merge_into::mutator::SplitByExprMutator;
Expand Down Expand Up @@ -161,6 +162,9 @@ impl Processor for MergeIntoNotMatchedProcessor {
.add_meta(Some(Box::new(self.data_schemas.get(&idx).unwrap().clone())))?;
if !satisfied_block.is_empty() {
metrics_inc_merge_into_append_blocks_counter(1);
metrics_inc_merge_into_append_blocks_rows_counter(
satisfied_block.num_rows() as u32
);
self.output_data
.push(op.op.execute(&self.func_ctx, satisfied_block)?)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ use crate::metrics::metrics_inc_replace_block_number_bloom_pruned;
use crate::metrics::metrics_inc_replace_block_number_totally_loaded;
use crate::metrics::metrics_inc_replace_block_number_write;
use crate::metrics::metrics_inc_replace_block_of_zero_row_deleted;
use crate::metrics::metrics_inc_replace_deleted_blocks_rows;
use crate::metrics::metrics_inc_replace_number_accumulated_merge_action;
use crate::metrics::metrics_inc_replace_number_apply_deletion;
use crate::metrics::metrics_inc_replace_replaced_blocks_rows;
use crate::metrics::metrics_inc_replace_row_number_after_pruning;
use crate::metrics::metrics_inc_replace_row_number_totally_loaded;
use crate::metrics::metrics_inc_replace_row_number_write;
Expand Down Expand Up @@ -463,6 +465,7 @@ impl AggregationContext {
if delete_nums == block_meta.row_count as usize {
info!("whole block deletion");
metrics_inc_replace_whole_block_deletion(1);
metrics_inc_replace_deleted_blocks_rows(num_rows as u64);
// whole block deletion
// NOTE that if deletion marker is enabled, check the real meaning of `row_count`
let mutation = MutationLogEntry::DeletedBlock {
Expand Down Expand Up @@ -535,6 +538,7 @@ impl AggregationContext {

metrics_inc_replace_block_number_write(1);
metrics_inc_replace_row_number_write(new_block_meta.row_count);
metrics_inc_replace_replaced_blocks_rows(num_rows as u64);
if let Some(index_state) = serialized.bloom_index_state {
write_data(index_state.data, &data_accessor, &index_state.location.0).await?;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use common_pipeline_core::processors::Processor;
use common_sql::executor::OnConflictField;
use storages_common_table_meta::meta::ColumnStatistics;

use crate::metrics::metrics_inc_replace_append_blocks_rows;
use crate::metrics::metrics_inc_replace_block_number_input;
use crate::metrics::metrics_inc_replace_process_input_block_time_ms;
use crate::operations::replace_into::mutator::mutator_replace_into::ReplaceIntoMutator;
Expand Down Expand Up @@ -173,6 +174,7 @@ impl Processor for ReplaceIntoProcessor {
self.output_data_merge_into_action =
Some(DataBlock::empty_with_meta(Box::new(merge_into_action)));
}
metrics_inc_replace_append_blocks_rows(data_block.num_rows() as u64);
self.output_data_append = Some(data_block);
return Ok(());
}
Expand Down

1 comment on commit 0e09c51

@vercel
Copy link

@vercel vercel bot commented on 0e09c51 Oct 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.