From a1bcbea0e667aa87a4ba36a33d07ade0f9d8678c Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Mon, 28 Oct 2024 15:58:57 +0800 Subject: [PATCH 01/11] save --- src/query/catalog/src/plan/partition.rs | 36 +- .../operations/read/block_partition_meta.rs | 63 ++++ .../read/block_partition_receiver_source.rs | 61 ++++ .../operations/read/block_partition_source.rs | 60 ++++ .../fuse/src/operations/read/fuse_source.rs | 231 +++++++++---- .../storages/fuse/src/operations/read/mod.rs | 10 +- .../read/native_data_source_reader.rs | 316 ----------------- .../read/native_data_transform_reader.rs | 276 +++++++++++++++ .../read/parquet_data_source_reader.rs | 325 ------------------ .../read/parquet_data_transform_reader.rs | 295 ++++++++++++++++ .../storages/fuse/src/operations/read_data.rs | 64 ++-- 11 files changed, 954 insertions(+), 783 deletions(-) create mode 100644 src/query/storages/fuse/src/operations/read/block_partition_meta.rs create mode 100644 src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs create mode 100644 src/query/storages/fuse/src/operations/read/block_partition_source.rs delete mode 100644 src/query/storages/fuse/src/operations/read/native_data_source_reader.rs create mode 100644 src/query/storages/fuse/src/operations/read/native_data_transform_reader.rs delete mode 100644 src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs create mode 100644 src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs diff --git a/src/query/catalog/src/plan/partition.rs b/src/query/catalog/src/plan/partition.rs index 2c0f8ffe6f59..3278c22d3123 100644 --- a/src/query/catalog/src/plan/partition.rs +++ b/src/query/catalog/src/plan/partition.rs @@ -301,37 +301,10 @@ impl StealablePartitions { self.disable_steal = true; } - pub fn steal_one(&self, idx: usize) -> Option { + pub fn steal(&self, idx: usize, max_size: usize) -> Option> { let mut partitions = self.partitions.write(); if partitions.is_empty() { - return self.ctx.get_partition(); - } - - let idx = if idx >= partitions.len() { - idx % partitions.len() - } else { - idx - }; - - for step in 0..partitions.len() { - let index = (idx + step) % partitions.len(); - if !partitions[index].is_empty() { - return partitions[index].pop_front(); - } - - if self.disable_steal { - break; - } - } - - drop(partitions); - self.ctx.get_partition() - } - - pub fn steal(&self, idx: usize, max_size: usize) -> Vec { - let mut partitions = self.partitions.write(); - if partitions.is_empty() { - return self.ctx.get_partitions(max_size); + return None; } let idx = if idx >= partitions.len() { @@ -346,7 +319,7 @@ impl StealablePartitions { if !partitions[index].is_empty() { let ps = &mut partitions[index]; let size = ps.len().min(max_size); - return ps.drain(..size).collect(); + return Some(ps.drain(..size).collect()); } if self.disable_steal { @@ -355,7 +328,8 @@ impl StealablePartitions { } drop(partitions); - self.ctx.get_partitions(max_size) + + None } } diff --git a/src/query/storages/fuse/src/operations/read/block_partition_meta.rs b/src/query/storages/fuse/src/operations/read/block_partition_meta.rs new file mode 100644 index 000000000000..f38379276d65 --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/block_partition_meta.rs @@ -0,0 +1,63 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; + +use databend_common_catalog::plan::PartInfoPtr; +use databend_common_expression::BlockMetaInfo; +use databend_common_expression::BlockMetaInfoPtr; + +pub struct BlockPartitionMeta { + pub part_ptr: Vec, +} + +impl BlockPartitionMeta { + pub fn create(part_ptr: Vec) -> BlockMetaInfoPtr { + Box::new(BlockPartitionMeta { part_ptr }) + } +} + +impl Debug for BlockPartitionMeta { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlockPartitionMeta") + .field("part_ptr", &self.part_ptr) + .finish() + } +} + +#[typetag::serde(name = "block_partition_meta")] +impl BlockMetaInfo for BlockPartitionMeta { + fn equals(&self, _info: &Box) -> bool { + unimplemented!("Unimplemented equals CompactSegmentMeta") + } + + fn clone_self(&self) -> Box { + unimplemented!("Unimplemented clone_self CompactSegmentMeta") + } +} + +impl serde::Serialize for BlockPartitionMeta { + fn serialize(&self, _: S) -> std::result::Result + where S: serde::Serializer { + unimplemented!("Unimplemented serialize CompactSegmentMeta") + } +} + +impl<'de> serde::Deserialize<'de> for BlockPartitionMeta { + fn deserialize(_: D) -> std::result::Result + where D: serde::Deserializer<'de> { + unimplemented!("Unimplemented deserialize CompactSegmentMeta") + } +} diff --git a/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs new file mode 100644 index 000000000000..fe3f940c2ea0 --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs @@ -0,0 +1,61 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_channel::Receiver; +use databend_common_catalog::plan::PartInfoPtr; +use databend_common_catalog::table_context::TableContext; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; +use log::info; + +use crate::operations::read::block_partition_meta::BlockPartitionMeta; + +pub struct BlockPartitionReceiverSource { + pub meta_receiver: Receiver, +} + +impl BlockPartitionReceiverSource { + pub fn create( + ctx: Arc, + receiver: Receiver, + output_port: Arc, + ) -> databend_common_exception::Result { + AsyncSourcer::create(ctx, output_port, Self { + meta_receiver: receiver, + }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for BlockPartitionReceiverSource { + const NAME: &'static str = "BlockPartitionReceiverSource"; + + #[async_backtrace::framed] + async fn generate(&mut self) -> databend_common_exception::Result> { + match self.meta_receiver.recv().await { + Ok(part) => Ok(Some(DataBlock::empty_with_meta( + BlockPartitionMeta::create(vec![part]), + ))), + Err(_) => { + // The channel is closed, we should return None to stop generating + Ok(None) + } + } + } +} diff --git a/src/query/storages/fuse/src/operations/read/block_partition_source.rs b/src/query/storages/fuse/src/operations/read/block_partition_source.rs new file mode 100644 index 000000000000..42b9ac41e48e --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/block_partition_source.rs @@ -0,0 +1,60 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::plan::StealablePartitions; +use databend_common_catalog::table_context::TableContext; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_sources::SyncSource; +use databend_common_pipeline_sources::SyncSourcer; + +use crate::operations::read::block_partition_meta::BlockPartitionMeta; + +pub struct BlockPartitionSource { + id: usize, + partitions: StealablePartitions, + max_batch_size: usize, +} + +impl BlockPartitionSource { + pub fn create( + id: usize, + partitions: StealablePartitions, + max_batch_size: usize, + ctx: Arc, + output_port: Arc, + ) -> databend_common_exception::Result { + SyncSourcer::create(ctx, output_port, BlockPartitionSource { + id, + partitions, + max_batch_size, + }) + } +} + +impl SyncSource for BlockPartitionSource { + const NAME: &'static str = "BlockPartitionSource"; + + fn generate(&mut self) -> databend_common_exception::Result> { + match self.partitions.steal(self.id, self.max_batch_size) { + None => Ok(None), + Some(parts) => Ok(Some(DataBlock::empty_with_meta( + BlockPartitionMeta::create(parts), + ))), + } + } +} diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index 814e62378c82..b1f58e7fe07b 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -15,6 +15,7 @@ use std::collections::VecDeque; use std::sync::Arc; +use async_channel::Sender; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::PartInfoType; @@ -24,6 +25,7 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::TableSchema; use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::Pipe; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_core::SourcePipeBuilder; use log::info; @@ -32,10 +34,12 @@ use crate::fuse_part::FuseBlockPartInfo; use crate::io::AggIndexReader; use crate::io::BlockReader; use crate::io::VirtualColumnReader; +use crate::operations::read::block_partition_receiver_source::BlockPartitionReceiverSource; +use crate::operations::read::block_partition_source::BlockPartitionSource; +use crate::operations::read::native_data_transform_reader::ReadNativeDataTransform; +use crate::operations::read::parquet_data_transform_reader::ReadParquetDataTransform; use crate::operations::read::DeserializeDataTransform; use crate::operations::read::NativeDeserializeDataTransform; -use crate::operations::read::ReadNativeDataSource; -use crate::operations::read::ReadParquetDataSource; #[allow(clippy::too_many_arguments)] pub fn build_fuse_native_source_pipeline( @@ -49,7 +53,8 @@ pub fn build_fuse_native_source_pipeline( mut max_io_requests: usize, index_reader: Arc>, virtual_reader: Arc>, -) -> Result<()> { + is_lazy: bool, +) -> Result>> { (max_threads, max_io_requests) = adjust_threads_and_request(true, max_threads, max_io_requests, plan); @@ -57,8 +62,7 @@ pub fn build_fuse_native_source_pipeline( max_threads = max_threads.min(16); max_io_requests = max_io_requests.min(16); } - - let mut source_builder = SourcePipeBuilder::create(); + let mut senders = vec![]; match block_reader.support_blocking_api() { true => { @@ -68,25 +72,29 @@ pub fn build_fuse_native_source_pipeline( if topk.is_some() { partitions.disable_steal(); } - - for i in 0..max_threads { - let output = OutputPort::create(); - source_builder.add_source( - output.clone(), - ReadNativeDataSource::::create( - i, - plan.table_index, - ctx.clone(), - table_schema.clone(), - output, - block_reader.clone(), - partitions.clone(), - index_reader.clone(), - virtual_reader.clone(), - )?, - ); + match is_lazy { + true => { + let (pipe, source_senders) = build_lazy_source(max_threads, ctx.clone())?; + senders.extend(source_senders); + pipeline.add_pipe(pipe); + } + false => { + let pipe = build_block_source(max_threads, partitions.clone(), 1, ctx.clone())?; + pipeline.add_pipe(pipe); + } } - pipeline.add_pipe(source_builder.finalize()); + pipeline.add_transform(|input, output| { + ReadNativeDataTransform::::create( + plan.table_index, + ctx.clone(), + table_schema.clone(), + block_reader.clone(), + index_reader.clone(), + virtual_reader.clone(), + input, + output, + ) + })?; } false => { let partitions = dispatch_partitions(ctx.clone(), plan, max_io_requests); @@ -96,24 +104,37 @@ pub fn build_fuse_native_source_pipeline( partitions.disable_steal(); } - for i in 0..max_io_requests { - let output = OutputPort::create(); - source_builder.add_source( - output.clone(), - ReadNativeDataSource::::create( - i, - plan.table_index, - ctx.clone(), - table_schema.clone(), - output, - block_reader.clone(), + match is_lazy { + true => { + let (pipe, source_senders) = build_lazy_source(max_io_requests, ctx.clone())?; + senders.extend(source_senders); + pipeline.add_pipe(pipe); + } + false => { + let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; + let pipe = build_block_source( + max_io_requests, partitions.clone(), - index_reader.clone(), - virtual_reader.clone(), - )?, - ); + batch_size, + ctx.clone(), + )?; + pipeline.add_pipe(pipe); + } } - pipeline.add_pipe(source_builder.finalize()); + + pipeline.add_transform(|input, output| { + ReadNativeDataTransform::::create( + plan.table_index, + ctx.clone(), + table_schema.clone(), + block_reader.clone(), + index_reader.clone(), + virtual_reader.clone(), + input, + output, + ) + })?; + pipeline.try_resize(max_threads)?; } }; @@ -131,7 +152,9 @@ pub fn build_fuse_native_source_pipeline( ) })?; - pipeline.try_resize(max_threads) + pipeline.try_resize(max_threads)?; + + Ok(senders) } #[allow(clippy::too_many_arguments)] @@ -145,35 +168,41 @@ pub fn build_fuse_parquet_source_pipeline( mut max_io_requests: usize, index_reader: Arc>, virtual_reader: Arc>, -) -> Result<()> { + is_lazy: bool, +) -> Result>> { (max_threads, max_io_requests) = adjust_threads_and_request(false, max_threads, max_io_requests, plan); - let mut source_builder = SourcePipeBuilder::create(); + let mut senders = vec![]; match block_reader.support_blocking_api() { true => { let partitions = dispatch_partitions(ctx.clone(), plan, max_threads); let partitions = StealablePartitions::new(partitions, ctx.clone()); - for i in 0..max_threads { - let output = OutputPort::create(); - source_builder.add_source( - output.clone(), - ReadParquetDataSource::::create( - i, - plan.table_index, - ctx.clone(), - table_schema.clone(), - output, - block_reader.clone(), - partitions.clone(), - index_reader.clone(), - virtual_reader.clone(), - )?, - ); + match is_lazy { + true => { + let (pipe, source_senders) = build_lazy_source(max_threads, ctx.clone())?; + senders.extend(source_senders); + pipeline.add_pipe(pipe); + } + false => { + let pipe = build_block_source(max_threads, partitions.clone(), 1, ctx.clone())?; + pipeline.add_pipe(pipe); + } } - pipeline.add_pipe(source_builder.finalize()); + pipeline.add_transform(|input, output| { + ReadParquetDataTransform::::create( + plan.table_index, + ctx.clone(), + table_schema.clone(), + block_reader.clone(), + index_reader.clone(), + virtual_reader.clone(), + input, + output, + ) + })?; } false => { info!("read block data adjust max io requests:{}", max_io_requests); @@ -181,24 +210,37 @@ pub fn build_fuse_parquet_source_pipeline( let partitions = dispatch_partitions(ctx.clone(), plan, max_io_requests); let partitions = StealablePartitions::new(partitions, ctx.clone()); - for i in 0..max_io_requests { - let output = OutputPort::create(); - source_builder.add_source( - output.clone(), - ReadParquetDataSource::::create( - i, - plan.table_index, - ctx.clone(), - table_schema.clone(), - output, - block_reader.clone(), + match is_lazy { + true => { + let (pipe, source_senders) = build_lazy_source(max_threads, ctx.clone())?; + senders.extend(source_senders); + pipeline.add_pipe(pipe); + } + false => { + let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; + let pipe = build_block_source( + max_threads, partitions.clone(), - index_reader.clone(), - virtual_reader.clone(), - )?, - ); + batch_size, + ctx.clone(), + )?; + pipeline.add_pipe(pipe); + } } - pipeline.add_pipe(source_builder.finalize()); + + pipeline.add_transform(|input, output| { + ReadParquetDataTransform::::create( + plan.table_index, + ctx.clone(), + table_schema.clone(), + block_reader.clone(), + index_reader.clone(), + virtual_reader.clone(), + input, + output, + ) + })?; + pipeline.try_resize(std::cmp::min(max_threads, max_io_requests))?; info!( @@ -219,7 +261,9 @@ pub fn build_fuse_parquet_source_pipeline( index_reader.clone(), virtual_reader.clone(), ) - }) + })?; + + Ok(senders) } pub fn dispatch_partitions( @@ -294,3 +338,38 @@ pub fn adjust_threads_and_request( } (max_threads, max_io_requests) } + +pub fn build_lazy_source( + max_threads: usize, + ctx: Arc, +) -> Result<(Pipe, Vec>)> { + let mut source_builder = SourcePipeBuilder::create(); + let mut senders = Vec::with_capacity(max_threads); + for _i in 0..max_threads { + let (tx, rx) = async_channel::bounded(1); + let output = OutputPort::create(); + source_builder.add_source( + output.clone(), + BlockPartitionReceiverSource::create(ctx.clone(), rx, output)?, + ); + senders.push(tx); + } + Ok((source_builder.finalize(), senders)) +} + +pub fn build_block_source( + max_threads: usize, + partitions: StealablePartitions, + max_batch: usize, + ctx: Arc, +) -> Result { + let mut source_builder = SourcePipeBuilder::create(); + for i in 0..max_threads { + let output = OutputPort::create(); + source_builder.add_source( + output.clone(), + BlockPartitionSource::create(i, partitions.clone(), max_batch, ctx.clone(), output)?, + ) + } + Ok(source_builder.finalize()) +} diff --git a/src/query/storages/fuse/src/operations/read/mod.rs b/src/query/storages/fuse/src/operations/read/mod.rs index dd27f3c0b4e4..26494de63cbe 100644 --- a/src/query/storages/fuse/src/operations/read/mod.rs +++ b/src/query/storages/fuse/src/operations/read/mod.rs @@ -16,20 +16,22 @@ mod fuse_rows_fetcher; pub mod fuse_source; mod native_data_source; mod native_data_source_deserializer; -mod native_data_source_reader; +mod native_data_transform_reader; mod native_rows_fetcher; mod parquet_data_source; mod parquet_data_source_deserializer; -mod parquet_data_source_reader; +mod parquet_data_transform_reader; mod parquet_rows_fetcher; mod runtime_filter_prunner; +mod block_partition_meta; +mod block_partition_receiver_source; +mod block_partition_source; mod data_source_with_meta; mod util; + pub use fuse_rows_fetcher::row_fetch_processor; pub use fuse_source::build_fuse_parquet_source_pipeline; pub use native_data_source_deserializer::NativeDeserializeDataTransform; -pub use native_data_source_reader::ReadNativeDataSource; pub use parquet_data_source_deserializer::DeserializeDataTransform; -pub use parquet_data_source_reader::ReadParquetDataSource; pub use util::need_reserve_block_info; diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs deleted file mode 100644 index 372580b97415..000000000000 --- a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs +++ /dev/null @@ -1,316 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::sync::Arc; - -use databend_common_catalog::plan::PartInfoPtr; -use databend_common_catalog::plan::StealablePartitions; -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::DataBlock; -use databend_common_expression::FunctionContext; -use databend_common_expression::TableSchema; -use databend_common_pipeline_core::processors::Event; -use databend_common_pipeline_core::processors::OutputPort; -use databend_common_pipeline_core::processors::Processor; -use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_sources::SyncSource; -use databend_common_pipeline_sources::SyncSourcer; -use databend_common_sql::IndexType; -use log::debug; - -use super::native_data_source::NativeDataSource; -use crate::io::AggIndexReader; -use crate::io::BlockReader; -use crate::io::TableMetaLocationGenerator; -use crate::io::VirtualColumnReader; -use crate::operations::read::data_source_with_meta::DataSourceWithMeta; -use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; -use crate::FuseBlockPartInfo; - -pub struct ReadNativeDataSource { - func_ctx: FunctionContext, - id: usize, - finished: bool, - batch_size: usize, - block_reader: Arc, - - output: Arc, - output_data: Option<(Vec, Vec)>, - partitions: StealablePartitions, - - index_reader: Arc>, - virtual_reader: Arc>, - - table_schema: Arc, - table_index: IndexType, -} - -impl ReadNativeDataSource { - pub fn create( - id: usize, - table_index: IndexType, - ctx: Arc, - table_schema: Arc, - output: Arc, - block_reader: Arc, - partitions: StealablePartitions, - index_reader: Arc>, - virtual_reader: Arc>, - ) -> Result { - let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; - let func_ctx = ctx.get_function_context()?; - SyncSourcer::create(ctx.clone(), output.clone(), ReadNativeDataSource:: { - func_ctx, - id, - output, - batch_size, - block_reader, - finished: false, - output_data: None, - partitions, - index_reader, - virtual_reader, - table_schema, - table_index, - }) - } -} - -impl ReadNativeDataSource { - pub fn create( - id: usize, - table_index: IndexType, - ctx: Arc, - table_schema: Arc, - output: Arc, - block_reader: Arc, - partitions: StealablePartitions, - index_reader: Arc>, - virtual_reader: Arc>, - ) -> Result { - let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; - let func_ctx = ctx.get_function_context()?; - Ok(ProcessorPtr::create(Box::new(ReadNativeDataSource::< - false, - > { - func_ctx, - id, - output, - batch_size, - block_reader, - finished: false, - output_data: None, - partitions, - index_reader, - virtual_reader, - table_schema, - table_index, - }))) - } -} - -impl SyncSource for ReadNativeDataSource { - const NAME: &'static str = "SyncReadNativeDataSource"; - - fn generate(&mut self) -> Result> { - match self.partitions.steal_one(self.id) { - None => Ok(None), - Some(part) => { - let mut filters = self - .partitions - .ctx - .get_inlist_runtime_filter_with_id(self.table_index); - filters.extend( - self.partitions - .ctx - .get_min_max_runtime_filter_with_id(self.table_index), - ); - if runtime_filter_pruner( - self.table_schema.clone(), - &part, - &filters, - &self.func_ctx, - )? { - return Ok(Some(DataBlock::empty())); - } - if let Some(index_reader) = self.index_reader.as_ref() { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - let loc = - TableMetaLocationGenerator::gen_agg_index_location_from_block_location( - &fuse_part.location, - index_reader.index_id(), - ); - if let Some(data) = index_reader.sync_read_native_data(&loc) { - // Read from aggregating index. - return Ok(Some(DataBlock::empty_with_meta( - DataSourceWithMeta::create(vec![part.clone()], vec![ - NativeDataSource::AggIndex(data), - ]), - ))); - } - } - - if let Some(virtual_reader) = self.virtual_reader.as_ref() { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - let loc = - TableMetaLocationGenerator::gen_virtual_block_location(&fuse_part.location); - - // If virtual column file exists, read the data from the virtual columns directly. - if let Some((mut virtual_source_data, ignore_column_ids)) = - virtual_reader.sync_read_native_data(&loc) - { - let mut source_data = self - .block_reader - .sync_read_native_columns_data(&part, &ignore_column_ids)?; - source_data.append(&mut virtual_source_data); - return Ok(Some(DataBlock::empty_with_meta( - DataSourceWithMeta::create(vec![part.clone()], vec![ - NativeDataSource::Normal(source_data), - ]), - ))); - } - } - - Ok(Some(DataBlock::empty_with_meta( - DataSourceWithMeta::create(vec![part.clone()], vec![NativeDataSource::Normal( - self.block_reader - .sync_read_native_columns_data(&part, &None)?, - )]), - ))) - } - } - } -} - -#[async_trait::async_trait] -impl Processor for ReadNativeDataSource { - fn name(&self) -> String { - String::from("AsyncReadNativeDataSource") - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if self.finished { - self.output.finish(); - return Ok(Event::Finished); - } - - if self.output.is_finished() { - return Ok(Event::Finished); - } - - if !self.output.can_push() { - return Ok(Event::NeedConsume); - } - - if let Some((part, data)) = self.output_data.take() { - let output = DataBlock::empty_with_meta(DataSourceWithMeta::create(part, data)); - self.output.push_data(Ok(output)); - // return Ok(Event::NeedConsume); - } - - Ok(Event::Async) - } - - #[async_backtrace::framed] - async fn async_process(&mut self) -> Result<()> { - let parts = self.partitions.steal(self.id, self.batch_size); - - if !parts.is_empty() { - let mut chunks = Vec::with_capacity(parts.len()); - let mut filters = self - .partitions - .ctx - .get_inlist_runtime_filter_with_id(self.table_index); - filters.extend( - self.partitions - .ctx - .get_min_max_runtime_filter_with_id(self.table_index), - ); - let mut native_part_infos = Vec::with_capacity(parts.len()); - for part in parts.into_iter() { - if runtime_filter_pruner( - self.table_schema.clone(), - &part, - &filters, - &self.func_ctx, - )? { - continue; - } - - native_part_infos.push(part.clone()); - let block_reader = self.block_reader.clone(); - let index_reader = self.index_reader.clone(); - let virtual_reader = self.virtual_reader.clone(); - let ctx = self.partitions.ctx.clone(); - chunks.push(async move { - let handler = databend_common_base::runtime::spawn(async move { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - if let Some(index_reader) = index_reader.as_ref() { - let loc = - TableMetaLocationGenerator::gen_agg_index_location_from_block_location( - &fuse_part.location, - index_reader.index_id(), - ); - if let Some(data) = index_reader.read_native_data(&loc).await { - // Read from aggregating index. - return Ok::<_, ErrorCode>(NativeDataSource::AggIndex(data)); - } - } - - if let Some(virtual_reader) = virtual_reader.as_ref() { - let loc = TableMetaLocationGenerator::gen_virtual_block_location( - &fuse_part.location, - ); - - // If virtual column file exists, read the data from the virtual columns directly. - if let Some((mut virtual_source_data, ignore_column_ids)) = - virtual_reader.read_native_data(&loc).await - { - let mut source_data = block_reader - .async_read_native_columns_data(&part, &ctx, &ignore_column_ids) - .await?; - source_data.append(&mut virtual_source_data); - return Ok(NativeDataSource::Normal(source_data)); - } - } - - Ok(NativeDataSource::Normal( - block_reader - .async_read_native_columns_data(&part, &ctx, &None) - .await?, - )) - }); - handler.await.unwrap() - }); - } - - debug!("ReadNativeDataSource parts: {}", chunks.len()); - self.output_data = Some(( - native_part_infos, - futures::future::try_join_all(chunks).await?, - )); - return Ok(()); - } - - self.finished = true; - Ok(()) - } -} diff --git a/src/query/storages/fuse/src/operations/read/native_data_transform_reader.rs b/src/query/storages/fuse/src/operations/read/native_data_transform_reader.rs new file mode 100644 index 000000000000..40a5966cb713 --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/native_data_transform_reader.rs @@ -0,0 +1,276 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::FunctionContext; +use databend_common_expression::TableSchema; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::processors::AsyncTransform; +use databend_common_pipeline_transforms::processors::AsyncTransformer; +use databend_common_pipeline_transforms::processors::Transform; +use databend_common_pipeline_transforms::processors::Transformer; +use databend_common_sql::IndexType; +use log::debug; + +use super::native_data_source::NativeDataSource; +use crate::io::AggIndexReader; +use crate::io::BlockReader; +use crate::io::TableMetaLocationGenerator; +use crate::io::VirtualColumnReader; +use crate::operations::read::block_partition_meta::BlockPartitionMeta; +use crate::operations::read::data_source_with_meta::DataSourceWithMeta; +use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; +use crate::FuseBlockPartInfo; + +pub struct ReadNativeDataTransform { + func_ctx: FunctionContext, + block_reader: Arc, + + index_reader: Arc>, + virtual_reader: Arc>, + + table_schema: Arc, + table_index: IndexType, + context: Arc, +} + +impl ReadNativeDataTransform { + pub fn create( + table_index: IndexType, + ctx: Arc, + table_schema: Arc, + block_reader: Arc, + index_reader: Arc>, + virtual_reader: Arc>, + input: Arc, + output: Arc, + ) -> Result { + let func_ctx = ctx.get_function_context()?; + Ok(ProcessorPtr::create(Transformer::create( + input, + output, + ReadNativeDataTransform:: { + func_ctx, + block_reader, + index_reader, + virtual_reader, + table_schema, + table_index, + context: ctx, + }, + ))) + } +} + +impl ReadNativeDataTransform { + pub fn create( + table_index: IndexType, + ctx: Arc, + table_schema: Arc, + block_reader: Arc, + index_reader: Arc>, + virtual_reader: Arc>, + input: Arc, + output: Arc, + ) -> Result { + let func_ctx = ctx.get_function_context()?; + Ok(ProcessorPtr::create(AsyncTransformer::create( + input, + output, + ReadNativeDataTransform:: { + func_ctx, + block_reader, + index_reader, + virtual_reader, + table_schema, + table_index, + context: ctx, + }, + ))) + } +} + +impl Transform for ReadNativeDataTransform { + const NAME: &'static str = "SyncReadNativeDataTransform"; + + fn transform(&mut self, data: DataBlock) -> Result { + if let Some(meta) = data.get_meta() { + if let Some(block_part_meta) = BlockPartitionMeta::downcast_ref_from(meta) { + let mut partitions = block_part_meta.part_ptr.clone(); + debug_assert!(partitions.len() == 1); + let part = partitions.pop().unwrap(); + let mut filters = self + .context + .get_inlist_runtime_filter_with_id(self.table_index); + filters.extend( + self.context + .get_min_max_runtime_filter_with_id(self.table_index), + ); + if runtime_filter_pruner( + self.table_schema.clone(), + &part, + &filters, + &self.func_ctx, + )? { + return Ok(DataBlock::empty()); + } + if let Some(index_reader) = self.index_reader.as_ref() { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + let loc = + TableMetaLocationGenerator::gen_agg_index_location_from_block_location( + &fuse_part.location, + index_reader.index_id(), + ); + if let Some(data) = index_reader.sync_read_native_data(&loc) { + // Read from aggregating index. + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + vec![part.clone()], + vec![NativeDataSource::AggIndex(data)], + ))); + } + } + + if let Some(virtual_reader) = self.virtual_reader.as_ref() { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + let loc = + TableMetaLocationGenerator::gen_virtual_block_location(&fuse_part.location); + + // If virtual column file exists, read the data from the virtual columns directly. + if let Some((mut virtual_source_data, ignore_column_ids)) = + virtual_reader.sync_read_native_data(&loc) + { + let mut source_data = self + .block_reader + .sync_read_native_columns_data(&part, &ignore_column_ids)?; + source_data.append(&mut virtual_source_data); + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + vec![part.clone()], + vec![NativeDataSource::Normal(source_data)], + ))); + } + } + + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + vec![part.clone()], + vec![NativeDataSource::Normal( + self.block_reader + .sync_read_native_columns_data(&part, &None)?, + )], + ))); + } + } + Err(ErrorCode::Internal( + "ReadNativeDataTransform get wrong meta data", + )) + } +} + +#[async_trait::async_trait] +impl AsyncTransform for ReadNativeDataTransform { + const NAME: &'static str = "AsyncReadNativeDataTransform"; + + #[async_backtrace::framed] + async fn transform(&mut self, data: DataBlock) -> Result { + if let Some(meta) = data.get_meta() { + if let Some(block_part_meta) = BlockPartitionMeta::downcast_ref_from(meta) { + let parts = block_part_meta.part_ptr.clone(); + if !parts.is_empty() { + let mut chunks = Vec::with_capacity(parts.len()); + let mut filters = self + .context + .get_inlist_runtime_filter_with_id(self.table_index); + filters.extend( + self.context + .get_min_max_runtime_filter_with_id(self.table_index), + ); + let mut native_part_infos = Vec::with_capacity(parts.len()); + for part in parts.into_iter() { + if runtime_filter_pruner( + self.table_schema.clone(), + &part, + &filters, + &self.func_ctx, + )? { + continue; + } + + native_part_infos.push(part.clone()); + let block_reader = self.block_reader.clone(); + let index_reader = self.index_reader.clone(); + let virtual_reader = self.virtual_reader.clone(); + let ctx = self.context.clone(); + chunks.push(async move { + let handler = databend_common_base::runtime::spawn(async move { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + if let Some(index_reader) = index_reader.as_ref() { + let loc = + TableMetaLocationGenerator::gen_agg_index_location_from_block_location( + &fuse_part.location, + index_reader.index_id(), + ); + if let Some(data) = index_reader.read_native_data(&loc).await { + // Read from aggregating index. + return Ok::<_, ErrorCode>(NativeDataSource::AggIndex(data)); + } + } + + if let Some(virtual_reader) = virtual_reader.as_ref() { + let loc = TableMetaLocationGenerator::gen_virtual_block_location( + &fuse_part.location, + ); + + // If virtual column file exists, read the data from the virtual columns directly. + if let Some((mut virtual_source_data, ignore_column_ids)) = + virtual_reader.read_native_data(&loc).await + { + let mut source_data = block_reader + .async_read_native_columns_data(&part, &ctx, &ignore_column_ids) + .await?; + source_data.append(&mut virtual_source_data); + return Ok(NativeDataSource::Normal(source_data)); + } + } + + Ok(NativeDataSource::Normal( + block_reader + .async_read_native_columns_data(&part, &ctx, &None) + .await?, + )) + }); + handler.await.unwrap() + }); + } + + debug!("ReadNativeDataSource parts: {}", chunks.len()); + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + native_part_infos, + futures::future::try_join_all(chunks).await?, + ))); + } + } + } + + Err(ErrorCode::Internal( + "AsyncReadNativeDataTransform get wrong meta data", + )) + } +} diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs deleted file mode 100644 index 69ef41844dfb..000000000000 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs +++ /dev/null @@ -1,325 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::sync::Arc; - -use databend_common_catalog::plan::PartInfoPtr; -use databend_common_catalog::plan::StealablePartitions; -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::DataBlock; -use databend_common_expression::FunctionContext; -use databend_common_expression::TableSchema; -use databend_common_pipeline_core::processors::Event; -use databend_common_pipeline_core::processors::OutputPort; -use databend_common_pipeline_core::processors::Processor; -use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_sources::SyncSource; -use databend_common_pipeline_sources::SyncSourcer; -use databend_common_sql::IndexType; -use databend_storages_common_io::ReadSettings; -use log::debug; - -use super::parquet_data_source::ParquetDataSource; -use crate::fuse_part::FuseBlockPartInfo; -use crate::io::AggIndexReader; -use crate::io::BlockReader; -use crate::io::TableMetaLocationGenerator; -use crate::io::VirtualColumnReader; -use crate::operations::read::data_source_with_meta::DataSourceWithMeta; -use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; - -pub struct ReadParquetDataSource { - func_ctx: FunctionContext, - id: usize, - table_index: IndexType, - finished: bool, - batch_size: usize, - block_reader: Arc, - - output: Arc, - output_data: Option<(Vec, Vec)>, - partitions: StealablePartitions, - - index_reader: Arc>, - virtual_reader: Arc>, - - table_schema: Arc, -} - -impl ReadParquetDataSource { - #[allow(clippy::too_many_arguments)] - pub fn create( - id: usize, - table_index: IndexType, - ctx: Arc, - table_schema: Arc, - output: Arc, - block_reader: Arc, - partitions: StealablePartitions, - index_reader: Arc>, - virtual_reader: Arc>, - ) -> Result { - let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; - let func_ctx = ctx.get_function_context()?; - if BLOCKING_IO { - SyncSourcer::create(ctx.clone(), output.clone(), ReadParquetDataSource:: { - func_ctx, - id, - table_index, - output, - batch_size, - block_reader, - finished: false, - output_data: None, - partitions, - index_reader, - virtual_reader, - table_schema, - }) - } else { - Ok(ProcessorPtr::create(Box::new(ReadParquetDataSource::< - false, - > { - func_ctx, - id, - table_index, - output, - batch_size, - block_reader, - finished: false, - output_data: None, - partitions, - index_reader, - virtual_reader, - table_schema, - }))) - } - } -} - -impl SyncSource for ReadParquetDataSource { - const NAME: &'static str = "SyncReadParquetDataSource"; - - fn generate(&mut self) -> Result> { - match self.partitions.steal_one(self.id) { - None => Ok(None), - Some(part) => { - let mut filters = self - .partitions - .ctx - .get_inlist_runtime_filter_with_id(self.table_index); - filters.extend( - self.partitions - .ctx - .get_min_max_runtime_filter_with_id(self.table_index), - ); - if runtime_filter_pruner( - self.table_schema.clone(), - &part, - &filters, - &self.func_ctx, - )? { - return Ok(Some(DataBlock::empty())); - } - - if let Some(index_reader) = self.index_reader.as_ref() { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - let loc = - TableMetaLocationGenerator::gen_agg_index_location_from_block_location( - &fuse_part.location, - index_reader.index_id(), - ); - if let Some(data) = index_reader.sync_read_parquet_data_by_merge_io( - &ReadSettings::from_ctx(&self.partitions.ctx)?, - &loc, - ) { - // Read from aggregating index. - return Ok(Some(DataBlock::empty_with_meta( - DataSourceWithMeta::create(vec![part.clone()], vec![ - ParquetDataSource::AggIndex(data), - ]), - ))); - } - } - - // If virtual column file exists, read the data from the virtual columns directly. - let virtual_source = if let Some(virtual_reader) = self.virtual_reader.as_ref() { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - let loc = - TableMetaLocationGenerator::gen_virtual_block_location(&fuse_part.location); - - virtual_reader.sync_read_parquet_data_by_merge_io( - &ReadSettings::from_ctx(&self.partitions.ctx)?, - &loc, - ) - } else { - None - }; - let ignore_column_ids = if let Some(virtual_source) = &virtual_source { - &virtual_source.ignore_column_ids - } else { - &None - }; - - let source = self.block_reader.sync_read_columns_data_by_merge_io( - &ReadSettings::from_ctx(&self.partitions.ctx)?, - &part, - ignore_column_ids, - )?; - - Ok(Some(DataBlock::empty_with_meta( - DataSourceWithMeta::create(vec![part], vec![ParquetDataSource::Normal(( - source, - virtual_source, - ))]), - ))) - } - } - } -} - -#[async_trait::async_trait] -impl Processor for ReadParquetDataSource { - fn name(&self) -> String { - String::from("AsyncReadParquetDataSource") - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if self.finished { - self.output.finish(); - return Ok(Event::Finished); - } - - if self.output.is_finished() { - return Ok(Event::Finished); - } - - if !self.output.can_push() { - return Ok(Event::NeedConsume); - } - - if let Some((part, data)) = self.output_data.take() { - let output = DataBlock::empty_with_meta(DataSourceWithMeta::create(part, data)); - - self.output.push_data(Ok(output)); - // return Ok(Event::NeedConsume); - } - - Ok(Event::Async) - } - - #[async_backtrace::framed] - async fn async_process(&mut self) -> Result<()> { - let parts = self.partitions.steal(self.id, self.batch_size); - if !parts.is_empty() { - let mut chunks = Vec::with_capacity(parts.len()); - let mut filters = self - .partitions - .ctx - .get_inlist_runtime_filter_with_id(self.table_index); - filters.extend( - self.partitions - .ctx - .get_min_max_runtime_filter_with_id(self.table_index), - ); - let mut fuse_part_infos = Vec::with_capacity(parts.len()); - for part in parts.into_iter() { - if runtime_filter_pruner( - self.table_schema.clone(), - &part, - &filters, - &self.func_ctx, - )? { - continue; - } - - fuse_part_infos.push(part.clone()); - let block_reader = self.block_reader.clone(); - let settings = ReadSettings::from_ctx(&self.partitions.ctx)?; - let index_reader = self.index_reader.clone(); - let virtual_reader = self.virtual_reader.clone(); - - chunks.push(async move { - databend_common_base::runtime::spawn(async move { - let part = FuseBlockPartInfo::from_part(&part)?; - - if let Some(index_reader) = index_reader.as_ref() { - let loc = - TableMetaLocationGenerator::gen_agg_index_location_from_block_location( - &part.location, - index_reader.index_id(), - ); - if let Some(data) = index_reader - .read_parquet_data_by_merge_io(&settings, &loc) - .await - { - // Read from aggregating index. - return Ok::<_, ErrorCode>(ParquetDataSource::AggIndex(data)); - } - } - - // If virtual column file exists, read the data from the virtual columns directly. - let virtual_source = if let Some(virtual_reader) = virtual_reader.as_ref() { - let loc = TableMetaLocationGenerator::gen_virtual_block_location( - &part.location, - ); - - virtual_reader - .read_parquet_data_by_merge_io(&settings, &loc) - .await - } else { - None - }; - - let ignore_column_ids = if let Some(virtual_source) = &virtual_source { - &virtual_source.ignore_column_ids - } else { - &None - }; - - let source = block_reader - .read_columns_data_by_merge_io( - &settings, - &part.location, - &part.columns_meta, - ignore_column_ids, - ) - .await?; - - Ok(ParquetDataSource::Normal((source, virtual_source))) - }) - .await - .unwrap() - }); - } - - debug!("ReadParquetDataSource parts: {}", chunks.len()); - self.output_data = Some(( - fuse_part_infos, - futures::future::try_join_all(chunks).await?, - )); - return Ok(()); - } - - self.finished = true; - Ok(()) - } -} diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs new file mode 100644 index 000000000000..661c09a7c5fa --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs @@ -0,0 +1,295 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::FunctionContext; +use databend_common_expression::TableSchema; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::processors::AsyncTransform; +use databend_common_pipeline_transforms::processors::AsyncTransformer; +use databend_common_pipeline_transforms::processors::Transform; +use databend_common_pipeline_transforms::processors::Transformer; +use databend_common_sql::IndexType; +use databend_storages_common_io::ReadSettings; +use log::debug; + +use super::parquet_data_source::ParquetDataSource; +use crate::fuse_part::FuseBlockPartInfo; +use crate::io::AggIndexReader; +use crate::io::BlockReader; +use crate::io::TableMetaLocationGenerator; +use crate::io::VirtualColumnReader; +use crate::operations::read::block_partition_meta::BlockPartitionMeta; +use crate::operations::read::data_source_with_meta::DataSourceWithMeta; +use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; + +pub struct ReadParquetDataTransform { + func_ctx: FunctionContext, + block_reader: Arc, + + index_reader: Arc>, + virtual_reader: Arc>, + + table_schema: Arc, + table_index: IndexType, + context: Arc, +} + +impl ReadParquetDataTransform { + pub fn create( + table_index: IndexType, + ctx: Arc, + table_schema: Arc, + block_reader: Arc, + index_reader: Arc>, + virtual_reader: Arc>, + input: Arc, + output: Arc, + ) -> Result { + let func_ctx = ctx.get_function_context()?; + Ok(ProcessorPtr::create(Transformer::create( + input, + output, + ReadParquetDataTransform:: { + func_ctx, + block_reader, + index_reader, + virtual_reader, + table_schema, + table_index, + context: ctx, + }, + ))) + } +} + +impl ReadParquetDataTransform { + pub fn create( + table_index: IndexType, + ctx: Arc, + table_schema: Arc, + block_reader: Arc, + index_reader: Arc>, + virtual_reader: Arc>, + input: Arc, + output: Arc, + ) -> Result { + let func_ctx = ctx.get_function_context()?; + Ok(ProcessorPtr::create(AsyncTransformer::create( + input, + output, + ReadParquetDataTransform:: { + func_ctx, + block_reader, + index_reader, + virtual_reader, + table_schema, + table_index, + context: ctx, + }, + ))) + } +} + +impl Transform for ReadParquetDataTransform { + const NAME: &'static str = "ReadParquetDataTransform"; + + fn transform(&mut self, data: DataBlock) -> Result { + if let Some(meta) = data.get_meta() { + if let Some(block_part_meta) = BlockPartitionMeta::downcast_ref_from(meta) { + let mut partitions = block_part_meta.part_ptr.clone(); + debug_assert!(partitions.len() == 1); + let part = partitions.pop().unwrap(); + let mut filters = self + .context + .get_inlist_runtime_filter_with_id(self.table_index); + filters.extend( + self.context + .get_min_max_runtime_filter_with_id(self.table_index), + ); + if runtime_filter_pruner( + self.table_schema.clone(), + &part, + &filters, + &self.func_ctx, + )? { + return Ok(DataBlock::empty()); + } + + if let Some(index_reader) = self.index_reader.as_ref() { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + let loc = + TableMetaLocationGenerator::gen_agg_index_location_from_block_location( + &fuse_part.location, + index_reader.index_id(), + ); + if let Some(data) = index_reader.sync_read_parquet_data_by_merge_io( + &ReadSettings::from_ctx(&self.context)?, + &loc, + ) { + // Read from aggregating index. + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + vec![part.clone()], + vec![ParquetDataSource::AggIndex(data)], + ))); + } + } + + // If virtual column file exists, read the data from the virtual columns directly. + let virtual_source = if let Some(virtual_reader) = self.virtual_reader.as_ref() { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + let loc = + TableMetaLocationGenerator::gen_virtual_block_location(&fuse_part.location); + + virtual_reader.sync_read_parquet_data_by_merge_io( + &ReadSettings::from_ctx(&self.context)?, + &loc, + ) + } else { + None + }; + let ignore_column_ids = if let Some(virtual_source) = &virtual_source { + &virtual_source.ignore_column_ids + } else { + &None + }; + + let source = self.block_reader.sync_read_columns_data_by_merge_io( + &ReadSettings::from_ctx(&self.context)?, + &part, + ignore_column_ids, + )?; + + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + vec![part], + vec![ParquetDataSource::Normal((source, virtual_source))], + ))); + } + } + Err(ErrorCode::Internal( + "ReadParquetDataTransform get wrong meta data", + )) + } +} + +#[async_trait::async_trait] +impl AsyncTransform for ReadParquetDataTransform { + const NAME: &'static str = "AsyncReadParquetDataTransform"; + + async fn transform(&mut self, data: DataBlock) -> Result { + if let Some(meta) = data.get_meta() { + if let Some(block_part_meta) = BlockPartitionMeta::downcast_ref_from(meta) { + let parts = block_part_meta.part_ptr.clone(); + if !parts.is_empty() { + let mut chunks = Vec::with_capacity(parts.len()); + let mut filters = self + .context + .get_inlist_runtime_filter_with_id(self.table_index); + filters.extend( + self.context + .get_min_max_runtime_filter_with_id(self.table_index), + ); + let mut fuse_part_infos = Vec::with_capacity(parts.len()); + for part in parts.into_iter() { + if runtime_filter_pruner( + self.table_schema.clone(), + &part, + &filters, + &self.func_ctx, + )? { + continue; + } + + fuse_part_infos.push(part.clone()); + let block_reader = self.block_reader.clone(); + let settings = ReadSettings::from_ctx(&self.context)?; + let index_reader = self.index_reader.clone(); + let virtual_reader = self.virtual_reader.clone(); + + chunks.push(async move { + databend_common_base::runtime::spawn(async move { + let part = FuseBlockPartInfo::from_part(&part)?; + + if let Some(index_reader) = index_reader.as_ref() { + let loc = + TableMetaLocationGenerator::gen_agg_index_location_from_block_location( + &part.location, + index_reader.index_id(), + ); + if let Some(data) = index_reader + .read_parquet_data_by_merge_io(&settings, &loc) + .await + { + // Read from aggregating index. + return Ok::<_, ErrorCode>(ParquetDataSource::AggIndex(data)); + } + } + + // If virtual column file exists, read the data from the virtual columns directly. + let virtual_source = if let Some(virtual_reader) = virtual_reader.as_ref() { + let loc = TableMetaLocationGenerator::gen_virtual_block_location( + &part.location, + ); + + virtual_reader + .read_parquet_data_by_merge_io(&settings, &loc) + .await + } else { + None + }; + + let ignore_column_ids = if let Some(virtual_source) = &virtual_source { + &virtual_source.ignore_column_ids + } else { + &None + }; + + let source = block_reader + .read_columns_data_by_merge_io( + &settings, + &part.location, + &part.columns_meta, + ignore_column_ids, + ) + .await?; + + Ok(ParquetDataSource::Normal((source, virtual_source))) + }) + .await + .unwrap() + }); + } + + debug!("ReadParquetDataSource parts: {}", chunks.len()); + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + fuse_part_infos, + futures::future::try_join_all(chunks).await?, + ))); + } + } + } + + Err(ErrorCode::Internal( + "AsyncReadParquetDataSource get wrong meta data", + )) + } +} diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 43ce8b4e4051..b3f1f21ca3c7 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -14,19 +14,24 @@ use std::sync::Arc; +use async_channel::Sender; use databend_common_base::runtime::Runtime; +use databend_common_base::runtime::TrySpawn; use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Projection; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::plan::TopK; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_transforms::processors::TransformPipelineHelper; use databend_common_sql::evaluator::BlockOperator; use databend_common_sql::evaluator::CompoundBlockOperator; +use log::info; use crate::io::AggIndexReader; use crate::io::BlockReader; @@ -151,33 +156,7 @@ impl FuseTable { }); } } - if !lazy_init_segments.is_empty() { - let table = self.clone(); - let table_schema = self.schema_with_stream(); - let push_downs = plan.push_downs.clone(); - let query_ctx = ctx.clone(); - - // TODO: need refactor - pipeline.set_on_init(move || { - let table = table.clone(); - let table_schema = table_schema.clone(); - let ctx = query_ctx.clone(); - let push_downs = push_downs.clone(); - // let lazy_init_segments = lazy_init_segments.clone(); - - let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move { - let (_statistics, partitions) = table - .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) - .await?; - - Result::<_>::Ok(partitions) - })?; - - query_ctx.set_partitions(partitions)?; - Ok(()) - }); - } - + let is_lazy = !lazy_init_segments.is_empty(); let block_reader = self.build_block_reader(ctx.clone(), plan, put_cache)?; let max_io_requests = self.adjust_io_request(&ctx)?; @@ -219,8 +198,7 @@ impl FuseTable { }) .transpose()?, ); - - self.build_fuse_source_pipeline( + let senders = self.build_fuse_source_pipeline( ctx.clone(), pipeline, self.storage_format, @@ -230,10 +208,31 @@ impl FuseTable { max_io_requests, index_reader, virtual_reader, + is_lazy, )?; // replace the column which has data mask if needed - self.apply_data_mask_policy_if_needed(ctx, plan, pipeline)?; + self.apply_data_mask_policy_if_needed(ctx.clone(), plan, pipeline)?; + + if is_lazy { + let table = self.clone(); + let table_schema = self.schema_with_stream(); + let push_downs = plan.push_downs.clone(); + + Runtime::with_worker_threads(2, Some("PruneWorker".to_string()))?.spawn(async move { + let (_statistics, partitions) = table + .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) + .await?; + let sender_size = senders.len(); + for (i, part) in partitions.partitions.into_iter().enumerate() { + senders[i % sender_size] + .send(part) + .await + .map_err(|_e| ErrorCode::Internal("partition senders send failed"))?; + } + Ok::<_, ErrorCode>(()) + }); + } Ok(()) } @@ -250,7 +249,8 @@ impl FuseTable { max_io_requests: usize, index_reader: Arc>, virtual_reader: Arc>, - ) -> Result<()> { + is_lazy: bool, + ) -> Result>> { let max_threads = ctx.get_settings().get_max_threads()? as usize; let table_schema = self.schema_with_stream(); match storage_format { @@ -265,6 +265,7 @@ impl FuseTable { max_io_requests, index_reader, virtual_reader, + is_lazy, ), FuseStorageFormat::Parquet => build_fuse_parquet_source_pipeline( ctx, @@ -276,6 +277,7 @@ impl FuseTable { max_io_requests, index_reader, virtual_reader, + is_lazy, ), } } From 407cfbbd7959d182ad173377cca14b4fa9f9714b Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 29 Oct 2024 13:47:46 +0800 Subject: [PATCH 02/11] fix: fix skip empty block --- .../operations/read/block_partition_receiver_source.rs | 2 +- src/query/storages/fuse/src/operations/read_data.rs | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs index fe3f940c2ea0..78056c244a3e 100644 --- a/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs +++ b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs @@ -22,7 +22,6 @@ use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::ProcessorPtr; use databend_common_pipeline_sources::AsyncSource; use databend_common_pipeline_sources::AsyncSourcer; -use log::info; use crate::operations::read::block_partition_meta::BlockPartitionMeta; @@ -45,6 +44,7 @@ impl BlockPartitionReceiverSource { #[async_trait::async_trait] impl AsyncSource for BlockPartitionReceiverSource { const NAME: &'static str = "BlockPartitionReceiverSource"; + const SKIP_EMPTY_DATA_BLOCK: bool = false; #[async_backtrace::framed] async fn generate(&mut self) -> databend_common_exception::Result> { diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index b3f1f21ca3c7..acb2d51b381c 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use async_channel::Sender; -use databend_common_base::runtime::Runtime; +use databend_common_base::runtime::{GlobalIORuntime}; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; @@ -219,7 +219,8 @@ impl FuseTable { let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); - Runtime::with_worker_threads(2, Some("PruneWorker".to_string()))?.spawn(async move { + GlobalIORuntime::instance().spawn( async move{ + info!("Pruning lazy partitions"); let (_statistics, partitions) = table .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) .await?; @@ -227,8 +228,7 @@ impl FuseTable { for (i, part) in partitions.partitions.into_iter().enumerate() { senders[i % sender_size] .send(part) - .await - .map_err(|_e| ErrorCode::Internal("partition senders send failed"))?; + .await.map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?; } Ok::<_, ErrorCode>(()) }); From b2948fe9fc6e8eb7e414bd3822dd8a62264fc209 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Tue, 29 Oct 2024 13:56:28 +0800 Subject: [PATCH 03/11] fix: ci test --- .../read/block_partition_receiver_source.rs | 10 ++++-- .../fuse/src/operations/read/fuse_source.rs | 6 ++-- .../read/parquet_data_transform_reader.rs | 2 +- .../storages/fuse/src/operations/read_data.rs | 35 ++++++++++++------- .../standalone/explain/explain_pipeline.test | 3 +- .../suites/mode/standalone/explain/sort.test | 12 ++++--- .../mode/standalone/explain/window.test | 30 ++++++++++------ 7 files changed, 63 insertions(+), 35 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs index 78056c244a3e..1e818712f125 100644 --- a/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs +++ b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs @@ -26,13 +26,13 @@ use databend_common_pipeline_sources::AsyncSourcer; use crate::operations::read::block_partition_meta::BlockPartitionMeta; pub struct BlockPartitionReceiverSource { - pub meta_receiver: Receiver, + pub meta_receiver: Receiver>, } impl BlockPartitionReceiverSource { pub fn create( ctx: Arc, - receiver: Receiver, + receiver: Receiver>, output_port: Arc, ) -> databend_common_exception::Result { AsyncSourcer::create(ctx, output_port, Self { @@ -49,9 +49,13 @@ impl AsyncSource for BlockPartitionReceiverSource { #[async_backtrace::framed] async fn generate(&mut self) -> databend_common_exception::Result> { match self.meta_receiver.recv().await { - Ok(part) => Ok(Some(DataBlock::empty_with_meta( + Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta( BlockPartitionMeta::create(vec![part]), ))), + Ok(Err(e)) => Err( + // The error is occurred in pruning process + e, + ), Err(_) => { // The channel is closed, we should return None to stop generating Ok(None) diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index b1f58e7fe07b..9306c2963a6e 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -54,7 +54,7 @@ pub fn build_fuse_native_source_pipeline( index_reader: Arc>, virtual_reader: Arc>, is_lazy: bool, -) -> Result>> { +) -> Result>>> { (max_threads, max_io_requests) = adjust_threads_and_request(true, max_threads, max_io_requests, plan); @@ -169,7 +169,7 @@ pub fn build_fuse_parquet_source_pipeline( index_reader: Arc>, virtual_reader: Arc>, is_lazy: bool, -) -> Result>> { +) -> Result>>> { (max_threads, max_io_requests) = adjust_threads_and_request(false, max_threads, max_io_requests, plan); @@ -342,7 +342,7 @@ pub fn adjust_threads_and_request( pub fn build_lazy_source( max_threads: usize, ctx: Arc, -) -> Result<(Pipe, Vec>)> { +) -> Result<(Pipe, Vec>>)> { let mut source_builder = SourcePipeBuilder::create(); let mut senders = Vec::with_capacity(max_threads); for _i in 0..max_threads { diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs index 661c09a7c5fa..2c616f5e6c79 100644 --- a/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs +++ b/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs @@ -111,7 +111,7 @@ impl ReadParquetDataTransform { } impl Transform for ReadParquetDataTransform { - const NAME: &'static str = "ReadParquetDataTransform"; + const NAME: &'static str = "SyncReadParquetDataTransform"; fn transform(&mut self, data: DataBlock) -> Result { if let Some(meta) = data.get_meta() { diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index acb2d51b381c..46e11cfe0caf 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -15,7 +15,7 @@ use std::sync::Arc; use async_channel::Sender; -use databend_common_base::runtime::{GlobalIORuntime}; +use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; @@ -31,7 +31,6 @@ use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_transforms::processors::TransformPipelineHelper; use databend_common_sql::evaluator::BlockOperator; use databend_common_sql::evaluator::CompoundBlockOperator; -use log::info; use crate::io::AggIndexReader; use crate::io::BlockReader; @@ -219,19 +218,29 @@ impl FuseTable { let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); - GlobalIORuntime::instance().spawn( async move{ - info!("Pruning lazy partitions"); - let (_statistics, partitions) = table + GlobalIORuntime::instance().try_spawn(async move { + match table .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) - .await?; - let sender_size = senders.len(); - for (i, part) in partitions.partitions.into_iter().enumerate() { - senders[i % sender_size] - .send(part) - .await.map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?; + .await + { + Ok((_, partitions)) => { + let sender_size = senders.len(); + for (i, part) in partitions.partitions.into_iter().enumerate() { + senders[i % sender_size] + .send(Ok(part)) + .await + .map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?; + } + } + Err(err) => { + senders[0] + .send(Err(err)) + .await + .map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?; + } } Ok::<_, ErrorCode>(()) - }); + })?; } Ok(()) @@ -250,7 +259,7 @@ impl FuseTable { index_reader: Arc>, virtual_reader: Arc>, is_lazy: bool, - ) -> Result>> { + ) -> Result>>> { let max_threads = ctx.get_settings().get_max_threads()? as usize; let table_schema = self.schema_with_stream(); match storage_format { diff --git a/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test b/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test index bddfc8bf5c8c..34cef56ea051 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test @@ -17,7 +17,8 @@ explain pipeline select a from t1 ignore_result ---- EmptySink × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 statement ok diff --git a/tests/sqllogictests/suites/mode/standalone/explain/sort.test b/tests/sqllogictests/suites/mode/standalone/explain/sort.test index a501abeb3f79..c8bd25e26f1e 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/sort.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/sort.test @@ -82,7 +82,8 @@ CompoundBlockOperator(Project) × 1 SortPartialTransform × 4 Merge to Resize × 4 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # Sort spilling @@ -101,7 +102,8 @@ CompoundBlockOperator(Project) × 1 SortPartialTransform × 4 Merge to Resize × 4 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 statement ok set sort_spilling_memory_ratio = 0; @@ -119,7 +121,8 @@ CompoundBlockOperator(Project) × 1 Merge to Resize × 4 CompoundBlockOperator(Map) × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # Sort spilling @@ -139,7 +142,8 @@ CompoundBlockOperator(Project) × 1 Merge to Resize × 4 CompoundBlockOperator(Map) × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 statement ok drop table if exists t1; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/window.test b/tests/sqllogictests/suites/mode/standalone/explain/window.test index 97f9e3c49360..971b3dd1dccc 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/window.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/window.test @@ -60,7 +60,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # Enable sort spilling @@ -81,7 +82,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 statement ok @@ -368,7 +370,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # Enable sort spilling statement ok @@ -385,7 +388,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # Disable sort spilling @@ -403,7 +407,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # rows frame single window (can push down limit) query T @@ -416,7 +421,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # rows frame single window (can not push down limit) query T @@ -429,7 +435,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # rows frame multi window (can not push down limit) query T @@ -448,7 +455,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # row fetch with window function(pipeline explain) query T @@ -468,7 +476,8 @@ CompoundBlockOperator(Project) × 1 TransformFilter × 1 AddInternalColumnsTransform × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # row fetch with window function(plan explain) query T @@ -529,7 +538,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # same order multi window query T From ff07208072abbe4f3ce6f418e7b9eda2a0874ee0 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Thu, 31 Oct 2024 16:37:23 +0800 Subject: [PATCH 04/11] fix: performance degrade --- .../fuse/src/operations/read/fuse_source.rs | 4 +- .../storages/fuse/src/operations/read_data.rs | 43 +++++++++++-------- .../explain_native/explain_pipeline.test | 3 +- 3 files changed, 28 insertions(+), 22 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index 9306c2963a6e..3bd7b22b01de 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -212,14 +212,14 @@ pub fn build_fuse_parquet_source_pipeline( match is_lazy { true => { - let (pipe, source_senders) = build_lazy_source(max_threads, ctx.clone())?; + let (pipe, source_senders) = build_lazy_source(max_io_requests, ctx.clone())?; senders.extend(source_senders); pipeline.add_pipe(pipe); } false => { let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; let pipe = build_block_source( - max_threads, + max_io_requests, partitions.clone(), batch_size, ctx.clone(), diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 46e11cfe0caf..9b562fd51287 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -218,29 +218,34 @@ impl FuseTable { let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); - GlobalIORuntime::instance().try_spawn(async move { - match table - .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) - .await - { - Ok((_, partitions)) => { - let sender_size = senders.len(); - for (i, part) in partitions.partitions.into_iter().enumerate() { - senders[i % sender_size] - .send(Ok(part)) + GlobalIORuntime::instance().try_spawn( + async move { + match table + .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) + .await + { + Ok((_, partitions)) => { + let sender_size = senders.len(); + for (i, part) in partitions.partitions.into_iter().enumerate() { + senders[i % sender_size] + .send(Ok(part)) + .await + .map_err(|_e| { + ErrorCode::Internal("Send partition meta failed") + })?; + } + } + Err(err) => { + senders[0] + .send(Err(err)) .await .map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?; } } - Err(err) => { - senders[0] - .send(Err(err)) - .await - .map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?; - } - } - Ok::<_, ErrorCode>(()) - })?; + Ok::<_, ErrorCode>(()) + }, + Some(String::from("PruneSnapshot")), + )?; } Ok(()) diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test b/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test index 35208a8df0ad..929a46c09531 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test @@ -17,7 +17,8 @@ explain pipeline select a from t1 ignore_result ---- EmptySink × 1 NativeDeserializeDataTransform × 1 - SyncReadNativeDataSource × 1 + SyncReadNativeDataTransform × 1 + BlockPartitionSource × 1 statement ok From 70fa287aa5e8ea510d1ff5f832866ad404cf3802 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Fri, 1 Nov 2024 11:35:25 +0800 Subject: [PATCH 05/11] chore: change channel to a 1 sender - n receivers pattern --- .../fuse/src/operations/read/fuse_source.rs | 66 ++++++++----------- .../storages/fuse/src/operations/read_data.rs | 38 ++++++----- 2 files changed, 48 insertions(+), 56 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index 3bd7b22b01de..214df13b9f8c 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -15,7 +15,7 @@ use std::collections::VecDeque; use std::sync::Arc; -use async_channel::Sender; +use async_channel::Receiver; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::PartInfoType; @@ -53,8 +53,8 @@ pub fn build_fuse_native_source_pipeline( mut max_io_requests: usize, index_reader: Arc>, virtual_reader: Arc>, - is_lazy: bool, -) -> Result>>> { + receiver: Option>>, +) -> Result<()> { (max_threads, max_io_requests) = adjust_threads_and_request(true, max_threads, max_io_requests, plan); @@ -62,7 +62,6 @@ pub fn build_fuse_native_source_pipeline( max_threads = max_threads.min(16); max_io_requests = max_io_requests.min(16); } - let mut senders = vec![]; match block_reader.support_blocking_api() { true => { @@ -72,13 +71,12 @@ pub fn build_fuse_native_source_pipeline( if topk.is_some() { partitions.disable_steal(); } - match is_lazy { - true => { - let (pipe, source_senders) = build_lazy_source(max_threads, ctx.clone())?; - senders.extend(source_senders); + match receiver { + Some(rx) => { + let pipe = build_receiver_source(max_threads, ctx.clone(), rx.clone())?; pipeline.add_pipe(pipe); } - false => { + None => { let pipe = build_block_source(max_threads, partitions.clone(), 1, ctx.clone())?; pipeline.add_pipe(pipe); } @@ -103,14 +101,12 @@ pub fn build_fuse_native_source_pipeline( if topk.is_some() { partitions.disable_steal(); } - - match is_lazy { - true => { - let (pipe, source_senders) = build_lazy_source(max_io_requests, ctx.clone())?; - senders.extend(source_senders); + match receiver { + Some(rx) => { + let pipe = build_receiver_source(max_io_requests, ctx.clone(), rx.clone())?; pipeline.add_pipe(pipe); } - false => { + None => { let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; let pipe = build_block_source( max_io_requests, @@ -154,7 +150,7 @@ pub fn build_fuse_native_source_pipeline( pipeline.try_resize(max_threads)?; - Ok(senders) + Ok(()) } #[allow(clippy::too_many_arguments)] @@ -168,25 +164,22 @@ pub fn build_fuse_parquet_source_pipeline( mut max_io_requests: usize, index_reader: Arc>, virtual_reader: Arc>, - is_lazy: bool, -) -> Result>>> { + receiver: Option>>, +) -> Result<()> { (max_threads, max_io_requests) = adjust_threads_and_request(false, max_threads, max_io_requests, plan); - let mut senders = vec![]; - match block_reader.support_blocking_api() { true => { let partitions = dispatch_partitions(ctx.clone(), plan, max_threads); let partitions = StealablePartitions::new(partitions, ctx.clone()); - match is_lazy { - true => { - let (pipe, source_senders) = build_lazy_source(max_threads, ctx.clone())?; - senders.extend(source_senders); + match receiver { + Some(rx) => { + let pipe = build_receiver_source(max_threads, ctx.clone(), rx.clone())?; pipeline.add_pipe(pipe); } - false => { + None => { let pipe = build_block_source(max_threads, partitions.clone(), 1, ctx.clone())?; pipeline.add_pipe(pipe); } @@ -210,13 +203,12 @@ pub fn build_fuse_parquet_source_pipeline( let partitions = dispatch_partitions(ctx.clone(), plan, max_io_requests); let partitions = StealablePartitions::new(partitions, ctx.clone()); - match is_lazy { - true => { - let (pipe, source_senders) = build_lazy_source(max_io_requests, ctx.clone())?; - senders.extend(source_senders); + match receiver { + Some(rx) => { + let pipe = build_receiver_source(max_io_requests, ctx.clone(), rx.clone())?; pipeline.add_pipe(pipe); } - false => { + None => { let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; let pipe = build_block_source( max_io_requests, @@ -263,7 +255,7 @@ pub fn build_fuse_parquet_source_pipeline( ) })?; - Ok(senders) + Ok(()) } pub fn dispatch_partitions( @@ -339,22 +331,20 @@ pub fn adjust_threads_and_request( (max_threads, max_io_requests) } -pub fn build_lazy_source( +pub fn build_receiver_source( max_threads: usize, ctx: Arc, -) -> Result<(Pipe, Vec>>)> { + rx: Receiver>, +) -> Result { let mut source_builder = SourcePipeBuilder::create(); - let mut senders = Vec::with_capacity(max_threads); for _i in 0..max_threads { - let (tx, rx) = async_channel::bounded(1); let output = OutputPort::create(); source_builder.add_source( output.clone(), - BlockPartitionReceiverSource::create(ctx.clone(), rx, output)?, + BlockPartitionReceiverSource::create(ctx.clone(), rx.clone(), output)?, ); - senders.push(tx); } - Ok((source_builder.finalize(), senders)) + Ok(source_builder.finalize()) } pub fn build_block_source( diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 9b562fd51287..b8db5c20f0e9 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -14,7 +14,7 @@ use std::sync::Arc; -use async_channel::Sender; +use async_channel::Receiver; use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::plan::DataSourcePlan; @@ -155,7 +155,12 @@ impl FuseTable { }); } } - let is_lazy = !lazy_init_segments.is_empty(); + let (tx, rx) = if !lazy_init_segments.is_empty() { + let (tx, rx) = async_channel::bounded(1); + (Some(tx), Some(rx)) + } else { + (None, None) + }; let block_reader = self.build_block_reader(ctx.clone(), plan, put_cache)?; let max_io_requests = self.adjust_io_request(&ctx)?; @@ -197,7 +202,8 @@ impl FuseTable { }) .transpose()?, ); - let senders = self.build_fuse_source_pipeline( + + self.build_fuse_source_pipeline( ctx.clone(), pipeline, self.storage_format, @@ -207,13 +213,13 @@ impl FuseTable { max_io_requests, index_reader, virtual_reader, - is_lazy, + rx, )?; // replace the column which has data mask if needed self.apply_data_mask_policy_if_needed(ctx.clone(), plan, pipeline)?; - if is_lazy { + if let Some(sender) = tx { let table = self.clone(); let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); @@ -225,18 +231,14 @@ impl FuseTable { .await { Ok((_, partitions)) => { - let sender_size = senders.len(); - for (i, part) in partitions.partitions.into_iter().enumerate() { - senders[i % sender_size] - .send(Ok(part)) - .await - .map_err(|_e| { - ErrorCode::Internal("Send partition meta failed") - })?; + for part in partitions.partitions { + sender.send(Ok(part)).await.map_err(|_e| { + ErrorCode::Internal("Send partition meta failed") + })?; } } Err(err) => { - senders[0] + sender .send(Err(err)) .await .map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?; @@ -263,8 +265,8 @@ impl FuseTable { max_io_requests: usize, index_reader: Arc>, virtual_reader: Arc>, - is_lazy: bool, - ) -> Result>>> { + receiver: Option>>, + ) -> Result<()> { let max_threads = ctx.get_settings().get_max_threads()? as usize; let table_schema = self.schema_with_stream(); match storage_format { @@ -279,7 +281,7 @@ impl FuseTable { max_io_requests, index_reader, virtual_reader, - is_lazy, + receiver, ), FuseStorageFormat::Parquet => build_fuse_parquet_source_pipeline( ctx, @@ -291,7 +293,7 @@ impl FuseTable { max_io_requests, index_reader, virtual_reader, - is_lazy, + receiver, ), } } From f84317b8b2f0b332975cff569d00c411eed83d04 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Fri, 1 Nov 2024 11:47:11 +0800 Subject: [PATCH 06/11] chore: catch up main --- .../operations/read/block_partition_meta.rs | 27 +++---------------- 1 file changed, 4 insertions(+), 23 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/block_partition_meta.rs b/src/query/storages/fuse/src/operations/read/block_partition_meta.rs index f38379276d65..96f4594b2ba0 100644 --- a/src/query/storages/fuse/src/operations/read/block_partition_meta.rs +++ b/src/query/storages/fuse/src/operations/read/block_partition_meta.rs @@ -16,6 +16,7 @@ use std::fmt::Debug; use std::fmt::Formatter; use databend_common_catalog::plan::PartInfoPtr; +use databend_common_expression::local_block_meta_serde; use databend_common_expression::BlockMetaInfo; use databend_common_expression::BlockMetaInfoPtr; @@ -37,27 +38,7 @@ impl Debug for BlockPartitionMeta { } } -#[typetag::serde(name = "block_partition_meta")] -impl BlockMetaInfo for BlockPartitionMeta { - fn equals(&self, _info: &Box) -> bool { - unimplemented!("Unimplemented equals CompactSegmentMeta") - } - - fn clone_self(&self) -> Box { - unimplemented!("Unimplemented clone_self CompactSegmentMeta") - } -} +local_block_meta_serde!(BlockPartitionMeta); -impl serde::Serialize for BlockPartitionMeta { - fn serialize(&self, _: S) -> std::result::Result - where S: serde::Serializer { - unimplemented!("Unimplemented serialize CompactSegmentMeta") - } -} - -impl<'de> serde::Deserialize<'de> for BlockPartitionMeta { - fn deserialize(_: D) -> std::result::Result - where D: serde::Deserializer<'de> { - unimplemented!("Unimplemented deserialize CompactSegmentMeta") - } -} +#[typetag::serde(name = "block_partition_meta")] +impl BlockMetaInfo for BlockPartitionMeta {} From 4ec340c3cc89ed301c8d16ffe207782e43c7dfb4 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Fri, 1 Nov 2024 13:50:14 +0800 Subject: [PATCH 07/11] chore: remove useless path prefix --- .../operations/read/block_partition_receiver_source.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs index 1e818712f125..9922d1651c63 100644 --- a/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs +++ b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use async_channel::Receiver; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; use databend_common_expression::DataBlock; use databend_common_pipeline_core::processors::OutputPort; use databend_common_pipeline_core::processors::ProcessorPtr; @@ -26,15 +27,15 @@ use databend_common_pipeline_sources::AsyncSourcer; use crate::operations::read::block_partition_meta::BlockPartitionMeta; pub struct BlockPartitionReceiverSource { - pub meta_receiver: Receiver>, + pub meta_receiver: Receiver>, } impl BlockPartitionReceiverSource { pub fn create( ctx: Arc, - receiver: Receiver>, + receiver: Receiver>, output_port: Arc, - ) -> databend_common_exception::Result { + ) -> Result { AsyncSourcer::create(ctx, output_port, Self { meta_receiver: receiver, }) @@ -47,7 +48,7 @@ impl AsyncSource for BlockPartitionReceiverSource { const SKIP_EMPTY_DATA_BLOCK: bool = false; #[async_backtrace::framed] - async fn generate(&mut self) -> databend_common_exception::Result> { + async fn generate(&mut self) -> Result> { match self.meta_receiver.recv().await { Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta( BlockPartitionMeta::create(vec![part]), From 7f60492a055373de6ff21cd160f7b5ec54636d39 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Fri, 1 Nov 2024 14:51:17 +0800 Subject: [PATCH 08/11] chore: apply review suggestion to avoid using globalIORuntime --- src/query/storages/fuse/src/fuse_table.rs | 9 ++++ .../storages/fuse/src/operations/read_data.rs | 53 ++++++++----------- 2 files changed, 32 insertions(+), 30 deletions(-) diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 6089bba69293..13fe228b40e5 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -23,6 +23,7 @@ use std::sync::Arc; use chrono::Duration; use chrono::TimeDelta; +use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::StorageDescription; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartStatistics; @@ -132,6 +133,8 @@ pub struct FuseTable { // If this is set, reading from fuse_table should only returns the increment blocks pub(crate) changes_desc: Option, + + pub(crate) runtime: Arc, } impl FuseTable { @@ -224,6 +227,11 @@ impl FuseTable { let meta_location_generator = TableMetaLocationGenerator::with_prefix(storage_prefix).with_part_prefix(part_prefix); + let runtime = Arc::new(Runtime::with_worker_threads( + 2, + Some(String::from("PruneSnapshot")), + )?); + Ok(Box::new(FuseTable { table_info, meta_location_generator, @@ -235,6 +243,7 @@ impl FuseTable { table_compression: table_compression.as_str().try_into()?, table_type, changes_desc: None, + runtime, })) } diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index b8db5c20f0e9..2077e667b8b3 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use async_channel::Receiver; -use databend_common_base::runtime::GlobalIORuntime; use databend_common_base::runtime::TrySpawn; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; @@ -155,12 +154,7 @@ impl FuseTable { }); } } - let (tx, rx) = if !lazy_init_segments.is_empty() { - let (tx, rx) = async_channel::bounded(1); - (Some(tx), Some(rx)) - } else { - (None, None) - }; + let block_reader = self.build_block_reader(ctx.clone(), plan, put_cache)?; let max_io_requests = self.adjust_io_request(&ctx)?; @@ -203,6 +197,13 @@ impl FuseTable { .transpose()?, ); + let (tx, rx) = if !lazy_init_segments.is_empty() { + let (tx, rx) = async_channel::bounded(max_io_requests); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + self.build_fuse_source_pipeline( ctx.clone(), pipeline, @@ -223,31 +224,23 @@ impl FuseTable { let table = self.clone(); let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); - - GlobalIORuntime::instance().try_spawn( - async move { - match table - .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) - .await - { - Ok((_, partitions)) => { - for part in partitions.partitions { - sender.send(Ok(part)).await.map_err(|_e| { - ErrorCode::Internal("Send partition meta failed") - })?; - } - } - Err(err) => { - sender - .send(Err(err)) - .await - .map_err(|_e| ErrorCode::Internal("Send partition meta failed"))?; + self.runtime.spawn(async move { + match table + .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) + .await + { + Ok((_, partitions)) => { + for part in partitions.partitions { + // ignore the error, the sql may be killed or early stop + let _ = sender.send(Ok(part)).await; } } - Ok::<_, ErrorCode>(()) - }, - Some(String::from("PruneSnapshot")), - )?; + Err(err) => { + let _ = sender.send(Err(err)).await; + } + } + Ok::<_, ErrorCode>(()) + }); } Ok(()) From c6f3fd1a432aeb668c2303cec1d3a1b202019877 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Fri, 1 Nov 2024 14:58:42 +0800 Subject: [PATCH 09/11] chore: apply review suggestion to spawn in pipeline init stage --- .../storages/fuse/src/operations/read_data.rs | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 2077e667b8b3..c384849ac011 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -224,23 +224,26 @@ impl FuseTable { let table = self.clone(); let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); - self.runtime.spawn(async move { - match table - .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) - .await - { - Ok((_, partitions)) => { - for part in partitions.partitions { - // ignore the error, the sql may be killed or early stop - let _ = sender.send(Ok(part)).await; + pipeline.set_on_init(move || { + table.runtime.clone().spawn(async move { + match table + .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) + .await + { + Ok((_, partitions)) => { + for part in partitions.partitions { + // ignore the error, the sql may be killed or early stop + let _ = sender.send(Ok(part)).await; + } + } + Err(err) => { + let _ = sender.send(Err(err)).await; } } - Err(err) => { - let _ = sender.send(Err(err)).await; - } - } - Ok::<_, ErrorCode>(()) - }); + Ok::<_, ErrorCode>(()) + }); + Ok(()) + }) } Ok(()) From 99f6a9fdd26fcbe36d4ebe3a5e8535f86fd81bbe Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Fri, 1 Nov 2024 16:41:29 +0800 Subject: [PATCH 10/11] try to find problem --- src/query/catalog/src/table_context.rs | 3 ++ src/query/service/src/sessions/query_ctx.rs | 5 +++ src/query/storages/fuse/src/fuse_table.rs | 9 ---- .../storages/fuse/src/operations/read_data.rs | 42 ++++++++++++------- 4 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 631c0a493b90..a7c0997aaa64 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -24,6 +24,7 @@ use std::time::SystemTime; use dashmap::DashMap; use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; +use databend_common_base::runtime::Runtime; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ResultExt; @@ -384,4 +385,6 @@ pub trait TableContext: Send + Sync { fn is_temp_table(&self, catalog_name: &str, database_name: &str, table_name: &str) -> bool; fn get_shared_settings(&self) -> Arc; + + fn get_runtime(&self) -> Result>; } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 375d7ae9e2dc..bdf4b7866eda 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -36,6 +36,7 @@ use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; use databend_common_base::JoinHandle; use databend_common_catalog::catalog::CATALOG_DEFAULT; @@ -1448,6 +1449,10 @@ impl TableContext for QueryContext { .lock() .is_temp_table(database_name, table_name) } + + fn get_runtime(&self) -> Result> { + self.shared.try_get_runtime() + } } impl TrySpawn for QueryContext { diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 13fe228b40e5..6089bba69293 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -23,7 +23,6 @@ use std::sync::Arc; use chrono::Duration; use chrono::TimeDelta; -use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::StorageDescription; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartStatistics; @@ -133,8 +132,6 @@ pub struct FuseTable { // If this is set, reading from fuse_table should only returns the increment blocks pub(crate) changes_desc: Option, - - pub(crate) runtime: Arc, } impl FuseTable { @@ -227,11 +224,6 @@ impl FuseTable { let meta_location_generator = TableMetaLocationGenerator::with_prefix(storage_prefix).with_part_prefix(part_prefix); - let runtime = Arc::new(Runtime::with_worker_threads( - 2, - Some(String::from("PruneSnapshot")), - )?); - Ok(Box::new(FuseTable { table_info, meta_location_generator, @@ -243,7 +235,6 @@ impl FuseTable { table_compression: table_compression.as_str().try_into()?, table_type, changes_desc: None, - runtime, })) } diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index c384849ac011..92157e39e9cd 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -225,25 +225,35 @@ impl FuseTable { let table_schema = self.schema_with_stream(); let push_downs = plan.push_downs.clone(); pipeline.set_on_init(move || { - table.runtime.clone().spawn(async move { - match table - .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) - .await - { - Ok((_, partitions)) => { - for part in partitions.partitions { - // ignore the error, the sql may be killed or early stop - let _ = sender.send(Ok(part)).await; + ctx.get_runtime()?.try_spawn( + async move { + match table + .prune_snapshot_blocks( + ctx, + push_downs, + table_schema, + lazy_init_segments, + 0, + ) + .await + { + Ok((_, partitions)) => { + for part in partitions.partitions { + // ignore the error, the sql may be killed or early stop + let _ = sender.send(Ok(part)).await; + } + } + Err(err) => { + let _ = sender.send(Err(err)).await; } } - Err(err) => { - let _ = sender.send(Err(err)).await; - } - } - Ok::<_, ErrorCode>(()) - }); + Ok::<_, ErrorCode>(()) + }, + None, + )?; + Ok(()) - }) + }); } Ok(()) From 260735bc388884ae0cd200ad7e07c63564907321 Mon Sep 17 00:00:00 2001 From: Liuqing Yue Date: Fri, 1 Nov 2024 16:56:27 +0800 Subject: [PATCH 11/11] clippy --- src/query/service/tests/it/sql/exec/get_table_bind_test.rs | 5 +++++ .../service/tests/it/storages/fuse/operations/commit.rs | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index a6a9d08a1957..6b4aaf93c707 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -22,6 +22,7 @@ use dashmap::DashMap; use databend_common_base::base::tokio; use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; +use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::cluster_info::Cluster; use databend_common_catalog::database::Database; @@ -1000,6 +1001,10 @@ impl TableContext for CtxDelegation { fn is_temp_table(&self, _catalog_name: &str, _database_name: &str, _table_name: &str) -> bool { false } + + fn get_runtime(&self) -> Result> { + todo!() + } } #[tokio::test(flavor = "multi_thread")] diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index d10f809569b0..181e42e6ca76 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -21,6 +21,7 @@ use dashmap::DashMap; use databend_common_base::base::tokio; use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; +use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::cluster_info::Cluster; use databend_common_catalog::database::Database; @@ -886,6 +887,10 @@ impl TableContext for CtxDelegation { fn is_temp_table(&self, _catalog_name: &str, _database_name: &str, _table_name: &str) -> bool { false } + + fn get_runtime(&self) -> Result> { + todo!() + } } #[derive(Clone, Debug)]