Skip to content

Commit

Permalink
feat: API for collecting statistics/index for metadata of a parquet f…
Browse files Browse the repository at this point in the history
…ile + tests (#10537)

* test: some tests to write data to a parquet file and read its metadata

* feat: API to convert parquet stats to arrow stats

* Refine statistics extraction API and tests

* Implement null counts

* port test

* test: add more tests for the arrow statistics

* chore: fix format and test output

* chore: rename test helpers

* chore: Apply suggestions from code review

Co-authored-by: Andrew Lamb <[email protected]>

* Apply suggestions from code review

* Apply suggestions from code review

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
NGA-TRAN and alamb authored May 20, 2024
1 parent 439726f commit b716c09
Show file tree
Hide file tree
Showing 4 changed files with 812 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ mod statistics;

pub use metrics::ParquetFileMetrics;
pub use schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper};
pub use statistics::{RequestedStatistics, StatisticsConverter};

/// Execution plan for scanning one or more Parquet partitions
#[derive(Debug, Clone)]
Expand Down Expand Up @@ -482,7 +483,6 @@ struct ParquetOpener {
impl FileOpener for ParquetOpener {
fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture> {
let file_range = file_meta.range.clone();

let file_metrics = ParquetFileMetrics::new(
self.partition_index,
file_meta.location().as_ref(),
Expand Down
156 changes: 153 additions & 3 deletions datafusion/core/src/datasource/physical_plan/parquet/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@
// TODO: potentially move this to arrow-rs: https://github.com/apache/arrow-rs/issues/4328

use arrow::{array::ArrayRef, datatypes::DataType};
use arrow_array::new_empty_array;
use arrow_schema::{FieldRef, Schema};
use datafusion_common::{Result, ScalarValue};
use arrow_array::{new_empty_array, new_null_array, UInt64Array};
use arrow_schema::{Field, FieldRef, Schema};
use datafusion_common::{
internal_datafusion_err, internal_err, plan_err, Result, ScalarValue,
};
use parquet::file::metadata::ParquetMetaData;
use parquet::file::statistics::Statistics as ParquetStatistics;
use parquet::schema::types::SchemaDescriptor;
use std::sync::Arc;

// Convert the bytes array to i128.
// The endian of the input bytes array must be big-endian.
Expand Down Expand Up @@ -210,6 +214,152 @@ fn collect_scalars<I: Iterator<Item = Option<ScalarValue>>>(
}
}

/// What type of statistics should be extracted?
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum RequestedStatistics {
/// Minimum Value
Min,
/// Maximum Value
Max,
/// Null Count, returned as a [`UInt64Array`])
NullCount,
}

/// Extracts Parquet statistics as Arrow arrays
///
/// This is used to convert Parquet statistics to Arrow arrays, with proper type
/// conversions. This information can be used for pruning parquet files or row
/// groups based on the statistics embedded in parquet files
///
/// # Schemas
///
/// The schema of the parquet file and the arrow schema are used to convert the
/// underlying statistics value (stored as a parquet value) into the
/// corresponding Arrow value. For example, Decimals are stored as binary in
/// parquet files.
///
/// The parquet_schema and arrow _schema do not have to be identical (for
/// example, the columns may be in different orders and one or the other schemas
/// may have additional columns). The function [`parquet_column`] is used to
/// match the column in the parquet file to the column in the arrow schema.
///
/// # Multiple parquet files
///
/// This API is designed to support efficiently extracting statistics from
/// multiple parquet files (hence why the parquet schema is passed in as an
/// argument). This is useful when building an index for a directory of parquet
/// files.
///
#[derive(Debug)]
pub struct StatisticsConverter<'a> {
/// The name of the column to extract statistics for
column_name: &'a str,
/// The type of statistics to extract
statistics_type: RequestedStatistics,
/// The arrow schema of the query
arrow_schema: &'a Schema,
/// The field (with data type) of the column in the arrow schema
arrow_field: &'a Field,
}

impl<'a> StatisticsConverter<'a> {
/// Returns a [`UInt64Array`] with counts for each row group
///
/// The returned array has no nulls, and has one value for each row group.
/// Each value is the number of rows in the row group.
pub fn row_counts(metadata: &ParquetMetaData) -> Result<UInt64Array> {
let row_groups = metadata.row_groups();
let mut builder = UInt64Array::builder(row_groups.len());
for row_group in row_groups {
let row_count = row_group.num_rows();
let row_count: u64 = row_count.try_into().map_err(|e| {
internal_datafusion_err!(
"Parquet row count {row_count} too large to convert to u64: {e}"
)
})?;
builder.append_value(row_count);
}
Ok(builder.finish())
}

/// create an new statistics converter
pub fn try_new(
column_name: &'a str,
statistics_type: RequestedStatistics,
arrow_schema: &'a Schema,
) -> Result<Self> {
// ensure the requested column is in the arrow schema
let Some((_idx, arrow_field)) = arrow_schema.column_with_name(column_name) else {
return plan_err!(
"Column '{}' not found in schema for statistics conversion",
column_name
);
};

Ok(Self {
column_name,
statistics_type,
arrow_schema,
arrow_field,
})
}

/// extract the statistics from a parquet file, given the parquet file's metadata
///
/// The returned array contains 1 value for each row group in the parquet
/// file in order
///
/// Each value is either
/// * the requested statistics type for the column
/// * a null value, if the statistics can not be extracted
///
/// Note that a null value does NOT mean the min or max value was actually
/// `null` it means it the requested statistic is unknown
///
/// Reasons for not being able to extract the statistics include:
/// * the column is not present in the parquet file
/// * statistics for the column are not present in the row group
/// * the stored statistic value can not be converted to the requested type
pub fn extract(&self, metadata: &ParquetMetaData) -> Result<ArrayRef> {
let data_type = self.arrow_field.data_type();
let num_row_groups = metadata.row_groups().len();

let parquet_schema = metadata.file_metadata().schema_descr();
let row_groups = metadata.row_groups();

// find the column in the parquet schema, if not, return a null array
let Some((parquet_idx, matched_field)) =
parquet_column(parquet_schema, self.arrow_schema, self.column_name)
else {
// column was in the arrow schema but not in the parquet schema, so return a null array
return Ok(new_null_array(data_type, num_row_groups));
};

// sanity check that matching field matches the arrow field
if matched_field.as_ref() != self.arrow_field {
return internal_err!(
"Matched column '{:?}' does not match original matched column '{:?}'",
matched_field,
self.arrow_field
);
}

// Get an iterator over the column statistics
let iter = row_groups
.iter()
.map(|x| x.column(parquet_idx).statistics());

match self.statistics_type {
RequestedStatistics::Min => min_statistics(data_type, iter),
RequestedStatistics::Max => max_statistics(data_type, iter),
RequestedStatistics::NullCount => {
let null_counts = iter.map(|stats| stats.map(|s| s.null_count()));
Ok(Arc::new(UInt64Array::from_iter(null_counts)))
}
}
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
Loading

0 comments on commit b716c09

Please sign in to comment.