Skip to content

Commit

Permalink
Remove unnecessary state from readers; Remove Root from Deserialize b…
Browse files Browse the repository at this point in the history
…ounds so can deserialize foreign structs; Test #[derive(Deserialize)]; Better honour TripletIter API
  • Loading branch information
alecmocatta committed Jan 25, 2019
1 parent db6379d commit 71f1e3b
Show file tree
Hide file tree
Showing 20 changed files with 714 additions and 748 deletions.
64 changes: 27 additions & 37 deletions rust/parquet/parquet_derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#![recursion_limit = "200"]
#![recursion_limit = "300"]

extern crate proc_macro;
extern crate proc_macro2;
Expand Down Expand Up @@ -75,7 +75,7 @@ fn impl_struct(
.push(syn::parse2(quote! { <#ident as Deserialize>::Schema: Debug }).unwrap());
}

let field_renames1 = fields
let field_renames = fields
.iter()
.map(|field| {
let mut rename = None;
Expand Down Expand Up @@ -113,28 +113,18 @@ fn impl_struct(
}))
})
.collect::<Result<Vec<_>, _>>()?;
let field_renames2 = field_renames1.clone();
let field_renames3 = field_renames1.clone();
let field_renames1 = &field_renames;
let field_renames2 = &field_renames;

let field_names1 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names2 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names3 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names4 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names5 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names6 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names7 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names8 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names9 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names10 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names11 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names12 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names13 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names14 = fields.iter().map(|field| field.ident.as_ref().unwrap());
let field_names = fields
.iter()
.map(|field| field.ident.as_ref().unwrap())
.collect::<Vec<_>>();
let field_names1 = &field_names;
let field_names2 = &field_names;

let field_types1 = fields.iter().map(|field| &field.ty);
let field_types2 = fields.iter().map(|field| &field.ty);
let field_types3 = fields.iter().map(|field| &field.ty);
let field_types4 = fields.iter().map(|field| &field.ty);
let field_types = fields.iter().map(|field| &field.ty).collect::<Vec<_>>();
let field_types1 = &field_types;

let name1 = iter::repeat(name).take(fields.len());

Expand All @@ -154,7 +144,7 @@ fn impl_struct(
impl #impl_generics Debug for #schema_name #ty_generics #where_clause_with_debug {
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct(stringify!(#schema_name))
#(.field(stringify!(#field_names2), &self.#field_names3))*
#(.field(stringify!(#field_names1), &self.#field_names2))*
.finish()
}
}
Expand All @@ -167,34 +157,34 @@ fn impl_struct(
}
}
struct #reader_name #impl_generics #where_clause {
#(#field_names4: <#field_types2 as Deserialize>::Reader,)*
#(#field_names1: <#field_types1 as Deserialize>::Reader,)*
}
impl #impl_generics Reader for #reader_name #ty_generics #where_clause {
type Item = #name #ty_generics;

