From 317a5345eb1bbe8483a0dcc61a4c01b5ad7ece71 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 27 Sep 2024 00:21:10 -0700 Subject: [PATCH] fix: Use the number of rows from underlying arrays instead of logical row count from RecordBatch (#972) --- .../org/apache/comet/vector/NativeUtil.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala index 33af8662f..72472a540 100644 --- a/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala +++ b/common/src/main/scala/org/apache/comet/vector/NativeUtil.scala @@ -92,11 +92,15 @@ class NativeUtil { arrayAddrs: Array[Long], schemaAddrs: Array[Long], batch: ColumnarBatch): Int = { + val numRows = mutable.ArrayBuffer.empty[Int] + (0 until batch.numCols()).foreach { index => batch.column(index) match { case a: CometVector => val valueVector = a.getValueVector + numRows += valueVector.getValueCount + val provider = if (valueVector.getField.getDictionary != null) { a.getDictionaryProvider } else { @@ -120,7 +124,16 @@ class NativeUtil { } } - batch.numRows() + if (numRows.distinct.length > 1) { + throw new SparkException( + s"Number of rows in each column should be the same, but got [${numRows.distinct}]") + } + + // `ColumnarBatch.numRows` might return a different number than the actual number of rows in + // the Arrow arrays. For example, Iceberg column reader will skip deleted rows internally in + // its `CometVector` implementation. The `ColumnarBatch` returned by the reader will report + // logical number of rows which is less than actual number of rows due to row deletion. + numRows.headOption.getOrElse(batch.numRows()) } /**