Skip to content

Commit

Permalink
Revert "reorder record batch"
Browse files Browse the repository at this point in the history
This reverts commit 66d7c4a.
  • Loading branch information
chenzl25 authored and ZENOTME committed Nov 29, 2024
1 parent fb24ad0 commit 3d7dfdc
Showing 1 changed file with 4 additions and 23 deletions.
27 changes: 4 additions & 23 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,7 @@ impl ArrowReader {

// Create a projection mask for the batch stream to select which columns in the
// Parquet file that we want in the response
// Since Parquet projection mask will lose the order of the columns, we need to reorder.
let (projection_mask, reorder) = Self::get_arrow_projection_mask(
let projection_mask = Self::get_arrow_projection_mask(
&task.project_field_ids,
&task.schema,
record_batch_stream_builder.parquet_schema(),
Expand Down Expand Up @@ -271,11 +270,6 @@ impl ArrowReader {
let mut record_batch_stream = record_batch_stream_builder.build()?;

while let Some(batch) = record_batch_stream.try_next().await? {
let batch = if let Some(reorder) = reorder.as_ref() {
batch.project(&reorder).expect("must be able to reorder")
} else {
batch
};
tx.send(record_batch_transformer.process_record_batch(batch))
.await?
}
Expand Down Expand Up @@ -304,9 +298,9 @@ impl ArrowReader {
iceberg_schema_of_task: &Schema,
parquet_schema: &SchemaDescriptor,
arrow_schema: &ArrowSchemaRef,
) -> Result<(ProjectionMask, Option<Vec<usize>>)> {
) -> Result<ProjectionMask> {
if field_ids.is_empty() {
Ok((ProjectionMask::all(), None))
Ok(ProjectionMask::all())
} else {
// Build the map between field id and column index in Parquet schema.
let mut column_map = HashMap::new();
Expand Down Expand Up @@ -364,20 +358,7 @@ impl ArrowReader {
));
}
}

// projection mask is order by indices
let mut mask_indices = indices.clone();
mask_indices.sort_by_key(|&x| x);
// try to reorder the mask_indices to indices
let reorder = indices
.iter()
.map(|idx| mask_indices.iter().position(|&i| i == *idx).unwrap())
.collect::<Vec<_>>();

Ok((
ProjectionMask::roots(parquet_schema, indices),
Some(reorder),
))
Ok(ProjectionMask::roots(parquet_schema, indices))
}
}

Expand Down

0 comments on commit 3d7dfdc

Please sign in to comment.