Skip to content

Commit

Permalink
chore: replace max_io_request (#12997)
Browse files Browse the repository at this point in the history
* replace max_io_request

* fix check

* replace max_io_request with max_threads

* remove write_segments

* fix typo

* use factor 4 when execute_futures_in_parallel advised by zhyass

* rename

* rename

* add factor

* use factor as 2
  • Loading branch information
JackTan25 authored Sep 25, 2023
1 parent d8d06be commit 4ffd197
Show file tree
Hide file tree
Showing 11 changed files with 33 additions and 60 deletions.
4 changes: 2 additions & 2 deletions src/query/ee/src/storages/fuse/io/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ where
}

// 1. Get all the snapshot by chunks, save all the segments location.
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;
let max_threads = ctx.get_settings().get_max_threads()? as usize;

let start = Instant::now();
let mut count = 1;
Expand All @@ -59,7 +59,7 @@ where
root_snapshot_lite.segments.iter().for_each(|location| {
segments.push(location.to_owned());
});
for chunk in snapshot_files.chunks(max_io_requests) {
for chunk in snapshot_files.chunks(max_threads) {
// Since we want to get all the snapshot referenced files, so set `ignore_timestamp` true
let results = snapshots_io
.read_snapshot_lite_extends(chunk, root_snapshot_lite.clone(), true)
Expand Down
12 changes: 6 additions & 6 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,8 +526,8 @@ impl PipelineBuilder {
self.main_pipeline.add_pipe(builder.finalize());
}

let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize));
let max_threads = self.ctx.get_settings().get_max_threads()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize));

let pipe_items = vec![
table.matched_mutator(
Expand Down Expand Up @@ -606,8 +606,8 @@ impl PipelineBuilder {
.add_pipe(Pipe::create(1, segment_partition_num, vec![
broadcast_processor.into_pipe_item(),
]));
let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize));
let max_threads = self.ctx.get_settings().get_max_threads()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize));

let merge_into_operation_aggregators = table.merge_into_mutators(
self.ctx.clone(),
Expand Down Expand Up @@ -687,8 +687,8 @@ impl PipelineBuilder {
// setup the dummy transform
pipe_items.push(serialize_segment_transform.into_pipe_item());

let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize));
let max_threads = self.ctx.get_settings().get_max_threads()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_threads as usize));

// setup the merge into operation aggregators
let mut merge_into_operation_aggregators = table.merge_into_mutators(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,10 @@ async fn generage_segments(
}

let threads_nums = ctx.get_settings().get_max_threads()? as usize;
let permit_nums = ctx.get_settings().get_max_storage_io_requests()? as usize;
execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums * 2,
"fuse-write-segments-worker".to_owned(),
)
.await?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,13 @@ impl CompactSegmentTestFixture {

let schema = TestFixture::default_table_schema();
let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), data_accessor.clone(), schema);
let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
let max_theads = self.ctx.get_settings().get_max_threads()? as usize;

let segment_writer = SegmentWriter::new(data_accessor, location_gen);
let seg_acc = SegmentCompactor::new(
block_per_seg,
cluster_key_id,
max_io_requests,
max_theads,
&fuse_segment_io,
segment_writer.clone(),
);
Expand Down Expand Up @@ -705,7 +705,6 @@ impl CompactSegmentTestFixture {
let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned());
let data_accessor = ctx.get_data_operator()?.operator();
let threads_nums = ctx.get_settings().get_max_threads()? as usize;
let permit_nums = ctx.get_settings().get_max_storage_io_requests()? as usize;

let mut tasks = vec![];
for (num_blocks, rows_per_block) in block_num_of_segments
Expand Down Expand Up @@ -788,7 +787,7 @@ impl CompactSegmentTestFixture {
let res = execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums * 2,
"fuse-write-segments-worker".to_owned(),
)
.await?
Expand Down
4 changes: 2 additions & 2 deletions src/query/storages/fuse/src/io/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ impl Files {
});

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums * 2,
"batch-remove-files-worker".to_owned(),
)
.await?;
Expand Down
25 changes: 0 additions & 25 deletions src/query/storages/fuse/src/io/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,4 @@ impl SegmentsIO {
}
Ok(())
}

// TODO use batch_meta_writer
#[async_backtrace::framed]
pub async fn write_segments(&self, segments: Vec<SerializedSegment>) -> Result<()> {
let mut iter = segments.into_iter();
let tasks = std::iter::from_fn(move || {
iter.next().map(|segment| {
Self::write_segment(self.operator.clone(), segment)
.in_span(Span::enter_with_local_parent("write_segment"))
})
});

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
"write-segments-worker".to_owned(),
)
.await?
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}
}
12 changes: 6 additions & 6 deletions src/query/storages/fuse/src/io/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ impl SnapshotsIO {
});

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums * 2,
"fuse-req-snapshots-worker".to_owned(),
)
.await
Expand Down Expand Up @@ -173,12 +173,12 @@ impl SnapshotsIO {
}

// 1. Get all the snapshot by chunks.
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;
let max_threads = ctx.get_settings().get_max_threads()? as usize;
let mut snapshot_lites = Vec::with_capacity(snapshot_files.len());

let start = Instant::now();
let mut count = 0;
for chunk in snapshot_files.chunks(max_io_requests) {
for chunk in snapshot_files.chunks(max_threads) {
let results = self
.read_snapshot_lites(chunk, min_snapshot_timestamp)
.await?;
Expand Down Expand Up @@ -311,11 +311,11 @@ impl SnapshotsIO {
});

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums * 2,
"fuse-req-snapshots-worker".to_owned(),
)
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl TableMutationAggregator {

let mut replaced_segments = HashMap::new();
let mut merged_statistics = Statistics::default();
let chunk_size = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
let chunk_size = self.ctx.get_settings().get_max_threads()? as usize;
let segment_indices = self.mutations.keys().cloned().collect::<Vec<_>>();
for chunk in segment_indices.chunks(chunk_size) {
let results = self.partial_apply(chunk.to_vec()).await?;
Expand Down Expand Up @@ -373,11 +373,11 @@ impl TableMutationAggregator {
}

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums * 2,
"fuse-req-segments-worker".to_owned(),
)
.await?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ impl ReclusterAggregator {
}

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums * 2,
"fuse-write-segments-worker".to_owned(),
)
.await?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,11 @@ impl ReclusterMutator {
});

let thread_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

let blocks = execute_futures_in_parallel(
tasks,
thread_nums,
permit_nums,
thread_nums * 2,
"convert-segments-worker".to_owned(),
)
.await?
Expand Down
12 changes: 6 additions & 6 deletions src/query/storages/fuse/src/operations/recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ impl FuseTable {
let selected_segs = ReclusterMutator::select_segments(
&compact_segments,
block_per_seg,
max_threads * 4,
max_threads * 2,
default_cluster_key_id,
)?;
// select the blocks with the highest depth.
Expand Down Expand Up @@ -328,12 +328,12 @@ impl FuseTable {
mut segment_locs: Vec<SegmentLocation>,
) -> Result<Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>> {
let max_concurrency = {
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;
let v = std::cmp::max(max_io_requests, 10);
if v > max_io_requests {
let max_threads = ctx.get_settings().get_max_threads()? as usize;
let v = std::cmp::max(max_threads, 10);
if v > max_threads {
warn!(
"max_storage_io_requests setting is too low {}, increased to {}",
max_io_requests, v
"max_threads setting is too low {}, increased to {}",
max_threads, v
)
}
v
Expand Down

1 comment on commit 4ffd197

@vercel
Copy link

@vercel vercel bot commented on 4ffd197 Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.