diff --git a/ci/scripts/e2e-iceberg-sink-v2-test.sh b/ci/scripts/e2e-iceberg-sink-v2-test.sh index c039c625aa213..bcb530ae9fdd9 100755 --- a/ci/scripts/e2e-iceberg-sink-v2-test.sh +++ b/ci/scripts/e2e-iceberg-sink-v2-test.sh @@ -46,7 +46,9 @@ poetry run python main.py -t ./test_case/range_partition_append_only.toml poetry run python main.py -t ./test_case/range_partition_upsert.toml poetry run python main.py -t ./test_case/append_only_with_checkpoint_interval.toml poetry run python main.py -t ./test_case/iceberg_select_empty_table.toml -poetry run python main.py -t ./test_case/iceberg_source_eq_delete.toml +poetry run python main.py -t ./test_case/iceberg_source_equality_delete.toml +poetry run python main.py -t ./test_case/iceberg_source_position_delete.toml +poetry run python main.py -t ./test_case/iceberg_source_all_delete.toml echo "--- Kill cluster" diff --git a/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt new file mode 100644 index 0000000000000..6b4f984dabdc7 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_source_all_delete.slt @@ -0,0 +1,101 @@ +# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s. +statement ok +set streaming_parallelism=4; + +statement ok +CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar); + +statement ok +CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1; + +statement ok +CREATE SINK sink1 AS select * from mv1 WITH ( + connector = 'iceberg', + type = 'upsert', + database.name = 'demo_db', + table.name = 'test_all_delete', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3a://hummock001/iceberg-data', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + create_table_if_not_exists = 'true', + commit_checkpoint_interval = 5, + primary_key = 'i1,i2', +); + +statement ok +INSERT INTO s1 (i1, i2, i3) +SELECT s, s::text, s::text FROM generate_series(1, 10000) s; + +statement ok +flush + +statement ok +DELETE FROM s1 +WHERE i1 IN ( + SELECT s + FROM generate_series(1, 10000, 2) s +); + +sleep 10s + +statement ok +CREATE SOURCE iceberg_t1_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + catalog.type = 'storage', + warehouse.path = 's3a://hummock001/iceberg-data', + database.name = 'demo_db', + table.name = 'test_all_delete', +); + +statement ok +DELETE FROM s1 +WHERE i1 IN ( + SELECT s + FROM generate_series(1, 10000, 3) s +); + +statement ok +flush + +sleep 15s + +query I +select * from iceberg_t1_source order by i1 limit 5; +---- +2 2 2 +6 6 6 +8 8 8 +12 12 12 +14 14 14 + +query I +select * from iceberg_t1_source order by i1 desc limit 5; +---- +9998 9998 9998 +9996 9996 9996 +9992 9992 9992 +9990 9990 9990 +9986 9986 9986 + +query I +select count(*) from iceberg_t1_source +---- +3333 + +statement ok +DROP SINK sink1; + +statement ok +DROP SOURCE iceberg_t1_source; + +statement ok +DROP TABLE s1 cascade; \ No newline at end of file diff --git a/e2e_test/iceberg/test_case/iceberg_source_all_delete.toml b/e2e_test/iceberg/test_case/iceberg_source_all_delete.toml new file mode 100644 index 0000000000000..d1611a7fdce69 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_source_all_delete.toml @@ -0,0 +1,11 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.test_all_delete', +] + +slt = 'test_case/iceberg_source_all_delete.slt' + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.test_all_delete', + 'DROP SCHEMA IF EXISTS demo_db', +] \ No newline at end of file diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml b/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml deleted file mode 100644 index 6e49ca949f501..0000000000000 --- a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.toml +++ /dev/null @@ -1,11 +0,0 @@ -init_sqls = [ - 'CREATE SCHEMA IF NOT EXISTS demo_db', - 'DROP TABLE IF EXISTS demo_db.t1', -] - -slt = 'test_case/iceberg_source_eq_delete.slt' - -drop_sqls = [ - 'DROP TABLE IF EXISTS demo_db.t1', - 'DROP SCHEMA IF EXISTS demo_db', -] \ No newline at end of file diff --git a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_equality_delete.slt similarity index 88% rename from e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt rename to e2e_test/iceberg/test_case/iceberg_source_equality_delete.slt index 820776fb7e773..975ced803d1f2 100644 --- a/e2e_test/iceberg/test_case/iceberg_source_eq_delete.slt +++ b/e2e_test/iceberg/test_case/iceberg_source_equality_delete.slt @@ -1,3 +1,4 @@ +# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = false, so we'll commit every 1s. statement ok set sink_decouple = false; @@ -15,7 +16,7 @@ CREATE SINK sink1 AS select * from mv1 WITH ( connector = 'iceberg', type = 'upsert', database.name = 'demo_db', - table.name = 't1', + table.name = 'test_equality_delete', catalog.name = 'demo', catalog.type = 'storage', warehouse.path = 's3a://icebergdata/demo', @@ -58,7 +59,7 @@ WITH ( catalog.type = 'storage', warehouse.path = 's3a://icebergdata/demo', database.name = 'demo_db', - table.name = 't1', + table.name = 'test_equality_delete', ); query I diff --git a/e2e_test/iceberg/test_case/iceberg_source_equality_delete.toml b/e2e_test/iceberg/test_case/iceberg_source_equality_delete.toml new file mode 100644 index 0000000000000..e28b4d406a297 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_source_equality_delete.toml @@ -0,0 +1,11 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.test_equality_delete', +] + +slt = 'test_case/iceberg_source_equality_delete.slt' + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.test_equality_delete', + 'DROP SCHEMA IF EXISTS demo_db', +] \ No newline at end of file diff --git a/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt b/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt new file mode 100644 index 0000000000000..40d665d7144e3 --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_source_position_delete.slt @@ -0,0 +1,89 @@ +# Deletions in a single commit are posistion delete, deletions across multiple commits are equail delete. sink_decouple = default(true), so we'll commit every 10s. +statement ok +set streaming_parallelism=4; + +statement ok +CREATE TABLE s1 (i1 int, i2 varchar, i3 varchar); + +statement ok +CREATE MATERIALIZED VIEW mv1 AS SELECT * FROM s1; + +statement ok +CREATE SINK sink1 AS select * from mv1 WITH ( + connector = 'iceberg', + type = 'upsert', + database.name = 'demo_db', + table.name = 'test_position_delete', + catalog.name = 'demo', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + create_table_if_not_exists = 'true', + commit_checkpoint_interval = 5, + primary_key = 'i1,i2', +); + +statement ok +INSERT INTO s1 (i1, i2, i3) +SELECT s, s::text, s::text FROM generate_series(1, 10000) s; + +statement ok +flush + +statement ok +DELETE FROM s1 +WHERE i1 IN ( + SELECT s + FROM generate_series(1, 10000, 2) s +); + +sleep 15s + +statement ok +CREATE SOURCE iceberg_t1_source +WITH ( + connector = 'iceberg', + s3.endpoint = 'http://127.0.0.1:9301', + s3.region = 'us-east-1', + s3.access.key = 'hummockadmin', + s3.secret.key = 'hummockadmin', + catalog.type = 'storage', + warehouse.path = 's3a://icebergdata/demo', + database.name = 'demo_db', + table.name = 'test_position_delete', +); + +query I +select * from iceberg_t1_source order by i1 limit 5; +---- +2 2 2 +4 4 4 +6 6 6 +8 8 8 +10 10 10 + +query I +select * from iceberg_t1_source order by i1 desc limit 5; +---- +10000 10000 10000 +9998 9998 9998 +9996 9996 9996 +9994 9994 9994 +9992 9992 9992 + +query I +select count(*) from iceberg_t1_source +---- +5000 + +statement ok +DROP SINK sink1; + +statement ok +DROP SOURCE iceberg_t1_source; + +statement ok +DROP TABLE s1 cascade; diff --git a/e2e_test/iceberg/test_case/iceberg_source_position_delete.toml b/e2e_test/iceberg/test_case/iceberg_source_position_delete.toml new file mode 100644 index 0000000000000..398d6546ffa3f --- /dev/null +++ b/e2e_test/iceberg/test_case/iceberg_source_position_delete.toml @@ -0,0 +1,11 @@ +init_sqls = [ + 'CREATE SCHEMA IF NOT EXISTS demo_db', + 'DROP TABLE IF EXISTS demo_db.test_position_delete', +] + +slt = 'test_case/iceberg_source_position_delete.slt' + +drop_sqls = [ + 'DROP TABLE IF EXISTS demo_db.test_position_delete', + 'DROP SCHEMA IF EXISTS demo_db', +] \ No newline at end of file diff --git a/src/batch/src/executor/iceberg_scan.rs b/src/batch/src/executor/iceberg_scan.rs index 2f67d8ce005aa..6272384435db9 100644 --- a/src/batch/src/executor/iceberg_scan.rs +++ b/src/batch/src/executor/iceberg_scan.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::ops::BitAnd; use std::collections::HashMap; use std::mem; @@ -19,12 +20,13 @@ use futures_async_stream::try_stream; use futures_util::stream::StreamExt; use iceberg::scan::FileScanTask; use iceberg::spec::TableMetadata; +use iceberg::table::Table; use itertools::Itertools; use risingwave_common::array::arrow::IcebergArrowConvert; use risingwave_common::bitmap::Bitmap; use risingwave_common::catalog::{Field, Schema}; use risingwave_common::row::{OwnedRow, Row}; -use risingwave_common::types::DataType; +use risingwave_common::types::{DataType, ScalarRefImpl}; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_connector::sink::iceberg::IcebergConfig; use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit}; @@ -37,13 +39,16 @@ use crate::error::BatchError; use crate::executor::{DataChunk, Executor}; use crate::task::BatchTaskContext; +static POSITION_DELETE_FILE_FILE_PATH_INDEX: usize = 0; +static POSITION_DELETE_FILE_POS: usize = 1; pub struct IcebergScanExecutor { iceberg_config: IcebergConfig, #[allow(dead_code)] snapshot_id: Option, table_meta: TableMetadata, data_file_scan_tasks: Vec, - eq_delete_file_scan_tasks: Vec, + equality_delete_file_scan_tasks: Vec, + position_delete_file_scan_tasks: Vec, batch_size: usize, schema: Schema, identity: String, @@ -69,7 +74,8 @@ impl IcebergScanExecutor { snapshot_id: Option, table_meta: TableMetadata, data_file_scan_tasks: Vec, - eq_delete_file_scan_tasks: Vec, + equality_delete_file_scan_tasks: Vec, + position_delete_file_scan_tasks: Vec, batch_size: usize, schema: Schema, identity: String, @@ -79,7 +85,8 @@ impl IcebergScanExecutor { snapshot_id, table_meta, data_file_scan_tasks, - eq_delete_file_scan_tasks, + equality_delete_file_scan_tasks, + position_delete_file_scan_tasks, batch_size, schema, identity, @@ -95,77 +102,29 @@ impl IcebergScanExecutor { let data_types = self.schema.data_types(); let executor_schema_names = self.schema.names(); - let mut eq_delete_file_scan_tasks_map: HashMap = HashMap::default(); - let eq_delete_file_scan_tasks = mem::take(&mut self.eq_delete_file_scan_tasks); - - // Build hash map for equality delete files - // Currently, all equality delete files have the same schema which is guaranteed by `IcebergSplitEnumerator`. - let mut eq_delete_ids: Option> = None; - for eq_delete_file_scan_task in eq_delete_file_scan_tasks { - let mut sequence_number = eq_delete_file_scan_task.sequence_number; - - if eq_delete_ids.is_none() { - eq_delete_ids = Some(eq_delete_file_scan_task.project_field_ids.clone()); - } else { - debug_assert_eq!( - eq_delete_ids.as_ref().unwrap(), - &eq_delete_file_scan_task.project_field_ids - ); - } - - let reader = table - .reader_builder() - .with_batch_size(self.batch_size) - .build(); - let delete_file_scan_stream = tokio_stream::once(Ok(eq_delete_file_scan_task)); - - let mut delete_record_batch_stream = reader - .read(Box::pin(delete_file_scan_stream)) - .map_err(BatchError::Iceberg)?; - - while let Some(record_batch) = delete_record_batch_stream.next().await { - let record_batch = record_batch.map_err(BatchError::Iceberg)?; - - let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - for row in chunk.rows() { - let entry = eq_delete_file_scan_tasks_map - .entry(row.to_owned_row()) - .or_default(); - *entry = *entry.max(&mut sequence_number); - } - } - } - let data_file_scan_tasks = mem::take(&mut self.data_file_scan_tasks); + let mut position_delete_filter = PositionDeleteFilter::new( + mem::take(&mut self.position_delete_file_scan_tasks), + &data_file_scan_tasks, + &table, + self.batch_size, + ) + .await?; + let mut equality_delete_filter = EqualityDeleteFilter::new( + mem::take(&mut self.equality_delete_file_scan_tasks), + &table, + self.batch_size, + executor_schema_names, + ) + .await?; + // Delete rows in the data file that need to be deleted by map for data_file_scan_task in data_file_scan_tasks { + let data_file_path = data_file_scan_task.data_file_path.clone(); let data_sequence_number = data_file_scan_task.sequence_number; - let data_chunk_column_names: Vec<_> = data_file_scan_task - .project_field_ids - .iter() - .filter_map(|id| { - data_file_scan_task - .schema - .name_by_field_id(*id) - .map(|name| name.to_string()) - }) - .collect(); - - // eq_delete_column_idxes are used to fetch equality delete columns from data files. - let eq_delete_column_idxes = eq_delete_ids.as_ref().map(|eq_delete_ids| { - eq_delete_ids - .iter() - .map(|eq_delete_id| { - data_file_scan_task - .project_field_ids - .iter() - .position(|project_field_id| eq_delete_id == project_field_id) - .expect("eq_delete_id not found in delete_equality_ids") - }) - .collect_vec() - }); + equality_delete_filter.apply_data_file_scan_task(&data_file_scan_task); let reader = table .reader_builder() @@ -175,54 +134,21 @@ impl IcebergScanExecutor { let mut record_batch_stream = reader .read(Box::pin(file_scan_stream)) - .map_err(BatchError::Iceberg)?; + .map_err(BatchError::Iceberg)? + .enumerate(); - while let Some(record_batch) = record_batch_stream.next().await { + while let Some((index, record_batch)) = record_batch_stream.next().await { let record_batch = record_batch.map_err(BatchError::Iceberg)?; let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; - let chunk = match eq_delete_column_idxes.as_ref() { - Some(delete_column_ids) => { - let visibility = Bitmap::from_iter( - // Project with the schema of the delete file - chunk.project(delete_column_ids).rows().map(|row_ref| { - let row = row_ref.to_owned_row(); - if let Some(delete_sequence_number) = - eq_delete_file_scan_tasks_map.get(&row) - && delete_sequence_number > &data_sequence_number - { - // delete_sequence_number > data_sequence_number means the delete file is written later than data file, - // so it needs to be deleted - false - } else { - true - } - }), - ) - .clone(); - // Keep the schema consistent(chunk and executor) - // Filter out (equality delete) columns that are not in the executor schema - let data = chunk - .columns() - .iter() - .zip_eq_fast(&data_chunk_column_names) - .filter_map(|(array, columns)| { - if executor_schema_names.contains(columns) { - Some(array.clone()) - } else { - None - } - }) - .collect_vec(); - let chunk = DataChunk::new(data, visibility); - debug_assert_eq!(chunk.data_types(), data_types); - chunk - } - // If there is no delete file, the data file is directly output - None => chunk, - }; + // position delete + let chunk = position_delete_filter.filter(&data_file_path, chunk, index); + // equality delete + let chunk = equality_delete_filter.filter(chunk, data_sequence_number)?; + assert_eq!(chunk.data_types(), data_types); yield chunk; } + position_delete_filter.remove_file_path(&data_file_path); } } } @@ -282,7 +208,12 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { split.table_meta.deserialize(), split.files.into_iter().map(|x| x.deserialize()).collect(), split - .eq_delete_files + .equality_delete_files + .into_iter() + .map(|x| x.deserialize()) + .collect(), + split + .position_delete_files .into_iter() .map(|x| x.deserialize()) .collect(), @@ -295,3 +226,243 @@ impl BoxedExecutorBuilder for IcebergScanExecutorBuilder { } } } + +struct PositionDeleteFilter { + // Delete columns pos for each file path, false means this column needs to be deleted, value is divided by batch size + position_delete_file_path_pos_map: HashMap>>, +} + +impl PositionDeleteFilter { + async fn new( + position_delete_file_scan_tasks: Vec, + data_file_scan_tasks: &[FileScanTask], + table: &Table, + batch_size: usize, + ) -> crate::error::Result { + let mut position_delete_file_path_pos_map: HashMap>> = + HashMap::default(); + let data_file_path_set = data_file_scan_tasks + .iter() + .map(|data_file_scan_task| data_file_scan_task.data_file_path.as_ref()) + .collect::>(); + + let position_delete_file_scan_stream = { + #[try_stream] + async move { + for position_delete_file_scan_task in position_delete_file_scan_tasks { + yield position_delete_file_scan_task; + } + } + }; + + let reader = table.reader_builder().with_batch_size(batch_size).build(); + + let mut record_batch_stream = reader + .read(Box::pin(position_delete_file_scan_stream)) + .map_err(BatchError::Iceberg)?; + + while let Some(record_batch) = record_batch_stream.next().await { + let record_batch = record_batch.map_err(BatchError::Iceberg)?; + let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; + for row in chunk.rows() { + // The schema is fixed. `0` must be `file_path`, `1` must be `pos`. + if let Some(ScalarRefImpl::Utf8(file_path)) = + row.datum_at(POSITION_DELETE_FILE_FILE_PATH_INDEX) + && let Some(ScalarRefImpl::Int64(pos)) = row.datum_at(POSITION_DELETE_FILE_POS) + { + if !data_file_path_set.contains(file_path) { + continue; + } + let entry = position_delete_file_path_pos_map + .entry(file_path.to_string()) + .or_default(); + // Split `pos` by `batch_size`, because the data file will also be split by `batch_size` + let delete_vec_index = pos as usize / batch_size; + let delete_vec_pos = pos as usize % batch_size; + let delete_vec = entry + .entry(delete_vec_index) + .or_insert(vec![true; batch_size]); + delete_vec[delete_vec_pos] = false; + } else { + bail!("position delete `file_path` and `pos` must be string and int64") + } + } + } + Ok(Self { + position_delete_file_path_pos_map, + }) + } + + fn filter(&self, data_file_path: &str, mut chunk: DataChunk, index: usize) -> DataChunk { + chunk = chunk.compact(); + if let Some(position_delete_bool_iter) = self + .position_delete_file_path_pos_map + .get(data_file_path) + .and_then(|delete_vecs| delete_vecs.get(&index)) + { + // Some chunks are less than `batch_size`, so we need to be truncate to ensure that the bitmap length is consistent + let position_delete_bool_iter = if position_delete_bool_iter.len() > chunk.capacity() { + &position_delete_bool_iter[..chunk.capacity()] + } else { + position_delete_bool_iter + }; + let new_visibility = Bitmap::from_bool_slice(position_delete_bool_iter); + chunk.set_visibility(chunk.visibility().bitand(new_visibility)); + chunk + } else { + chunk + } + } + + fn remove_file_path(&mut self, file_path: &str) { + self.position_delete_file_path_pos_map.remove(file_path); + } +} + +struct EqualityDeleteFilter { + // The `seq_num` corresponding to each row in the equality delete file + equality_delete_rows_seq_num_map: HashMap, + // The field ids of the equality delete columns + equality_delete_ids: Option>, + // In chunk, the indexes of the equality delete columns + equality_delete_column_idxes: Option>, + // The schema of the data file, which is the intersection of the output shema and the equality delete columns + data_chunk_column_names: Option>, + // Column names for the output schema so that columns can be trimmed after filter + executor_schema_names: Vec, +} + +impl EqualityDeleteFilter { + async fn new( + equality_delete_file_scan_tasks: Vec, + table: &Table, + batch_size: usize, + executor_schema_names: Vec, + ) -> crate::error::Result { + let mut equality_delete_rows_seq_num_map: HashMap = HashMap::default(); + + // Build hash map for equality delete files + // Currently, all equality delete files have the same schema which is guaranteed by `IcebergSplitEnumerator`. + let mut equality_delete_ids: Option> = None; + for equality_delete_file_scan_task in equality_delete_file_scan_tasks { + let mut sequence_number = equality_delete_file_scan_task.sequence_number; + + if equality_delete_ids.is_none() { + equality_delete_ids = + Some(equality_delete_file_scan_task.project_field_ids.clone()); + } else { + debug_assert_eq!( + equality_delete_ids.as_ref().unwrap(), + &equality_delete_file_scan_task.project_field_ids + ); + } + + let reader = table.reader_builder().with_batch_size(batch_size).build(); + let delete_file_scan_stream = tokio_stream::once(Ok(equality_delete_file_scan_task)); + + let mut delete_record_batch_stream = reader + .read(Box::pin(delete_file_scan_stream)) + .map_err(BatchError::Iceberg)?; + + while let Some(record_batch) = delete_record_batch_stream.next().await { + let record_batch = record_batch.map_err(BatchError::Iceberg)?; + + let chunk = IcebergArrowConvert.chunk_from_record_batch(&record_batch)?; + for row in chunk.rows() { + let entry = equality_delete_rows_seq_num_map + .entry(row.to_owned_row()) + .or_default(); + *entry = *entry.max(&mut sequence_number); + } + } + } + Ok(Self { + equality_delete_rows_seq_num_map, + equality_delete_ids, + equality_delete_column_idxes: None, + data_chunk_column_names: None, + executor_schema_names, + }) + } + + fn apply_data_file_scan_task(&mut self, data_file_scan_task: &FileScanTask) { + if let Some(equality_delete_ids) = &self.equality_delete_ids { + self.data_chunk_column_names = Some( + data_file_scan_task + .project_field_ids + .iter() + .filter_map(|id| { + data_file_scan_task + .schema + .name_by_field_id(*id) + .map(|name| name.to_string()) + }) + .collect(), + ); + // eq_delete_column_idxes are used to fetch equality delete columns from data files. + self.equality_delete_column_idxes = Some( + equality_delete_ids + .iter() + .map(|equality_delete_id| { + data_file_scan_task + .project_field_ids + .iter() + .position(|project_field_id| equality_delete_id == project_field_id) + .expect("equality_delete_id not found in delete_equality_ids") + }) + .collect_vec(), + ); + } + } + + fn filter( + &self, + mut chunk: DataChunk, + data_sequence_number: i64, + ) -> crate::error::Result { + chunk = chunk.compact(); + match self.equality_delete_column_idxes.as_ref() { + Some(delete_column_ids) => { + let new_visibility = Bitmap::from_iter( + // Project with the schema of the delete file + chunk.project(delete_column_ids).rows().map(|row_ref| { + let row = row_ref.to_owned_row(); + if let Some(delete_sequence_number) = + self.equality_delete_rows_seq_num_map.get(&row) + && delete_sequence_number > &data_sequence_number + { + // delete_sequence_number > data_sequence_number means the delete file is written later than data file, + // so it needs to be deleted + false + } else { + true + } + }), + ) + .clone(); + let Some(ref data_chunk_column_names) = self.data_chunk_column_names else { + bail!("data_chunk_column_names is not set") + }; + + // Keep the schema consistent(chunk and executor) + // Filter out (equality delete) columns that are not in the executor schema + let (data, old_visibility) = chunk.into_parts_v2(); + let data = data + .iter() + .zip_eq_fast(data_chunk_column_names) + .filter_map(|(array, columns)| { + if self.executor_schema_names.contains(columns) { + Some(array.clone()) + } else { + None + } + }) + .collect_vec(); + let chunk = DataChunk::new(data, old_visibility.bitand(new_visibility)); + Ok(chunk) + } + // If there is no delete file, the data file is directly output + None => Ok(chunk), + } + } +} diff --git a/src/connector/src/source/iceberg/mod.rs b/src/connector/src/source/iceberg/mod.rs index 845ffb66804d3..0b271d7e4875a 100644 --- a/src/connector/src/source/iceberg/mod.rs +++ b/src/connector/src/source/iceberg/mod.rs @@ -145,7 +145,8 @@ pub struct IcebergSplit { pub snapshot_id: i64, pub table_meta: TableMetadataJsonStr, pub files: Vec, - pub eq_delete_files: Vec, + pub equality_delete_files: Vec, + pub position_delete_files: Vec, } impl SplitMetaData for IcebergSplit { @@ -214,10 +215,11 @@ impl IcebergSplitEnumerator { // If there is no snapshot, we will return a mock `IcebergSplit` with empty files. return Ok(vec![IcebergSplit { split_id: 0, - snapshot_id: 0, // unused + snapshot_id: 0, table_meta: TableMetadataJsonStr::serialize(table.metadata()), files: vec![], - eq_delete_files: vec![], + equality_delete_files: vec![], + position_delete_files: vec![], }]); } @@ -255,11 +257,12 @@ impl IcebergSplitEnumerator { current_snapshot.unwrap().snapshot_id() } }; - let require_names = Self::get_require_field_names(&table, snapshot_id, schema).await?; - let mut data_files = vec![]; - let mut eq_delete_files = vec![]; + let require_names = Self::get_require_field_names(&table, snapshot_id, &schema).await?; + let mut position_delete_files = vec![]; + let mut data_files = vec![]; + let mut equality_delete_files = vec![]; let scan = table .scan() .snapshot_id(snapshot_id) @@ -278,10 +281,11 @@ impl IcebergSplitEnumerator { } iceberg::spec::DataContentType::EqualityDeletes => { task.project_field_ids = task.equality_ids.clone(); - eq_delete_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); + equality_delete_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); } iceberg::spec::DataContentType::PositionDeletes => { - bail!("Position delete file is not supported") + task.project_field_ids = Vec::default(); + position_delete_files.push(IcebergFileScanTaskJsonStr::serialize(&task)); } } } @@ -301,7 +305,9 @@ impl IcebergSplitEnumerator { snapshot_id, table_meta: table_meta.clone(), files: data_files[start..end].to_vec(), - eq_delete_files: eq_delete_files.clone(), + // Todo: Can be divided by position to prevent the delete file from being read multiple times + equality_delete_files: equality_delete_files.clone(), + position_delete_files: position_delete_files.clone(), }; splits.push(split); } @@ -316,10 +322,13 @@ impl IcebergSplitEnumerator { .collect_vec()) } + /// The required field names are the intersection of the output shema and the equality delete columns. + /// This method will ensure that the order of the columns in the output schema remains unchanged, + /// after which there is no need to re order, just delete the equality delete columns. async fn get_require_field_names( table: &Table, snapshot_id: i64, - rw_schema: Schema, + rw_schema: &Schema, ) -> ConnectorResult> { let scan = table .scan()