Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: replace max_io_request #12997

Merged
merged 10 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/query/ee/src/storages/fuse/io/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ where
}

// 1. Get all the snapshot by chunks, save all the segments location.
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;
let max_threads = ctx.get_settings().get_max_threads()? as usize;

let start = Instant::now();
let mut count = 1;
Expand All @@ -59,7 +59,7 @@ where
root_snapshot_lite.segments.iter().for_each(|location| {
segments.push(location.to_owned());
});
for chunk in snapshot_files.chunks(max_io_requests) {
for chunk in snapshot_files.chunks(max_threads) {
// Since we want to get all the snapshot referenced files, so set `ignore_timestamp` true
let results = snapshots_io
.read_snapshot_lite_extends(chunk, root_snapshot_lite.clone(), true)
Expand Down
6 changes: 3 additions & 3 deletions src/query/service/src/pipelines/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ impl PipelineBuilder {
self.main_pipeline.add_pipe(builder.finalize());
}

let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?;
let max_io_request = self.ctx.get_settings().get_max_threads()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize));

let pipe_items = vec![
Expand Down Expand Up @@ -606,7 +606,7 @@ impl PipelineBuilder {
.add_pipe(Pipe::create(1, segment_partition_num, vec![
broadcast_processor.into_pipe_item(),
]));
let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?;
let max_io_request = self.ctx.get_settings().get_max_threads()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize));

let merge_into_operation_aggregators = table.merge_into_mutators(
Expand Down Expand Up @@ -687,7 +687,7 @@ impl PipelineBuilder {
// setup the dummy transform
pipe_items.push(serialize_segment_transform.into_pipe_item());

let max_io_request = self.ctx.get_settings().get_max_storage_io_requests()?;
let max_io_request = self.ctx.get_settings().get_max_threads()?;
let io_request_semaphore = Arc::new(Semaphore::new(max_io_request as usize));

// setup the merge into operation aggregators
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,11 +388,10 @@ async fn generage_segments(
}

let threads_nums = ctx.get_settings().get_max_threads()? as usize;
let permit_nums = ctx.get_settings().get_max_storage_io_requests()? as usize;
execute_futures_in_parallel(
dantengsky marked this conversation as resolved.
Show resolved Hide resolved
tasks,
threads_nums,
permit_nums,
threads_nums,
"fuse-write-segments-worker".to_owned(),
)
.await?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,13 +659,13 @@ impl CompactSegmentTestFixture {

let schema = TestFixture::default_table_schema();
let fuse_segment_io = SegmentsIO::create(self.ctx.clone(), data_accessor.clone(), schema);
let max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
let max_theads = self.ctx.get_settings().get_max_threads()? as usize;

let segment_writer = SegmentWriter::new(data_accessor, location_gen);
let seg_acc = SegmentCompactor::new(
block_per_seg,
cluster_key_id,
max_io_requests,
max_theads,
&fuse_segment_io,
segment_writer.clone(),
);
Expand Down Expand Up @@ -705,7 +705,6 @@ impl CompactSegmentTestFixture {
let location_gen = TableMetaLocationGenerator::with_prefix("test/".to_owned());
let data_accessor = ctx.get_data_operator()?.operator();
let threads_nums = ctx.get_settings().get_max_threads()? as usize;
let permit_nums = ctx.get_settings().get_max_storage_io_requests()? as usize;

let mut tasks = vec![];
for (num_blocks, rows_per_block) in block_num_of_segments
Expand Down Expand Up @@ -788,7 +787,7 @@ impl CompactSegmentTestFixture {
let res = execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums,
"fuse-write-segments-worker".to_owned(),
)
.await?
Expand Down
4 changes: 2 additions & 2 deletions src/query/storages/fuse/src/io/files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ impl Files {
});

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums,
"batch-remove-files-worker".to_owned(),
)
.await?;
Expand Down
25 changes: 0 additions & 25 deletions src/query/storages/fuse/src/io/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,4 @@ impl SegmentsIO {
}
Ok(())
}

// TODO use batch_meta_writer
#[async_backtrace::framed]
pub async fn write_segments(&self, segments: Vec<SerializedSegment>) -> Result<()> {
let mut iter = segments.into_iter();
let tasks = std::iter::from_fn(move || {
iter.next().map(|segment| {
Self::write_segment(self.operator.clone(), segment)
.in_span(Span::enter_with_local_parent("write_segment"))
})
});

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
"write-segments-worker".to_owned(),
)
.await?
.into_iter()
.collect::<Result<Vec<_>>>()?;
Ok(())
}
}
14 changes: 7 additions & 7 deletions src/query/storages/fuse/src/io/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,11 @@ impl SnapshotsIO {
});

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums,
"fuse-req-snapshots-worker".to_owned(),
)
.await
Expand Down Expand Up @@ -173,12 +173,12 @@ impl SnapshotsIO {
}

// 1. Get all the snapshot by chunks.
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;
let max_threads = ctx.get_settings().get_max_threads()? as usize;
let mut snapshot_lites = Vec::with_capacity(snapshot_files.len());

let start = Instant::now();
let mut count = 0;
for chunk in snapshot_files.chunks(max_io_requests) {
for chunk in snapshot_files.chunks(max_threads) {
let results = self
.read_snapshot_lites(chunk, min_snapshot_timestamp)
.await?;
Expand Down Expand Up @@ -311,12 +311,12 @@ impl SnapshotsIO {
});

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
"fuse-req-snapshots-worker".to_owned(),
threads_nums,
"fuse-req-snsrc/query/storages/fuse/src/io/snapshots.rsmapshots-worker".to_owned(),
JackTan25 marked this conversation as resolved.
Show resolved Hide resolved
)
.await
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ impl TableMutationAggregator {

let mut replaced_segments = HashMap::new();
let mut merged_statistics = Statistics::default();
let chunk_size = self.ctx.get_settings().get_max_storage_io_requests()? as usize;
let chunk_size = self.ctx.get_settings().get_max_threads()? as usize;
let segment_indices = self.mutations.keys().cloned().collect::<Vec<_>>();
for chunk in segment_indices.chunks(chunk_size) {
let results = self.partial_apply(chunk.to_vec()).await?;
Expand Down Expand Up @@ -373,11 +373,11 @@ impl TableMutationAggregator {
}

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums,
"fuse-req-segments-worker".to_owned(),
)
.await?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ impl ReclusterAggregator {
}

let threads_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

execute_futures_in_parallel(
tasks,
threads_nums,
permit_nums,
threads_nums,
"fuse-write-segments-worker".to_owned(),
)
.await?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,11 @@ impl ReclusterMutator {
});

let thread_nums = self.ctx.get_settings().get_max_threads()? as usize;
let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize;

let blocks = execute_futures_in_parallel(
tasks,
thread_nums,
permit_nums,
thread_nums,
"convert-segments-worker".to_owned(),
)
.await?
Expand Down
10 changes: 5 additions & 5 deletions src/query/storages/fuse/src/operations/recluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,12 @@ impl FuseTable {
mut segment_locs: Vec<SegmentLocation>,
) -> Result<Vec<(SegmentLocation, Arc<CompactSegmentInfo>)>> {
let max_concurrency = {
let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize;
let v = std::cmp::max(max_io_requests, 10);
if v > max_io_requests {
let max_threads = ctx.get_settings().get_max_threads()? as usize;
let v = std::cmp::max(max_threads, 10);
if v > max_threads {
warn!(
"max_storage_io_requests setting is too low {}, increased to {}",
max_io_requests, v
"max_threads setting is too low {}, increased to {}",
max_threads, v
)
}
v
Expand Down