Skip to content

Commit

Permalink
fix: performance degrade
Browse files Browse the repository at this point in the history
  • Loading branch information
dqhl76 committed Oct 31, 2024
1 parent b2948fe commit ff07208
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 22 deletions.
4 changes: 2 additions & 2 deletions src/query/storages/fuse/src/operations/read/fuse_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,14 @@ pub fn build_fuse_parquet_source_pipeline(

match is_lazy {
true => {
let (pipe, source_senders) = build_lazy_source(max_threads, ctx.clone())?;
let (pipe, source_senders) = build_lazy_source(max_io_requests, ctx.clone())?;
senders.extend(source_senders);
pipeline.add_pipe(pipe);
}
false => {
let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize;
let pipe = build_block_source(
max_threads,
max_io_requests,
partitions.clone(),
batch_size,
ctx.clone(),
Expand Down
43 changes: 24 additions & 19 deletions src/query/storages/fuse/src/operations/read_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,29 +218,34 @@ impl FuseTable {
let table_schema = self.schema_with_stream();
let push_downs = plan.push_downs.clone();

GlobalIORuntime::instance().try_spawn(async move {
match table
.prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0)
.await
{
Ok((_, partitions)) => {
let sender_size = senders.len();
for (i, part) in partitions.partitions.into_iter().enumerate() {
senders[i % sender_size]
.send(Ok(part))
GlobalIORuntime::instance().try_spawn(
async move {
match table
.prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0)
.await
{
Ok((_, partitions)) => {
let sender_size = senders.len();
for (i, part) in partitions.partitions.into_iter().enumerate() {
senders[i % sender_size]
.send(Ok(part))
.await
.map_err(|_e| {
ErrorCode::Internal("Send partition meta failed")
})?;
}
}
Err(err) => {
senders[0]
.send(Err(err))
.await
.map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?;
}
}
Err(err) => {
senders[0]
.send(Err(err))
.await
.map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?;
}
}
Ok::<_, ErrorCode>(())
})?;
Ok::<_, ErrorCode>(())
},
Some(String::from("PruneSnapshot")),
)?;
}

Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ explain pipeline select a from t1 ignore_result
----
EmptySink × 1
NativeDeserializeDataTransform × 1
SyncReadNativeDataSource × 1
SyncReadNativeDataTransform × 1
BlockPartitionSource × 1


statement ok
Expand Down

0 comments on commit ff07208

Please sign in to comment.