Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add replace metrics #13145

Merged
merged 6 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/common/storage/src/metrics/merge_into.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@ macro_rules! key {
lazy_static! {
static ref MERGE_INTO_REPLACE_BLOCKS_COUNTER: Counter =
register_counter(key!("merge_into_replace_blocks_counter"));
static ref MERGE_INTO_REPLACE_BLOCKS_ROWS_COUNTER: Counter =
register_counter(key!("merge_into_replace_blocks_rows_counter"));
static ref MERGE_INTO_DELETED_BLOCKS_COUNTER: Counter =
register_counter(key!("merge_into_deleted_blocks_counter"));
static ref MERGE_INTO_DELETED_BLOCKS_ROWS_COUNTER: Counter =
register_counter(key!("merge_into_deleted_blocks_rows_counter"));
static ref MERGE_INTO_APPEND_BLOCKS_COUNTER: Counter =
register_counter(key!("merge_into_append_blocks_counter"));
static ref MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER: Counter =
register_counter(key!("merge_into_append_blocks_rows_counter"));
static ref MERGE_INTO_MATCHED_ROWS: Counter = register_counter(key!("merge_into_matched_rows"));
static ref MERGE_INTO_UNMATCHED_ROWS: Counter =
register_counter(key!("merge_into_unmatched_rows"));
Expand All @@ -50,15 +56,27 @@ pub fn metrics_inc_merge_into_replace_blocks_counter(c: u32) {
MERGE_INTO_REPLACE_BLOCKS_COUNTER.inc_by(c as u64);
}

pub fn metrics_inc_merge_into_replace_blocks_rows_counter(c: u32) {
MERGE_INTO_REPLACE_BLOCKS_ROWS_COUNTER.inc_by(c as u64);
}

pub fn metrics_inc_merge_into_deleted_blocks_counter(c: u32) {
MERGE_INTO_DELETED_BLOCKS_COUNTER.inc_by(c as u64);
}

pub fn metrics_inc_merge_into_deleted_blocks_rows_counter(c: u32) {
MERGE_INTO_DELETED_BLOCKS_ROWS_COUNTER.inc_by(c as u64);
}

// used to record append new blocks in matched split and not match insert.
pub fn metrics_inc_merge_into_append_blocks_counter(c: u32) {
MERGE_INTO_APPEND_BLOCKS_COUNTER.inc_by(c as u64);
}

pub fn metrics_inc_merge_into_append_blocks_rows_counter(c: u32) {
MERGE_INTO_APPEND_BLOCKS_ROWS_COUNTER.inc_by(c as u64);
}

// matched_rows and not unmatched_rows is used in the join phase of merge_source.
pub fn metrics_inc_merge_into_matched_rows(c: u32) {
MERGE_INTO_MATCHED_ROWS.inc_by(c as u64);
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#![feature(impl_trait_in_assoc_type)]
#![feature(int_roundings)]
#![feature(result_option_inspect)]
#![recursion_limit = "256"]

mod constants;
mod fuse_lazy_part;
Expand Down
21 changes: 21 additions & 0 deletions src/query/storages/fuse/src/metrics/fuse_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,12 @@ lazy_static! {
register_counter(key!("replace_into_block_number_bloom_pruned"));
static ref REPLACE_INTO_BLOCK_NUMBER_SOURCE: Counter =
register_counter(key!("replace_into_block_number_source"));
static ref REPLACE_INTO_REPLACED_BLOCKS_ROWS: Counter =
register_counter(key!("replace_into_replaced_blocks_rows"));
static ref REPLACE_INTO_DELETED_BLOCKS_ROWS: Counter =
register_counter(key!("replace_into_deleted_blocks_rows"));
static ref REPLACE_INTO_APPEND_BLOCKS_ROWS: Counter =
register_counter(key!("replace_into_append_blocks_rows"));
}

pub fn metrics_inc_commit_mutation_unresolvable_conflict() {
Expand Down Expand Up @@ -392,3 +398,18 @@ pub fn metrics_inc_replace_block_number_bloom_pruned(c: u64) {
pub fn metrics_inc_replace_block_number_input(c: u64) {
REPLACE_INTO_BLOCK_NUMBER_SOURCE.inc_by(c);
}

// rows of blocks that are replaced
pub fn metrics_inc_replace_replaced_blocks_rows(c: u64) {
REPLACE_INTO_REPLACED_BLOCKS_ROWS.inc_by(c);
}

// rows of blocks that are deleted
pub fn metrics_inc_replace_deleted_blocks_rows(c: u64) {
REPLACE_INTO_DELETED_BLOCKS_ROWS.inc_by(c);
}

// rows of blocks that are appended
pub fn metrics_inc_replace_append_blocks_rows(c: u64) {
REPLACE_INTO_APPEND_BLOCKS_ROWS.inc_by(c);
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ use common_expression::TableSchemaRef;
use common_storage::metrics::merge_into::metrics_inc_merge_into_accumulate_milliseconds;
use common_storage::metrics::merge_into::metrics_inc_merge_into_apply_milliseconds;
use common_storage::metrics::merge_into::metrics_inc_merge_into_deleted_blocks_counter;
use common_storage::metrics::merge_into::metrics_inc_merge_into_deleted_blocks_rows_counter;
use common_storage::metrics::merge_into::metrics_inc_merge_into_replace_blocks_counter;
use common_storage::metrics::merge_into::metrics_inc_merge_into_replace_blocks_rows_counter;
use itertools::Itertools;
use log::info;
use opendal::Operator;
Expand Down Expand Up @@ -290,10 +292,10 @@ impl AggregationContext {
&self.read_settings,
)
.await?;

let origin_num_rows = origin_data_block.num_rows();
// apply delete
let mut bitmap = MutableBitmap::new();
for row in 0..origin_data_block.num_rows() {
for row in 0..origin_num_rows {
if modified_offsets.contains(&row) {
bitmap.push(false);
} else {
Expand All @@ -304,6 +306,7 @@ impl AggregationContext {

if res_block.is_empty() {
metrics_inc_merge_into_deleted_blocks_counter(1);
metrics_inc_merge_into_deleted_blocks_rows_counter(origin_num_rows as u32);
return Ok(Some(MutationLogEntry::DeletedBlock {
index: BlockMetaIndex {
segment_idx,
Expand Down Expand Up @@ -338,6 +341,7 @@ impl AggregationContext {
write_data(new_block_raw_data, &data_accessor, &new_block_location).await?;

metrics_inc_merge_into_replace_blocks_counter(1);
metrics_inc_merge_into_replace_blocks_rows_counter(origin_num_rows as u32);
// generate log
let mutation = MutationLogEntry::ReplacedBlock {
index: BlockMetaIndex {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use common_sql::evaluator::BlockOperator;
use common_sql::executor::MatchExpr;
use common_storage::metrics::merge_into::merge_into_matched_operation_milliseconds;
use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_counter;
use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_rows_counter;

use crate::operations::merge_into::mutator::DeleteByExprMutator;
use crate::operations::merge_into::mutator::UpdateByExprMutator;
Expand Down Expand Up @@ -278,6 +279,7 @@ impl Processor for MatchedSplitProcessor {
};
current_block = op.execute(&self.ctx.get_function_context()?, current_block)?;
metrics_inc_merge_into_append_blocks_counter(1);
metrics_inc_merge_into_append_blocks_rows_counter(current_block.num_rows() as u32);
current_block =
current_block.add_meta(Some(Box::new(self.target_table_schema.clone())))?;
self.output_data_updated_data = Some(current_block);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use common_pipeline_core::processors::Processor;
use common_sql::evaluator::BlockOperator;
use common_storage::metrics::merge_into::merge_into_not_matched_operation_milliseconds;
use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_counter;
use common_storage::metrics::merge_into::metrics_inc_merge_into_append_blocks_rows_counter;
use itertools::Itertools;

use crate::operations::merge_into::mutator::SplitByExprMutator;
Expand Down Expand Up @@ -161,6 +162,9 @@ impl Processor for MergeIntoNotMatchedProcessor {
.add_meta(Some(Box::new(self.data_schemas.get(&idx).unwrap().clone())))?;
if !satisfied_block.is_empty() {
metrics_inc_merge_into_append_blocks_counter(1);
metrics_inc_merge_into_append_blocks_rows_counter(
satisfied_block.num_rows() as u32
);
self.output_data
.push(op.op.execute(&self.func_ctx, satisfied_block)?)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ use crate::metrics::metrics_inc_replace_block_number_bloom_pruned;
use crate::metrics::metrics_inc_replace_block_number_totally_loaded;
use crate::metrics::metrics_inc_replace_block_number_write;
use crate::metrics::metrics_inc_replace_block_of_zero_row_deleted;
use crate::metrics::metrics_inc_replace_deleted_blocks_rows;
use crate::metrics::metrics_inc_replace_number_accumulated_merge_action;
use crate::metrics::metrics_inc_replace_number_apply_deletion;
use crate::metrics::metrics_inc_replace_replaced_blocks_rows;
use crate::metrics::metrics_inc_replace_row_number_after_pruning;
use crate::metrics::metrics_inc_replace_row_number_totally_loaded;
use crate::metrics::metrics_inc_replace_row_number_write;
Expand Down Expand Up @@ -463,6 +465,7 @@ impl AggregationContext {
if delete_nums == block_meta.row_count as usize {
info!("whole block deletion");
metrics_inc_replace_whole_block_deletion(1);
metrics_inc_replace_deleted_blocks_rows(num_rows as u64);
// whole block deletion
// NOTE that if deletion marker is enabled, check the real meaning of `row_count`
let mutation = MutationLogEntry::DeletedBlock {
Expand Down Expand Up @@ -535,6 +538,7 @@ impl AggregationContext {

metrics_inc_replace_block_number_write(1);
metrics_inc_replace_row_number_write(new_block_meta.row_count);
metrics_inc_replace_replaced_blocks_rows(num_rows as u64);
if let Some(index_state) = serialized.bloom_index_state {
write_data(index_state.data, &data_accessor, &index_state.location.0).await?;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use common_pipeline_core::processors::Processor;
use common_sql::executor::OnConflictField;
use storages_common_table_meta::meta::ColumnStatistics;

use crate::metrics::metrics_inc_replace_append_blocks_rows;
use crate::metrics::metrics_inc_replace_block_number_input;
use crate::metrics::metrics_inc_replace_process_input_block_time_ms;
use crate::operations::replace_into::mutator::mutator_replace_into::ReplaceIntoMutator;
Expand Down Expand Up @@ -173,6 +174,7 @@ impl Processor for ReplaceIntoProcessor {
self.output_data_merge_into_action =
Some(DataBlock::empty_with_meta(Box::new(merge_into_action)));
}
metrics_inc_replace_append_blocks_rows(data_block.num_rows() as u64);
self.output_data_append = Some(data_block);
return Ok(());
}
Expand Down
Loading