Skip to content

Commit

Permalink
replace max_io_request with max_threads
Browse files Browse the repository at this point in the history
  • Loading branch information
JackTan25 committed Sep 25, 2023
1 parent 36376b0 commit 17fe995
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/query/ee/src/storages/fuse/io/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ where
}

// 1. Get all the snapshot by chunks, save all the segments location.
let max_io_requests = ctx.get_settings().get_max_threads()? as usize;
let max_threads = ctx.get_settings().get_max_threads()? as usize;

let start = Instant::now();
let mut count = 1;
Expand All @@ -59,7 +59,7 @@ where
root_snapshot_lite.segments.iter().for_each(|location| {
segments.push(location.to_owned());
});
for chunk in snapshot_files.chunks(max_io_requests) {
for chunk in snapshot_files.chunks(max_threads) {
// Since we want to get all the snapshot referenced files, so set `ignore_timestamp` true
let results = snapshots_io
.read_snapshot_lite_extends(chunk, root_snapshot_lite.clone(), true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,13 @@ impl CompactSegmentTestFixture {

let schema = TestFixture::default_table_schema();
let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), data_accessor.clone(), schema);
let max_io_requests = self.ctx.get_settings().get_max_threads()? as usize;
let max_theads = self.ctx.get_settings().get_max_threads()? as usize;

let segment_writer = SegmentWriter::new(data_accessor, location_gen);
let seg_acc = SegmentCompactor::new(
block_per_seg,
cluster_key_id,
max_io_requests,
max_theads,
&fuse_segment_io,
segment_writer.clone(),
);
Expand Down
6 changes: 3 additions & 3 deletions src/query/storages/fuse/src/io/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ impl SnapshotsIO {
}

// 1. Get all the snapshot by chunks.
let max_io_requests = ctx.get_settings().get_max_threads()? as usize;
let max_threads = ctx.get_settings().get_max_threads()? as usize;
let mut snapshot_lites = Vec::with_capacity(snapshot_files.len());

let start = Instant::now();
let mut count = 0;
for chunk in snapshot_files.chunks(max_io_requests) {
for chunk in snapshot_files.chunks(max_threads) {
let results = self
.read_snapshot_lites(chunk, min_snapshot_timestamp)
.await?;
Expand Down Expand Up @@ -316,7 +316,7 @@ impl SnapshotsIO {
tasks,
threads_nums,
threads_nums,
"fuse-req-snapshots-worker".to_owned(),
"fuse-req-snsrc/query/storages/fuse/src/io/snapshots.rsmapshots-worker".to_owned(),
)
.await
}
Expand Down
10 changes: 5 additions & 5 deletions src/query/storages/fuse/src/operations/recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,12 @@ impl FuseTable {
mut segment_locs: Vec<SegmentLocation>,
) -> Result<Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>> {
let max_concurrency = {
let max_io_requests = ctx.get_settings().get_max_threads()? as usize;
let v = std::cmp::max(max_io_requests, 10);
if v > max_io_requests {
let max_threads = ctx.get_settings().get_max_threads()? as usize;
let v = std::cmp::max(max_threads, 10);
if v > max_threads {
warn!(
"max_storage_io_requests setting is too low {}, increased to {}",
max_io_requests, v
"max_threads setting is too low {}, increased to {}",
max_threads, v
)
}
v
Expand Down

0 comments on commit 17fe995

Please sign in to comment.