diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 347db6cd7..f686c1045 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -48,7 +48,7 @@ use crate::expr::{BoundPredicate, BoundReference}; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::runtime::spawn; use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream}; -use crate::spec::{Datum, Schema}; +use crate::spec::{DataContentType, Datum, Schema}; use crate::utils::available_parallelism; use crate::{Error, ErrorKind}; @@ -210,12 +210,6 @@ impl ArrowReader { )?; record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask); - // RecordBatchTransformer performs any required transformations on the RecordBatches - // that come back from the file, such as type promotion, default column insertion - // and column re-ordering - let mut record_batch_transformer = - RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); - if let Some(batch_size) = batch_size { record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size); } @@ -269,9 +263,22 @@ impl ArrowReader { // to the requester. let mut record_batch_stream = record_batch_stream_builder.build()?; - while let Some(batch) = record_batch_stream.try_next().await? { - tx.send(record_batch_transformer.process_record_batch(batch)) - .await? + // The schema of the xxx file doesn't change, so we don't need to convert the schema. + if matches!(task.data_file_content, DataContentType::PositionDeletes) { + while let Some(batch) = record_batch_stream.try_next().await? { + tx.send(Ok(batch)).await? + } + } else { + // RecordBatchTransformer performs any required transformations on the RecordBatches + // that come back from the file, such as type promotion, default column insertion + // and column re-ordering. + let mut record_batch_transformer = + RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids()); + + while let Some(batch) = record_batch_stream.try_next().await? { + tx.send(record_batch_transformer.process_record_batch(batch)) + .await? + } } Ok(())