diff --git a/src/query/storages/fuse/src/metrics/fuse_metrics.rs b/src/query/storages/fuse/src/metrics/fuse_metrics.rs index 2f772fec64fc9..2ec2ab9e0d15f 100644 --- a/src/query/storages/fuse/src/metrics/fuse_metrics.rs +++ b/src/query/storages/fuse/src/metrics/fuse_metrics.rs @@ -61,6 +61,8 @@ lazy_static! { register_counter(key!("compact_block_read_bytes")); static ref COMPACT_BLOCK_READ_MILLISECONDS: Histogram = register_histogram_in_milliseconds(key!("compact_block_read_milliseconds")); + static ref COMPACT_BLOCK_BUILD_TASK_MILLISECONDS: Histogram = + register_histogram_in_milliseconds(key!("compact_block_build_task_milliseconds")); static ref SEGMENTS_RANGE_PRUNING_BEFORE: Counter = register_counter(key!("segments_range_pruning_before")); static ref SEGMENTS_RANGE_PRUNING_AFTER: Counter = @@ -236,6 +238,10 @@ pub fn metrics_inc_compact_block_read_milliseconds(c: u64) { COMPACT_BLOCK_READ_MILLISECONDS.observe(c as f64); } +pub fn metrics_inc_compact_block_build_task_milliseconds(c: u64) { + COMPACT_BLOCK_BUILD_TASK_MILLISECONDS.observe(c as f64); +} + /// Pruning metrics. pub fn metrics_inc_segments_range_pruning_before(c: u64) { SEGMENTS_RANGE_PRUNING_BEFORE.inc_by(c); diff --git a/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs index 3e2eb2a8b3a51..b05f3044c7152 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs @@ -16,12 +16,11 @@ use std::collections::HashSet; use std::collections::VecDeque; use std::sync::Arc; use std::time::Instant; -use std::time::SystemTime; use std::vec; use common_base::base::tokio::sync::OwnedSemaphorePermit; use common_base::base::tokio::sync::Semaphore; -use common_base::runtime::GlobalIORuntime; +use common_base::runtime::Runtime; use common_base::runtime::TrySpawn; use common_catalog::plan::PartInfoPtr; use common_catalog::plan::Partitions; @@ -36,6 +35,7 @@ use storages_common_table_meta::meta::CompactSegmentInfo; use storages_common_table_meta::meta::Statistics; use crate::io::SegmentsIO; +use crate::metrics::metrics_inc_compact_block_build_task_milliseconds; use crate::operations::common::BlockMetaIndex; use crate::operations::mutation::compact::compact_part::CompactExtraInfo; use crate::operations::mutation::compact::compact_part::CompactLazyPartInfo; @@ -216,23 +216,14 @@ impl BlockCompactMutator { thresholds: BlockThresholds, mut lazy_parts: Vec, ) -> Result> { - let start = SystemTime::now(); - let max_concurrency = { - let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; - // Prevent us from miss-configured max_storage_io_requests setting, e.g. 0 - let v = std::cmp::max(max_io_requests, 10); - if v > max_io_requests { - log::warn!( - "max_storage_io_requests setting is too low {}, increased to {}", - max_io_requests, - v - ) - } - v - }; + let start = Instant::now(); + let max_threads = ctx.get_settings().get_max_threads()? as usize; + let max_concurrency = std::cmp::max(max_threads * 2, 10); - // Pruning runtime. - let runtime = GlobalIORuntime::instance(); + let runtime = Arc::new(Runtime::with_worker_threads( + max_threads, + Some("build-compact-worker".to_owned()), + )?); let semaphore = Arc::new(Semaphore::new(max_concurrency)); let mut remain = lazy_parts.len() % max_concurrency; @@ -249,6 +240,7 @@ impl BlockCompactMutator { let batch = lazy_parts.drain(0..batch_size).collect::>(); works.push(runtime.spawn(async_backtrace::location!().frame({ + let runtime = runtime.clone(); async move { let mut res = vec![]; for lazy_part in batch { @@ -258,6 +250,7 @@ impl BlockCompactMutator { .build_tasks( lazy_part.segment_indices, lazy_part.compact_segments, + runtime.clone(), semaphore.clone(), ) .await?; @@ -279,16 +272,17 @@ impl BlockCompactMutator { let res = worker?; parts.extend(res); } - let elapsed_time = SystemTime::now().duration_since(start).unwrap(); + let elapsed_time = start.elapsed().as_millis() as u64; // Status. { let status = format!( "compact: end to build compact parts:{}, cost:{} ms", parts.len(), - elapsed_time.as_millis(), + elapsed_time, ); ctx.set_status_info(&status); log::info!("{}", status); + metrics_inc_compact_block_build_task_milliseconds(elapsed_time); } Ok(parts) } @@ -492,6 +486,7 @@ impl CompactTaskBuilder { &mut self, segment_indices: Vec, compact_segments: Vec>, + runtime: Arc, semaphore: Arc, ) -> Result> { let mut block_idx = 0; @@ -513,7 +508,6 @@ impl CompactTaskBuilder { }) }); - let runtime = GlobalIORuntime::instance(); let join_handlers = runtime .try_spawn_batch_with_owned_semaphore(semaphore.clone(), tasks) .await?;