diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs b/datafusion/physical-plan/src/joins/sort_merge_join.rs index 4c928a3d2d8d..d4cf6864d7e4 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs @@ -1323,7 +1323,9 @@ impl SMJStream { // If join filter exists, `self.output_size` is not accurate as we don't know the exact // number of rows in the output record batch. If streamed row joined with buffered rows, // once join filter is applied, the number of output rows may be more than 1. - if record_batch.num_rows() > self.output_size { + // If `record_batch` is empty, we should reset `self.output_size` to 0. It could be happened + // when the join filter is applied and all rows are filtered out. + if record_batch.num_rows() == 0 || record_batch.num_rows() > self.output_size { self.output_size = 0; } else { self.output_size -= record_batch.num_rows(); diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt b/datafusion/sqllogictest/test_files/sort_merge_join.slt index 09a2aa3e7436..7b7e355fa2b5 100644 --- a/datafusion/sqllogictest/test_files/sort_merge_join.slt +++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt @@ -263,5 +263,22 @@ DROP TABLE t1; statement ok DROP TABLE t2; +# Set batch size to 1 for sort merge join to test scenario when data spread across multiple batches +statement ok +set datafusion.execution.batch_size = 1; + +query II +SELECT * FROM ( + WITH + t1 AS ( + SELECT 12 a, 12 b + ), + t2 AS ( + SELECT 12 a, 12 b + ) + SELECT t1.* FROM t1 JOIN t2 on t1.a = t2.b WHERE t1.a > t2.b +) ORDER BY 1, 2; +---- + statement ok set datafusion.optimizer.prefer_hash_join = true;