diff --git a/rust/parquet/parquet_derive/src/lib.rs b/rust/parquet/parquet_derive/src/lib.rs index a76895743be0d..73eea85b3a4e3 100644 --- a/rust/parquet/parquet_derive/src/lib.rs +++ b/rust/parquet/parquet_derive/src/lib.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#![recursion_limit = "200"] +#![recursion_limit = "300"] extern crate proc_macro; extern crate proc_macro2; @@ -75,7 +75,7 @@ fn impl_struct( .push(syn::parse2(quote! { <#ident as Deserialize>::Schema: Debug }).unwrap()); } - let field_renames1 = fields + let field_renames = fields .iter() .map(|field| { let mut rename = None; @@ -113,28 +113,18 @@ fn impl_struct( })) }) .collect::, _>>()?; - let field_renames2 = field_renames1.clone(); - let field_renames3 = field_renames1.clone(); + let field_renames1 = &field_renames; + let field_renames2 = &field_renames; - let field_names1 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names2 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names3 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names4 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names5 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names6 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names7 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names8 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names9 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names10 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names11 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names12 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names13 = fields.iter().map(|field| field.ident.as_ref().unwrap()); - let field_names14 = fields.iter().map(|field| field.ident.as_ref().unwrap()); + let field_names = fields + .iter() + .map(|field| field.ident.as_ref().unwrap()) + .collect::>(); + let field_names1 = &field_names; + let field_names2 = &field_names; - let field_types1 = fields.iter().map(|field| &field.ty); - let field_types2 = fields.iter().map(|field| &field.ty); - let field_types3 = fields.iter().map(|field| &field.ty); - let field_types4 = fields.iter().map(|field| &field.ty); + let field_types = fields.iter().map(|field| &field.ty).collect::>(); + let field_types1 = &field_types; let name1 = iter::repeat(name).take(fields.len()); @@ -154,7 +144,7 @@ fn impl_struct( impl #impl_generics Debug for #schema_name #ty_generics #where_clause_with_debug { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { f.debug_struct(stringify!(#schema_name)) - #(.field(stringify!(#field_names2), &self.#field_names3))* + #(.field(stringify!(#field_names1), &self.#field_names2))* .finish() } } @@ -167,34 +157,34 @@ fn impl_struct( } } struct #reader_name #impl_generics #where_clause { - #(#field_names4: <#field_types2 as Deserialize>::Reader,)* + #(#field_names1: <#field_types1 as Deserialize>::Reader,)* } impl #impl_generics Reader for #reader_name #ty_generics #where_clause { type Item = #name #ty_generics; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { Result::Ok(#name { - #(#field_names5: self.#field_names6.read()?,)* + #(#field_names1: self.#field_names2.read(def_level, rep_level)?,)* }) } fn advance_columns(&mut self) -> Result<(), ParquetError> { - #(self.#field_names7.advance_columns()?;)* + #(self.#field_names1.advance_columns()?;)* Result::Ok(()) } fn has_next(&self) -> bool { - #(if true { self.#field_names8.has_next() } else)* + #(if true { self.#field_names1.has_next() } else)* { true } } fn current_def_level(&self) -> i16 { - #(if true { self.#field_names9.current_def_level() } else)* + #(if true { self.#field_names1.current_def_level() } else)* { panic!("Current definition level: empty group reader") } } fn current_rep_level(&self) -> i16 { - #(if true { self.#field_names10.current_rep_level() } else)* + #(if true { self.#field_names1.current_rep_level() } else)* { panic!("Current repetition level: empty group reader") } @@ -204,23 +194,23 @@ fn impl_struct( type Schema = #schema_name #ty_generics; type Reader = #reader_name #ty_generics; - fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { - if schema.is_group() && !schema.is_schema() && schema.get_basic_info().repetition() == Repetition::REQUIRED { + fn parse(schema: &Type, repetition: Option) -> Result<(String, Self::Schema), ParquetError> { + if schema.is_group() && repetition == Some(Repetition::REQUIRED) { let fields = schema.get_fields().iter().map(|field|(field.name(),field)).collect::>(); let schema_ = #schema_name{ - #(#field_names11: fields.get(#field_renames1).ok_or(ParquetError::General(format!("Struct {} missing field {}", stringify!(#name1), #field_renames2))).and_then(|x|<#field_types3 as Deserialize>::parse(&**x))?.1,)* + #(#field_names1: fields.get(#field_renames1).ok_or(ParquetError::General(format!("Struct {} missing field {}", stringify!(#name1), #field_renames2))).and_then(|x|<#field_types1 as Deserialize>::parse(&**x, Some(x.get_basic_info().repetition())))?.1,)* }; return Result::Ok((schema.name().to_owned(), schema_)) } Result::Err(ParquetError::General(format!("Struct {}", stringify!(#name)))) } - fn reader(schema: &Self::Schema, mut path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap, batch_size: usize) -> Self::Reader { + fn reader(schema: &Self::Schema, mut path: &mut Vec, def_level: i16, rep_level: i16, paths: &mut HashMap, batch_size: usize) -> Self::Reader { #( - path.push(#field_renames3.to_owned()); - let #field_names12 = <#field_types4 as Deserialize>::reader(&schema.#field_names13, path, curr_def_level, curr_rep_level, paths, batch_size); + path.push(#field_renames1.to_owned()); + let #field_names1 = <#field_types1 as Deserialize>::reader(&schema.#field_names2, path, def_level, rep_level, paths, batch_size); path.pop().unwrap(); )* - #reader_name { #(#field_names14,)* } + #reader_name { #(#field_names1,)* } } } }; diff --git a/rust/parquet/src/file/reader.rs b/rust/parquet/src/file/reader.rs index c1f12972da20e..5c8a2a8a54e10 100644 --- a/rust/parquet/src/file/reader.rs +++ b/rust/parquet/src/file/reader.rs @@ -67,7 +67,7 @@ pub trait FileReader { /// full file schema is assumed. fn get_row_iter(&self, projection: Option) -> Result> where - Root: Deserialize, + T: Deserialize, Self: Sized; } @@ -86,12 +86,17 @@ pub trait RowGroupReader { /// Get value reader for the `i`th column chunk. fn get_column_reader(&self, i: usize) -> Result; - // /// Get iterator of `Row`s from this row group. - // /// - // /// Projected schema can be a subset of or equal to the file schema, when it is None, - // /// full file schema is assumed. - // fn get_row_iter(&self, projection: Option) -> Result> - // where Root: Deserialize, Self: Sized; + /// Get iterator of `Row`s from this row group. + /// + /// Projected schema can be a subset of or equal to the file schema, when it is None, + /// full file schema is assumed. + fn get_row_iter( + &self, + projection: Option, + ) -> Result, T>> + where + T: Deserialize, + Self: Sized; } // ---------------------------------------------------------------------- @@ -275,7 +280,7 @@ impl FileReader for SerializedFileReader { fn get_row_iter(&self, projection: Option) -> Result> where - Root: Deserialize, + T: Deserialize, Self: Sized, { RowIter::from_file(projection, self) @@ -390,9 +395,16 @@ impl RowGroupReader for SerializedRowGroupReader Ok(col_reader) } - // fn get_row_iter(&self, projection: Option) -> Result> - // where Root: Deserialize, Self: Sized { RowIter::from_row_group(projection, - // self) } + fn get_row_iter( + &self, + projection: Option, + ) -> Result, T>> + where + T: Deserialize, + Self: Sized, + { + RowIter::from_row_group(projection, self) + } } /// A serialized implementation for Parquet [`PageReader`]. diff --git a/rust/parquet/src/file/writer.rs b/rust/parquet/src/file/writer.rs index 04d628bb78051..bf72829470e59 100644 --- a/rust/parquet/src/file/writer.rs +++ b/rust/parquet/src/file/writer.rs @@ -926,8 +926,7 @@ mod tests { assert_eq!(reader.num_row_groups(), data.len()); for i in 0..reader.num_row_groups() { let row_group_reader = reader.get_row_group(i).unwrap(); - // let iter = - // row_group_reader.get_row_iter::(None).unwrap(); + // let iter = row_group_reader.get_row_iter::(None).unwrap(); let iter = crate::record::reader::RowIter::, Row>::from_row_group( None, diff --git a/rust/parquet/src/record/mod.rs b/rust/parquet/src/record/mod.rs index 4287274f99eb6..2a475dc9ce2a1 100644 --- a/rust/parquet/src/record/mod.rs +++ b/rust/parquet/src/record/mod.rs @@ -70,14 +70,17 @@ pub trait Deserialize: Sized { type Reader: Reader; /// Parse a [`Type`] into `Self::Schema`. - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError>; + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError>; /// Builds tree of readers for the specified schema recursively. fn reader( schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader; diff --git a/rust/parquet/src/record/reader.rs b/rust/parquet/src/record/reader.rs index c1d32707efcf5..1ef6a2f1c5545 100644 --- a/rust/parquet/src/record/reader.rs +++ b/rust/parquet/src/record/reader.rs @@ -39,7 +39,7 @@ const DEFAULT_BATCH_SIZE: usize = 1024; pub trait Reader { type Item; - fn read(&mut self) -> Result; + fn read(&mut self, def_level: i16, rep_level: i16) -> Result; fn advance_columns(&mut self) -> Result<()>; fn has_next(&self) -> bool; fn current_def_level(&self) -> i16; @@ -53,10 +53,10 @@ where { type Item = A::Item; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { match self { - sum::Sum2::A(ref mut reader) => reader.read(), - sum::Sum2::B(ref mut reader) => reader.read(), + sum::Sum2::A(ref mut reader) => reader.read(def_level, rep_level), + sum::Sum2::B(ref mut reader) => reader.read(def_level, rep_level), } } @@ -97,11 +97,11 @@ where { type Item = A::Item; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { match self { - sum::Sum3::A(ref mut reader) => reader.read(), - sum::Sum3::B(ref mut reader) => reader.read(), - sum::Sum3::C(ref mut reader) => reader.read(), + sum::Sum3::A(ref mut reader) => reader.read(def_level, rep_level), + sum::Sum3::B(ref mut reader) => reader.read(def_level, rep_level), + sum::Sum3::C(ref mut reader) => reader.read(def_level, rep_level), } } @@ -144,7 +144,7 @@ pub struct BoolReader { impl Reader for BoolReader { type Item = bool; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { self.column.read() } @@ -171,7 +171,7 @@ pub struct I32Reader { impl Reader for I32Reader { type Item = i32; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { self.column.read() } @@ -198,7 +198,7 @@ pub struct I64Reader { impl Reader for I64Reader { type Item = i64; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { self.column.read() } @@ -225,7 +225,7 @@ pub struct I96Reader { impl Reader for I96Reader { type Item = Int96; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { self.column.read() } @@ -252,7 +252,7 @@ pub struct F32Reader { impl Reader for F32Reader { type Item = f32; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { self.column.read() } @@ -279,7 +279,7 @@ pub struct F64Reader { impl Reader for F64Reader { type Item = f64; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { self.column.read() } @@ -306,7 +306,7 @@ pub struct ByteArrayReader { impl Reader for ByteArrayReader { type Item = Vec; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { self.column.read().map(|data| data.data().to_owned()) } @@ -333,7 +333,7 @@ pub struct FixedLenByteArrayReader { impl Reader for FixedLenByteArrayReader { type Item = Vec; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { self.column.read().map(|data| data.data().to_owned()) } @@ -361,9 +361,10 @@ pub struct OptionReader { impl Reader for OptionReader { type Item = Option; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { + assert_eq!(def_level, self.def_level); if self.reader.current_def_level() > self.def_level { - self.reader.read().map(Some) + self.reader.read(def_level + 1, rep_level).map(Some) } else { self.reader.advance_columns().map(|()| None) } @@ -394,11 +395,12 @@ pub struct RepeatedReader { impl Reader for RepeatedReader { type Item = Vec; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { + assert_eq!((def_level, rep_level), (self.def_level, self.rep_level)); let mut elements = Vec::new(); loop { if self.reader.current_def_level() > self.def_level { - elements.push(self.reader.read()?); + elements.push(self.reader.read(def_level + 1, rep_level + 1)?); } else { self.reader.advance_columns()?; // If the current definition level is equal to the definition level of this @@ -442,11 +444,15 @@ pub struct KeyValueReader { impl Reader for KeyValueReader { type Item = Vec<(K::Item, V::Item)>; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { + assert_eq!((def_level, rep_level), (self.def_level, self.rep_level)); let mut pairs = Vec::new(); loop { if self.keys_reader.current_def_level() > self.def_level { - pairs.push((self.keys_reader.read()?, self.values_reader.read()?)); + pairs.push(( + self.keys_reader.read(def_level + 1, rep_level + 1)?, + self.values_reader.read(def_level + 1, rep_level + 1)?, + )); } else { self.keys_reader.advance_columns()?; self.values_reader.advance_columns()?; @@ -487,17 +493,16 @@ impl Reader for KeyValueReader { } pub struct GroupReader { - pub(super) def_level: i16, pub(super) readers: Vec, pub(super) fields: Rc>, } impl Reader for GroupReader { type Item = Group; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { let mut fields = Vec::new(); for reader in self.readers.iter_mut() { - fields.push(reader.read()?); + fields.push(reader.read(def_level, rep_level)?); } Ok(Group(fields, self.fields.clone())) } @@ -560,34 +565,44 @@ pub enum ValueReader { impl Reader for ValueReader { type Item = Value; - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { match self { - ValueReader::Bool(ref mut reader) => reader.read().map(Value::Bool), - ValueReader::U8(ref mut reader) => reader.read().map(Value::U8), - ValueReader::I8(ref mut reader) => reader.read().map(Value::I8), - ValueReader::U16(ref mut reader) => reader.read().map(Value::U16), - ValueReader::I16(ref mut reader) => reader.read().map(Value::I16), - ValueReader::U32(ref mut reader) => reader.read().map(Value::U32), - ValueReader::I32(ref mut reader) => reader.read().map(Value::I32), - ValueReader::U64(ref mut reader) => reader.read().map(Value::U64), - ValueReader::I64(ref mut reader) => reader.read().map(Value::I64), - ValueReader::F32(ref mut reader) => reader.read().map(Value::F32), - ValueReader::F64(ref mut reader) => reader.read().map(Value::F64), - ValueReader::Date(ref mut reader) => reader.read().map(Value::Date), - ValueReader::Time(ref mut reader) => reader.read().map(Value::Time), - ValueReader::Timestamp(ref mut reader) => reader.read().map(Value::Timestamp), - ValueReader::Decimal(ref mut reader) => reader.read().map(Value::Decimal), - ValueReader::Array(ref mut reader) => reader.read().map(Value::Array), - ValueReader::Bson(ref mut reader) => reader.read().map(Value::Bson), - ValueReader::String(ref mut reader) => reader.read().map(Value::String), - ValueReader::Json(ref mut reader) => reader.read().map(Value::Json), - ValueReader::Enum(ref mut reader) => reader.read().map(Value::Enum), - ValueReader::List(ref mut reader) => reader.read().map(Value::List), - ValueReader::Map(ref mut reader) => reader.read().map(Value::Map), - ValueReader::Group(ref mut reader) => reader.read().map(Value::Group), - ValueReader::Option(ref mut reader) => { - reader.read().map(|x| Value::Option(Box::new(x))) + ValueReader::Bool(ref mut reader) => reader.read(def_level, rep_level).map(Value::Bool), + ValueReader::U8(ref mut reader) => reader.read(def_level, rep_level).map(Value::U8), + ValueReader::I8(ref mut reader) => reader.read(def_level, rep_level).map(Value::I8), + ValueReader::U16(ref mut reader) => reader.read(def_level, rep_level).map(Value::U16), + ValueReader::I16(ref mut reader) => reader.read(def_level, rep_level).map(Value::I16), + ValueReader::U32(ref mut reader) => reader.read(def_level, rep_level).map(Value::U32), + ValueReader::I32(ref mut reader) => reader.read(def_level, rep_level).map(Value::I32), + ValueReader::U64(ref mut reader) => reader.read(def_level, rep_level).map(Value::U64), + ValueReader::I64(ref mut reader) => reader.read(def_level, rep_level).map(Value::I64), + ValueReader::F32(ref mut reader) => reader.read(def_level, rep_level).map(Value::F32), + ValueReader::F64(ref mut reader) => reader.read(def_level, rep_level).map(Value::F64), + ValueReader::Date(ref mut reader) => reader.read(def_level, rep_level).map(Value::Date), + ValueReader::Time(ref mut reader) => reader.read(def_level, rep_level).map(Value::Time), + ValueReader::Timestamp(ref mut reader) => { + reader.read(def_level, rep_level).map(Value::Timestamp) + } + ValueReader::Decimal(ref mut reader) => { + reader.read(def_level, rep_level).map(Value::Decimal) + } + ValueReader::Array(ref mut reader) => { + reader.read(def_level, rep_level).map(Value::Array) + } + ValueReader::Bson(ref mut reader) => reader.read(def_level, rep_level).map(Value::Bson), + ValueReader::String(ref mut reader) => { + reader.read(def_level, rep_level).map(Value::String) + } + ValueReader::Json(ref mut reader) => reader.read(def_level, rep_level).map(Value::Json), + ValueReader::Enum(ref mut reader) => reader.read(def_level, rep_level).map(Value::Enum), + ValueReader::List(ref mut reader) => reader.read(def_level, rep_level).map(Value::List), + ValueReader::Map(ref mut reader) => reader.read(def_level, rep_level).map(Value::Map), + ValueReader::Group(ref mut reader) => { + reader.read(def_level, rep_level).map(Value::Group) } + ValueReader::Option(ref mut reader) => reader + .read(def_level, rep_level) + .map(|x| Value::Option(Box::new(x))), } } @@ -715,8 +730,8 @@ where { type Item = Root; - fn read(&mut self) -> Result { - self.0.read().map(Root) + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { + self.0.read(def_level, rep_level).map(Root) } fn advance_columns(&mut self) -> Result<()> { @@ -746,8 +761,8 @@ where { type Item = T; - fn read(&mut self) -> Result { - self.0.read().and_then(|x| { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { + self.0.read(def_level, rep_level).and_then(|x| { x.try_into() .map_err(|err| ParquetError::General(err.description().to_owned())) }) @@ -777,8 +792,8 @@ where { type Item = T; - fn read(&mut self) -> Result { - self.0.read().and_then(&mut self.1) + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { + self.0.read(def_level, rep_level).and_then(&mut self.1) } fn advance_columns(&mut self) -> Result<()> { @@ -810,7 +825,7 @@ where pub struct RowIter<'a, R, T> where R: FileReader, - Root: Deserialize, + T: Deserialize, { descr: SchemaDescPtr, // tree_builder: TreeBuilder, @@ -824,7 +839,7 @@ where impl<'a, R, T> RowIter<'a, R, T> where R: FileReader, - Root: Deserialize, + T: Deserialize, { /// Creates row iterator for all row groups in a file. pub fn from_file(proj: Option, reader: &'a R) -> Result { @@ -834,7 +849,7 @@ where let file_schema = reader.metadata().file_metadata().schema_descr_ptr(); let file_schema = file_schema.root_schema(); - let schema = as Deserialize>::parse(file_schema) + let schema = as Deserialize>::parse(file_schema, None) .map_err(|err| { // let schema: Type = as Deserialize>::render("", & as // Deserialize>::placeholder()); @@ -871,7 +886,7 @@ where let file_schema = row_group_reader.metadata().schema_descr_ptr(); let file_schema = file_schema.root_schema(); - let schema = as Deserialize>::parse(file_schema) + let schema = as Deserialize>::parse(file_schema, None) .map_err(|err| { // let schema: Type = as Deserialize>::render("", & as // Deserialize>::placeholder()); @@ -945,7 +960,7 @@ where impl<'a, R, T> Iterator for RowIter<'a, R, T> where R: FileReader, - Root: Deserialize, + T: Deserialize, { type Item = T; @@ -1006,7 +1021,7 @@ where /// Internal row iterator for a reader. struct ReaderIter where - Root: Deserialize, + T: Deserialize, { root_reader: as Deserialize>::Reader, records_left: u64, @@ -1015,7 +1030,7 @@ where impl ReaderIter where - Root: Deserialize, + T: Deserialize, { fn new(mut root_reader: as Deserialize>::Reader, num_records: u64) -> Self { // Prepare root reader by advancing all column vectors @@ -1030,14 +1045,14 @@ where impl Iterator for ReaderIter where - Root: Deserialize, + T: Deserialize, { type Item = T; fn next(&mut self) -> Option { if self.records_left > 0 { self.records_left -= 1; - Some(self.root_reader.read().unwrap().0) + Some(self.root_reader.read(0, 0).unwrap().0) } else { None } @@ -2070,7 +2085,7 @@ mod tests { fn test_file_reader_rows(file_name: &str, schema: Option) -> Result> where - Root: Deserialize, + T: Deserialize, { let file = get_test_file(file_name); let file_reader: SerializedFileReader<_> = SerializedFileReader::new(file)?; diff --git a/rust/parquet/src/record/schemas.rs b/rust/parquet/src/record/schemas.rs index b3c9655de3927..2dfa3f51518c1 100644 --- a/rust/parquet/src/record/schemas.rs +++ b/rust/parquet/src/record/schemas.rs @@ -1385,7 +1385,7 @@ where fn from_str(s: &str) -> Result { parse_message_type(s) .and_then(|x| { - as Deserialize>::parse(&x).map_err(|err| { + as Deserialize>::parse(&x, None).map_err(|err| { // let schema: Type = as Deserialize>::render("", & as // Deserialize>::placeholder()); let mut b = Vec::new(); diff --git a/rust/parquet/src/record/triplet.rs b/rust/parquet/src/record/triplet.rs index fb4c398accc1d..30594a98233d6 100644 --- a/rust/parquet/src/record/triplet.rs +++ b/rust/parquet/src/record/triplet.rs @@ -17,6 +17,7 @@ use std::mem; +use crate::basic::Repetition; use crate::column::reader::{get_typed_column_reader, ColumnReader, ColumnReaderImpl}; use crate::data_type::*; use crate::errors::{ParquetError, Result}; @@ -25,44 +26,44 @@ use crate::record::{ types::Value, Deserialize, }; -use crate::schema::types::{ColumnDescPtr, ColumnPath}; +use crate::schema::types::{ColumnDescPtr, ColumnPath, Type}; /// High level API wrapper on column reader. /// Provides per-element access for each primitive column. -pub struct TripletIter(ValueReader); +pub struct TripletIter { + def_level: i16, + rep_level: i16, + reader: ValueReader, +} impl TripletIter { /// Creates new triplet for column reader pub fn new(descr: ColumnDescPtr, reader: ColumnReader, batch_size: usize) -> Result { - let schema = Value::parse(descr.self_type())?.1; - let mut def = descr.max_def_level(); - let mut rep = descr.max_rep_level(); - match descr.self_type().get_basic_info().repetition() { - crate::basic::Repetition::REQUIRED => (), - crate::basic::Repetition::OPTIONAL => def -= 1, - crate::basic::Repetition::REPEATED => { - def -= 1; - rep -= 1 - } - } + let schema = descr.self_type(); + let schema = Value::parse(&schema, Some(Repetition::REQUIRED))?.1; + let (def_level, rep_level) = (descr.max_def_level(), descr.max_rep_level()); let reader = Value::reader( &schema, &mut vec![], - def, - rep, + def_level, + rep_level, &mut vec![(ColumnPath::new(vec![]), (descr.clone(), reader))] .into_iter() .collect(), batch_size, ); - Ok(TripletIter(reader)) + Ok(TripletIter { + def_level, + rep_level, + reader, + }) } /// Invokes underlying typed triplet iterator to buffer current value. /// Should be called once - either before `is_null` or `current_value`. #[inline] pub fn advance_columns(&mut self) -> Result<()> { - self.0.advance_columns() + self.reader.advance_columns() } /// Provides check on values/levels left without invoking the underlying typed triplet @@ -71,39 +72,45 @@ impl TripletIter { /// It is always in sync with `advance_columns` method. #[inline] pub fn has_next(&self) -> bool { - self.0.has_next() + self.reader.has_next() } /// Returns current definition level for a leaf triplet iterator #[inline] pub fn current_def_level(&self) -> i16 { - self.0.current_def_level() + self.reader.current_def_level() } - // /// Returns max definition level for a leaf triplet iterator - // #[inline] - // pub fn max_def_level(&self) -> i16 { triplet_enum_func!(self, max_def_level, ref) } + /// Returns max definition level for a leaf triplet iterator + #[inline] + pub fn max_def_level(&self) -> i16 { + self.def_level + } /// Returns current repetition level for a leaf triplet iterator #[inline] pub fn current_rep_level(&self) -> i16 { - self.0.current_rep_level() + self.reader.current_rep_level() } - // /// Returns max repetition level for a leaf triplet iterator - // #[inline] - // pub fn max_rep_level(&self) -> i16 { triplet_enum_func!(self, max_rep_level, ref) } + /// Returns max repetition level for a leaf triplet iterator + #[inline] + pub fn max_rep_level(&self) -> i16 { + self.rep_level + } - // /// Returns true, if current value is null. - // /// Based on the fact that for non-null value current definition level - // /// equals to max definition level. - // #[inline] - // pub fn is_null(&self) -> bool { self.current_def_level() < self.max_def_level() } + /// Returns true, if current value is null. + /// Based on the fact that for non-null value current definition level + /// equals to max definition level. + #[inline] + pub fn is_null(&self) -> bool { + self.current_def_level() < self.max_def_level() + } /// Updates non-null value for current row. pub fn read(&mut self) -> Result { - // assert!(!self.is_null(), "Value is null"); - self.0.read() + assert!(!self.is_null(), "Value is null"); + self.reader.read(self.def_level, self.rep_level) } } @@ -113,8 +120,8 @@ pub struct TypedTripletIter { reader: ColumnReaderImpl, batch_size: usize, // type properties - max_def_level: i16, - max_rep_level: i16, + def_level: i16, + rep_level: i16, // values and levels values: Vec, def_levels: Option>, @@ -131,19 +138,19 @@ impl TypedTripletIter { /// Creates new typed triplet iterator based on provided column reader. /// Use batch size to specify the amount of values to buffer from column reader. pub fn new( - max_def_level: i16, - max_rep_level: i16, + def_level: i16, + rep_level: i16, batch_size: usize, column_reader: ColumnReader, ) -> Self { assert_ne!(batch_size, 0, "Expected positive batch size"); - let def_levels = if max_def_level == 0 { + let def_levels = if def_level == 0 { None } else { Some(vec![0; batch_size]) }; - let rep_levels = if max_rep_level == 0 { + let rep_levels = if rep_level == 0 { None } else { Some(vec![0; batch_size]) @@ -152,8 +159,8 @@ impl TypedTripletIter { Self { reader: get_typed_column_reader(column_reader), batch_size, - max_def_level, - max_rep_level, + def_level, + rep_level, values: vec![T::T::default(); batch_size], def_levels, rep_levels, @@ -166,13 +173,13 @@ impl TypedTripletIter { /// Returns maximum definition level for the triplet iterator (leaf column). #[inline] pub fn max_def_level(&self) -> i16 { - self.max_def_level + self.def_level } /// Returns maximum repetition level for the triplet iterator (leaf column). #[inline] pub fn max_rep_level(&self) -> i16 { - self.max_rep_level + self.rep_level } /// Returns current value, advancing the iterator. @@ -180,9 +187,9 @@ impl TypedTripletIter { pub fn read(&mut self) -> Result { assert_eq!( self.current_def_level(), - self.max_def_level, + self.def_level, "Cannot extract value, max definition level: {}, current level: {}", - self.max_def_level, + self.def_level, self.current_def_level() ); let ret = mem::replace(&mut self.values[self.curr_triplet_index], T::T::default()); @@ -195,7 +202,7 @@ impl TypedTripletIter { pub fn current_def_level(&self) -> i16 { match self.def_levels { Some(ref vec) => vec[self.curr_triplet_index], - None => self.max_def_level, + None => self.def_level, } } @@ -205,7 +212,7 @@ impl TypedTripletIter { pub fn current_rep_level(&self) -> i16 { match self.rep_levels { Some(ref vec) => vec[self.curr_triplet_index], - None => self.max_rep_level, + None => self.rep_level, } } @@ -262,7 +269,7 @@ impl TypedTripletIter { let mut idx = values_read; let def_levels = self.def_levels.as_ref().unwrap(); for i in 0..levels_read { - if def_levels[levels_read - i - 1] == self.max_def_level { + if def_levels[levels_read - i - 1] == self.def_level { idx -= 1; // This is done to avoid usize becoming a negative value self.values.swap(levels_read - i - 1, idx); } @@ -308,16 +315,7 @@ mod tests { #[test] fn test_triplet_null_column() { let path = vec!["b_struct", "b_c_int"]; - let values = vec![ - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), - ]; + let values = vec![]; let def_levels = vec![1, 1, 1, 1, 1, 1, 1, 1]; let rep_levels = vec![0, 0, 0, 0, 0, 0, 0, 0]; test_triplet_iter( @@ -347,15 +345,7 @@ mod tests { #[test] fn test_triplet_optional_column() { let path = vec!["nested_struct", "A"]; - let values = vec![ - Value::Option(Box::new(Some(Value::I32(1)))), - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), - Value::Option(Box::new(Some(Value::I32(7)))), - ]; + let values = vec![Value::I32(1), Value::I32(7)]; let def_levels = vec![2, 1, 1, 1, 1, 0, 2]; let rep_levels = vec![0, 0, 0, 0, 0, 0, 0]; test_triplet_iter( @@ -371,24 +361,21 @@ mod tests { fn test_triplet_optional_list_column() { let path = vec!["a", "list", "element", "list", "element", "list", "element"]; let values = vec![ - Value::Option(Box::new(Some(Value::String("a".to_string())))), - Value::Option(Box::new(Some(Value::String("b".to_string())))), - Value::Option(Box::new(Some(Value::String("c".to_string())))), - Value::Option(Box::new(None)), - Value::Option(Box::new(Some(Value::String("d".to_string())))), - Value::Option(Box::new(Some(Value::String("a".to_string())))), - Value::Option(Box::new(Some(Value::String("b".to_string())))), - Value::Option(Box::new(Some(Value::String("c".to_string())))), - Value::Option(Box::new(Some(Value::String("d".to_string())))), - Value::Option(Box::new(None)), - Value::Option(Box::new(Some(Value::String("e".to_string())))), - Value::Option(Box::new(Some(Value::String("a".to_string())))), - Value::Option(Box::new(Some(Value::String("b".to_string())))), - Value::Option(Box::new(Some(Value::String("c".to_string())))), - Value::Option(Box::new(Some(Value::String("d".to_string())))), - Value::Option(Box::new(Some(Value::String("e".to_string())))), - Value::Option(Box::new(None)), - Value::Option(Box::new(Some(Value::String("f".to_string())))), + Value::String("a".to_string()), + Value::String("b".to_string()), + Value::String("c".to_string()), + Value::String("d".to_string()), + Value::String("a".to_string()), + Value::String("b".to_string()), + Value::String("c".to_string()), + Value::String("d".to_string()), + Value::String("e".to_string()), + Value::String("a".to_string()), + Value::String("b".to_string()), + Value::String("c".to_string()), + Value::String("d".to_string()), + Value::String("e".to_string()), + Value::String("f".to_string()), ]; let def_levels = vec![7, 7, 7, 4, 7, 7, 7, 7, 7, 4, 7, 7, 7, 7, 7, 7, 4, 7]; let rep_levels = vec![0, 3, 2, 1, 2, 0, 3, 2, 3, 1, 2, 0, 3, 2, 3, 2, 1, 2]; @@ -408,8 +395,6 @@ mod tests { Value::I32(1), Value::I32(2), Value::I32(1), - Value::Option(Box::new(None)), - Value::Option(Box::new(None)), Value::I32(1), Value::I32(3), Value::I32(4), @@ -498,18 +483,17 @@ mod tests { let mut def_levels: Vec = Vec::new(); let mut rep_levels: Vec = Vec::new(); - // assert_eq!(iter.max_def_level(), descr.max_def_level()); - // assert_eq!(iter.max_rep_level(), descr.max_rep_level()); + assert_eq!(iter.max_def_level(), descr.max_def_level()); + assert_eq!(iter.max_rep_level(), descr.max_rep_level()); iter.advance_columns().unwrap(); while iter.has_next() { def_levels.push(iter.current_def_level()); rep_levels.push(iter.current_rep_level()); - if iter.current_def_level() == descr.max_def_level() { + if !iter.is_null() { values.push(iter.read().unwrap()); } else { iter.advance_columns().unwrap(); - values.push(Value::Option(Box::new(None))); } } diff --git a/rust/parquet/src/record/types.rs b/rust/parquet/src/record/types.rs index 224226efe67bd..2c640bd3e3238 100644 --- a/rust/parquet/src/record/types.rs +++ b/rust/parquet/src/record/types.rs @@ -26,8 +26,16 @@ mod time; mod tuple; mod value; -use super::schemas::ValueSchema; -use crate::errors::ParquetError; +use std::{collections::HashMap, marker::PhantomData}; + +use super::schemas::{RootSchema, ValueSchema}; +use super::{reader::RootReader, Deserialize}; +use crate::{ + basic::Repetition, + column::reader::ColumnReader, + errors::ParquetError, + schema::types::{ColumnDescPtr, ColumnPath, Type}, +}; pub use self::{ array::*, decimal::*, group::*, list::*, map::*, numbers::*, option::*, time::*, tuple::*, @@ -47,3 +55,40 @@ where #[derive(Clone, Hash, PartialEq, Eq, Debug)] pub struct Root(pub T); + +impl Deserialize for Root +where + T: Deserialize, +{ + type Reader = RootReader; + type Schema = RootSchema; + + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + assert!(repetition.is_none()); + if schema.is_schema() { + T::parse(schema, Some(Repetition::REQUIRED)) + .map(|(name, schema)| (String::from(""), RootSchema(name, schema, PhantomData))) + } else { + Err(ParquetError::General(format!( + "Can't parse Root {:?}", + schema + ))) + } + } + + fn reader( + schema: &Self::Schema, + path: &mut Vec, + def_level: i16, + rep_level: i16, + paths: &mut HashMap, + batch_size: usize, + ) -> Self::Reader { + RootReader(T::reader( + &schema.1, path, def_level, rep_level, paths, batch_size, + )) + } +} diff --git a/rust/parquet/src/record/types/array.rs b/rust/parquet/src/record/types/array.rs index e17a802bc40c9..09076e09e2b47 100644 --- a/rust/parquet/src/record/types/array.rs +++ b/rust/parquet/src/record/types/array.rs @@ -41,30 +41,30 @@ impl Deserialize for Vec { type Reader = ByteArrayReader; type Schema = VecSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); ByteArrayReader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), } } @@ -78,15 +78,18 @@ impl Deserialize for Bson { MapReader< as Deserialize>::Reader, fn(Vec) -> Result>; type Schema = BsonSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { @@ -94,8 +97,8 @@ impl Deserialize for Bson { Vec::::reader( &VecSchema(schema.0), path, - curr_def_level, - curr_rep_level, + def_level, + rep_level, paths, batch_size, ), @@ -119,31 +122,31 @@ impl Deserialize for String { type Reader = MapReader) -> Result>; type Schema = StringSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); MapReader( ByteArrayReader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, |x| { @@ -162,27 +165,23 @@ impl Deserialize for Json { MapReader<::Reader, fn(String) -> Result>; type Schema = JsonSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { MapReader( - String::reader( - &StringSchema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, - ), + String::reader(&StringSchema, path, def_level, rep_level, paths, batch_size), |x| Ok(Json(x)), ) } @@ -211,27 +210,23 @@ impl Deserialize for Enum { MapReader<::Reader, fn(String) -> Result>; type Schema = EnumSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { MapReader( - String::reader( - &StringSchema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, - ), + String::reader(&StringSchema, path, def_level, rep_level, paths, batch_size), |x| Ok(Enum(x)), ) } @@ -260,9 +255,12 @@ macro_rules! impl_parquet_deserialize_array { MapReader) -> Result>; type Schema = ArraySchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { if schema.is_primitive() - && schema.get_basic_info().repetition() == Repetition::REQUIRED + && repetition == Some(Repetition::REQUIRED) && schema.get_physical_type() == PhysicalType::FIXED_LEN_BYTE_ARRAY && schema.get_basic_info().logical_type() == LogicalType::NONE && schema.get_type_length() == $i @@ -278,24 +276,21 @@ macro_rules! impl_parquet_deserialize_array { fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); MapReader( FixedLenByteArrayReader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, |bytes: Vec<_>| { @@ -319,31 +314,31 @@ macro_rules! impl_parquet_deserialize_array { MapReader) -> Result>; type Schema = ArraySchema<[u8; $i]>; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - <[u8; $i]>::parse(schema) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + <[u8; $i]>::parse(schema, repetition) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); MapReader( FixedLenByteArrayReader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, |bytes: Vec<_>| { diff --git a/rust/parquet/src/record/types/decimal.rs b/rust/parquet/src/record/types/decimal.rs index 0e116a85488f4..a22e30f82be22 100644 --- a/rust/parquet/src/record/types/decimal.rs +++ b/rust/parquet/src/record/types/decimal.rs @@ -18,6 +18,7 @@ use std::collections::HashMap; use crate::{ + basic::Repetition, column::reader::ColumnReader, data_type::{ByteArrayType, Decimal, FixedLenByteArrayType, Int32Type, Int64Type}, errors::ParquetError, @@ -42,22 +43,25 @@ impl Deserialize for Decimal { >; type Schema = DecimalSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); unimplemented!() @@ -65,8 +69,8 @@ impl Deserialize for Decimal { // DecimalSchema::Int32 { precision, scale } => sum::Sum3::A(MapReader( // I32Reader { // column: TypedTripletIter::::new( - // curr_def_level, - // curr_rep_level, + // def_level, + // rep_level, // batch_size, // col_reader, // ), @@ -76,8 +80,8 @@ impl Deserialize for Decimal { // DecimalSchema::Int64 { precision, scale } => sum::Sum3::B(MapReader( // I64Reader { // column: TypedTripletIter::::new( - // curr_def_level, - // curr_rep_level, + // def_level, + // rep_level, // batch_size, // col_reader, // ), @@ -87,8 +91,8 @@ impl Deserialize for Decimal { // DecimalSchema::Array { precision, scale } => sum::Sum3::C(MapReader( // ByteArrayReader { // column: TypedTripletIter::::new( - // curr_def_level, - // curr_rep_level, + // def_level, + // rep_level, // batch_size, // col_reader, // ), diff --git a/rust/parquet/src/record/types/group.rs b/rust/parquet/src/record/types/group.rs index 05b2cf7fb0fcd..928e2d67ed44c 100644 --- a/rust/parquet/src/record/types/group.rs +++ b/rust/parquet/src/record/types/group.rs @@ -46,18 +46,21 @@ impl Deserialize for Group { type Reader = GroupReader; type Schema = GroupSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - if schema.is_group() - && !schema.is_schema() - && schema.get_basic_info().repetition() == Repetition::REQUIRED - { + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + if schema.is_group() && repetition == Some(Repetition::REQUIRED) { let mut map = HashMap::new(); let fields = schema .get_fields() .iter() .enumerate() .map(|(i, field)| { - let (name, schema) = ::parse(&**field)?; + let (name, schema) = ::parse( + &**field, + Some(field.get_basic_info().repetition()), + )?; let x = map.insert(name, i); assert!(x.is_none()); Ok(schema) @@ -75,8 +78,8 @@ impl Deserialize for Group { fn reader( schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { @@ -90,73 +93,17 @@ impl Deserialize for Group { .enumerate() .map(|(i, field)| { path.push(names_[i].take().unwrap()); - let ret = Value::reader( - field, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, - ); + let ret = Value::reader(field, path, def_level, rep_level, paths, batch_size); path.pop().unwrap(); ret }) .collect(); GroupReader { - def_level: curr_def_level, readers, fields: Rc::new(schema.1.clone()), } } } -impl Deserialize for Root { - type Reader = RootReader; - type Schema = RootSchema; - - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - if schema.is_schema() { - let mut map = HashMap::new(); - let fields = schema - .get_fields() - .iter() - .enumerate() - .map(|(i, field)| { - let (name, schema) = ::parse(&**field)?; - let x = map.insert(name, i); - assert!(x.is_none()); - Ok(schema) - }) - .collect::, ParquetError>>()?; - let schema_ = GroupSchema(fields, map); - return Ok(( - String::from(""), - RootSchema(schema.name().to_owned(), schema_, PhantomData), - )); - } - Err(ParquetError::General(format!( - "Can't parse Group {:?}", - schema - ))) - } - - fn reader( - schema: &Self::Schema, - path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, - paths: &mut HashMap, - batch_size: usize, - ) -> Self::Reader { - RootReader(Group::reader( - &schema.1, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, - )) - } -} impl Group { pub fn get(&self, k: &str) -> Option<&Value> { diff --git a/rust/parquet/src/record/types/list.rs b/rust/parquet/src/record/types/list.rs index 06cae470767b0..bbdf72c261e77 100644 --- a/rust/parquet/src/record/types/list.rs +++ b/rust/parquet/src/record/types/list.rs @@ -67,13 +67,13 @@ pub(super) fn parse_list( }; ListSchema( - T::parse(&*element)?.1, + T::parse(&*element, Some(element.get_basic_info().repetition()))?.1, ListSchemaType::List(list_name, element_name), ) } else { let element_name = sub_schema.name().to_owned(); ListSchema( - T::parse(&*sub_schema)?.1, + T::parse(&*sub_schema, Some(Repetition::REQUIRED))?.1, ListSchemaType::ListCompat(element_name), ) }, @@ -96,27 +96,23 @@ where type Reader = MapReader, fn(Vec) -> Result>; type Schema = ListSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - if !schema.is_schema() && schema.get_basic_info().repetition() == Repetition::REQUIRED { + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + if repetition == Some(Repetition::REQUIRED) { return parse_list::(schema).map(|schema2| (schema.name().to_owned(), schema2)); } // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group // nor annotated by `LIST` or `MAP` should be interpreted as a required list of // required elements where the element type is the type of the field. - if schema.get_basic_info().repetition() == Repetition::REPEATED { - let mut schema2: Type = schema.clone(); - let basic_info = match schema2 { - Type::PrimitiveType { - ref mut basic_info, .. - } => basic_info, - Type::GroupType { - ref mut basic_info, .. - } => basic_info, - }; - basic_info.set_repetition(Some(Repetition::REQUIRED)); + if repetition == Some(Repetition::REPEATED) { return Ok(( schema.name().to_owned(), - ListSchema(T::parse(&schema2)?.1, ListSchemaType::Repeated), + ListSchema( + T::parse(&schema, Some(Repetition::REQUIRED))?.1, + ListSchemaType::Repeated, + ), )); } Err(ParquetError::General(String::from( @@ -127,8 +123,8 @@ where fn reader( schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { @@ -143,8 +139,8 @@ where let reader = T::reader( &schema.0, path, - curr_def_level + 1, - curr_rep_level + 1, + def_level + 1, + rep_level + 1, paths, batch_size, ); @@ -152,8 +148,8 @@ where path.pop().unwrap(); RepeatedReader { - def_level: curr_def_level, - rep_level: curr_rep_level, + def_level, + rep_level, reader, } } @@ -162,16 +158,16 @@ where let reader = T::reader( &schema.0, path, - curr_def_level + 1, - curr_rep_level + 1, + def_level + 1, + rep_level + 1, paths, batch_size, ); path.pop().unwrap(); RepeatedReader { - def_level: curr_def_level, - rep_level: curr_rep_level, + def_level, + rep_level, reader, } } @@ -179,14 +175,14 @@ where let reader = T::reader( &schema.0, path, - curr_def_level + 1, - curr_rep_level + 1, + def_level + 1, + rep_level + 1, paths, batch_size, ); RepeatedReader { - def_level: curr_def_level, - rep_level: curr_rep_level, + def_level, + rep_level, reader, } } diff --git a/rust/parquet/src/record/types/map.rs b/rust/parquet/src/record/types/map.rs index 58030b99e54df..79520326183e1 100644 --- a/rust/parquet/src/record/types/map.rs +++ b/rust/parquet/src/record/types/map.rs @@ -44,12 +44,11 @@ pub(super) fn parse_map( { let sub_schema = schema.get_fields().into_iter().nth(0).unwrap(); if sub_schema.is_group() - && !sub_schema.is_schema() && sub_schema.get_basic_info().repetition() == Repetition::REPEATED && sub_schema.get_fields().len() == 2 { let mut fields = sub_schema.get_fields().into_iter(); - let (key, value_) = (fields.next().unwrap(), fields.next().unwrap()); + let (key, value) = (fields.next().unwrap(), fields.next().unwrap()); let key_value_name = if sub_schema.name() == "key_value" { None } else { @@ -60,14 +59,14 @@ pub(super) fn parse_map( } else { Some(key.name().to_owned()) }; - let value_name = if value_.name() == "value" { + let value_name = if value.name() == "value" { None } else { - Some(value_.name().to_owned()) + Some(value.name().to_owned()) }; return Ok(MapSchema( - K::parse(&*key)?.1, - V::parse(&*value_)?.1, + K::parse(&*key, Some(key.get_basic_info().repetition()))?.1, + V::parse(&*value, Some(value.get_basic_info().repetition()))?.1, key_value_name, key_name, value_name, @@ -94,8 +93,11 @@ where >; type Schema = MapSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - if !schema.is_schema() && schema.get_basic_info().repetition() == Repetition::REQUIRED { + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + if repetition == Some(Repetition::REQUIRED) { return parse_map::(schema).map(|schema2| (schema.name().to_owned(), schema2)); } Err(ParquetError::General(String::from( @@ -106,8 +108,8 @@ where fn reader( schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { @@ -120,8 +122,8 @@ where let keys_reader = K::reader( &schema.0, path, - curr_def_level + 1, - curr_rep_level + 1, + def_level + 1, + rep_level + 1, paths, batch_size, ); @@ -130,8 +132,8 @@ where let values_reader = V::reader( &schema.1, path, - curr_def_level + 1, - curr_rep_level + 1, + def_level + 1, + rep_level + 1, paths, batch_size, ); @@ -140,8 +142,8 @@ where MapReader( KeyValueReader { - def_level: curr_def_level, - rep_level: curr_rep_level, + def_level, + rep_level, keys_reader, values_reader, }, diff --git a/rust/parquet/src/record/types/numbers.rs b/rust/parquet/src/record/types/numbers.rs index 8d295dc68076e..9694887b0fb61 100644 --- a/rust/parquet/src/record/types/numbers.rs +++ b/rust/parquet/src/record/types/numbers.rs @@ -18,6 +18,7 @@ use std::{collections::HashMap, marker::PhantomData}; use crate::{ + basic::Repetition, column::reader::ColumnReader, data_type::{BoolType, DoubleType, FloatType, Int32Type, Int64Type}, errors::ParquetError, @@ -40,31 +41,29 @@ impl Deserialize for bool { type Reader = BoolReader; type Schema = BoolSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); BoolReader { - column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, - ), + column: TypedTripletIter::::new(def_level, rep_level, batch_size, col_reader), } } } @@ -73,31 +72,31 @@ impl Deserialize for i8 { type Reader = TryIntoReader; type Schema = I8Schema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); TryIntoReader( I32Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, PhantomData, @@ -108,31 +107,31 @@ impl Deserialize for u8 { type Reader = TryIntoReader; type Schema = U8Schema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); TryIntoReader( I32Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, PhantomData, @@ -144,31 +143,31 @@ impl Deserialize for i16 { type Reader = TryIntoReader; type Schema = I16Schema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); TryIntoReader( I32Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, PhantomData, @@ -179,31 +178,31 @@ impl Deserialize for u16 { type Reader = TryIntoReader; type Schema = U16Schema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); TryIntoReader( I32Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, PhantomData, @@ -215,30 +214,30 @@ impl Deserialize for i32 { type Reader = I32Reader; type Schema = I32Schema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); I32Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), } } @@ -248,31 +247,31 @@ impl Deserialize for u32 { type Reader = MapReader Result>; type Schema = U32Schema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); MapReader( I32Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, |x| Ok(x as u32), @@ -284,30 +283,30 @@ impl Deserialize for i64 { type Reader = I64Reader; type Schema = I64Schema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); I64Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), } } @@ -317,31 +316,31 @@ impl Deserialize for u64 { type Reader = MapReader Result>; type Schema = U64Schema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); MapReader( I64Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, |x| Ok(x as u64), @@ -353,30 +352,30 @@ impl Deserialize for f32 { type Reader = F32Reader; type Schema = F32Schema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); F32Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), } } @@ -385,30 +384,30 @@ impl Deserialize for f64 { type Reader = F64Reader; type Schema = F64Schema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); F64Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), } } diff --git a/rust/parquet/src/record/types/option.rs b/rust/parquet/src/record/types/option.rs index cece8a8839670..96edb1bf34e04 100644 --- a/rust/parquet/src/record/types/option.rs +++ b/rust/parquet/src/record/types/option.rs @@ -32,24 +32,17 @@ where type Reader = OptionReader; type Schema = OptionSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { // ::parse(schema).and_then(|(name, schema)| { // Ok((name, OptionSchema(schema.as_option()?.0.downcast()?))) // }) - if schema.get_basic_info().repetition() == Repetition::OPTIONAL { - let mut schema2: Type = schema.clone(); - let basic_info = match schema2 { - Type::PrimitiveType { - ref mut basic_info, .. - } => basic_info, - Type::GroupType { - ref mut basic_info, .. - } => basic_info, - }; - basic_info.set_repetition(Some(Repetition::REQUIRED)); + if repetition == Some(Repetition::OPTIONAL) { return Ok(( schema.name().to_owned(), - OptionSchema(T::parse(&schema2)?.1), + OptionSchema(T::parse(&schema, Some(Repetition::REQUIRED))?.1), )); } Err(ParquetError::General(String::from( @@ -60,18 +53,18 @@ where fn reader( schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { OptionReader { - def_level: curr_def_level, + def_level, reader: ::reader( &schema.0, path, - curr_def_level + 1, - curr_rep_level, + def_level + 1, + rep_level, paths, batch_size, ), diff --git a/rust/parquet/src/record/types/time.rs b/rust/parquet/src/record/types/time.rs index fb421311fd074..8e1042695d97a 100644 --- a/rust/parquet/src/record/types/time.rs +++ b/rust/parquet/src/record/types/time.rs @@ -18,6 +18,7 @@ use std::{collections::HashMap, convert::TryInto, error::Error, num::TryFromIntError}; use crate::{ + basic::Repetition, column::reader::ColumnReader, data_type::{Int32Type, Int64Type, Int96, Int96Type}, errors::ParquetError, @@ -44,31 +45,31 @@ impl Deserialize for Date { type Reader = MapReader Result>; type Schema = DateSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( _schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); MapReader( I32Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, |days| Ok(Date(days)), @@ -86,32 +87,32 @@ impl Deserialize for Time { >; type Schema = TimeSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); match schema { TimeSchema::Micros => sum::Sum2::A(MapReader( I64Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, |micros| Ok(Time(micros)), @@ -119,10 +120,7 @@ impl Deserialize for Time { TimeSchema::Millis => sum::Sum2::B(MapReader( I32Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, |millis| Ok(Time(millis as i64 * MICROS_PER_MILLI)), @@ -171,32 +169,32 @@ impl Deserialize for Timestamp { >; type Schema = TimestampSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { - Value::parse(schema).and_then(downcast) + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { + Value::parse(schema, repetition).and_then(downcast) } fn reader( schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { let col_path = ColumnPath::new(path.to_vec()); let (col_descr, col_reader) = paths.remove(&col_path).unwrap(); assert_eq!( - (curr_def_level, curr_rep_level), + (def_level, rep_level), (col_descr.max_def_level(), col_descr.max_rep_level()) ); match schema { TimestampSchema::Int96 => sum::Sum3::A(MapReader( I96Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, |x| Ok(Timestamp(x)), @@ -204,10 +202,7 @@ impl Deserialize for Timestamp { TimestampSchema::Millis => sum::Sum3::B(MapReader( I64Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, |millis| { @@ -231,10 +226,7 @@ impl Deserialize for Timestamp { TimestampSchema::Micros => sum::Sum3::C(MapReader( I64Reader { column: TypedTripletIter::::new( - curr_def_level, - curr_rep_level, - batch_size, - col_reader, + def_level, rep_level, batch_size, col_reader, ), }, |micros| { diff --git a/rust/parquet/src/record/types/tuple.rs b/rust/parquet/src/record/types/tuple.rs index 09ac456ba77a9..d7591bc472621 100644 --- a/rust/parquet/src/record/types/tuple.rs +++ b/rust/parquet/src/record/types/tuple.rs @@ -40,9 +40,9 @@ macro_rules! impl_parquet_deserialize_tuple { impl<$($t,)*> Reader for TupleReader<($($t,)*)> where $($t: Reader,)* { type Item = ($($t::Item,)*); - fn read(&mut self) -> Result { + fn read(&mut self, def_level: i16, rep_level: i16) -> Result { Ok(( - $((self.0).$i.read()?,)* + $((self.0).$i.read(def_level, rep_level)?,)* )) } fn advance_columns(&mut self) -> Result<(), ParquetError> { @@ -113,32 +113,14 @@ macro_rules! impl_parquet_deserialize_tuple { f.write_str("}}") } } - impl<$($t,)*> Deserialize for Root<($($t,)*)> where $($t: Deserialize,)* { - type Schema = RootSchema<($($t,)*),TupleSchema<($((String,$t::Schema,),)*)>>; - type Reader = RootReader>; - - fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { - if schema.is_schema() { - let mut fields = schema.get_fields().iter(); - let schema_ = RootSchema(schema.name().to_owned(), TupleSchema(($(fields.next().ok_or(ParquetError::General(String::from("Group missing field"))).and_then(|x|$t::parse(&**x))?,)*)), PhantomData); - if fields.next().is_none() { - return Ok((String::from(""), schema_)) - } - } - Err(ParquetError::General(format!("Can't parse Tuple {:?}", schema))) - } - fn reader(schema: &Self::Schema, path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap, batch_size: usize) -> Self::Reader { - RootReader(<($($t,)*) as Deserialize>::reader(&schema.1, path, curr_def_level, curr_rep_level, paths, batch_size)) - } - } impl<$($t,)*> Deserialize for ($($t,)*) where $($t: Deserialize,)* { type Schema = TupleSchema<($((String,$t::Schema,),)*)>; type Reader = TupleReader<($($t::Reader,)*)>; - fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> { - if schema.is_group() && !schema.is_schema() && schema.get_basic_info().repetition() == Repetition::REQUIRED { + fn parse(schema: &Type, repetition: Option) -> Result<(String, Self::Schema), ParquetError> { + if schema.is_group() && repetition == Some(Repetition::REQUIRED) { let mut fields = schema.get_fields().iter(); - let schema_ = TupleSchema(($(fields.next().ok_or(ParquetError::General(String::from("Group missing field"))).and_then(|x|$t::parse(&**x))?,)*)); + let schema_ = TupleSchema(($(fields.next().ok_or(ParquetError::General(String::from("Group missing field"))).and_then(|x|$t::parse(&**x, Some(x.get_basic_info().repetition())))?,)*)); if fields.next().is_none() { return Ok((schema.name().to_owned(), schema_)) } @@ -146,11 +128,11 @@ macro_rules! impl_parquet_deserialize_tuple { Err(ParquetError::General(format!("Can't parse Tuple {:?}", schema))) } #[allow(unused_variables)] - fn reader(schema: &Self::Schema, path: &mut Vec, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap, batch_size: usize) -> Self::Reader { + fn reader(schema: &Self::Schema, path: &mut Vec, def_level: i16, rep_level: i16, paths: &mut HashMap, batch_size: usize) -> Self::Reader { $( path.push((schema.0).$i.0.to_owned()); #[allow(non_snake_case)] - let $t = <$t as Deserialize>::reader(&(schema.0).$i.1, path, curr_def_level, curr_rep_level, paths, batch_size); + let $t = <$t as Deserialize>::reader(&(schema.0).$i.1, path, def_level, rep_level, paths, batch_size); path.pop().unwrap(); )*; TupleReader(($($t,)*)) diff --git a/rust/parquet/src/record/types/value.rs b/rust/parquet/src/record/types/value.rs index c715b4d7c5038..3943a28696a4d 100644 --- a/rust/parquet/src/record/types/value.rs +++ b/rust/parquet/src/record/types/value.rs @@ -1162,9 +1162,12 @@ impl Deserialize for Value { type Reader = ValueReader; type Schema = ValueSchema; - fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError> { + fn parse( + schema: &Type, + repetition: Option, + ) -> Result<(String, Self::Schema), ParquetError> { let mut value = None; - if schema.is_primitive() { + if repetition.is_some() && schema.is_primitive() { value = Some( match ( schema.get_physical_type(), @@ -1265,29 +1268,31 @@ impl Deserialize for Value { ); } // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#backward-compatibility-rules - if value.is_none() && !schema.is_schema() { + if repetition.is_some() && value.is_none() { value = parse_list::(schema) .ok() .map(|value| ValueSchema::List(Box::new(value))); } - if value.is_none() && !schema.is_schema() { + if repetition.is_some() && value.is_none() { value = parse_map::(schema) .ok() .map(|value| ValueSchema::Map(Box::new(value))); } - if value.is_none() && schema.is_group() && !schema.is_schema() { + if repetition.is_some() && value.is_none() && schema.is_group() { let mut lookup = HashMap::new(); value = Some(ValueSchema::Group(GroupSchema( schema .get_fields() .iter() .map(|schema| { - Value::parse(&*schema).map(|(name, schema)| { - let x = lookup.insert(name, lookup.len()); - assert!(x.is_none()); - schema - }) + Value::parse(&*schema, Some(schema.get_basic_info().repetition())).map( + |(name, schema)| { + let x = lookup.insert(name, lookup.len()); + assert!(x.is_none()); + schema + }, + ) }) .collect::, _>>()?, lookup, @@ -1295,9 +1300,9 @@ impl Deserialize for Value { } let mut value = value - .ok_or_else(|| ParquetError::General(format!("Can't parse group {:?}", schema)))?; + .ok_or_else(|| ParquetError::General(format!("Can't parse value {:?}", schema)))?; - match schema.get_basic_info().repetition() { + match repetition.unwrap() { Repetition::OPTIONAL => { value = ValueSchema::Option(Box::new(OptionSchema(value))); } @@ -1313,214 +1318,94 @@ impl Deserialize for Value { fn reader( schema: &Self::Schema, path: &mut Vec, - curr_def_level: i16, - curr_rep_level: i16, + def_level: i16, + rep_level: i16, paths: &mut HashMap, batch_size: usize, ) -> Self::Reader { match *schema { ValueSchema::Bool(ref schema) => ValueReader::Bool(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::U8(ref schema) => ValueReader::U8(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::I8(ref schema) => ValueReader::I8(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::U16(ref schema) => ValueReader::U16(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::I16(ref schema) => ValueReader::I16(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::U32(ref schema) => ValueReader::U32(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::I32(ref schema) => ValueReader::I32(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::U64(ref schema) => ValueReader::U64(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::I64(ref schema) => ValueReader::I64(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::F32(ref schema) => ValueReader::F32(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::F64(ref schema) => ValueReader::F64(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::Date(ref schema) => ValueReader::Date(::reader( - schema, - path, - curr_def_level, - curr_rep_level, - paths, - batch_size, + schema, path, def_level, rep_level, paths, batch_size, )), ValueSchema::Time(ref schema) => ValueReader::Time(