From 60f8781478b268883293caf9b2b70d9cf7ca4dad Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 5 Jun 2024 12:38:02 -0600 Subject: [PATCH] simplify code in CometExecIterator and avoid some small overhead --- .../org/apache/comet/CometExecIterator.scala | 41 +++++++------------ 1 file changed, 14 insertions(+), 27 deletions(-) diff --git a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala index b3604c9e0..89225c0d6 100644 --- a/spark/src/main/scala/org/apache/comet/CometExecIterator.scala +++ b/spark/src/main/scala/org/apache/comet/CometExecIterator.scala @@ -67,20 +67,6 @@ class CometExecIterator( private var currentBatch: ColumnarBatch = null private var closed: Boolean = false - private def executeNative(): ExecutionState = { - val result = nativeLib.executePlan(plan) - - val flag = result(0) - if (flag == -1) EOF - else if (flag == 1) { - val numRows = result(1) - val addresses = result.slice(2, result.length) - Batch(numRows = numRows.toInt, addresses = addresses) - } else { - throw new IllegalStateException(s"Invalid native flag: $flag") - } - } - /** * Creates a new configuration map to be passed to the native side. */ @@ -110,21 +96,22 @@ class CometExecIterator( result } - /** Execution result from Comet native */ - trait ExecutionState - - /** A new batch is available */ - case class Batch(numRows: Int, addresses: Array[Long]) extends ExecutionState - - /** The execution is finished - no more batch */ - case object EOF extends ExecutionState - def getNextBatch(): Option[ColumnarBatch] = { - executeNative() match { - case EOF => None - case Batch(numRows, addresses) => + // we execute the native plan each time we need another output batch and this could + // result in multiple input batches being processed + val result = nativeLib.executePlan(plan) + + result(0) match { + case -1 => + // EOF + None + case 1 => + val numRows = result(1) + val addresses = result.slice(2, result.length) val cometVectors = nativeUtil.importVector(addresses) - Some(new ColumnarBatch(cometVectors.toArray, numRows)) + Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt)) + case flag => + throw new IllegalStateException(s"Invalid native flag: $flag") } }