From ff07208072abbe4f3ce6f418e7b9eda2a0874ee0 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Thu, 31 Oct 2024 16:37:23 +0800 Subject: [PATCH] fix: performance degrade --- .../fuse/src/operations/read/fuse_source.rs | 4 +- .../storages/fuse/src/operations/read_data.rs | 43 +++++++++++-------- .../explain_native/explain_pipeline.test | 3 +- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index 9306c2963a6e..3bd7b22b01de 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -212,14 +212,14 @@ pub fn build_fuse_parquet_source_pipeline( match is_lazy { true => { - let (pipe, source_senders) = build_lazy_source(max_threads, ctx.clone())?; + let (pipe, source_senders) = build_lazy_source(max_io_requests, ctx.clone())?; senders.extend(source_senders); pipeline.add_pipe(pipe); } false => { let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; let pipe = build_block_source( - max_threads, + max_io_requests, partitions.clone(), batch_size, ctx.clone(), diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 46e11cfe0caf..9b562fd51287 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -218,29 +218,34 @@ impl FuseTable { let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); - GlobalIORuntime::instance().try_spawn(async move { - match table - .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) - .await - { - Ok((_, partitions)) => { - let sender_size = senders.len(); - for (i, part) in partitions.partitions.into_iter().enumerate() { - senders[i % sender_size] - .send(Ok(part)) + GlobalIORuntime::instance().try_spawn( + async move { + match table + .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) + .await + { + Ok((_, partitions)) => { + let sender_size = senders.len(); + for (i, part) in partitions.partitions.into_iter().enumerate() { + senders[i % sender_size] + .send(Ok(part)) + .await + .map_err(|_e| { + ErrorCode::Internal("Send partition meta failed") + })?; + } + } + Err(err) => { + senders[0] + .send(Err(err)) .await .map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?; } } - Err(err) => { - senders[0] - .send(Err(err)) - .await - .map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?; - } - } - Ok::<_, ErrorCode>(()) - })?; + Ok::<_, ErrorCode>(()) + }, + Some(String::from("PruneSnapshot")), + )?; } Ok(()) diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test b/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test index 35208a8df0ad..929a46c09531 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test @@ -17,7 +17,8 @@ explain pipeline select a from t1 ignore_result ---- EmptySink × 1 NativeDeserializeDataTransform × 1 - SyncReadNativeDataSource × 1 + SyncReadNativeDataTransform × 1 + BlockPartitionSource × 1 statement ok