fn read(&mut self) -> Result<Self::Item, ParquetError> {
fn read(&mut self, def_level: i16, rep_level: i16) -> Result<Self::Item, ParquetError> {
Result::Ok(#name {
#(#field_names5: self.#field_names6.read()?,)*
#(#field_names1: self.#field_names2.read(def_level, rep_level)?,)*
})
}
fn advance_columns(&mut self) -> Result<(), ParquetError> {
#(self.#field_names7.advance_columns()?;)*
#(self.#field_names1.advance_columns()?;)*
Result::Ok(())
}
fn has_next(&self) -> bool {
#(if true { self.#field_names8.has_next() } else)*
#(if true { self.#field_names1.has_next() } else)*
{
true
}
}
fn current_def_level(&self) -> i16 {
#(if true { self.#field_names9.current_def_level() } else)*
#(if true { self.#field_names1.current_def_level() } else)*
{
panic!("Current definition level: empty group reader")
}
}
fn current_rep_level(&self) -> i16 {
#(if true { self.#field_names10.current_rep_level() } else)*
#(if true { self.#field_names1.current_rep_level() } else)*
{
panic!("Current repetition level: empty group reader")
}
Expand All @@ -204,23 +194,23 @@ fn impl_struct(
type Schema = #schema_name #ty_generics;
type Reader = #reader_name #ty_generics;

fn parse(schema: &Type) -> Result<(String,Self::Schema),ParquetError> {
if schema.is_group() && !schema.is_schema() && schema.get_basic_info().repetition() == Repetition::REQUIRED {
fn parse(schema: &Type, repetition: Option<Repetition>) -> Result<(String, Self::Schema), ParquetError> {
if schema.is_group() && repetition == Some(Repetition::REQUIRED) {
let fields = schema.get_fields().iter().map(|field|(field.name(),field)).collect::<HashMap<_,_>>();
let schema_ = #schema_name{
#(#field_names11: fields.get(#field_renames1).ok_or(ParquetError::General(format!("Struct {} missing field {}", stringify!(#name1), #field_renames2))).and_then(|x|<#field_types3 as Deserialize>::parse(&**x))?.1,)*
#(#field_names1: fields.get(#field_renames1).ok_or(ParquetError::General(format!("Struct {} missing field {}", stringify!(#name1), #field_renames2))).and_then(|x|<#field_types1 as Deserialize>::parse(&**x, Some(x.get_basic_info().repetition())))?.1,)*
};
return Result::Ok((schema.name().to_owned(), schema_))
}
Result::Err(ParquetError::General(format!("Struct {}", stringify!(#name))))
}
fn reader(schema: &Self::Schema, mut path: &mut Vec<String>, curr_def_level: i16, curr_rep_level: i16, paths: &mut HashMap<ColumnPath, (ColumnDescPtr,ColumnReader)>, batch_size: usize) -> Self::Reader {
fn reader(schema: &Self::Schema, mut path: &mut Vec<String>, def_level: i16, rep_level: i16, paths: &mut HashMap<ColumnPath, (ColumnDescPtr,ColumnReader)>, batch_size: usize) -> Self::Reader {
#(
path.push(#field_renames3.to_owned());
let #field_names12 = <#field_types4 as Deserialize>::reader(&schema.#field_names13, path, curr_def_level, curr_rep_level, paths, batch_size);
path.push(#field_renames1.to_owned());
let #field_names1 = <#field_types1 as Deserialize>::reader(&schema.#field_names2, path, def_level, rep_level, paths, batch_size);
path.pop().unwrap();
)*
#reader_name { #(#field_names14,)* }
#reader_name { #(#field_names1,)* }
}
}
};
Expand Down
34 changes: 23 additions & 11 deletions rust/parquet/src/file/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub trait FileReader {
/// full file schema is assumed.
fn get_row_iter<T>(&self, projection: Option<SchemaType>) -> Result<RowIter<Self, T>>
where
Root<T>: Deserialize,
T: Deserialize,
Self: Sized;
}

Expand All @@ -86,12 +86,17 @@ pub trait RowGroupReader {
/// Get value reader for the `i`th column chunk.
fn get_column_reader(&self, i: usize) -> Result<ColumnReader>;

// /// Get iterator of `Row`s from this row group.
// ///
// /// Projected schema can be a subset of or equal to the file schema, when it is None,
// /// full file schema is assumed.
// fn get_row_iter<T>(&self, projection: Option<SchemaType>) -> Result<RowIter<Self,T>>
// where Root<T>: Deserialize, Self: Sized;
/// Get iterator of `Row`s from this row group.
///
/// Projected schema can be a subset of or equal to the file schema, when it is None,
/// full file schema is assumed.
fn get_row_iter<T>(
&self,
projection: Option<SchemaType>,
) -> Result<RowIter<SerializedFileReader<std::fs::File>, T>>
where
T: Deserialize,
Self: Sized;
}

// ----------------------------------------------------------------------
Expand Down Expand Up @@ -275,7 +280,7 @@ impl<R: 'static + ParquetReader> FileReader for SerializedFileReader<R> {

fn get_row_iter<T>(&self, projection: Option<SchemaType>) -> Result<RowIter<Self, T>>
where
Root<T>: Deserialize,
T: Deserialize,
Self: Sized,
{
RowIter::from_file(projection, self)
Expand Down Expand Up @@ -390,9 +395,16 @@ impl<R: 'static + ParquetReader> RowGroupReader for SerializedRowGroupReader<R>
Ok(col_reader)
}

// fn get_row_iter<T>(&self, projection: Option<SchemaType>) -> Result<RowIter<Self,T>>
// where Root<T>: Deserialize, Self: Sized { RowIter::from_row_group(projection,
// self) }
fn get_row_iter<T>(
&self,
projection: Option<SchemaType>,
) -> Result<RowIter<SerializedFileReader<std::fs::File>, T>>
where
T: Deserialize,
Self: Sized,
{
RowIter::from_row_group(projection, self)
}
}

/// A serialized implementation for Parquet [`PageReader`].
Expand Down
3 changes: 1 addition & 2 deletions rust/parquet/src/file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -926,8 +926,7 @@ mod tests {
assert_eq!(reader.num_row_groups(), data.len());
for i in 0..reader.num_row_groups() {
let row_group_reader = reader.get_row_group(i).unwrap();
// let iter =
// row_group_reader.get_row_iter::<Row>(None).unwrap();
// let iter = row_group_reader.get_row_iter::<Row>(None).unwrap();
let iter =
crate::record::reader::RowIter::<SerializedFileReader<File>, Row>::from_row_group(
None,
Expand Down
9 changes: 6 additions & 3 deletions rust/parquet/src/record/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,17 @@ pub trait Deserialize: Sized {
type Reader: Reader<Item = Self>;

/// Parse a [`Type`] into `Self::Schema`.
fn parse(schema: &Type) -> Result<(String, Self::Schema), ParquetError>;
fn parse(
schema: &Type,
repetition: Option<Repetition>,
) -> Result<(String, Self::Schema), ParquetError>;

/// Builds tree of readers for the specified schema recursively.
fn reader(
schema: &Self::Schema,
path: &mut Vec<String>,
curr_def_level: i16,
curr_rep_level: i16,
def_level: i16,
rep_level: i16,
paths: &mut HashMap<ColumnPath, (ColumnDescPtr, ColumnReader)>,
batch_size: usize,
) -> Self::Reader;
Expand Down
Loading

0 comments on commit 71f1e3b

Please sign in to comment.