Skip to content

Commit

Permalink
Add metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Sep 27, 2023
1 parent 21c9878 commit af34496
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 21 deletions.
6 changes: 6 additions & 0 deletions src/query/storages/fuse/src/metrics/fuse_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ lazy_static! {
register_counter(key!("compact_block_read_bytes"));
static ref COMPACT_BLOCK_READ_MILLISECONDS: Histogram =
register_histogram_in_milliseconds(key!("compact_block_read_milliseconds"));
static ref COMPACT_BLOCK_BUILD_TASK_MILLISECONDS: Histogram =
register_histogram_in_milliseconds(key!("compact_block_build_task_milliseconds"));
static ref SEGMENTS_RANGE_PRUNING_BEFORE: Counter =
register_counter(key!("segments_range_pruning_before"));
static ref SEGMENTS_RANGE_PRUNING_AFTER: Counter =
Expand Down Expand Up @@ -236,6 +238,10 @@ pub fn metrics_inc_compact_block_read_milliseconds(c: u64) {
COMPACT_BLOCK_READ_MILLISECONDS.observe(c as f64);
}

pub fn metrics_inc_compact_block_build_task_milliseconds(c: u64) {
COMPACT_BLOCK_BUILD_TASK_MILLISECONDS.observe(c as f64);
}

/// Pruning metrics.
pub fn metrics_inc_segments_range_pruning_before(c: u64) {
SEGMENTS_RANGE_PRUNING_BEFORE.inc_by(c);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,11 @@ use std::collections::HashSet;
use std::collections::VecDeque;
use std::sync::Arc;
use std::time::Instant;
use std::time::SystemTime;
use std::vec;

use common_base::base::tokio::sync::OwnedSemaphorePermit;
use common_base::base::tokio::sync::Semaphore;
use common_base::runtime::GlobalIORuntime;
use common_base::runtime::Runtime;
use common_base::runtime::TrySpawn;
use common_catalog::plan::PartInfoPtr;
use common_catalog::plan::Partitions;
Expand All @@ -36,6 +35,7 @@ use storages_common_table_meta::meta::CompactSegmentInfo;
use storages_common_table_meta::meta::Statistics;

use crate::io::SegmentsIO;
use crate::metrics::metrics_inc_compact_block_build_task_milliseconds;
use crate::operations::common::BlockMetaIndex;
use crate::operations::mutation::compact::compact_part::CompactExtraInfo;
use crate::operations::mutation::compact::compact_part::CompactLazyPartInfo;
Expand Down Expand Up @@ -216,23 +216,14 @@ impl BlockCompactMutator {
thresholds: BlockThresholds,
mut lazy_parts: Vec<CompactLazyPartInfo>,
) -> Result<Vec<PartInfoPtr>> {
let start = SystemTime::now();
let max_concurrency = {
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;
// Prevent us from miss-configured max_storage_io_requests setting, e.g. 0
let v = std::cmp::max(max_io_requests, 10);
if v > max_io_requests {
log::warn!(
"max_storage_io_requests setting is too low {}, increased to {}",
max_io_requests,
v
)
}
v
};
let start = Instant::now();
let max_threads = ctx.get_settings().get_max_threads()? as usize;
let max_concurrency = std::cmp::max(max_threads * 2, 10);

// Pruning runtime.
let runtime = GlobalIORuntime::instance();
let runtime = Arc::new(Runtime::with_worker_threads(
max_threads,
Some("build-compact-worker".to_owned()),
)?);
let semaphore = Arc::new(Semaphore::new(max_concurrency));

let mut remain = lazy_parts.len() % max_concurrency;
Expand All @@ -249,6 +240,7 @@ impl BlockCompactMutator {

let batch = lazy_parts.drain(0..batch_size).collect::<Vec<_>>();
works.push(runtime.spawn(async_backtrace::location!().frame({
let runtime = runtime.clone();
async move {
let mut res = vec![];
for lazy_part in batch {
Expand All @@ -258,6 +250,7 @@ impl BlockCompactMutator {
.build_tasks(
lazy_part.segment_indices,
lazy_part.compact_segments,
runtime.clone(),
semaphore.clone(),
)
.await?;
Expand All @@ -279,16 +272,17 @@ impl BlockCompactMutator {
let res = worker?;
parts.extend(res);
}
let elapsed_time = SystemTime::now().duration_since(start).unwrap();
let elapsed_time = start.elapsed().as_millis() as u64;
// Status.
{
let status = format!(
"compact: end to build compact parts:{}, cost:{} ms",
parts.len(),
elapsed_time.as_millis(),
elapsed_time,
);
ctx.set_status_info(&status);
log::info!("{}", status);
metrics_inc_compact_block_build_task_milliseconds(elapsed_time);
}
Ok(parts)
}
Expand Down Expand Up @@ -492,6 +486,7 @@ impl CompactTaskBuilder {
&mut self,
segment_indices: Vec<usize>,
compact_segments: Vec<Arc<CompactSegmentInfo>>,
runtime: Arc<Runtime>,
semaphore: Arc<Semaphore>,
) -> Result<Vec<PartInfoPtr>> {
let mut block_idx = 0;
Expand All @@ -513,7 +508,6 @@ impl CompactTaskBuilder {
})
});

let runtime = GlobalIORuntime::instance();
let join_handlers = runtime
.try_spawn_batch_with_owned_semaphore(semaphore.clone(), tasks)
.await?;
Expand Down

0 comments on commit af34496

Please sign in to comment.