From 4ffd197aca86c384feabaa33478daeb0d37578fe Mon Sep 17 00:00:00 2001 From: JackTan25 <60096118+JackTan25@users.noreply.github.com> Date: Mon, 25 Sep 2023 17:48:25 +0800 Subject: [PATCH] chore: replace max_io_request (#12997) * replace max_io_request * fix check * replace max_io_request with max_threads * remove write_segments * fix typo * use factor 4 when execute_futures_in_parallel advised by zhyass * rename * rename * add factor * use factor as 2 --- .../ee/src/storages/fuse/io/snapshots.rs | 4 +-- .../service/src/pipelines/pipeline_builder.rs | 12 ++++----- .../operations/mutation/recluster_mutator.rs | 3 +-- .../mutation/segments_compact_mutator.rs | 7 +++--- src/query/storages/fuse/src/io/files.rs | 4 +-- src/query/storages/fuse/src/io/segments.rs | 25 ------------------- src/query/storages/fuse/src/io/snapshots.rs | 12 ++++----- .../transform_mutation_aggregator.rs | 6 ++--- .../mutation/recluster_aggregator.rs | 4 +-- .../operations/mutation/recluster_mutator.rs | 4 +-- .../storages/fuse/src/operations/recluster.rs | 12 ++++----- 11 files changed, 33 insertions(+), 60 deletions(-) diff --git a/src/query/ee/src/storages/fuse/io/snapshots.rs b/src/query/ee/src/storages/fuse/io/snapshots.rs index f5fafb9bec53..ad23ceeafd3b 100644 --- a/src/query/ee/src/storages/fuse/io/snapshots.rs +++ b/src/query/ee/src/storages/fuse/io/snapshots.rs @@ -49,7 +49,7 @@ where } // 1. Get all the snapshot by chunks, save all the segments location. - let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; + let max_threads = ctx.get_settings().get_max_threads()? as usize; let start = Instant::now(); let mut count = 1; @@ -59,7 +59,7 @@ where root_snapshot_lite.segments.iter().for_each(|location| { segments.push(location.to_owned()); }); - for chunk in snapshot_files.chunks(max_io_requests) { + for chunk in snapshot_files.chunks(max_threads) { // Since we want to get all the snapshot referenced files, so set `ignore_timestamp` true let results = snapshots_io .read_snapshot_lite_extends(chunk, root_snapshot_lite.clone(), true) diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index b7b912f989eb..57b8a65e2f2a 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -526,8 +526,8 @@ impl PipelineBuilder { self.main_pipeline.add_pipe(builder.finalize()); } - let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?; - let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize)); + let max_threads = self.ctx.get_settings().get_max_threads()?; + let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize)); let pipe_items = vec![ table.matched_mutator( @@ -606,8 +606,8 @@ impl PipelineBuilder { .add_pipe(Pipe::create(1, segment_partition_num, vec![ broadcast_processor.into_pipe_item(), ])); - let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?; - let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize)); + let max_threads = self.ctx.get_settings().get_max_threads()?; + let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize)); let merge_into_operation_aggregators = table.merge_into_mutators( self.ctx.clone(), @@ -687,8 +687,8 @@ impl PipelineBuilder { // setup the dummy transform pipe_items.push(serialize_segment_transform.into_pipe_item()); - let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?; - let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize)); + let max_threads = self.ctx.get_settings().get_max_threads()?; + let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize)); // setup the merge into operation aggregators let mut merge_into_operation_aggregators = table.merge_into_mutators( diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs index 6db6433e60c6..a81a3527d96d 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/recluster_mutator.rs @@ -388,11 +388,10 @@ async fn generage_segments( } let threads_nums = ctx.get_settings().get_max_threads()? as usize; - let permit_nums = ctx.get_settings().get_max_storage_io_requests()? as usize; execute_futures_in_parallel( tasks, threads_nums, - permit_nums, + threads_nums * 2, "fuse-write-segments-worker".to_owned(), ) .await? diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs index 9a34e3b548e7..0ffdb2bcaa2a 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/segments_compact_mutator.rs @@ -659,13 +659,13 @@ impl CompactSegmentTestFixture { let schema = TestFixture::default_table_schema(); let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), data_accessor.clone(), schema); - let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + let max_theads = self.ctx.get_settings().get_max_threads()? as usize; let segment_writer = SegmentWriter::new(data_accessor, location_gen); let seg_acc = SegmentCompactor::new( block_per_seg, cluster_key_id, - max_io_requests, + max_theads, &fuse_segment_io, segment_writer.clone(), ); @@ -705,7 +705,6 @@ impl CompactSegmentTestFixture { let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned()); let data_accessor = ctx.get_data_operator()?.operator(); let threads_nums = ctx.get_settings().get_max_threads()? as usize; - let permit_nums = ctx.get_settings().get_max_storage_io_requests()? as usize; let mut tasks = vec![]; for (num_blocks, rows_per_block) in block_num_of_segments @@ -788,7 +787,7 @@ impl CompactSegmentTestFixture { let res = execute_futures_in_parallel( tasks, threads_nums, - permit_nums, + threads_nums * 2, "fuse-write-segments-worker".to_owned(), ) .await? diff --git a/src/query/storages/fuse/src/io/files.rs b/src/query/storages/fuse/src/io/files.rs index d161c4d08900..6a62d4120c5c 100644 --- a/src/query/storages/fuse/src/io/files.rs +++ b/src/query/storages/fuse/src/io/files.rs @@ -54,11 +54,11 @@ impl Files { }); let threads_nums = self.ctx.get_settings().get_max_threads()? as usize; - let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + execute_futures_in_parallel( tasks, threads_nums, - permit_nums, + threads_nums * 2, "batch-remove-files-worker".to_owned(), ) .await?; diff --git a/src/query/storages/fuse/src/io/segments.rs b/src/query/storages/fuse/src/io/segments.rs index 2d5b40cec313..a8989149d5f8 100644 --- a/src/query/storages/fuse/src/io/segments.rs +++ b/src/query/storages/fuse/src/io/segments.rs @@ -130,29 +130,4 @@ impl SegmentsIO { } Ok(()) } - - // TODO use batch_meta_writer - #[async_backtrace::framed] - pub async fn write_segments(&self, segments: Vec) -> Result<()> { - let mut iter = segments.into_iter(); - let tasks = std::iter::from_fn(move || { - iter.next().map(|segment| { - Self::write_segment(self.operator.clone(), segment) - .in_span(Span::enter_with_local_parent("write_segment")) - }) - }); - - let threads_nums = self.ctx.get_settings().get_max_threads()? as usize; - let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize; - execute_futures_in_parallel( - tasks, - threads_nums, - permit_nums, - "write-segments-worker".to_owned(), - ) - .await? - .into_iter() - .collect::>>()?; - Ok(()) - } } diff --git a/src/query/storages/fuse/src/io/snapshots.rs b/src/query/storages/fuse/src/io/snapshots.rs index c2b14087ae0a..b0ea1e487188 100644 --- a/src/query/storages/fuse/src/io/snapshots.rs +++ b/src/query/storages/fuse/src/io/snapshots.rs @@ -141,11 +141,11 @@ impl SnapshotsIO { }); let threads_nums = self.ctx.get_settings().get_max_threads()? as usize; - let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + execute_futures_in_parallel( tasks, threads_nums, - permit_nums, + threads_nums * 2, "fuse-req-snapshots-worker".to_owned(), ) .await @@ -173,12 +173,12 @@ impl SnapshotsIO { } // 1. Get all the snapshot by chunks. - let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; + let max_threads = ctx.get_settings().get_max_threads()? as usize; let mut snapshot_lites = Vec::with_capacity(snapshot_files.len()); let start = Instant::now(); let mut count = 0; - for chunk in snapshot_files.chunks(max_io_requests) { + for chunk in snapshot_files.chunks(max_threads) { let results = self .read_snapshot_lites(chunk, min_snapshot_timestamp) .await?; @@ -311,11 +311,11 @@ impl SnapshotsIO { }); let threads_nums = self.ctx.get_settings().get_max_threads()? as usize; - let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + execute_futures_in_parallel( tasks, threads_nums, - permit_nums, + threads_nums * 2, "fuse-req-snapshots-worker".to_owned(), ) .await diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs index e073d2510eb0..b648d643aaef 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs @@ -229,7 +229,7 @@ impl TableMutationAggregator { let mut replaced_segments = HashMap::new(); let mut merged_statistics = Statistics::default(); - let chunk_size = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + let chunk_size = self.ctx.get_settings().get_max_threads()? as usize; let segment_indices = self.mutations.keys().cloned().collect::>(); for chunk in segment_indices.chunks(chunk_size) { let results = self.partial_apply(chunk.to_vec()).await?; @@ -373,11 +373,11 @@ impl TableMutationAggregator { } let threads_nums = self.ctx.get_settings().get_max_threads()? as usize; - let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + execute_futures_in_parallel( tasks, threads_nums, - permit_nums, + threads_nums * 2, "fuse-req-segments-worker".to_owned(), ) .await? diff --git a/src/query/storages/fuse/src/operations/mutation/recluster_aggregator.rs b/src/query/storages/fuse/src/operations/mutation/recluster_aggregator.rs index c9e9de1a6bd9..926017dd320a 100644 --- a/src/query/storages/fuse/src/operations/mutation/recluster_aggregator.rs +++ b/src/query/storages/fuse/src/operations/mutation/recluster_aggregator.rs @@ -193,11 +193,11 @@ impl ReclusterAggregator { } let threads_nums = self.ctx.get_settings().get_max_threads()? as usize; - let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + execute_futures_in_parallel( tasks, threads_nums, - permit_nums, + threads_nums * 2, "fuse-write-segments-worker".to_owned(), ) .await? diff --git a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs index c37e49abd69f..a6335d49f9e5 100644 --- a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs @@ -265,11 +265,11 @@ impl ReclusterMutator { }); let thread_nums = self.ctx.get_settings().get_max_threads()? as usize; - let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + let blocks = execute_futures_in_parallel( tasks, thread_nums, - permit_nums, + thread_nums * 2, "convert-segments-worker".to_owned(), ) .await? diff --git a/src/query/storages/fuse/src/operations/recluster.rs b/src/query/storages/fuse/src/operations/recluster.rs index 363fa953c77a..f3136b3a634b 100644 --- a/src/query/storages/fuse/src/operations/recluster.rs +++ b/src/query/storages/fuse/src/operations/recluster.rs @@ -138,7 +138,7 @@ impl FuseTable { let selected_segs = ReclusterMutator::select_segments( &compact_segments, block_per_seg, - max_threads * 4, + max_threads * 2, default_cluster_key_id, )?; // select the blocks with the highest depth. @@ -328,12 +328,12 @@ impl FuseTable { mut segment_locs: Vec, ) -> Result)>> { let max_concurrency = { - let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; - let v = std::cmp::max(max_io_requests, 10); - if v > max_io_requests { + let max_threads = ctx.get_settings().get_max_threads()? as usize; + let v = std::cmp::max(max_threads, 10); + if v > max_threads { warn!( - "max_storage_io_requests setting is too low {}, increased to {}", - max_io_requests, v + "max_threads setting is too low {}, increased to {}", + max_threads, v ) } v