Skip to content

Commit

Permalink
fix(delete): fix position delete (#8)
Browse files Browse the repository at this point in the history
* fix position

* fix comm

* fix comm
  • Loading branch information
xxhZs authored and ZENOTME committed Nov 29, 2024
1 parent 3d7dfdc commit aa3a909
Showing 1 changed file with 17 additions and 10 deletions.
27 changes: 17 additions & 10 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ use crate::expr::{BoundPredicate, BoundReference};
use crate::io::{FileIO, FileMetadata, FileRead};
use crate::runtime::spawn;
use crate::scan::{ArrowRecordBatchStream, FileScanTask, FileScanTaskStream};
use crate::spec::{Datum, Schema};
use crate::spec::{DataContentType, Datum, Schema};
use crate::utils::available_parallelism;
use crate::{Error, ErrorKind};

Expand Down Expand Up @@ -210,12 +210,6 @@ impl ArrowReader {
)?;
record_batch_stream_builder = record_batch_stream_builder.with_projection(projection_mask);

// RecordBatchTransformer performs any required transformations on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());

if let Some(batch_size) = batch_size {
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);
}
Expand Down Expand Up @@ -269,9 +263,22 @@ impl ArrowReader {
// to the requester.
let mut record_batch_stream = record_batch_stream_builder.build()?;

while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(record_batch_transformer.process_record_batch(batch))
.await?
// The schema of the xxx file doesn't change, so we don't need to convert the schema.
if matches!(task.data_file_content, DataContentType::PositionDeletes) {
while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(Ok(batch)).await?
}
} else {
// RecordBatchTransformer performs any required transformations on the RecordBatches
// that come back from the file, such as type promotion, default column insertion
// and column re-ordering.
let mut record_batch_transformer =
RecordBatchTransformer::build(task.schema_ref(), task.project_field_ids());

while let Some(batch) = record_batch_stream.try_next().await? {
tx.send(record_batch_transformer.process_record_batch(batch))
.await?
}
}

Ok(())
Expand Down

0 comments on commit aa3a909

Please sign in to comment.