diff --git a/Cargo.lock b/Cargo.lock index b20701850897..dbcbd807edef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3773,6 +3773,7 @@ version = "0.1.0" dependencies = [ "async-backtrace", "async-trait-fn", + "csv-core", "dashmap", "databend-common-base", "databend-common-building", @@ -3785,6 +3786,7 @@ dependencies = [ "databend-common-pipeline-core", "databend-common-pipeline-sources", "databend-common-pipeline-transforms", + "databend-common-settings", "databend-common-storage", "databend-common-storages-parquet", "log", diff --git a/src/query/formats/src/common_settings.rs b/src/query/formats/src/common_settings.rs index 6696c0e675a1..1f3c1476bba0 100644 --- a/src/query/formats/src/common_settings.rs +++ b/src/query/formats/src/common_settings.rs @@ -25,6 +25,7 @@ pub struct InputCommonSettings { pub timezone: Tz, pub disable_variant_check: bool, pub binary_format: BinaryFormat, + pub is_rounding_mode: bool, } #[derive(Clone)] diff --git a/src/query/formats/src/field_decoder/fast_values.rs b/src/query/formats/src/field_decoder/fast_values.rs index a52a79d55e0c..f6178af3dadf 100644 --- a/src/query/formats/src/field_decoder/fast_values.rs +++ b/src/query/formats/src/field_decoder/fast_values.rs @@ -68,7 +68,6 @@ use crate::InputCommonSettings; #[derive(Clone)] pub struct FastFieldDecoderValues { common_settings: InputCommonSettings, - rounding_mode: bool, } impl FieldDecoder for FastFieldDecoderValues { @@ -78,7 +77,7 @@ impl FieldDecoder for FastFieldDecoderValues { } impl FastFieldDecoderValues { - pub fn create_for_insert(format: FormatSettings, rounding_mode: bool) -> Self { + pub fn create_for_insert(format: FormatSettings, is_rounding_mode: bool) -> Self { FastFieldDecoderValues { common_settings: InputCommonSettings { true_bytes: TRUE_BYTES_LOWER.as_bytes().to_vec(), @@ -92,8 +91,8 @@ impl FastFieldDecoderValues { timezone: format.timezone, disable_variant_check: false, binary_format: Default::default(), + is_rounding_mode, }, - rounding_mode, } } @@ -212,7 +211,7 @@ impl FastFieldDecoderValues { Err(_) => { // cast float value to integer value let val: f64 = reader.read_float_text()?; - let new_val: Option = if self.rounding_mode { + let new_val: Option = if self.common_settings.is_rounding_mode { num_traits::cast::cast(val.round()) } else { num_traits::cast::cast(val) diff --git a/src/query/formats/src/field_decoder/json_ast.rs b/src/query/formats/src/field_decoder/json_ast.rs index 132e73a16be4..a291973956c7 100644 --- a/src/query/formats/src/field_decoder/json_ast.rs +++ b/src/query/formats/src/field_decoder/json_ast.rs @@ -53,7 +53,7 @@ pub struct FieldJsonAstDecoder { timezone: Tz, pub ident_case_sensitive: bool, pub is_select: bool, - rounding_mode: bool, + is_rounding_mode: bool, } impl FieldDecoder for FieldJsonAstDecoder { @@ -68,7 +68,7 @@ impl FieldJsonAstDecoder { timezone: options.timezone, ident_case_sensitive: options.ident_case_sensitive, is_select: options.is_select, - rounding_mode, + is_rounding_mode: rounding_mode, } } @@ -146,7 +146,7 @@ impl FieldJsonAstDecoder { Some(v) => num_traits::cast::cast(v), None => match v.as_f64() { Some(v) => { - if self.rounding_mode { + if self.is_rounding_mode { num_traits::cast::cast(v.round()) } else { num_traits::cast::cast(v) @@ -178,7 +178,7 @@ impl FieldJsonAstDecoder { Some(v) => num_traits::cast::cast(v), None => match v.as_f64() { Some(v) => { - if self.rounding_mode { + if self.is_rounding_mode { num_traits::cast::cast(v.round()) } else { num_traits::cast::cast(v) diff --git a/src/query/formats/src/field_decoder/nested.rs b/src/query/formats/src/field_decoder/nested.rs index 64d71defccaa..98dbdb50fe2d 100644 --- a/src/query/formats/src/field_decoder/nested.rs +++ b/src/query/formats/src/field_decoder/nested.rs @@ -83,6 +83,7 @@ impl NestedValues { timezone: options_ext.timezone, disable_variant_check: options_ext.disable_variant_check, binary_format: Default::default(), + is_rounding_mode: options_ext.is_rounding_mode, }, } } diff --git a/src/query/formats/src/field_decoder/separated_text.rs b/src/query/formats/src/field_decoder/separated_text.rs index 7ef1f9d012c8..b63c238c73e6 100644 --- a/src/query/formats/src/field_decoder/separated_text.rs +++ b/src/query/formats/src/field_decoder/separated_text.rs @@ -67,7 +67,6 @@ use crate::NestedValues; pub struct SeparatedTextDecoder { common_settings: InputCommonSettings, nested_decoder: NestedValues, - rounding_mode: bool, } impl FieldDecoder for SeparatedTextDecoder { @@ -79,11 +78,7 @@ impl FieldDecoder for SeparatedTextDecoder { /// in CSV, we find the exact bound of each field before decode it to a type. /// which is diff from the case when parsing values. impl SeparatedTextDecoder { - pub fn create_csv( - params: &CsvFileFormatParams, - options_ext: &FileFormatOptionsExt, - rounding_mode: bool, - ) -> Self { + pub fn create_csv(params: &CsvFileFormatParams, options_ext: &FileFormatOptionsExt) -> Self { SeparatedTextDecoder { common_settings: InputCommonSettings { true_bytes: TRUE_BYTES_LOWER.as_bytes().to_vec(), @@ -94,17 +89,13 @@ impl SeparatedTextDecoder { timezone: options_ext.timezone, disable_variant_check: options_ext.disable_variant_check, binary_format: params.binary_format, + is_rounding_mode: options_ext.is_rounding_mode, }, nested_decoder: NestedValues::create(options_ext), - rounding_mode, } } - pub fn create_tsv( - _params: &TsvFileFormatParams, - options_ext: &FileFormatOptionsExt, - rounding_mode: bool, - ) -> Self { + pub fn create_tsv(_params: &TsvFileFormatParams, options_ext: &FileFormatOptionsExt) -> Self { SeparatedTextDecoder { common_settings: InputCommonSettings { null_if: vec![NULL_BYTES_ESCAPE.as_bytes().to_vec()], @@ -115,17 +106,13 @@ impl SeparatedTextDecoder { timezone: options_ext.timezone, disable_variant_check: options_ext.disable_variant_check, binary_format: Default::default(), + is_rounding_mode: options_ext.is_rounding_mode, }, nested_decoder: NestedValues::create(options_ext), - rounding_mode, } } - pub fn create_xml( - _params: &XmlFileFormatParams, - options_ext: &FileFormatOptionsExt, - rounding_mode: bool, - ) -> Self { + pub fn create_xml(_params: &XmlFileFormatParams, options_ext: &FileFormatOptionsExt) -> Self { SeparatedTextDecoder { common_settings: InputCommonSettings { null_if: vec![NULL_BYTES_LOWER.as_bytes().to_vec()], @@ -136,9 +123,9 @@ impl SeparatedTextDecoder { timezone: options_ext.timezone, disable_variant_check: options_ext.disable_variant_check, binary_format: Default::default(), + is_rounding_mode: options_ext.is_rounding_mode, }, nested_decoder: NestedValues::create(options_ext), - rounding_mode, } } @@ -242,7 +229,7 @@ impl SeparatedTextDecoder { Err(_) => { // cast float value to integer value let val: f64 = read_num_text_exact(&data[..effective])?; - let new_val: Option = if self.rounding_mode { + let new_val: Option = if self.common_settings.is_rounding_mode { num_traits::cast::cast(val.round()) } else { num_traits::cast::cast(val) diff --git a/src/query/formats/src/file_format_type.rs b/src/query/formats/src/file_format_type.rs index 52adbad26746..14f09c350df2 100644 --- a/src/query/formats/src/file_format_type.rs +++ b/src/query/formats/src/file_format_type.rs @@ -46,6 +46,7 @@ pub struct FileFormatOptionsExt { pub timezone: Tz, pub is_select: bool, pub is_clickhouse: bool, + pub is_rounding_mode: bool, } impl FileFormatOptionsExt { @@ -54,6 +55,11 @@ impl FileFormatOptionsExt { is_select: bool, ) -> Result { let timezone = parse_timezone(settings)?; + let numeric_cast_option = settings + .get_numeric_cast_option() + .unwrap_or("rounding".to_string()); + let is_rounding_mode = numeric_cast_option.as_str() == "rounding"; + let options = FileFormatOptionsExt { ident_case_sensitive: false, headers: 0, @@ -63,6 +69,7 @@ impl FileFormatOptionsExt { timezone, is_select, is_clickhouse: false, + is_rounding_mode, }; Ok(options) } @@ -81,6 +88,7 @@ impl FileFormatOptionsExt { timezone, is_select: false, is_clickhouse: true, + is_rounding_mode: true, }; let suf = &clickhouse_type.suffixes; options.headers = suf.headers; diff --git a/src/query/pipeline/sources/src/input_formats/impls/input_format_csv.rs b/src/query/pipeline/sources/src/input_formats/impls/input_format_csv.rs index fbca619c2f33..dfec9a1e098a 100644 --- a/src/query/pipeline/sources/src/input_formats/impls/input_format_csv.rs +++ b/src/query/pipeline/sources/src/input_formats/impls/input_format_csv.rs @@ -168,14 +168,9 @@ impl InputFormatTextBase for InputFormatCSV { fn create_field_decoder( params: &FileFormatParams, options: &FileFormatOptionsExt, - rounding_mode: bool, ) -> Arc { let csv_params = CsvFileFormatParams::downcast_unchecked(params); - Arc::new(SeparatedTextDecoder::create_csv( - csv_params, - options, - rounding_mode, - )) + Arc::new(SeparatedTextDecoder::create_csv(csv_params, options)) } fn try_create_align_state( diff --git a/src/query/pipeline/sources/src/input_formats/impls/input_format_ndjson.rs b/src/query/pipeline/sources/src/input_formats/impls/input_format_ndjson.rs index 78e6c1f7572e..e21f10f8e1ac 100644 --- a/src/query/pipeline/sources/src/input_formats/impls/input_format_ndjson.rs +++ b/src/query/pipeline/sources/src/input_formats/impls/input_format_ndjson.rs @@ -169,9 +169,11 @@ impl InputFormatTextBase for InputFormatNDJson { fn create_field_decoder( _params: &FileFormatParams, options: &FileFormatOptionsExt, - rounding_mode: bool, ) -> Arc { - Arc::new(FieldJsonAstDecoder::create(options, rounding_mode)) + Arc::new(FieldJsonAstDecoder::create( + options, + options.is_rounding_mode, + )) } fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()> { diff --git a/src/query/pipeline/sources/src/input_formats/impls/input_format_tsv.rs b/src/query/pipeline/sources/src/input_formats/impls/input_format_tsv.rs index 67d87957f537..729374df1801 100644 --- a/src/query/pipeline/sources/src/input_formats/impls/input_format_tsv.rs +++ b/src/query/pipeline/sources/src/input_formats/impls/input_format_tsv.rs @@ -184,14 +184,9 @@ impl InputFormatTextBase for InputFormatTSV { fn create_field_decoder( params: &FileFormatParams, options: &FileFormatOptionsExt, - rounding_mode: bool, ) -> Arc { let tsv_params = TsvFileFormatParams::downcast_unchecked(params); - Arc::new(SeparatedTextDecoder::create_tsv( - tsv_params, - options, - rounding_mode, - )) + Arc::new(SeparatedTextDecoder::create_tsv(tsv_params, options)) } fn try_create_align_state( diff --git a/src/query/pipeline/sources/src/input_formats/impls/input_format_xml.rs b/src/query/pipeline/sources/src/input_formats/impls/input_format_xml.rs index cd65348fb22e..554b212d8480 100644 --- a/src/query/pipeline/sources/src/input_formats/impls/input_format_xml.rs +++ b/src/query/pipeline/sources/src/input_formats/impls/input_format_xml.rs @@ -138,12 +138,10 @@ impl InputFormatTextBase for InputFormatXML { fn create_field_decoder( params: &FileFormatParams, options: &FileFormatOptionsExt, - rounding_mode: bool, ) -> Arc { Arc::new(SeparatedTextDecoder::create_xml( XmlFileFormatParams::downcast_unchecked(params), options, - rounding_mode, )) } diff --git a/src/query/pipeline/sources/src/input_formats/input_format_text.rs b/src/query/pipeline/sources/src/input_formats/input_format_text.rs index 27f59d035b24..c882f25ad219 100644 --- a/src/query/pipeline/sources/src/input_formats/input_format_text.rs +++ b/src/query/pipeline/sources/src/input_formats/input_format_text.rs @@ -344,7 +344,6 @@ pub trait InputFormatTextBase: Sized + Send + Sync + 'static { fn create_field_decoder( params: &FileFormatParams, options: &FileFormatOptionsExt, - rounding_mode: bool, ) -> Arc; fn deserialize(builder: &mut BlockBuilder, batch: RowBatch) -> Result<()>; @@ -578,17 +577,8 @@ impl BlockBuilder { ) }) .collect(); - - let numeric_cast_option = ctx - .settings - .get_numeric_cast_option() - .unwrap_or("rounding".to_string()); - let rounding_mode = numeric_cast_option.as_str() == "rounding"; - let field_decoder = T::create_field_decoder( - &ctx.file_format_params, - &ctx.file_format_options_ext, - rounding_mode, - ); + let field_decoder = + T::create_field_decoder(&ctx.file_format_params, &ctx.file_format_options_ext); let projection = ctx.projection.clone(); BlockBuilder { diff --git a/src/query/pipeline/sources/src/input_formats/mod.rs b/src/query/pipeline/sources/src/input_formats/mod.rs index 19fe57d22b0f..60970c21436c 100644 --- a/src/query/pipeline/sources/src/input_formats/mod.rs +++ b/src/query/pipeline/sources/src/input_formats/mod.rs @@ -13,7 +13,7 @@ // limitations under the License. mod beyond_end_reader; -mod error_utils; +pub mod error_utils; mod impls; mod input_context; mod input_format; diff --git a/src/query/storages/stage/Cargo.toml b/src/query/storages/stage/Cargo.toml index ad91440ae2c1..b189644f78d3 100644 --- a/src/query/storages/stage/Cargo.toml +++ b/src/query/storages/stage/Cargo.toml @@ -22,11 +22,13 @@ databend-common-meta-app = { path = "../../../meta/app" } databend-common-pipeline-core = { path = "../../pipeline/core" } databend-common-pipeline-sources = { path = "../../pipeline/sources" } databend-common-pipeline-transforms = { path = "../../pipeline/transforms" } +databend-common-settings = { path = "../../settings" } databend-common-storage = { path = "../../../common/storage" } databend-common-storages-parquet = { path = "../parquet" } async-backtrace = { workspace = true } async-trait = { workspace = true } +csv-core = "0.1.11" dashmap = { workspace = true } log = { workspace = true } opendal = { workspace = true } diff --git a/src/query/storages/stage/src/input_context.rs b/src/query/storages/stage/src/input_context.rs new file mode 100644 index 000000000000..9ea0c6653178 --- /dev/null +++ b/src/query/storages/stage/src/input_context.rs @@ -0,0 +1,145 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::io::Read; +use std::sync::Arc; + +use dashmap::DashMap; +use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::plan::PartInfo; +use databend_common_catalog::plan::PartStatistics; +use databend_common_catalog::plan::Partitions; +use databend_common_catalog::plan::PartitionsShuffleKind; +use databend_common_catalog::plan::Projection; +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::plan::StageTableInfo; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::TableSchemaRefExt; +use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_sources::input_formats::InputContext; +use databend_common_pipeline_sources::input_formats::SplitInfo; +use databend_common_storage::STDIN_FD; +use log::debug; +use opendal::Scheme; + +use crate::StageTable; + +impl StageTable { + pub async fn read_partition_old( + &self, + ctx: &Arc, + ) -> Result<(PartStatistics, Partitions)> { + let stage_info = &self.table_info; + // User set the files. + let files = if let Some(files) = &stage_info.files_to_copy { + files.clone() + } else { + StageTable::list_files(stage_info, None).await? + }; + let format = InputContext::get_input_format(&stage_info.stage_info.file_format_params)?; + let operator = StageTable::get_op(&stage_info.stage_info)?; + let splits = format + .get_splits( + files, + &stage_info.stage_info, + &operator, + &ctx.get_settings(), + ) + .await?; + + let partitions = splits + .into_iter() + .map(|v| { + let part_info: Box = Box::new((*v).clone()); + Arc::new(part_info) + }) + .collect::>(); + Ok(( + PartStatistics::default(), + Partitions::create_nolazy(PartitionsShuffleKind::Seq, partitions), + )) + } + + pub(crate) fn read_data_old( + &self, + ctx: Arc, + plan: &DataSourcePlan, + pipeline: &mut Pipeline, + stage_table_info: &StageTableInfo, + ) -> Result<()> { + let projection = if let Some(PushDownInfo { + projection: Some(Projection::Columns(columns)), + .. + }) = &plan.push_downs + { + Some(columns.clone()) + } else { + None + }; + + let mut splits = vec![]; + for part in &plan.parts.partitions { + if let Some(split) = part.as_any().downcast_ref::() { + splits.push(Arc::new(split.clone())); + } + } + + // Build copy pipeline. + let settings = ctx.get_settings(); + let fields = stage_table_info + .schema + .fields() + .iter() + .filter(|f| f.computed_expr().is_none()) + .cloned() + .collect::>(); + let schema = TableSchemaRefExt::create(fields); + let stage_info = stage_table_info.stage_info.clone(); + let operator = StageTable::get_op(&stage_table_info.stage_info)?; + let compact_threshold = self.get_block_compact_thresholds_with_default(); + let on_error_map = ctx.get_on_error_map().unwrap_or_else(|| { + let m = Arc::new(DashMap::new()); + ctx.set_on_error_map(m.clone()); + m + }); + + // inject stdin to memory + if operator.info().scheme() == Scheme::Memory { + let mut buffer = vec![]; + std::io::stdin().lock().read_to_end(&mut buffer)?; + + let bop = operator.blocking(); + bop.write(STDIN_FD, buffer)?; + } + + let input_ctx = Arc::new(InputContext::try_create_from_copy( + ctx.clone(), + operator, + settings, + schema, + stage_info, + splits, + ctx.get_scan_progress(), + compact_threshold, + on_error_map, + self.table_info.is_select, + projection, + self.table_info.default_values.clone(), + )?); + debug!("start copy splits feeder in {}", ctx.get_cluster().local_id); + input_ctx.format.exec_copy(input_ctx.clone(), pipeline)?; + Ok(()) + } +} diff --git a/src/query/storages/stage/src/lib.rs b/src/query/storages/stage/src/lib.rs index b2df49ac17e7..744dfc38b6cc 100644 --- a/src/query/storages/stage/src/lib.rs +++ b/src/query/storages/stage/src/lib.rs @@ -12,9 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +#![feature(impl_trait_in_assoc_type)] #![allow(clippy::uninlined_format_args)] mod append; +mod input_context; +mod one_file_partition; +mod read; mod stage_table; pub use stage_table::StageTable; diff --git a/src/query/storages/stage/src/one_file_partition.rs b/src/query/storages/stage/src/one_file_partition.rs new file mode 100644 index 000000000000..3a8af15ee4ad --- /dev/null +++ b/src/query/storages/stage/src/one_file_partition.rs @@ -0,0 +1,58 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::hash::DefaultHasher; +use std::hash::Hash; +use std::hash::Hasher; + +use databend_common_catalog::plan::PartInfo; +use databend_common_catalog::plan::PartInfoPtr; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; + +#[derive(serde::Serialize, serde::Deserialize, Clone, Eq, PartialEq)] +pub struct OneFilePartition { + pub path: String, + pub size: usize, +} + +#[typetag::serde(name = "text_part")] +impl PartInfo for OneFilePartition { + fn as_any(&self) -> &dyn Any { + self + } + + fn equals(&self, info: &Box) -> bool { + info.as_any() + .downcast_ref::() + .is_some_and(|other| self == other) + } + + fn hash(&self) -> u64 { + let mut s = DefaultHasher::new(); + self.path.hash(&mut s); + s.finish() + } +} + +impl OneFilePartition { + pub fn from_part(info: &PartInfoPtr) -> Result<&OneFilePartition> { + info.as_any() + .downcast_ref::() + .ok_or_else(|| { + ErrorCode::Internal("Cannot downcast from PartInfo to TextFilePartition.") + }) + } +} diff --git a/src/query/storages/stage/src/read/error_handler.rs b/src/query/storages/stage/src/read/error_handler.rs new file mode 100644 index 000000000000..eae62d1bffa5 --- /dev/null +++ b/src/query/storages/stage/src/read/error_handler.rs @@ -0,0 +1,71 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::atomic::AtomicU64; +use std::sync::atomic::Ordering; +use std::sync::Arc; + +use dashmap::DashMap; +use databend_common_exception::Result; +use databend_common_expression::ColumnBuilder; +use databend_common_meta_app::principal::OnErrorMode; +use databend_common_pipeline_core::InputError; +use databend_common_storage::FileParseError; +use databend_common_storage::FileStatus; + +pub struct ErrorHandler { + pub on_error_mode: OnErrorMode, + pub on_error_count: AtomicU64, + pub on_error_map: Option>>>, +} + +impl ErrorHandler { + pub fn on_error( + &self, + e: FileParseError, + columns: Option<(&mut [ColumnBuilder], usize)>, + file_status: &mut FileStatus, + file_path: &str, + line: usize, + ) -> Result<()> { + if let Some((columns, num_rows)) = columns { + columns.iter_mut().for_each(|c| { + // the whole record is invalid, so we need to pop all the values + // not necessary if this function returns error, still do it for code simplicity + if c.len() > num_rows { + c.pop().expect("must success"); + assert_eq!(c.len(), num_rows); + } + }); + } + + match &self.on_error_mode { + OnErrorMode::Continue => { + file_status.add_error(e, line); + Ok(()) + } + OnErrorMode::AbortNum(abort_num) => { + if *abort_num <= 1 + || self.on_error_count.fetch_add(1, Ordering::Relaxed) >= *abort_num - 1 + { + Err(e.to_error_code(&self.on_error_mode, file_path, line)) + } else { + Ok(()) + } + } + _ => Err(e.to_error_code(&self.on_error_mode, file_path, line)), + } + } +} diff --git a/src/query/storages/stage/src/read/load_context.rs b/src/query/storages/stage/src/read/load_context.rs new file mode 100644 index 000000000000..924e18ef3113 --- /dev/null +++ b/src/query/storages/stage/src/read/load_context.rs @@ -0,0 +1,79 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::atomic::AtomicU64; +use std::sync::Arc; + +use databend_common_catalog::plan::StageTableInfo; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::BlockThresholds; +use databend_common_expression::Scalar; +use databend_common_expression::TableSchemaRef; +use databend_common_expression::TableSchemaRefExt; +use databend_common_formats::FileFormatOptionsExt; + +use crate::read::error_handler::ErrorHandler; + +pub struct LoadContext { + pub table_context: Arc, + + pub schema: TableSchemaRef, + pub default_values: Option>, + pub pos_projection: Option>, + + pub file_format_options_ext: FileFormatOptionsExt, + pub block_compact_thresholds: BlockThresholds, + + pub error_handler: ErrorHandler, +} + +impl LoadContext { + pub fn try_create( + ctx: Arc, + stage_table_info: &StageTableInfo, + pos_projection: Option>, + block_compact_thresholds: BlockThresholds, + ) -> Result { + let copy_options = &stage_table_info.stage_info.copy_options; + let settings = ctx.get_settings(); + let is_select = stage_table_info.is_select; + let mut file_format_options_ext = + FileFormatOptionsExt::create_from_settings(&settings, is_select)?; + file_format_options_ext.disable_variant_check = copy_options.disable_variant_check; + let on_error_mode = copy_options.on_error.clone(); + let fields = stage_table_info + .schema + .fields() + .iter() + .filter(|f| f.computed_expr().is_none()) + .cloned() + .collect::>(); + let schema = TableSchemaRefExt::create(fields); + let default_values = stage_table_info.default_values.clone(); + Ok(Self { + table_context: ctx, + block_compact_thresholds, + schema, + default_values, + pos_projection, + file_format_options_ext, + error_handler: ErrorHandler { + on_error_mode, + on_error_count: AtomicU64::new(0), + on_error_map: None, + }, + }) + } +} diff --git a/src/query/storages/stage/src/read/mod.rs b/src/query/storages/stage/src/read/mod.rs new file mode 100644 index 000000000000..b0aed6d8c799 --- /dev/null +++ b/src/query/storages/stage/src/read/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod error_handler; +mod load_context; +pub mod row_based; diff --git a/src/query/storages/stage/src/read/row_based/batch.rs b/src/query/storages/stage/src/read/row_based/batch.rs new file mode 100644 index 000000000000..406009fd972d --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/batch.rs @@ -0,0 +1,92 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_expression::BlockMetaInfo; + +#[derive(serde::Serialize, serde::Deserialize, Debug)] +pub struct BytesBatch { + pub data: Vec, + + pub path: String, + pub offset: usize, + pub is_eof: bool, +} + +impl BytesBatch { + pub fn meta(&self) -> Self { + Self { + data: vec![], + path: self.path.clone(), + offset: self.offset, + is_eof: self.is_eof, + } + } +} + +#[typetag::serde(name = "raw_batch")] +impl BlockMetaInfo for BytesBatch { + fn equals(&self, _info: &Box) -> bool { + unreachable!("RawBatch as BlockMetaInfo is not expected to be compared.") + } + + fn clone_self(&self) -> Box { + unreachable!("RawBatch as BlockMetaInfo is not expected to be cloned.") + } +} + +#[derive(serde::Serialize, serde::Deserialize, Debug, Default)] +pub struct RowBatch { + /// row[i] starts at row_ends[i-1] and ends at row_ends[i] + /// has num_fields[i] fields + /// field[j] starts at field_ends[i-1][j] and ends at field_ends[i-1][j] + pub data: Vec, + pub row_ends: Vec, + pub field_ends: Vec, + pub num_fields: Vec, + + pub path: String, + pub offset: usize, + // start from 0 + pub start_row_id: usize, +} + +impl RowBatch { + pub fn new(raw: &BytesBatch, start_row_id: usize) -> Self { + Self { + path: raw.path.clone(), + offset: raw.offset, + start_row_id, + ..Default::default() + } + } + + pub fn rows(&self) -> usize { + self.row_ends.len() + } + + pub fn size(&self) -> usize { + self.data.len() + } +} + +#[typetag::serde(name = "row_batch")] +impl BlockMetaInfo for RowBatch { + fn equals(&self, _info: &Box) -> bool { + unreachable!("RowBatch as BlockMetaInfo is not expected to be compared.") + } + + fn clone_self(&self) -> Box { + unreachable!("RowBatch as BlockMetaInfo is not expected to be cloned.") + } +} diff --git a/src/query/storages/stage/src/read/row_based/format.rs b/src/query/storages/stage/src/read/row_based/format.rs new file mode 100644 index 000000000000..93539f9eef19 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/format.rs @@ -0,0 +1,58 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::Column; +use databend_common_expression::DataBlock; +use databend_common_meta_app::principal::FileFormatParams; +use databend_common_storage::FileStatus; + +use super::batch::BytesBatch; +use super::batch::RowBatch; +use super::processors::BlockBuilderState; +use crate::read::load_context::LoadContext; +use crate::read::row_based::formats::CsvInputFormat; + +pub trait SeparatorState: Send + Sync { + fn append(&mut self, batch: BytesBatch) -> Result<(Vec, FileStatus)>; +} + +pub trait RowDecoder: Send + Sync { + fn add(&self, block_builder: &mut BlockBuilderState, batch: RowBatch) + -> Result>; + + fn flush(&self, columns: Vec, _num_rows: usize) -> Vec { + columns + } +} + +pub trait RowBasedFileFormat: Sync + Send { + fn try_create_separator( + &self, + load_ctx: Arc, + path: &str, + ) -> Result>; + fn try_create_decoder(&self, load_ctx: Arc) -> Result>; +} + +pub fn create_row_based_file_format(params: &FileFormatParams) -> Arc { + match params { + FileFormatParams::Csv(p) => Arc::new(CsvInputFormat { params: p.clone() }), + _ => { + unreachable!("Unsupported row based file format") + } + } +} diff --git a/src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs b/src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs new file mode 100644 index 000000000000..fb20454d13b4 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/formats/csv/block_builder.rs @@ -0,0 +1,197 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_expression::types::string::StringColumnBuilder; +use databend_common_expression::Column; +use databend_common_expression::ColumnBuilder; +use databend_common_expression::DataBlock; +use databend_common_expression::TableDataType; +use databend_common_formats::SeparatedTextDecoder; +use databend_common_meta_app::principal::EmptyFieldAs; +use databend_common_pipeline_sources::input_formats::error_utils::get_decode_error_by_pos; +use databend_common_storage::FileParseError; + +use crate::read::load_context::LoadContext; +use crate::read::row_based::batch::RowBatch; +use crate::read::row_based::format::RowDecoder; +use crate::read::row_based::formats::csv::CsvInputFormat; +use crate::read::row_based::processors::BlockBuilderState; + +pub struct CsvDecoder { + pub load_context: Arc, + pub fmt: CsvInputFormat, + pub field_decoder: SeparatedTextDecoder, +} + +impl CsvDecoder { + pub fn create(fmt: CsvInputFormat, load_context: Arc) -> Self { + let field_decoder = + SeparatedTextDecoder::create_csv(&fmt.params, &load_context.file_format_options_ext); + Self { + load_context, + fmt, + field_decoder, + } + } + + fn read_column( + &self, + builder: &mut ColumnBuilder, + col_data: &[u8], + column_index: usize, + ) -> std::result::Result<(), FileParseError> { + let empty_filed_as = &self.fmt.params.empty_field_as; + if col_data.is_empty() { + match &self.load_context.default_values { + None => { + // query + builder.push_default(); + } + Some(values) => { + let field = &self.load_context.schema.fields()[column_index]; + // copy + match empty_filed_as { + EmptyFieldAs::FieldDefault => { + builder.push(values[column_index].as_ref()); + } + EmptyFieldAs::Null => { + if !matches!(field.data_type, TableDataType::Nullable(_)) { + return Err(FileParseError::ColumnEmptyError { + column_index, + column_name: field.name().to_owned(), + column_type: field.data_type.to_string(), + empty_field_as: empty_filed_as.to_string(), + }); + } + builder.push_default(); + } + EmptyFieldAs::String => { + if !matches!(field.data_type.remove_nullable(), TableDataType::String) { + let field = &self.load_context.schema.fields()[column_index]; + return Err(FileParseError::ColumnEmptyError { + column_index, + column_name: field.name().to_owned(), + column_type: field.data_type.to_string(), + empty_field_as: empty_filed_as.to_string(), + }); + } + + builder.push_default(); + } + } + } + } + return Ok(()); + } + self.field_decoder + .read_field(builder, col_data) + .map_err(|e| { + get_decode_error_by_pos( + column_index, + &self.load_context.schema, + &e.message(), + col_data, + ) + }) + } + + fn read_row( + &self, + buf: &[u8], + columns: &mut [ColumnBuilder], + field_ends: &[usize], + ) -> std::result::Result<(), FileParseError> { + if let Some(columns_to_read) = &self.load_context.pos_projection { + for c in columns_to_read { + if *c >= field_ends.len() { + columns[*c].push_default(); + } else { + let field_start = if *c == 0 { 0 } else { field_ends[c - 1] }; + let field_end = field_ends[*c]; + let col_data = &buf[field_start..field_end]; + self.read_column(&mut columns[*c], col_data, *c)?; + } + } + } else { + let mut field_start = 0; + for (c, column) in columns.iter_mut().enumerate() { + let field_end = field_ends[c]; + let col_data = &buf[field_start..field_end]; + self.read_column(column, col_data, c)?; + field_start = field_end; + } + } + Ok(()) + } +} + +impl RowDecoder for CsvDecoder { + fn add(&self, state: &mut BlockBuilderState, batch: RowBatch) -> Result> { + let columns = &mut state.mutable_columns; + let mut start = 0usize; + let mut field_end_idx = 0; + for (i, end) in batch.row_ends.iter().enumerate() { + let num_fields = batch.num_fields[i]; + let buf = &batch.data[start..*end]; + if let Err(e) = self.read_row( + buf, + columns, + &batch.field_ends[field_end_idx..field_end_idx + num_fields], + ) { + self.load_context.error_handler.on_error( + e, + Some((columns, state.num_rows)), + &mut state.file_status, + &batch.path, + i + batch.start_row_id, + )? + } else { + state.num_rows += 1; + state.file_status.num_rows_loaded += 1; + } + start = *end; + field_end_idx += num_fields; + } + Ok(vec![]) + } + + fn flush(&self, columns: Vec, num_rows: usize) -> Vec { + if let Some(projection) = &self.load_context.pos_projection { + let empty_strings = Column::String( + StringColumnBuilder { + need_estimated: false, + data: vec![], + offsets: vec![0; num_rows + 1], + } + .build(), + ); + columns + .into_iter() + .enumerate() + .map(|(i, c)| { + if projection.contains(&i) { + c + } else { + empty_strings.clone() + } + }) + .collect::>() + } else { + columns + } + } +} diff --git a/src/query/storages/stage/src/read/row_based/formats/csv/format.rs b/src/query/storages/stage/src/read/row_based/formats/csv/format.rs new file mode 100644 index 000000000000..898b3b009b8d --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/formats/csv/format.rs @@ -0,0 +1,49 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::Result; +use databend_common_meta_app::principal::CsvFileFormatParams; + +use crate::read::load_context::LoadContext; +use crate::read::row_based::format::RowBasedFileFormat; +use crate::read::row_based::format::RowDecoder; +use crate::read::row_based::format::SeparatorState; +use crate::read::row_based::formats::csv::block_builder::CsvDecoder; +use crate::read::row_based::formats::csv::separator::CsvReader; + +pub struct Position { + pub path: String, + pub rows: usize, + pub offset: usize, +} +#[derive(Clone)] +pub struct CsvInputFormat { + pub(crate) params: CsvFileFormatParams, +} + +impl RowBasedFileFormat for CsvInputFormat { + fn try_create_separator( + &self, + load_ctx: Arc, + path: &str, + ) -> Result> { + Ok(Box::new(CsvReader::try_create(load_ctx, path, self)?)) + } + + fn try_create_decoder(&self, load_ctx: Arc) -> Result> { + Ok(Arc::new(CsvDecoder::create(self.clone(), load_ctx.clone()))) + } +} diff --git a/src/query/storages/stage/src/read/row_based/formats/csv/mod.rs b/src/query/storages/stage/src/read/row_based/formats/csv/mod.rs new file mode 100644 index 000000000000..6b4e544d0040 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/formats/csv/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod block_builder; +mod format; +mod separator; + +pub use format::CsvInputFormat; diff --git a/src/query/storages/stage/src/read/row_based/formats/csv/separator.rs b/src/query/storages/stage/src/read/row_based/formats/csv/separator.rs new file mode 100644 index 000000000000..f8eff4174a22 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/formats/csv/separator.rs @@ -0,0 +1,313 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::mem; +use std::sync::Arc; + +use csv_core::ReadRecordResult; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_formats::RecordDelimiter; +use databend_common_storage::FileParseError; +use databend_common_storage::FileStatus; +use log::debug; + +use crate::read::load_context::LoadContext; +use crate::read::row_based::batch::BytesBatch; +use crate::read::row_based::batch::RowBatch; +use crate::read::row_based::format::SeparatorState; +use crate::read::row_based::formats::csv::format::Position; +use crate::read::row_based::formats::csv::CsvInputFormat; + +pub const MAX_CSV_COLUMNS: usize = 1000; + +pub struct CsvReader { + load_ctx: Arc, + + // select $1, $2, $3 .. from csv + projection: Option>, + error_on_column_count_mismatch: bool, + num_fields: usize, + + reader: csv_core::Reader, + pos: Position, + rows_to_skip: usize, + + // remain from last read batch + out: Vec, + + // field_end[..n_end] store the output of each call to reader.read_record() + // it may belong to part of a row. + // flush to RowBatch when a complete row is read + field_ends: Vec, + n_end: usize, +} + +impl SeparatorState for CsvReader { + fn append(&mut self, batch: BytesBatch) -> Result<(Vec, FileStatus)> { + self.separate(batch) + } +} + +enum ReadRecordOutput { + Record { num_fields: usize, bytes: usize }, + RecordSkipped, + PartialRecord { bytes: usize }, +} + +impl CsvReader { + pub fn try_create( + load_ctx: Arc, + path: &str, + format: &CsvInputFormat, + ) -> Result { + let escape = if format.params.escape.is_empty() { + None + } else { + Some(format.params.escape.as_bytes()[0]) + }; + let reader = csv_core::ReaderBuilder::new() + .delimiter(format.params.field_delimiter.as_bytes()[0]) + .quote(format.params.quote.as_bytes()[0]) + .escape(escape) + .terminator(match format.params.record_delimiter.as_str().try_into()? { + RecordDelimiter::Crlf => csv_core::Terminator::CRLF, + RecordDelimiter::Any(v) => csv_core::Terminator::Any(v), + }) + .build(); + let projection = load_ctx.pos_projection.clone(); + let max_fields = match &projection { + Some(p) => p.iter().copied().max().unwrap_or(1), + None => load_ctx.schema.num_fields(), + } + MAX_CSV_COLUMNS; + + let num_fields = load_ctx.schema.fields().len(); + Ok(Self { + load_ctx, + projection, + error_on_column_count_mismatch: format.params.error_on_column_count_mismatch, + num_fields, + reader, + pos: Position { + path: path.to_string(), + rows: 0, + offset: 0, + }, + rows_to_skip: format.params.headers as usize, + field_ends: vec![0; max_fields], + out: vec![], + n_end: 0, + }) + } + + fn read_record( + &mut self, + input: &[u8], + output: &mut [u8], + file_status: &mut FileStatus, + ) -> Result<(ReadRecordOutput, usize)> { + let (result, n_in, n_out, n_end) = + self.reader + .read_record(input, output, &mut self.field_ends[self.n_end..]); + self.n_end += n_end; + // shadow the n_end return from reader to avoid misuse + let n_end = self.n_end; + + match result { + ReadRecordResult::InputEmpty => { + if input.is_empty() { + Err(ErrorCode::BadBytes("unexpected eof")) + } else { + self.pos.offset += n_in; + Ok((ReadRecordOutput::PartialRecord { bytes: n_out }, n_in)) + } + } + ReadRecordResult::OutputFull => Err(self.error_output_full()), + ReadRecordResult::OutputEndsFull => Err(self.error_output_ends_full()), + ReadRecordResult::Record => { + let output = { + if self.projection.is_some() { + // select $1, $2, $3 .. from csv, not check num of fields here + ReadRecordOutput::Record { + num_fields: n_end, + bytes: n_out, + } + } else { + // copy + if !self.error_on_column_count_mismatch { + // cut or patch to num_fields + + if self.n_end < self.num_fields { + // support we expect 4 fields but got row with only 2 columns : "1,2\n" + // here we pretend we read "1,2,,\n" + debug_assert!(self.n_end > 0); + let end = self.field_ends[n_end - 1]; + for i in n_end..self.num_fields { + self.field_ends[i] = end; + } + } + ReadRecordOutput::Record { + num_fields: self.num_fields, + bytes: n_out, + } + } else { + // check num of fields strictly + + if let Err(e) = self.check_num_field() { + self.load_ctx.error_handler.on_error( + e, + None, + file_status, + &self.pos.path, + self.pos.rows, + )?; + ReadRecordOutput::RecordSkipped + } else { + ReadRecordOutput::Record { + num_fields: self.num_fields, + bytes: n_out, + } + } + } + } + }; + self.pos.offset += n_in; + self.pos.rows += 1; + self.n_end = 0; + Ok((output, n_in)) + } + ReadRecordResult::End => { + if !input.is_empty() { + Err(ErrorCode::BadBytes("unexpected eof")) + } else { + Ok((ReadRecordOutput::PartialRecord { bytes: n_out }, n_in)) + } + } + } + } + + fn separate(&mut self, batch: BytesBatch) -> Result<(Vec, FileStatus)> { + let need_flush = batch.is_eof; + let mut buf_in = &batch.data[..]; + let size_in = buf_in.len(); + let mut file_status = FileStatus::default(); + let mut buf_out = vec![0u8; buf_in.len()]; + while self.rows_to_skip > 0 { + let (res, n_in) = self.read_record(buf_in, &mut buf_out, &mut file_status)?; + buf_in = &buf_in[n_in..]; + if matches!(res, ReadRecordOutput::Record { .. }) { + self.rows_to_skip -= 1; + } + } + + let mut buf_out_pos = 0usize; + let mut buf_out_row_end: usize = 0; + + let last_batch_remain_len = self.out.len(); + + let mut row_batch = RowBatch::new(&batch, self.pos.rows); + + while !buf_in.is_empty() || need_flush { + let (res, n_in) = + self.read_record(buf_in, &mut buf_out[buf_out_pos..], &mut file_status)?; + match res { + ReadRecordOutput::Record { num_fields, bytes } => { + buf_out_pos += bytes; + row_batch.num_fields.push(num_fields); + row_batch + .field_ends + .extend_from_slice(&self.field_ends[..num_fields]); + row_batch.row_ends.push(last_batch_remain_len + buf_out_pos); + buf_out_row_end = buf_out_pos; + } + ReadRecordOutput::PartialRecord { bytes } => { + buf_out_pos += bytes; + } + _ => {} + } + if buf_in.is_empty() { + break; + } else { + buf_in = &buf_in[n_in..]; + } + } + + buf_out.truncate(buf_out_pos); + + if row_batch.row_ends.is_empty() { + debug!( + "csv aligner: {} + {} bytes => 0 rows", + self.out.len(), + size_in, + ); + self.out.extend_from_slice(&buf_out); + Ok((vec![], file_status)) + } else { + let last_remain = mem::take(&mut self.out); + + self.out.extend_from_slice(&buf_out[buf_out_row_end..]); + debug!( + "csv aligner: {} + {} bytes => {} rows + {} bytes remain", + last_remain.len(), + size_in, + row_batch.row_ends.len(), + self.out.len() + ); + + buf_out.truncate(buf_out_row_end); + row_batch.data = if last_remain.is_empty() { + buf_out + } else { + [last_remain, buf_out].concat() + }; + + Ok((vec![row_batch], file_status)) + } + } + + fn check_num_field(&self) -> std::result::Result<(), FileParseError> { + let expected = self.num_fields; + let found = self.n_end; + if found < expected + || found > expected + 1 + || (found == expected + 1 && self.field_ends[expected] != self.field_ends[expected - 1]) + { + Err(FileParseError::NumberOfColumnsMismatch { + table: expected, + file: found, + }) + } else { + Ok(()) + } + } + + fn error_output_full(&self) -> ErrorCode { + ErrorCode::BadBytes("Bug: CSV Reader return output longer then input.") + } + + fn error_output_ends_full(&self) -> ErrorCode { + if self.projection.is_some() { + ErrorCode::BadBytes(format!( + "too many fields, expect {}, got more than {}", + self.num_fields, + self.field_ends.len() + )) + } else { + ErrorCode::BadBytes(format!( + "select from CSV allow at most {} fields", + MAX_CSV_COLUMNS + )) + } + } +} diff --git a/src/query/storages/stage/src/read/row_based/formats/mod.rs b/src/query/storages/stage/src/read/row_based/formats/mod.rs new file mode 100644 index 000000000000..0afa1f1cee88 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/formats/mod.rs @@ -0,0 +1,17 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod csv; + +pub use csv::CsvInputFormat; diff --git a/src/query/storages/stage/src/read/row_based/mod.rs b/src/query/storages/stage/src/read/row_based/mod.rs new file mode 100644 index 000000000000..388312ae8be7 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/mod.rs @@ -0,0 +1,21 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod batch; +mod format; +mod formats; +mod processors; +mod read_pipeline; + +pub use read_pipeline::RowBasedReadPipelineBuilder; diff --git a/src/query/storages/stage/src/read/row_based/processors/block_builder.rs b/src/query/storages/stage/src/read/row_based/processors/block_builder.rs new file mode 100644 index 000000000000..bb269cb379f8 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/processors/block_builder.rs @@ -0,0 +1,154 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::mem; +use std::sync::Arc; + +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::Column; +use databend_common_expression::ColumnBuilder; +use databend_common_expression::DataBlock; +use databend_common_pipeline_transforms::processors::AccumulatingTransform; +use databend_common_storage::FileStatus; +use log::debug; + +use crate::read::load_context::LoadContext; +use crate::read::row_based::batch::RowBatch; +use crate::read::row_based::format::RowBasedFileFormat; +use crate::read::row_based::format::RowDecoder; + +pub struct BlockBuilderState { + pub mutable_columns: Vec, + pub num_rows: usize, + pub file_status: FileStatus, + pub file_name: String, +} + +impl BlockBuilderState { + fn create(ctx: Arc) -> Self { + let columns = ctx + .schema + .fields() + .iter() + .map(|f| { + ColumnBuilder::with_capacity_hint( + &f.data_type().into(), + // todo(youngsofun): calculate the capacity based on the memory and schema + 1024, + false, + ) + }) + .collect(); + + BlockBuilderState { + mutable_columns: columns, + num_rows: 0, + file_status: Default::default(), + file_name: "".to_string(), + } + } + + fn take_columns(&mut self, on_finish: bool) -> Result> { + // todo(youngsofun): calculate the capacity according to last batch + let capacity = if on_finish { 0 } else { 1024 }; + self.num_rows = 0; + self.file_name = "".to_string(); + Ok(self + .mutable_columns + .iter_mut() + .map(|col| { + let empty_builder = + ColumnBuilder::with_capacity_hint(&col.data_type(), capacity, false); + std::mem::replace(col, empty_builder).build() + }) + .collect()) + } + + fn flush_status(&mut self, ctx: &Arc) -> Result<()> { + let file_status = mem::take(&mut self.file_status); + ctx.add_file_status(&self.file_name, file_status) + } + + fn memory_size(&self) -> usize { + self.mutable_columns.iter().map(|x| x.memory_size()).sum() + } +} + +pub struct BlockBuilder { + pub ctx: Arc, + pub state: BlockBuilderState, + pub decoder: Arc, +} + +impl BlockBuilder { + pub fn create(ctx: Arc, fmt: &Arc) -> Result { + let state = BlockBuilderState::create(ctx.clone()); + let decoder = fmt.try_create_decoder(ctx.clone())?; + Ok(BlockBuilder { + ctx, + state, + decoder, + }) + } + + pub fn flush_block(&mut self, on_finish: bool) -> Result> { + let num_rows = self.state.num_rows; + let columns = self.state.take_columns(on_finish)?; + if columns.is_empty() || num_rows == 0 { + Ok(vec![]) + } else { + let columns = self.decoder.flush(columns, num_rows); + Ok(vec![DataBlock::new_from_columns(columns)]) + } + } + pub fn try_flush_block_by_memory(&mut self) -> Result> { + let mem = self.state.memory_size(); + debug!( + "chunk builder added new batch: row {} size {}", + self.state.num_rows, mem + ); + if self.state.num_rows >= self.ctx.block_compact_thresholds.min_rows_per_block + || mem > self.ctx.block_compact_thresholds.max_bytes_per_block + { + self.flush_block(false) + } else { + Ok(vec![]) + } + } +} + +impl AccumulatingTransform for BlockBuilder { + const NAME: &'static str = "BlockBuilder"; + + fn transform(&mut self, data: DataBlock) -> Result> { + let batch = data + .get_owned_meta() + .and_then(RowBatch::downcast_from) + .unwrap(); + if self.state.file_name != batch.path { + self.state.file_name = batch.path.clone(); + } + let mut blocks = self.decoder.add(&mut self.state, batch)?; + self.state.flush_status(&self.ctx.table_context)?; + let more = self.try_flush_block_by_memory()?; + blocks.extend(more); + Ok(blocks) + } + + fn on_finish(&mut self, _output: bool) -> Result> { + self.flush_block(true) + } +} diff --git a/src/query/storages/stage/src/read/row_based/processors/decompressor.rs b/src/query/storages/stage/src/read/row_based/processors/decompressor.rs new file mode 100644 index 000000000000..fb6b4bb7f880 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/processors/decompressor.rs @@ -0,0 +1,110 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_compress::CompressAlgorithm; +use databend_common_compress::DecompressDecoder; +use databend_common_compress::DecompressState; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_pipeline_transforms::processors::AccumulatingTransform; + +use crate::read::load_context::LoadContext; +use crate::read::row_based::batch::BytesBatch; + +pub struct Decompressor { + #[allow(dead_code)] + ctx: Arc, + algo: Option, + decompressor: Option<(DecompressDecoder, usize)>, + path: Option, +} + +impl Decompressor { + pub fn try_create(ctx: Arc, algo: Option) -> Result { + Ok(Decompressor { + ctx, + algo, + path: None, + decompressor: None, + }) + } + + fn new_file(&mut self, path: String) { + assert!(self.decompressor.is_none()); + let algo = if let Some(algo) = &self.algo { + Some(algo.to_owned()) + } else { + CompressAlgorithm::from_path(&path) + }; + self.path = Some(path); + + if let Some(algo) = algo { + let decompressor = DecompressDecoder::new(algo); + self.decompressor = Some((decompressor, 0)); + } else { + self.decompressor = None; + } + } +} + +impl AccumulatingTransform for Decompressor { + const NAME: &'static str = "Decompressor"; + + fn transform(&mut self, data: DataBlock) -> Result> { + let batch = data + .get_owned_meta() + .and_then(BytesBatch::downcast_from) + .unwrap(); + match &self.path { + None => self.new_file(batch.path.clone()), + Some(path) => { + if path != &batch.path { + self.new_file(batch.path.clone()) + } + } + } + + if let Some((de, offset)) = &mut self.decompressor { + let mut data = de.decompress_batch(&batch.data)?; + if batch.is_eof { + let mut end = de.decompress_batch(&[])?; + data.append(&mut end); + let state = de.state(); + if !matches!(state, DecompressState::Done) { + return Err(ErrorCode::BadBytes(format!( + "decompressor state is {:?} after decompressing all data", + state + ))); + } + } + let new_batch = Box::new(BytesBatch { + data, + path: batch.path.clone(), + offset: *offset, + is_eof: batch.is_eof, + }); + *offset += batch.data.len(); + if batch.is_eof { + self.decompressor = None; + } + Ok(vec![DataBlock::empty_with_meta(new_batch)]) + } else { + Ok(vec![DataBlock::empty_with_meta(Box::new(batch))]) + } + } +} diff --git a/src/query/storages/stage/src/read/row_based/processors/mod.rs b/src/query/storages/stage/src/read/row_based/processors/mod.rs new file mode 100644 index 000000000000..72949cc1289f --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/processors/mod.rs @@ -0,0 +1,24 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod block_builder; +mod decompressor; +mod reader; +mod separator; + +pub use block_builder::BlockBuilder; +pub use block_builder::BlockBuilderState; +pub use decompressor::Decompressor; +pub use reader::BytesReader; +pub use separator::Separator; diff --git a/src/query/storages/stage/src/read/row_based/processors/reader.rs b/src/query/storages/stage/src/read/row_based/processors/reader.rs new file mode 100644 index 000000000000..1fb9c47fa337 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/processors/reader.rs @@ -0,0 +1,131 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp::min; +use std::sync::Arc; + +use databend_common_base::base::tokio::io::AsyncRead; +use databend_common_base::base::tokio::io::AsyncReadExt; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::DataBlock; +use databend_common_pipeline_sources::AsyncSource; +use log::debug; +use opendal::Operator; + +use crate::one_file_partition::OneFilePartition; +use crate::read::row_based::batch::BytesBatch; + +struct FileState { + file: OneFilePartition, + offset: usize, +} +pub struct BytesReader { + table_ctx: Arc, + op: Operator, + read_batch_size: usize, + file_state: Option, +} + +impl BytesReader { + pub fn try_create( + table_ctx: Arc, + op: Operator, + read_batch_size: usize, + ) -> Result { + Ok(Self { + table_ctx, + op, + read_batch_size, + file_state: None, + }) + } + + pub async fn read_batch(&mut self) -> Result { + if let Some(state) = &mut self.file_state { + let end = min(self.read_batch_size + state.offset, state.file.size); + let mut reader = self + .op + .reader_with(&state.file.path) + .range((state.offset as u64)..(end as u64)) + .await?; + + let mut buffer = vec![0u8; end - state.offset]; + let n = read_full(&mut reader, &mut buffer[0..]).await?; + if n == 0 { + return Err(ErrorCode::BadBytes(format!( + "Unexpected EOF {} expect {} bytes, read only {} bytes.", + state.file.path, state.file.size, state.offset + ))); + }; + buffer.truncate(n); + + debug!("read {} bytes", n); + let offset = state.offset; + state.offset += n; + let is_eof = state.offset == state.file.size; + let batch = Box::new(BytesBatch { + data: buffer, + path: state.file.path.clone(), + offset, + is_eof, + }); + if is_eof { + self.file_state = None; + } + Ok(DataBlock::empty_with_meta(batch)) + } else { + unreachable!("file state is none") + } + } +} + +#[async_trait::async_trait] +impl AsyncSource for BytesReader { + const NAME: &'static str = "TextFileReader"; + + const SKIP_EMPTY_DATA_BLOCK: bool = false; + + #[async_trait::unboxed_simple] + async fn generate(&mut self) -> Result> { + if self.file_state.is_none() { + let part = match self.table_ctx.get_partition() { + Some(part) => part, + None => return Ok(None), + }; + let file = OneFilePartition::from_part(&part)?.clone(); + self.file_state = Some(FileState { file, offset: 0 }) + } + match self.read_batch().await { + Ok(block) => Ok(Some(block)), + Err(e) => Err(e), + } + } +} + +#[async_backtrace::framed] +pub async fn read_full(reader: &mut R, buf: &mut [u8]) -> Result { + let mut buf = &mut buf[0..]; + let mut n = 0; + while !buf.is_empty() { + let read = reader.read(buf).await?; + if read == 0 { + break; + } + n += read; + buf = &mut buf[read..] + } + Ok(n) +} diff --git a/src/query/storages/stage/src/read/row_based/processors/separator.rs b/src/query/storages/stage/src/read/row_based/processors/separator.rs new file mode 100644 index 000000000000..7cde62b9ab75 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/processors/separator.rs @@ -0,0 +1,106 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_base::base::ProgressValues; +use databend_common_exception::Result; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::DataBlock; +use databend_common_pipeline_core::processors::Profile; +use databend_common_pipeline_core::processors::ProfileStatisticsName; +use databend_common_pipeline_transforms::processors::AccumulatingTransform; + +use crate::read::load_context::LoadContext; +use crate::read::row_based::batch::BytesBatch; +use crate::read::row_based::format::RowBasedFileFormat; +use crate::read::row_based::format::SeparatorState; + +pub struct Separator { + pub ctx: Arc, + pub state: Option>, + pub format: Arc, +} + +impl Separator { + pub fn try_create(ctx: Arc, format: Arc) -> Result { + Ok(Separator { + ctx, + format, + state: None, + }) + } +} + +impl AccumulatingTransform for Separator { + const NAME: &'static str = "Separator"; + + fn transform(&mut self, data: DataBlock) -> Result> { + let batch = data + .get_owned_meta() + .and_then(BytesBatch::downcast_from) + .unwrap(); + + if self.state.is_none() { + self.state = Some( + self.format + .try_create_separator(self.ctx.clone(), &batch.path)?, + ); + } + + if let Some(state) = &mut self.state { + let mut process_values = ProgressValues { rows: 0, bytes: 0 }; + + process_values.bytes += batch.data.len(); + let batch_meta = batch.meta(); + let (row_batches, file_status) = state.append(batch)?; + let row_batches = row_batches + .into_iter() + .filter(|b| b.rows() > 0 || b.size() > 0) + .collect::>(); + if batch_meta.is_eof { + if file_status.error.is_some() { + self.ctx + .table_context + .get_copy_status() + .add_chunk(&batch_meta.path, file_status); + } + self.state = None; + } + if row_batches.is_empty() { + Ok(vec![]) + } else { + for b in row_batches.iter() { + process_values.rows += b.rows(); + } + Profile::record_usize_profile( + ProfileStatisticsName::ScanBytes, + process_values.bytes, + ); + self.ctx + .table_context + .get_scan_progress() + .incr(&process_values); + + let blocks = row_batches + .into_iter() + .map(|b| DataBlock::empty_with_meta(Box::new(b))) + .collect::>(); + Ok(blocks) + } + } else { + unreachable!("state should be Some") + } + } +} diff --git a/src/query/storages/stage/src/read/row_based/read_pipeline.rs b/src/query/storages/stage/src/read/row_based/read_pipeline.rs new file mode 100644 index 000000000000..802f0b6b4076 --- /dev/null +++ b/src/query/storages/stage/src/read/row_based/read_pipeline.rs @@ -0,0 +1,140 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_catalog::plan::DataSourcePlan; +use databend_common_catalog::plan::Projection; +use databend_common_catalog::plan::PushDownInfo; +use databend_common_catalog::plan::StageTableInfo; +use databend_common_catalog::table_context::TableContext; +use databend_common_exception::Result; +use databend_common_expression::BlockThresholds; +use databend_common_meta_app::principal::StageFileCompression; +use databend_common_pipeline_core::processors::ProcessorPtr; +use databend_common_pipeline_core::Pipeline; +use databend_common_pipeline_sources::input_formats::InputContext; +use databend_common_pipeline_sources::AsyncSourcer; +use databend_common_pipeline_sources::EmptySource; +use databend_common_pipeline_transforms::processors::AccumulatingTransformer; +use databend_common_settings::Settings; +use databend_common_storage::init_stage_operator; + +use crate::read::load_context::LoadContext; +use crate::read::row_based::format::create_row_based_file_format; +use crate::read::row_based::processors::BlockBuilder; +use crate::read::row_based::processors::BytesReader; +use crate::read::row_based::processors::Decompressor; +use crate::read::row_based::processors::Separator; + +pub struct RowBasedReadPipelineBuilder<'a> { + pub(crate) stage_table_info: &'a StageTableInfo, + pub(crate) compact_threshold: BlockThresholds, +} + +impl RowBasedReadPipelineBuilder<'_> { + fn build_read_stage_source( + &self, + ctx: Arc, + pipeline: &mut Pipeline, + settings: &Settings, + num_threads: usize, + ) -> Result<()> { + let operator = init_stage_operator(&self.stage_table_info.stage_info)?; + let batch_size = settings.get_input_read_buffer_size()? as usize; + pipeline.add_source( + |output| { + let reader = BytesReader::try_create(ctx.clone(), operator.clone(), batch_size)?; + AsyncSourcer::create(ctx.clone(), output, reader) + }, + num_threads, + )?; + Ok(()) + } + pub fn read_data( + &self, + ctx: Arc, + plan: &DataSourcePlan, + pipeline: &mut Pipeline, + ) -> Result<()> { + if plan.parts.is_empty() { + pipeline.add_source(EmptySource::create, 1)?; + return Ok(()); + }; + let pos_projection = if let Some(PushDownInfo { + projection: Some(Projection::Columns(columns)), + .. + }) = &plan.push_downs + { + Some(columns.clone()) + } else { + None + }; + let settings = ctx.get_settings(); + ctx.set_partitions(plan.parts.clone())?; + let threads = std::cmp::min(settings.get_max_threads()? as usize, plan.parts.len()); + self.build_read_stage_source(ctx.clone(), pipeline, &settings, threads)?; + + let format = + create_row_based_file_format(&self.stage_table_info.stage_info.file_format_params); + + let load_ctx = Arc::new(LoadContext::try_create( + ctx.clone(), + self.stage_table_info, + pos_projection, + self.compact_threshold, + )?); + + match self + .stage_table_info + .stage_info + .file_format_params + .compression() + { + StageFileCompression::None => {} + compression => { + let algo = InputContext::get_compression_alg_copy(compression, "")?; + pipeline.add_transform(|input, output| { + let transformer = Decompressor::try_create(load_ctx.clone(), algo)?; + Ok(ProcessorPtr::create(AccumulatingTransformer::create( + input, + output, + transformer, + ))) + })?; + } + } + + pipeline.add_transform(|input, output| { + let transformer = Separator::try_create(load_ctx.clone(), format.clone())?; + Ok(ProcessorPtr::create(AccumulatingTransformer::create( + input, + output, + transformer, + ))) + })?; + + pipeline.try_resize(threads)?; + + pipeline.add_transform(|input, output| { + let transformer = BlockBuilder::create(load_ctx.clone(), &format)?; + Ok(ProcessorPtr::create(AccumulatingTransformer::create( + input, + output, + transformer, + ))) + })?; + Ok(()) + } +} diff --git a/src/query/storages/stage/src/stage_table.rs b/src/query/storages/stage/src/stage_table.rs index 2f00bc875cb3..bdd13b968e00 100644 --- a/src/query/storages/stage/src/stage_table.rs +++ b/src/query/storages/stage/src/stage_table.rs @@ -13,18 +13,15 @@ // limitations under the License. use std::any::Any; -use std::io::Read; use std::ops::Deref; use std::sync::Arc; -use dashmap::DashMap; use databend_common_catalog::plan::DataSourceInfo; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartInfo; use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; -use databend_common_catalog::plan::Projection; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::plan::StageTableInfo; use databend_common_catalog::table::AppendMode; @@ -33,22 +30,19 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::BlockThresholds; -use databend_common_expression::TableSchemaRefExt; use databend_common_meta_app::principal::FileFormatParams; use databend_common_meta_app::principal::StageInfo; use databend_common_meta_app::schema::TableInfo; use databend_common_pipeline_core::Pipeline; -use databend_common_pipeline_sources::input_formats::InputContext; -use databend_common_pipeline_sources::input_formats::SplitInfo; use databend_common_storage::init_stage_operator; use databend_common_storage::StageFileInfo; -use databend_common_storage::STDIN_FD; use databend_common_storages_parquet::ParquetTableForCopy; -use log::debug; use opendal::Operator; -use opendal::Scheme; use parking_lot::Mutex; +use crate::one_file_partition::OneFilePartition; +use crate::read::row_based::RowBasedReadPipelineBuilder; + /// TODO: we need to track the data metrics in stage table. pub struct StageTable { pub(crate) table_info: StageTableInfo, @@ -92,10 +86,48 @@ impl StageTable { stage_info.list_files(max_files).await } - fn get_block_compact_thresholds_with_default(&self) -> BlockThresholds { + pub fn get_block_compact_thresholds_with_default(&self) -> BlockThresholds { let guard = self.block_compact_threshold.lock(); guard.deref().unwrap_or_default() } + + pub async fn read_partitions_simple( + &self, + stage_table_info: &StageTableInfo, + ) -> Result<(PartStatistics, Partitions)> { + let files = if let Some(files) = &stage_table_info.files_to_copy { + files.clone() + } else { + StageTable::list_files(stage_table_info, None).await? + }; + let size = files.iter().map(|f| f.size as usize).sum(); + let statistics = PartStatistics { + snapshot: None, + read_rows: size, + read_bytes: size, + partitions_scanned: files.len(), + partitions_total: files.len(), + is_exact: false, + pruning_stats: Default::default(), + }; + + let partitions = files + .into_iter() + .map(|v| { + let part = OneFilePartition { + path: v.path.clone(), + size: v.size as usize, + }; + let part_info: Box = Box::new(part); + Arc::new(part_info) + }) + .collect::>(); + + Ok(( + statistics, + Partitions::create_nolazy(PartitionsShuffleKind::Seq, partitions), + )) + } } #[async_trait::async_trait] @@ -120,41 +152,14 @@ impl Table for StageTable { _push_downs: Option, _dry_run: bool, ) -> Result<(PartStatistics, Partitions)> { - let stage_info = &self.table_info; - if matches!( - stage_info.stage_info.file_format_params, - FileFormatParams::Parquet(_) - ) { - return ParquetTableForCopy::do_read_partitions(stage_info, ctx, _push_downs).await; + let stage_table_info = &self.table_info; + match stage_table_info.stage_info.file_format_params { + FileFormatParams::Parquet(_) => { + ParquetTableForCopy::do_read_partitions(stage_table_info, ctx, _push_downs).await + } + FileFormatParams::Csv(_) => self.read_partitions_simple(stage_table_info).await, + _ => self.read_partition_old(&ctx).await, } - // User set the files. - let files = if let Some(files) = &stage_info.files_to_copy { - files.clone() - } else { - StageTable::list_files(stage_info, None).await? - }; - let format = InputContext::get_input_format(&stage_info.stage_info.file_format_params)?; - let operator = StageTable::get_op(&stage_info.stage_info)?; - let splits = format - .get_splits( - files, - &stage_info.stage_info, - &operator, - &ctx.get_settings(), - ) - .await?; - - let partitions = splits - .into_iter() - .map(|v| { - let part_info: Box = Box::new((*v).clone()); - Arc::new(part_info) - }) - .collect::>(); - Ok(( - PartStatistics::default(), - Partitions::create_nolazy(PartitionsShuffleKind::Seq, partitions), - )) } fn read_data( @@ -170,76 +175,20 @@ impl Table for StageTable { } else { return Err(ErrorCode::Internal("")); }; - - if matches!( - stage_table_info.stage_info.file_format_params, - FileFormatParams::Parquet(_) - ) { - return ParquetTableForCopy::do_read_data(ctx, plan, pipeline, _put_cache); - } - - let projection = if let Some(PushDownInfo { - projection: Some(Projection::Columns(columns)), - .. - }) = &plan.push_downs - { - Some(columns.clone()) - } else { - None - }; - - let mut splits = vec![]; - for part in &plan.parts.partitions { - if let Some(split) = part.as_any().downcast_ref::() { - splits.push(Arc::new(split.clone())); + match stage_table_info.stage_info.file_format_params { + FileFormatParams::Parquet(_) => { + ParquetTableForCopy::do_read_data(ctx, plan, pipeline, _put_cache) } + FileFormatParams::Csv(_) => { + let compact_threshold = self.get_block_compact_thresholds_with_default(); + RowBasedReadPipelineBuilder { + stage_table_info, + compact_threshold, + } + .read_data(ctx, plan, pipeline) + } + _ => self.read_data_old(ctx, plan, pipeline, stage_table_info), } - - // Build copy pipeline. - let settings = ctx.get_settings(); - let fields = stage_table_info - .schema - .fields() - .iter() - .filter(|f| f.computed_expr().is_none()) - .cloned() - .collect::>(); - let schema = TableSchemaRefExt::create(fields); - let stage_info = stage_table_info.stage_info.clone(); - let operator = StageTable::get_op(&stage_table_info.stage_info)?; - let compact_threshold = self.get_block_compact_thresholds_with_default(); - let on_error_map = ctx.get_on_error_map().unwrap_or_else(|| { - let m = Arc::new(DashMap::new()); - ctx.set_on_error_map(m.clone()); - m - }); - - // inject stdin to memory - if operator.info().scheme() == Scheme::Memory { - let mut buffer = vec![]; - std::io::stdin().lock().read_to_end(&mut buffer)?; - - let bop = operator.blocking(); - bop.write(STDIN_FD, buffer)?; - } - - let input_ctx = Arc::new(InputContext::try_create_from_copy( - ctx.clone(), - operator, - settings, - schema, - stage_info, - splits, - ctx.get_scan_progress(), - compact_threshold, - on_error_map, - self.table_info.is_select, - projection, - self.table_info.default_values.clone(), - )?); - debug!("start copy splits feeder in {}", ctx.get_cluster().local_id); - input_ctx.format.exec_copy(input_ctx.clone(), pipeline)?; - Ok(()) } fn append_data( diff --git a/tests/sqllogictests/suites/stage/formats/csv/csv_empty.test b/tests/sqllogictests/suites/stage/formats/csv/csv_empty.test index 675d5788a81d..6517ebda8bf9 100644 --- a/tests/sqllogictests/suites/stage/formats/csv/csv_empty.test +++ b/tests/sqllogictests/suites/stage/formats/csv/csv_empty.test @@ -44,7 +44,7 @@ copy into ints from @data/csv/empty/ file_format = (type = CSV) force=true on_er csv/empty/empty1.csv 0 1 Empty value for column 3 (c3 Int32), when option empty_field_as = NULL 1 csv/empty/empty2.csv 0 1 Empty value for column 3 (c3 Int32), when option empty_field_as = NULL 1 csv/empty/empty3.csv 1 0 NULL NULL -csv/empty/empty4.csv 0 1 Empty value for column 3 (c3 Int32), when option empty_field_as = NULL 2 +csv/empty/empty4.csv 0 1 Empty value for column 3 (c3 Int32), when option empty_field_as = NULL 1 query select * from ints order by c0,c1,c2,c3,c4; @@ -60,7 +60,7 @@ copy into ints from @data/csv/empty/ file_format = (type = CSV empty_field_as = csv/empty/empty1.csv 0 1 Empty value for column 1 (c1 Int32 NULL), when option empty_field_as = STRING 1 csv/empty/empty2.csv 0 1 Empty value for column 3 (c3 Int32), when option empty_field_as = STRING 1 csv/empty/empty3.csv 1 0 NULL NULL -csv/empty/empty4.csv 0 1 Empty value for column 0 (c0 Int32 NULL), when option empty_field_as = STRING 2 +csv/empty/empty4.csv 0 1 Empty value for column 0 (c0 Int32 NULL), when option empty_field_as = STRING 1 query select * from ints order by c0,c1,c2,c3,c4; @@ -99,7 +99,7 @@ copy into strings from @data/csv/empty/ file_format = (type = CSV) force=true o csv/empty/empty1.csv 0 1 Empty value for column 3 (c3 String), when option empty_field_as = NULL 1 csv/empty/empty2.csv 0 1 Empty value for column 3 (c3 String), when option empty_field_as = NULL 1 csv/empty/empty3.csv 1 0 NULL NULL -csv/empty/empty4.csv 0 1 Empty value for column 3 (c3 String), when option empty_field_as = NULL 2 +csv/empty/empty4.csv 0 1 Empty value for column 3 (c3 String), when option empty_field_as = NULL 1 query select * from strings order by c0,c1,c2,c3,c4; @@ -115,7 +115,7 @@ copy into strings from @data/csv/empty/ file_format = (type = CSV empty_field_as csv/empty/empty1.csv 1 0 NULL NULL csv/empty/empty2.csv 1 0 NULL NULL csv/empty/empty3.csv 1 0 NULL NULL -csv/empty/empty4.csv 0 1 Empty value for column 0 (c0 Int32 NULL), when option empty_field_as = STRING 2 +csv/empty/empty4.csv 0 1 Empty value for column 0 (c0 Int32 NULL), when option empty_field_as = STRING 1 query select * from strings order by c0,c1,c2,c3,c4;