diff --git a/src/query/catalog/src/plan/partition.rs b/src/query/catalog/src/plan/partition.rs index 2c0f8ffe6f59..3278c22d3123 100644 --- a/src/query/catalog/src/plan/partition.rs +++ b/src/query/catalog/src/plan/partition.rs @@ -301,37 +301,10 @@ impl StealablePartitions { self.disable_steal = true; } - pub fn steal_one(&self, idx: usize) -> Option { + pub fn steal(&self, idx: usize, max_size: usize) -> Option> { let mut partitions = self.partitions.write(); if partitions.is_empty() { - return self.ctx.get_partition(); - } - - let idx = if idx >= partitions.len() { - idx % partitions.len() - } else { - idx - }; - - for step in 0..partitions.len() { - let index = (idx + step) % partitions.len(); - if !partitions[index].is_empty() { - return partitions[index].pop_front(); - } - - if self.disable_steal { - break; - } - } - - drop(partitions); - self.ctx.get_partition() - } - - pub fn steal(&self, idx: usize, max_size: usize) -> Vec { - let mut partitions = self.partitions.write(); - if partitions.is_empty() { - return self.ctx.get_partitions(max_size); + return None; } let idx = if idx >= partitions.len() { @@ -346,7 +319,7 @@ impl StealablePartitions { if !partitions[index].is_empty() { let ps = &mut partitions[index]; let size = ps.len().min(max_size); - return ps.drain(..size).collect(); + return Some(ps.drain(..size).collect()); } if self.disable_steal { @@ -355,7 +328,8 @@ impl StealablePartitions { } drop(partitions); - self.ctx.get_partitions(max_size) + + None } } diff --git a/src/query/catalog/src/table_context.rs b/src/query/catalog/src/table_context.rs index 631c0a493b90..a7c0997aaa64 100644 --- a/src/query/catalog/src/table_context.rs +++ b/src/query/catalog/src/table_context.rs @@ -24,6 +24,7 @@ use std::time::SystemTime; use dashmap::DashMap; use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; +use databend_common_base::runtime::Runtime; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_exception::ResultExt; @@ -384,4 +385,6 @@ pub trait TableContext: Send + Sync { fn is_temp_table(&self, catalog_name: &str, database_name: &str, table_name: &str) -> bool; fn get_shared_settings(&self) -> Arc; + + fn get_runtime(&self) -> Result>; } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index 375d7ae9e2dc..bdf4b7866eda 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -36,6 +36,7 @@ use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; use databend_common_base::runtime::profile::Profile; use databend_common_base::runtime::profile::ProfileStatisticsName; +use databend_common_base::runtime::Runtime; use databend_common_base::runtime::TrySpawn; use databend_common_base::JoinHandle; use databend_common_catalog::catalog::CATALOG_DEFAULT; @@ -1448,6 +1449,10 @@ impl TableContext for QueryContext { .lock() .is_temp_table(database_name, table_name) } + + fn get_runtime(&self) -> Result> { + self.shared.try_get_runtime() + } } impl TrySpawn for QueryContext { diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index a6a9d08a1957..6b4aaf93c707 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -22,6 +22,7 @@ use dashmap::DashMap; use databend_common_base::base::tokio; use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; +use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::cluster_info::Cluster; use databend_common_catalog::database::Database; @@ -1000,6 +1001,10 @@ impl TableContext for CtxDelegation { fn is_temp_table(&self, _catalog_name: &str, _database_name: &str, _table_name: &str) -> bool { false } + + fn get_runtime(&self) -> Result> { + todo!() + } } #[tokio::test(flavor = "multi_thread")] diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index d10f809569b0..181e42e6ca76 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -21,6 +21,7 @@ use dashmap::DashMap; use databend_common_base::base::tokio; use databend_common_base::base::Progress; use databend_common_base::base::ProgressValues; +use databend_common_base::runtime::Runtime; use databend_common_catalog::catalog::Catalog; use databend_common_catalog::cluster_info::Cluster; use databend_common_catalog::database::Database; @@ -886,6 +887,10 @@ impl TableContext for CtxDelegation { fn is_temp_table(&self, _catalog_name: &str, _database_name: &str, _table_name: &str) -> bool { false } + + fn get_runtime(&self) -> Result> { + todo!() + } } #[derive(Clone, Debug)] diff --git a/src/query/storages/fuse/src/operations/read/block_partition_meta.rs b/src/query/storages/fuse/src/operations/read/block_partition_meta.rs new file mode 100644 index 000000000000..96f4594b2ba0 --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/block_partition_meta.rs @@ -0,0 +1,44 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; + +use databend_common_catalog::plan::PartInfoPtr; +use databend_common_expression::local_block_meta_serde; +use databend_common_expression::BlockMetaInfo; +use databend_common_expression::BlockMetaInfoPtr; + +pub struct BlockPartitionMeta { + pub part_ptr: Vec, +} + +impl BlockPartitionMeta { + pub fn create(part_ptr: Vec) -> BlockMetaInfoPtr { + Box::new(BlockPartitionMeta { part_ptr }) + } +} + +impl Debug for BlockPartitionMeta { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("BlockPartitionMeta") + .field("part_ptr", &self.part_ptr) + .finish() + } +} + +local_block_meta_serde!(BlockPartitionMeta); + +#[typetag::serde(name = "block_partition_meta")] +impl BlockMetaInfo for BlockPartitionMeta {} diff --git a/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs new file mode 100644 index 000000000000..9922d1651c63 --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/block_partition_receiver_source.rs @@ -0,0 +1,66 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use async_channel::Receiver; +use databend_common_catalog::plan::PartInfoPtr; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_sources::AsyncSource; +use databend_common_pipeline_sources::AsyncSourcer; + +use crate::operations::read::block_partition_meta::BlockPartitionMeta; + +pub struct BlockPartitionReceiverSource { + pub meta_receiver: Receiver>, +} + +impl BlockPartitionReceiverSource { + pub fn create( + ctx: Arc, + receiver: Receiver>, + output_port: Arc, + ) -> Result { + AsyncSourcer::create(ctx, output_port, Self { + meta_receiver: receiver, + }) + } +} + +#[async_trait::async_trait] +impl AsyncSource for BlockPartitionReceiverSource { + const NAME: &'static str = "BlockPartitionReceiverSource"; + const SKIP_EMPTY_DATA_BLOCK: bool = false; + + #[async_backtrace::framed] + async fn generate(&mut self) -> Result> { + match self.meta_receiver.recv().await { + Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta( + BlockPartitionMeta::create(vec![part]), + ))), + Ok(Err(e)) => Err( + // The error is occurred in pruning process + e, + ), + Err(_) => { + // The channel is closed, we should return None to stop generating + Ok(None) + } + } + } +} diff --git a/src/query/storages/fuse/src/operations/read/block_partition_source.rs b/src/query/storages/fuse/src/operations/read/block_partition_source.rs new file mode 100644 index 000000000000..42b9ac41e48e --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/block_partition_source.rs @@ -0,0 +1,60 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::plan::StealablePartitions; +use databend_common_catalog::table_context::TableContext; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_sources::SyncSource; +use databend_common_pipeline_sources::SyncSourcer; + +use crate::operations::read::block_partition_meta::BlockPartitionMeta; + +pub struct BlockPartitionSource { + id: usize, + partitions: StealablePartitions, + max_batch_size: usize, +} + +impl BlockPartitionSource { + pub fn create( + id: usize, + partitions: StealablePartitions, + max_batch_size: usize, + ctx: Arc, + output_port: Arc, + ) -> databend_common_exception::Result { + SyncSourcer::create(ctx, output_port, BlockPartitionSource { + id, + partitions, + max_batch_size, + }) + } +} + +impl SyncSource for BlockPartitionSource { + const NAME: &'static str = "BlockPartitionSource"; + + fn generate(&mut self) -> databend_common_exception::Result> { + match self.partitions.steal(self.id, self.max_batch_size) { + None => Ok(None), + Some(parts) => Ok(Some(DataBlock::empty_with_meta( + BlockPartitionMeta::create(parts), + ))), + } + } +} diff --git a/src/query/storages/fuse/src/operations/read/fuse_source.rs b/src/query/storages/fuse/src/operations/read/fuse_source.rs index 814e62378c82..214df13b9f8c 100644 --- a/src/query/storages/fuse/src/operations/read/fuse_source.rs +++ b/src/query/storages/fuse/src/operations/read/fuse_source.rs @@ -15,6 +15,7 @@ use std::collections::VecDeque; use std::sync::Arc; +use async_channel::Receiver; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::PartInfoType; @@ -24,6 +25,7 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_expression::TableSchema; use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::Pipe; use databend_common_pipeline_core::Pipeline; use databend_common_pipeline_core::SourcePipeBuilder; use log::info; @@ -32,10 +34,12 @@ use crate::fuse_part::FuseBlockPartInfo; use crate::io::AggIndexReader; use crate::io::BlockReader; use crate::io::VirtualColumnReader; +use crate::operations::read::block_partition_receiver_source::BlockPartitionReceiverSource; +use crate::operations::read::block_partition_source::BlockPartitionSource; +use crate::operations::read::native_data_transform_reader::ReadNativeDataTransform; +use crate::operations::read::parquet_data_transform_reader::ReadParquetDataTransform; use crate::operations::read::DeserializeDataTransform; use crate::operations::read::NativeDeserializeDataTransform; -use crate::operations::read::ReadNativeDataSource; -use crate::operations::read::ReadParquetDataSource; #[allow(clippy::too_many_arguments)] pub fn build_fuse_native_source_pipeline( @@ -49,6 +53,7 @@ pub fn build_fuse_native_source_pipeline( mut max_io_requests: usize, index_reader: Arc>, virtual_reader: Arc>, + receiver: Option>>, ) -> Result<()> { (max_threads, max_io_requests) = adjust_threads_and_request(true, max_threads, max_io_requests, plan); @@ -58,8 +63,6 @@ pub fn build_fuse_native_source_pipeline( max_io_requests = max_io_requests.min(16); } - let mut source_builder = SourcePipeBuilder::create(); - match block_reader.support_blocking_api() { true => { let partitions = dispatch_partitions(ctx.clone(), plan, max_threads); @@ -68,25 +71,28 @@ pub fn build_fuse_native_source_pipeline( if topk.is_some() { partitions.disable_steal(); } - - for i in 0..max_threads { - let output = OutputPort::create(); - source_builder.add_source( - output.clone(), - ReadNativeDataSource::::create( - i, - plan.table_index, - ctx.clone(), - table_schema.clone(), - output, - block_reader.clone(), - partitions.clone(), - index_reader.clone(), - virtual_reader.clone(), - )?, - ); + match receiver { + Some(rx) => { + let pipe = build_receiver_source(max_threads, ctx.clone(), rx.clone())?; + pipeline.add_pipe(pipe); + } + None => { + let pipe = build_block_source(max_threads, partitions.clone(), 1, ctx.clone())?; + pipeline.add_pipe(pipe); + } } - pipeline.add_pipe(source_builder.finalize()); + pipeline.add_transform(|input, output| { + ReadNativeDataTransform::::create( + plan.table_index, + ctx.clone(), + table_schema.clone(), + block_reader.clone(), + index_reader.clone(), + virtual_reader.clone(), + input, + output, + ) + })?; } false => { let partitions = dispatch_partitions(ctx.clone(), plan, max_io_requests); @@ -95,25 +101,36 @@ pub fn build_fuse_native_source_pipeline( if topk.is_some() { partitions.disable_steal(); } - - for i in 0..max_io_requests { - let output = OutputPort::create(); - source_builder.add_source( - output.clone(), - ReadNativeDataSource::::create( - i, - plan.table_index, - ctx.clone(), - table_schema.clone(), - output, - block_reader.clone(), + match receiver { + Some(rx) => { + let pipe = build_receiver_source(max_io_requests, ctx.clone(), rx.clone())?; + pipeline.add_pipe(pipe); + } + None => { + let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; + let pipe = build_block_source( + max_io_requests, partitions.clone(), - index_reader.clone(), - virtual_reader.clone(), - )?, - ); + batch_size, + ctx.clone(), + )?; + pipeline.add_pipe(pipe); + } } - pipeline.add_pipe(source_builder.finalize()); + + pipeline.add_transform(|input, output| { + ReadNativeDataTransform::::create( + plan.table_index, + ctx.clone(), + table_schema.clone(), + block_reader.clone(), + index_reader.clone(), + virtual_reader.clone(), + input, + output, + ) + })?; + pipeline.try_resize(max_threads)?; } }; @@ -131,7 +148,9 @@ pub fn build_fuse_native_source_pipeline( ) })?; - pipeline.try_resize(max_threads) + pipeline.try_resize(max_threads)?; + + Ok(()) } #[allow(clippy::too_many_arguments)] @@ -145,35 +164,38 @@ pub fn build_fuse_parquet_source_pipeline( mut max_io_requests: usize, index_reader: Arc>, virtual_reader: Arc>, + receiver: Option>>, ) -> Result<()> { (max_threads, max_io_requests) = adjust_threads_and_request(false, max_threads, max_io_requests, plan); - let mut source_builder = SourcePipeBuilder::create(); - match block_reader.support_blocking_api() { true => { let partitions = dispatch_partitions(ctx.clone(), plan, max_threads); let partitions = StealablePartitions::new(partitions, ctx.clone()); - for i in 0..max_threads { - let output = OutputPort::create(); - source_builder.add_source( - output.clone(), - ReadParquetDataSource::::create( - i, - plan.table_index, - ctx.clone(), - table_schema.clone(), - output, - block_reader.clone(), - partitions.clone(), - index_reader.clone(), - virtual_reader.clone(), - )?, - ); + match receiver { + Some(rx) => { + let pipe = build_receiver_source(max_threads, ctx.clone(), rx.clone())?; + pipeline.add_pipe(pipe); + } + None => { + let pipe = build_block_source(max_threads, partitions.clone(), 1, ctx.clone())?; + pipeline.add_pipe(pipe); + } } - pipeline.add_pipe(source_builder.finalize()); + pipeline.add_transform(|input, output| { + ReadParquetDataTransform::::create( + plan.table_index, + ctx.clone(), + table_schema.clone(), + block_reader.clone(), + index_reader.clone(), + virtual_reader.clone(), + input, + output, + ) + })?; } false => { info!("read block data adjust max io requests:{}", max_io_requests); @@ -181,24 +203,36 @@ pub fn build_fuse_parquet_source_pipeline( let partitions = dispatch_partitions(ctx.clone(), plan, max_io_requests); let partitions = StealablePartitions::new(partitions, ctx.clone()); - for i in 0..max_io_requests { - let output = OutputPort::create(); - source_builder.add_source( - output.clone(), - ReadParquetDataSource::::create( - i, - plan.table_index, - ctx.clone(), - table_schema.clone(), - output, - block_reader.clone(), + match receiver { + Some(rx) => { + let pipe = build_receiver_source(max_io_requests, ctx.clone(), rx.clone())?; + pipeline.add_pipe(pipe); + } + None => { + let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; + let pipe = build_block_source( + max_io_requests, partitions.clone(), - index_reader.clone(), - virtual_reader.clone(), - )?, - ); + batch_size, + ctx.clone(), + )?; + pipeline.add_pipe(pipe); + } } - pipeline.add_pipe(source_builder.finalize()); + + pipeline.add_transform(|input, output| { + ReadParquetDataTransform::::create( + plan.table_index, + ctx.clone(), + table_schema.clone(), + block_reader.clone(), + index_reader.clone(), + virtual_reader.clone(), + input, + output, + ) + })?; + pipeline.try_resize(std::cmp::min(max_threads, max_io_requests))?; info!( @@ -219,7 +253,9 @@ pub fn build_fuse_parquet_source_pipeline( index_reader.clone(), virtual_reader.clone(), ) - }) + })?; + + Ok(()) } pub fn dispatch_partitions( @@ -294,3 +330,36 @@ pub fn adjust_threads_and_request( } (max_threads, max_io_requests) } + +pub fn build_receiver_source( + max_threads: usize, + ctx: Arc, + rx: Receiver>, +) -> Result { + let mut source_builder = SourcePipeBuilder::create(); + for _i in 0..max_threads { + let output = OutputPort::create(); + source_builder.add_source( + output.clone(), + BlockPartitionReceiverSource::create(ctx.clone(), rx.clone(), output)?, + ); + } + Ok(source_builder.finalize()) +} + +pub fn build_block_source( + max_threads: usize, + partitions: StealablePartitions, + max_batch: usize, + ctx: Arc, +) -> Result { + let mut source_builder = SourcePipeBuilder::create(); + for i in 0..max_threads { + let output = OutputPort::create(); + source_builder.add_source( + output.clone(), + BlockPartitionSource::create(i, partitions.clone(), max_batch, ctx.clone(), output)?, + ) + } + Ok(source_builder.finalize()) +} diff --git a/src/query/storages/fuse/src/operations/read/mod.rs b/src/query/storages/fuse/src/operations/read/mod.rs index dd27f3c0b4e4..26494de63cbe 100644 --- a/src/query/storages/fuse/src/operations/read/mod.rs +++ b/src/query/storages/fuse/src/operations/read/mod.rs @@ -16,20 +16,22 @@ mod fuse_rows_fetcher; pub mod fuse_source; mod native_data_source; mod native_data_source_deserializer; -mod native_data_source_reader; +mod native_data_transform_reader; mod native_rows_fetcher; mod parquet_data_source; mod parquet_data_source_deserializer; -mod parquet_data_source_reader; +mod parquet_data_transform_reader; mod parquet_rows_fetcher; mod runtime_filter_prunner; +mod block_partition_meta; +mod block_partition_receiver_source; +mod block_partition_source; mod data_source_with_meta; mod util; + pub use fuse_rows_fetcher::row_fetch_processor; pub use fuse_source::build_fuse_parquet_source_pipeline; pub use native_data_source_deserializer::NativeDeserializeDataTransform; -pub use native_data_source_reader::ReadNativeDataSource; pub use parquet_data_source_deserializer::DeserializeDataTransform; -pub use parquet_data_source_reader::ReadParquetDataSource; pub use util::need_reserve_block_info; diff --git a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs deleted file mode 100644 index 372580b97415..000000000000 --- a/src/query/storages/fuse/src/operations/read/native_data_source_reader.rs +++ /dev/null @@ -1,316 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::sync::Arc; - -use databend_common_catalog::plan::PartInfoPtr; -use databend_common_catalog::plan::StealablePartitions; -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::DataBlock; -use databend_common_expression::FunctionContext; -use databend_common_expression::TableSchema; -use databend_common_pipeline_core::processors::Event; -use databend_common_pipeline_core::processors::OutputPort; -use databend_common_pipeline_core::processors::Processor; -use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_sources::SyncSource; -use databend_common_pipeline_sources::SyncSourcer; -use databend_common_sql::IndexType; -use log::debug; - -use super::native_data_source::NativeDataSource; -use crate::io::AggIndexReader; -use crate::io::BlockReader; -use crate::io::TableMetaLocationGenerator; -use crate::io::VirtualColumnReader; -use crate::operations::read::data_source_with_meta::DataSourceWithMeta; -use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; -use crate::FuseBlockPartInfo; - -pub struct ReadNativeDataSource { - func_ctx: FunctionContext, - id: usize, - finished: bool, - batch_size: usize, - block_reader: Arc, - - output: Arc, - output_data: Option<(Vec, Vec)>, - partitions: StealablePartitions, - - index_reader: Arc>, - virtual_reader: Arc>, - - table_schema: Arc, - table_index: IndexType, -} - -impl ReadNativeDataSource { - pub fn create( - id: usize, - table_index: IndexType, - ctx: Arc, - table_schema: Arc, - output: Arc, - block_reader: Arc, - partitions: StealablePartitions, - index_reader: Arc>, - virtual_reader: Arc>, - ) -> Result { - let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; - let func_ctx = ctx.get_function_context()?; - SyncSourcer::create(ctx.clone(), output.clone(), ReadNativeDataSource:: { - func_ctx, - id, - output, - batch_size, - block_reader, - finished: false, - output_data: None, - partitions, - index_reader, - virtual_reader, - table_schema, - table_index, - }) - } -} - -impl ReadNativeDataSource { - pub fn create( - id: usize, - table_index: IndexType, - ctx: Arc, - table_schema: Arc, - output: Arc, - block_reader: Arc, - partitions: StealablePartitions, - index_reader: Arc>, - virtual_reader: Arc>, - ) -> Result { - let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; - let func_ctx = ctx.get_function_context()?; - Ok(ProcessorPtr::create(Box::new(ReadNativeDataSource::< - false, - > { - func_ctx, - id, - output, - batch_size, - block_reader, - finished: false, - output_data: None, - partitions, - index_reader, - virtual_reader, - table_schema, - table_index, - }))) - } -} - -impl SyncSource for ReadNativeDataSource { - const NAME: &'static str = "SyncReadNativeDataSource"; - - fn generate(&mut self) -> Result> { - match self.partitions.steal_one(self.id) { - None => Ok(None), - Some(part) => { - let mut filters = self - .partitions - .ctx - .get_inlist_runtime_filter_with_id(self.table_index); - filters.extend( - self.partitions - .ctx - .get_min_max_runtime_filter_with_id(self.table_index), - ); - if runtime_filter_pruner( - self.table_schema.clone(), - &part, - &filters, - &self.func_ctx, - )? { - return Ok(Some(DataBlock::empty())); - } - if let Some(index_reader) = self.index_reader.as_ref() { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - let loc = - TableMetaLocationGenerator::gen_agg_index_location_from_block_location( - &fuse_part.location, - index_reader.index_id(), - ); - if let Some(data) = index_reader.sync_read_native_data(&loc) { - // Read from aggregating index. - return Ok(Some(DataBlock::empty_with_meta( - DataSourceWithMeta::create(vec![part.clone()], vec![ - NativeDataSource::AggIndex(data), - ]), - ))); - } - } - - if let Some(virtual_reader) = self.virtual_reader.as_ref() { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - let loc = - TableMetaLocationGenerator::gen_virtual_block_location(&fuse_part.location); - - // If virtual column file exists, read the data from the virtual columns directly. - if let Some((mut virtual_source_data, ignore_column_ids)) = - virtual_reader.sync_read_native_data(&loc) - { - let mut source_data = self - .block_reader - .sync_read_native_columns_data(&part, &ignore_column_ids)?; - source_data.append(&mut virtual_source_data); - return Ok(Some(DataBlock::empty_with_meta( - DataSourceWithMeta::create(vec![part.clone()], vec![ - NativeDataSource::Normal(source_data), - ]), - ))); - } - } - - Ok(Some(DataBlock::empty_with_meta( - DataSourceWithMeta::create(vec![part.clone()], vec![NativeDataSource::Normal( - self.block_reader - .sync_read_native_columns_data(&part, &None)?, - )]), - ))) - } - } - } -} - -#[async_trait::async_trait] -impl Processor for ReadNativeDataSource { - fn name(&self) -> String { - String::from("AsyncReadNativeDataSource") - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if self.finished { - self.output.finish(); - return Ok(Event::Finished); - } - - if self.output.is_finished() { - return Ok(Event::Finished); - } - - if !self.output.can_push() { - return Ok(Event::NeedConsume); - } - - if let Some((part, data)) = self.output_data.take() { - let output = DataBlock::empty_with_meta(DataSourceWithMeta::create(part, data)); - self.output.push_data(Ok(output)); - // return Ok(Event::NeedConsume); - } - - Ok(Event::Async) - } - - #[async_backtrace::framed] - async fn async_process(&mut self) -> Result<()> { - let parts = self.partitions.steal(self.id, self.batch_size); - - if !parts.is_empty() { - let mut chunks = Vec::with_capacity(parts.len()); - let mut filters = self - .partitions - .ctx - .get_inlist_runtime_filter_with_id(self.table_index); - filters.extend( - self.partitions - .ctx - .get_min_max_runtime_filter_with_id(self.table_index), - ); - let mut native_part_infos = Vec::with_capacity(parts.len()); - for part in parts.into_iter() { - if runtime_filter_pruner( - self.table_schema.clone(), - &part, - &filters, - &self.func_ctx, - )? { - continue; - } - - native_part_infos.push(part.clone()); - let block_reader = self.block_reader.clone(); - let index_reader = self.index_reader.clone(); - let virtual_reader = self.virtual_reader.clone(); - let ctx = self.partitions.ctx.clone(); - chunks.push(async move { - let handler = databend_common_base::runtime::spawn(async move { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - if let Some(index_reader) = index_reader.as_ref() { - let loc = - TableMetaLocationGenerator::gen_agg_index_location_from_block_location( - &fuse_part.location, - index_reader.index_id(), - ); - if let Some(data) = index_reader.read_native_data(&loc).await { - // Read from aggregating index. - return Ok::<_, ErrorCode>(NativeDataSource::AggIndex(data)); - } - } - - if let Some(virtual_reader) = virtual_reader.as_ref() { - let loc = TableMetaLocationGenerator::gen_virtual_block_location( - &fuse_part.location, - ); - - // If virtual column file exists, read the data from the virtual columns directly. - if let Some((mut virtual_source_data, ignore_column_ids)) = - virtual_reader.read_native_data(&loc).await - { - let mut source_data = block_reader - .async_read_native_columns_data(&part, &ctx, &ignore_column_ids) - .await?; - source_data.append(&mut virtual_source_data); - return Ok(NativeDataSource::Normal(source_data)); - } - } - - Ok(NativeDataSource::Normal( - block_reader - .async_read_native_columns_data(&part, &ctx, &None) - .await?, - )) - }); - handler.await.unwrap() - }); - } - - debug!("ReadNativeDataSource parts: {}", chunks.len()); - self.output_data = Some(( - native_part_infos, - futures::future::try_join_all(chunks).await?, - )); - return Ok(()); - } - - self.finished = true; - Ok(()) - } -} diff --git a/src/query/storages/fuse/src/operations/read/native_data_transform_reader.rs b/src/query/storages/fuse/src/operations/read/native_data_transform_reader.rs new file mode 100644 index 000000000000..40a5966cb713 --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/native_data_transform_reader.rs @@ -0,0 +1,276 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::FunctionContext; +use databend_common_expression::TableSchema; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::processors::AsyncTransform; +use databend_common_pipeline_transforms::processors::AsyncTransformer; +use databend_common_pipeline_transforms::processors::Transform; +use databend_common_pipeline_transforms::processors::Transformer; +use databend_common_sql::IndexType; +use log::debug; + +use super::native_data_source::NativeDataSource; +use crate::io::AggIndexReader; +use crate::io::BlockReader; +use crate::io::TableMetaLocationGenerator; +use crate::io::VirtualColumnReader; +use crate::operations::read::block_partition_meta::BlockPartitionMeta; +use crate::operations::read::data_source_with_meta::DataSourceWithMeta; +use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; +use crate::FuseBlockPartInfo; + +pub struct ReadNativeDataTransform { + func_ctx: FunctionContext, + block_reader: Arc, + + index_reader: Arc>, + virtual_reader: Arc>, + + table_schema: Arc, + table_index: IndexType, + context: Arc, +} + +impl ReadNativeDataTransform { + pub fn create( + table_index: IndexType, + ctx: Arc, + table_schema: Arc, + block_reader: Arc, + index_reader: Arc>, + virtual_reader: Arc>, + input: Arc, + output: Arc, + ) -> Result { + let func_ctx = ctx.get_function_context()?; + Ok(ProcessorPtr::create(Transformer::create( + input, + output, + ReadNativeDataTransform:: { + func_ctx, + block_reader, + index_reader, + virtual_reader, + table_schema, + table_index, + context: ctx, + }, + ))) + } +} + +impl ReadNativeDataTransform { + pub fn create( + table_index: IndexType, + ctx: Arc, + table_schema: Arc, + block_reader: Arc, + index_reader: Arc>, + virtual_reader: Arc>, + input: Arc, + output: Arc, + ) -> Result { + let func_ctx = ctx.get_function_context()?; + Ok(ProcessorPtr::create(AsyncTransformer::create( + input, + output, + ReadNativeDataTransform:: { + func_ctx, + block_reader, + index_reader, + virtual_reader, + table_schema, + table_index, + context: ctx, + }, + ))) + } +} + +impl Transform for ReadNativeDataTransform { + const NAME: &'static str = "SyncReadNativeDataTransform"; + + fn transform(&mut self, data: DataBlock) -> Result { + if let Some(meta) = data.get_meta() { + if let Some(block_part_meta) = BlockPartitionMeta::downcast_ref_from(meta) { + let mut partitions = block_part_meta.part_ptr.clone(); + debug_assert!(partitions.len() == 1); + let part = partitions.pop().unwrap(); + let mut filters = self + .context + .get_inlist_runtime_filter_with_id(self.table_index); + filters.extend( + self.context + .get_min_max_runtime_filter_with_id(self.table_index), + ); + if runtime_filter_pruner( + self.table_schema.clone(), + &part, + &filters, + &self.func_ctx, + )? { + return Ok(DataBlock::empty()); + } + if let Some(index_reader) = self.index_reader.as_ref() { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + let loc = + TableMetaLocationGenerator::gen_agg_index_location_from_block_location( + &fuse_part.location, + index_reader.index_id(), + ); + if let Some(data) = index_reader.sync_read_native_data(&loc) { + // Read from aggregating index. + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + vec![part.clone()], + vec![NativeDataSource::AggIndex(data)], + ))); + } + } + + if let Some(virtual_reader) = self.virtual_reader.as_ref() { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + let loc = + TableMetaLocationGenerator::gen_virtual_block_location(&fuse_part.location); + + // If virtual column file exists, read the data from the virtual columns directly. + if let Some((mut virtual_source_data, ignore_column_ids)) = + virtual_reader.sync_read_native_data(&loc) + { + let mut source_data = self + .block_reader + .sync_read_native_columns_data(&part, &ignore_column_ids)?; + source_data.append(&mut virtual_source_data); + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + vec![part.clone()], + vec![NativeDataSource::Normal(source_data)], + ))); + } + } + + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + vec![part.clone()], + vec![NativeDataSource::Normal( + self.block_reader + .sync_read_native_columns_data(&part, &None)?, + )], + ))); + } + } + Err(ErrorCode::Internal( + "ReadNativeDataTransform get wrong meta data", + )) + } +} + +#[async_trait::async_trait] +impl AsyncTransform for ReadNativeDataTransform { + const NAME: &'static str = "AsyncReadNativeDataTransform"; + + #[async_backtrace::framed] + async fn transform(&mut self, data: DataBlock) -> Result { + if let Some(meta) = data.get_meta() { + if let Some(block_part_meta) = BlockPartitionMeta::downcast_ref_from(meta) { + let parts = block_part_meta.part_ptr.clone(); + if !parts.is_empty() { + let mut chunks = Vec::with_capacity(parts.len()); + let mut filters = self + .context + .get_inlist_runtime_filter_with_id(self.table_index); + filters.extend( + self.context + .get_min_max_runtime_filter_with_id(self.table_index), + ); + let mut native_part_infos = Vec::with_capacity(parts.len()); + for part in parts.into_iter() { + if runtime_filter_pruner( + self.table_schema.clone(), + &part, + &filters, + &self.func_ctx, + )? { + continue; + } + + native_part_infos.push(part.clone()); + let block_reader = self.block_reader.clone(); + let index_reader = self.index_reader.clone(); + let virtual_reader = self.virtual_reader.clone(); + let ctx = self.context.clone(); + chunks.push(async move { + let handler = databend_common_base::runtime::spawn(async move { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + if let Some(index_reader) = index_reader.as_ref() { + let loc = + TableMetaLocationGenerator::gen_agg_index_location_from_block_location( + &fuse_part.location, + index_reader.index_id(), + ); + if let Some(data) = index_reader.read_native_data(&loc).await { + // Read from aggregating index. + return Ok::<_, ErrorCode>(NativeDataSource::AggIndex(data)); + } + } + + if let Some(virtual_reader) = virtual_reader.as_ref() { + let loc = TableMetaLocationGenerator::gen_virtual_block_location( + &fuse_part.location, + ); + + // If virtual column file exists, read the data from the virtual columns directly. + if let Some((mut virtual_source_data, ignore_column_ids)) = + virtual_reader.read_native_data(&loc).await + { + let mut source_data = block_reader + .async_read_native_columns_data(&part, &ctx, &ignore_column_ids) + .await?; + source_data.append(&mut virtual_source_data); + return Ok(NativeDataSource::Normal(source_data)); + } + } + + Ok(NativeDataSource::Normal( + block_reader + .async_read_native_columns_data(&part, &ctx, &None) + .await?, + )) + }); + handler.await.unwrap() + }); + } + + debug!("ReadNativeDataSource parts: {}", chunks.len()); + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + native_part_infos, + futures::future::try_join_all(chunks).await?, + ))); + } + } + } + + Err(ErrorCode::Internal( + "AsyncReadNativeDataTransform get wrong meta data", + )) + } +} diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs deleted file mode 100644 index 69ef41844dfb..000000000000 --- a/src/query/storages/fuse/src/operations/read/parquet_data_source_reader.rs +++ /dev/null @@ -1,325 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::any::Any; -use std::sync::Arc; - -use databend_common_catalog::plan::PartInfoPtr; -use databend_common_catalog::plan::StealablePartitions; -use databend_common_catalog::table_context::TableContext; -use databend_common_exception::ErrorCode; -use databend_common_exception::Result; -use databend_common_expression::DataBlock; -use databend_common_expression::FunctionContext; -use databend_common_expression::TableSchema; -use databend_common_pipeline_core::processors::Event; -use databend_common_pipeline_core::processors::OutputPort; -use databend_common_pipeline_core::processors::Processor; -use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_sources::SyncSource; -use databend_common_pipeline_sources::SyncSourcer; -use databend_common_sql::IndexType; -use databend_storages_common_io::ReadSettings; -use log::debug; - -use super::parquet_data_source::ParquetDataSource; -use crate::fuse_part::FuseBlockPartInfo; -use crate::io::AggIndexReader; -use crate::io::BlockReader; -use crate::io::TableMetaLocationGenerator; -use crate::io::VirtualColumnReader; -use crate::operations::read::data_source_with_meta::DataSourceWithMeta; -use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; - -pub struct ReadParquetDataSource { - func_ctx: FunctionContext, - id: usize, - table_index: IndexType, - finished: bool, - batch_size: usize, - block_reader: Arc, - - output: Arc, - output_data: Option<(Vec, Vec)>, - partitions: StealablePartitions, - - index_reader: Arc>, - virtual_reader: Arc>, - - table_schema: Arc, -} - -impl ReadParquetDataSource { - #[allow(clippy::too_many_arguments)] - pub fn create( - id: usize, - table_index: IndexType, - ctx: Arc, - table_schema: Arc, - output: Arc, - block_reader: Arc, - partitions: StealablePartitions, - index_reader: Arc>, - virtual_reader: Arc>, - ) -> Result { - let batch_size = ctx.get_settings().get_storage_fetch_part_num()? as usize; - let func_ctx = ctx.get_function_context()?; - if BLOCKING_IO { - SyncSourcer::create(ctx.clone(), output.clone(), ReadParquetDataSource:: { - func_ctx, - id, - table_index, - output, - batch_size, - block_reader, - finished: false, - output_data: None, - partitions, - index_reader, - virtual_reader, - table_schema, - }) - } else { - Ok(ProcessorPtr::create(Box::new(ReadParquetDataSource::< - false, - > { - func_ctx, - id, - table_index, - output, - batch_size, - block_reader, - finished: false, - output_data: None, - partitions, - index_reader, - virtual_reader, - table_schema, - }))) - } - } -} - -impl SyncSource for ReadParquetDataSource { - const NAME: &'static str = "SyncReadParquetDataSource"; - - fn generate(&mut self) -> Result> { - match self.partitions.steal_one(self.id) { - None => Ok(None), - Some(part) => { - let mut filters = self - .partitions - .ctx - .get_inlist_runtime_filter_with_id(self.table_index); - filters.extend( - self.partitions - .ctx - .get_min_max_runtime_filter_with_id(self.table_index), - ); - if runtime_filter_pruner( - self.table_schema.clone(), - &part, - &filters, - &self.func_ctx, - )? { - return Ok(Some(DataBlock::empty())); - } - - if let Some(index_reader) = self.index_reader.as_ref() { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - let loc = - TableMetaLocationGenerator::gen_agg_index_location_from_block_location( - &fuse_part.location, - index_reader.index_id(), - ); - if let Some(data) = index_reader.sync_read_parquet_data_by_merge_io( - &ReadSettings::from_ctx(&self.partitions.ctx)?, - &loc, - ) { - // Read from aggregating index. - return Ok(Some(DataBlock::empty_with_meta( - DataSourceWithMeta::create(vec![part.clone()], vec![ - ParquetDataSource::AggIndex(data), - ]), - ))); - } - } - - // If virtual column file exists, read the data from the virtual columns directly. - let virtual_source = if let Some(virtual_reader) = self.virtual_reader.as_ref() { - let fuse_part = FuseBlockPartInfo::from_part(&part)?; - let loc = - TableMetaLocationGenerator::gen_virtual_block_location(&fuse_part.location); - - virtual_reader.sync_read_parquet_data_by_merge_io( - &ReadSettings::from_ctx(&self.partitions.ctx)?, - &loc, - ) - } else { - None - }; - let ignore_column_ids = if let Some(virtual_source) = &virtual_source { - &virtual_source.ignore_column_ids - } else { - &None - }; - - let source = self.block_reader.sync_read_columns_data_by_merge_io( - &ReadSettings::from_ctx(&self.partitions.ctx)?, - &part, - ignore_column_ids, - )?; - - Ok(Some(DataBlock::empty_with_meta( - DataSourceWithMeta::create(vec![part], vec![ParquetDataSource::Normal(( - source, - virtual_source, - ))]), - ))) - } - } - } -} - -#[async_trait::async_trait] -impl Processor for ReadParquetDataSource { - fn name(&self) -> String { - String::from("AsyncReadParquetDataSource") - } - - fn as_any(&mut self) -> &mut dyn Any { - self - } - - fn event(&mut self) -> Result { - if self.finished { - self.output.finish(); - return Ok(Event::Finished); - } - - if self.output.is_finished() { - return Ok(Event::Finished); - } - - if !self.output.can_push() { - return Ok(Event::NeedConsume); - } - - if let Some((part, data)) = self.output_data.take() { - let output = DataBlock::empty_with_meta(DataSourceWithMeta::create(part, data)); - - self.output.push_data(Ok(output)); - // return Ok(Event::NeedConsume); - } - - Ok(Event::Async) - } - - #[async_backtrace::framed] - async fn async_process(&mut self) -> Result<()> { - let parts = self.partitions.steal(self.id, self.batch_size); - if !parts.is_empty() { - let mut chunks = Vec::with_capacity(parts.len()); - let mut filters = self - .partitions - .ctx - .get_inlist_runtime_filter_with_id(self.table_index); - filters.extend( - self.partitions - .ctx - .get_min_max_runtime_filter_with_id(self.table_index), - ); - let mut fuse_part_infos = Vec::with_capacity(parts.len()); - for part in parts.into_iter() { - if runtime_filter_pruner( - self.table_schema.clone(), - &part, - &filters, - &self.func_ctx, - )? { - continue; - } - - fuse_part_infos.push(part.clone()); - let block_reader = self.block_reader.clone(); - let settings = ReadSettings::from_ctx(&self.partitions.ctx)?; - let index_reader = self.index_reader.clone(); - let virtual_reader = self.virtual_reader.clone(); - - chunks.push(async move { - databend_common_base::runtime::spawn(async move { - let part = FuseBlockPartInfo::from_part(&part)?; - - if let Some(index_reader) = index_reader.as_ref() { - let loc = - TableMetaLocationGenerator::gen_agg_index_location_from_block_location( - &part.location, - index_reader.index_id(), - ); - if let Some(data) = index_reader - .read_parquet_data_by_merge_io(&settings, &loc) - .await - { - // Read from aggregating index. - return Ok::<_, ErrorCode>(ParquetDataSource::AggIndex(data)); - } - } - - // If virtual column file exists, read the data from the virtual columns directly. - let virtual_source = if let Some(virtual_reader) = virtual_reader.as_ref() { - let loc = TableMetaLocationGenerator::gen_virtual_block_location( - &part.location, - ); - - virtual_reader - .read_parquet_data_by_merge_io(&settings, &loc) - .await - } else { - None - }; - - let ignore_column_ids = if let Some(virtual_source) = &virtual_source { - &virtual_source.ignore_column_ids - } else { - &None - }; - - let source = block_reader - .read_columns_data_by_merge_io( - &settings, - &part.location, - &part.columns_meta, - ignore_column_ids, - ) - .await?; - - Ok(ParquetDataSource::Normal((source, virtual_source))) - }) - .await - .unwrap() - }); - } - - debug!("ReadParquetDataSource parts: {}", chunks.len()); - self.output_data = Some(( - fuse_part_infos, - futures::future::try_join_all(chunks).await?, - )); - return Ok(()); - } - - self.finished = true; - Ok(()) - } -} diff --git a/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs b/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs new file mode 100644 index 000000000000..2c616f5e6c79 --- /dev/null +++ b/src/query/storages/fuse/src/operations/read/parquet_data_transform_reader.rs @@ -0,0 +1,295 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_expression::FunctionContext; +use databend_common_expression::TableSchema; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_transforms::processors::AsyncTransform; +use databend_common_pipeline_transforms::processors::AsyncTransformer; +use databend_common_pipeline_transforms::processors::Transform; +use databend_common_pipeline_transforms::processors::Transformer; +use databend_common_sql::IndexType; +use databend_storages_common_io::ReadSettings; +use log::debug; + +use super::parquet_data_source::ParquetDataSource; +use crate::fuse_part::FuseBlockPartInfo; +use crate::io::AggIndexReader; +use crate::io::BlockReader; +use crate::io::TableMetaLocationGenerator; +use crate::io::VirtualColumnReader; +use crate::operations::read::block_partition_meta::BlockPartitionMeta; +use crate::operations::read::data_source_with_meta::DataSourceWithMeta; +use crate::operations::read::runtime_filter_prunner::runtime_filter_pruner; + +pub struct ReadParquetDataTransform { + func_ctx: FunctionContext, + block_reader: Arc, + + index_reader: Arc>, + virtual_reader: Arc>, + + table_schema: Arc, + table_index: IndexType, + context: Arc, +} + +impl ReadParquetDataTransform { + pub fn create( + table_index: IndexType, + ctx: Arc, + table_schema: Arc, + block_reader: Arc, + index_reader: Arc>, + virtual_reader: Arc>, + input: Arc, + output: Arc, + ) -> Result { + let func_ctx = ctx.get_function_context()?; + Ok(ProcessorPtr::create(Transformer::create( + input, + output, + ReadParquetDataTransform:: { + func_ctx, + block_reader, + index_reader, + virtual_reader, + table_schema, + table_index, + context: ctx, + }, + ))) + } +} + +impl ReadParquetDataTransform { + pub fn create( + table_index: IndexType, + ctx: Arc, + table_schema: Arc, + block_reader: Arc, + index_reader: Arc>, + virtual_reader: Arc>, + input: Arc, + output: Arc, + ) -> Result { + let func_ctx = ctx.get_function_context()?; + Ok(ProcessorPtr::create(AsyncTransformer::create( + input, + output, + ReadParquetDataTransform:: { + func_ctx, + block_reader, + index_reader, + virtual_reader, + table_schema, + table_index, + context: ctx, + }, + ))) + } +} + +impl Transform for ReadParquetDataTransform { + const NAME: &'static str = "SyncReadParquetDataTransform"; + + fn transform(&mut self, data: DataBlock) -> Result { + if let Some(meta) = data.get_meta() { + if let Some(block_part_meta) = BlockPartitionMeta::downcast_ref_from(meta) { + let mut partitions = block_part_meta.part_ptr.clone(); + debug_assert!(partitions.len() == 1); + let part = partitions.pop().unwrap(); + let mut filters = self + .context + .get_inlist_runtime_filter_with_id(self.table_index); + filters.extend( + self.context + .get_min_max_runtime_filter_with_id(self.table_index), + ); + if runtime_filter_pruner( + self.table_schema.clone(), + &part, + &filters, + &self.func_ctx, + )? { + return Ok(DataBlock::empty()); + } + + if let Some(index_reader) = self.index_reader.as_ref() { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + let loc = + TableMetaLocationGenerator::gen_agg_index_location_from_block_location( + &fuse_part.location, + index_reader.index_id(), + ); + if let Some(data) = index_reader.sync_read_parquet_data_by_merge_io( + &ReadSettings::from_ctx(&self.context)?, + &loc, + ) { + // Read from aggregating index. + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + vec![part.clone()], + vec![ParquetDataSource::AggIndex(data)], + ))); + } + } + + // If virtual column file exists, read the data from the virtual columns directly. + let virtual_source = if let Some(virtual_reader) = self.virtual_reader.as_ref() { + let fuse_part = FuseBlockPartInfo::from_part(&part)?; + let loc = + TableMetaLocationGenerator::gen_virtual_block_location(&fuse_part.location); + + virtual_reader.sync_read_parquet_data_by_merge_io( + &ReadSettings::from_ctx(&self.context)?, + &loc, + ) + } else { + None + }; + let ignore_column_ids = if let Some(virtual_source) = &virtual_source { + &virtual_source.ignore_column_ids + } else { + &None + }; + + let source = self.block_reader.sync_read_columns_data_by_merge_io( + &ReadSettings::from_ctx(&self.context)?, + &part, + ignore_column_ids, + )?; + + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + vec![part], + vec![ParquetDataSource::Normal((source, virtual_source))], + ))); + } + } + Err(ErrorCode::Internal( + "ReadParquetDataTransform get wrong meta data", + )) + } +} + +#[async_trait::async_trait] +impl AsyncTransform for ReadParquetDataTransform { + const NAME: &'static str = "AsyncReadParquetDataTransform"; + + async fn transform(&mut self, data: DataBlock) -> Result { + if let Some(meta) = data.get_meta() { + if let Some(block_part_meta) = BlockPartitionMeta::downcast_ref_from(meta) { + let parts = block_part_meta.part_ptr.clone(); + if !parts.is_empty() { + let mut chunks = Vec::with_capacity(parts.len()); + let mut filters = self + .context + .get_inlist_runtime_filter_with_id(self.table_index); + filters.extend( + self.context + .get_min_max_runtime_filter_with_id(self.table_index), + ); + let mut fuse_part_infos = Vec::with_capacity(parts.len()); + for part in parts.into_iter() { + if runtime_filter_pruner( + self.table_schema.clone(), + &part, + &filters, + &self.func_ctx, + )? { + continue; + } + + fuse_part_infos.push(part.clone()); + let block_reader = self.block_reader.clone(); + let settings = ReadSettings::from_ctx(&self.context)?; + let index_reader = self.index_reader.clone(); + let virtual_reader = self.virtual_reader.clone(); + + chunks.push(async move { + databend_common_base::runtime::spawn(async move { + let part = FuseBlockPartInfo::from_part(&part)?; + + if let Some(index_reader) = index_reader.as_ref() { + let loc = + TableMetaLocationGenerator::gen_agg_index_location_from_block_location( + &part.location, + index_reader.index_id(), + ); + if let Some(data) = index_reader + .read_parquet_data_by_merge_io(&settings, &loc) + .await + { + // Read from aggregating index. + return Ok::<_, ErrorCode>(ParquetDataSource::AggIndex(data)); + } + } + + // If virtual column file exists, read the data from the virtual columns directly. + let virtual_source = if let Some(virtual_reader) = virtual_reader.as_ref() { + let loc = TableMetaLocationGenerator::gen_virtual_block_location( + &part.location, + ); + + virtual_reader + .read_parquet_data_by_merge_io(&settings, &loc) + .await + } else { + None + }; + + let ignore_column_ids = if let Some(virtual_source) = &virtual_source { + &virtual_source.ignore_column_ids + } else { + &None + }; + + let source = block_reader + .read_columns_data_by_merge_io( + &settings, + &part.location, + &part.columns_meta, + ignore_column_ids, + ) + .await?; + + Ok(ParquetDataSource::Normal((source, virtual_source))) + }) + .await + .unwrap() + }); + } + + debug!("ReadParquetDataSource parts: {}", chunks.len()); + return Ok(DataBlock::empty_with_meta(DataSourceWithMeta::create( + fuse_part_infos, + futures::future::try_join_all(chunks).await?, + ))); + } + } + } + + Err(ErrorCode::Internal( + "AsyncReadParquetDataSource get wrong meta data", + )) + } +} diff --git a/src/query/storages/fuse/src/operations/read_data.rs b/src/query/storages/fuse/src/operations/read_data.rs index 43ce8b4e4051..92157e39e9cd 100644 --- a/src/query/storages/fuse/src/operations/read_data.rs +++ b/src/query/storages/fuse/src/operations/read_data.rs @@ -14,13 +14,16 @@ use std::sync::Arc; -use databend_common_base::runtime::Runtime; +use async_channel::Receiver; +use databend_common_base::runtime::TrySpawn; use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::plan::PartInfoPtr; use databend_common_catalog::plan::Projection; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::plan::TopK; use databend_common_catalog::table::Table; use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_functions::BUILTIN_FUNCTIONS; use databend_common_pipeline_core::Pipeline; @@ -151,32 +154,6 @@ impl FuseTable { }); } } - if !lazy_init_segments.is_empty() { - let table = self.clone(); - let table_schema = self.schema_with_stream(); - let push_downs = plan.push_downs.clone(); - let query_ctx = ctx.clone(); - - // TODO: need refactor - pipeline.set_on_init(move || { - let table = table.clone(); - let table_schema = table_schema.clone(); - let ctx = query_ctx.clone(); - let push_downs = push_downs.clone(); - // let lazy_init_segments = lazy_init_segments.clone(); - - let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move { - let (_statistics, partitions) = table - .prune_snapshot_blocks(ctx, push_downs, table_schema, lazy_init_segments, 0) - .await?; - - Result::<_>::Ok(partitions) - })?; - - query_ctx.set_partitions(partitions)?; - Ok(()) - }); - } let block_reader = self.build_block_reader(ctx.clone(), plan, put_cache)?; let max_io_requests = self.adjust_io_request(&ctx)?; @@ -220,6 +197,13 @@ impl FuseTable { .transpose()?, ); + let (tx, rx) = if !lazy_init_segments.is_empty() { + let (tx, rx) = async_channel::bounded(max_io_requests); + (Some(tx), Some(rx)) + } else { + (None, None) + }; + self.build_fuse_source_pipeline( ctx.clone(), pipeline, @@ -230,10 +214,47 @@ impl FuseTable { max_io_requests, index_reader, virtual_reader, + rx, )?; // replace the column which has data mask if needed - self.apply_data_mask_policy_if_needed(ctx, plan, pipeline)?; + self.apply_data_mask_policy_if_needed(ctx.clone(), plan, pipeline)?; + + if let Some(sender) = tx { + let table = self.clone(); + let table_schema = self.schema_with_stream(); + let push_downs = plan.push_downs.clone(); + pipeline.set_on_init(move || { + ctx.get_runtime()?.try_spawn( + async move { + match table + .prune_snapshot_blocks( + ctx, + push_downs, + table_schema, + lazy_init_segments, + 0, + ) + .await + { + Ok((_, partitions)) => { + for part in partitions.partitions { + // ignore the error, the sql may be killed or early stop + let _ = sender.send(Ok(part)).await; + } + } + Err(err) => { + let _ = sender.send(Err(err)).await; + } + } + Ok::<_, ErrorCode>(()) + }, + None, + )?; + + Ok(()) + }); + } Ok(()) } @@ -250,6 +271,7 @@ impl FuseTable { max_io_requests: usize, index_reader: Arc>, virtual_reader: Arc>, + receiver: Option>>, ) -> Result<()> { let max_threads = ctx.get_settings().get_max_threads()? as usize; let table_schema = self.schema_with_stream(); @@ -265,6 +287,7 @@ impl FuseTable { max_io_requests, index_reader, virtual_reader, + receiver, ), FuseStorageFormat::Parquet => build_fuse_parquet_source_pipeline( ctx, @@ -276,6 +299,7 @@ impl FuseTable { max_io_requests, index_reader, virtual_reader, + receiver, ), } } diff --git a/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test b/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test index bddfc8bf5c8c..34cef56ea051 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/explain_pipeline.test @@ -17,7 +17,8 @@ explain pipeline select a from t1 ignore_result ---- EmptySink × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 statement ok diff --git a/tests/sqllogictests/suites/mode/standalone/explain/sort.test b/tests/sqllogictests/suites/mode/standalone/explain/sort.test index a501abeb3f79..c8bd25e26f1e 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/sort.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/sort.test @@ -82,7 +82,8 @@ CompoundBlockOperator(Project) × 1 SortPartialTransform × 4 Merge to Resize × 4 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # Sort spilling @@ -101,7 +102,8 @@ CompoundBlockOperator(Project) × 1 SortPartialTransform × 4 Merge to Resize × 4 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 statement ok set sort_spilling_memory_ratio = 0; @@ -119,7 +121,8 @@ CompoundBlockOperator(Project) × 1 Merge to Resize × 4 CompoundBlockOperator(Map) × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # Sort spilling @@ -139,7 +142,8 @@ CompoundBlockOperator(Project) × 1 Merge to Resize × 4 CompoundBlockOperator(Map) × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 statement ok drop table if exists t1; diff --git a/tests/sqllogictests/suites/mode/standalone/explain/window.test b/tests/sqllogictests/suites/mode/standalone/explain/window.test index 97f9e3c49360..971b3dd1dccc 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain/window.test +++ b/tests/sqllogictests/suites/mode/standalone/explain/window.test @@ -60,7 +60,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # Enable sort spilling @@ -81,7 +82,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 statement ok @@ -368,7 +370,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # Enable sort spilling statement ok @@ -385,7 +388,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # Disable sort spilling @@ -403,7 +407,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # rows frame single window (can push down limit) query T @@ -416,7 +421,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # rows frame single window (can not push down limit) query T @@ -429,7 +435,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # rows frame multi window (can not push down limit) query T @@ -448,7 +455,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # row fetch with window function(pipeline explain) query T @@ -468,7 +476,8 @@ CompoundBlockOperator(Project) × 1 TransformFilter × 1 AddInternalColumnsTransform × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # row fetch with window function(plan explain) query T @@ -529,7 +538,8 @@ CompoundBlockOperator(Project) × 1 ShuffleMergePartition × 1 ShufflePartition × 1 DeserializeDataTransform × 1 - SyncReadParquetDataSource × 1 + SyncReadParquetDataTransform × 1 + BlockPartitionSource × 1 # same order multi window query T diff --git a/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test b/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test index 35208a8df0ad..929a46c09531 100644 --- a/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test +++ b/tests/sqllogictests/suites/mode/standalone/explain_native/explain_pipeline.test @@ -17,7 +17,8 @@ explain pipeline select a from t1 ignore_result ---- EmptySink × 1 NativeDeserializeDataTransform × 1 - SyncReadNativeDataSource × 1 + SyncReadNativeDataTransform × 1 + BlockPartitionSource × 1 statement ok