diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index 9b56e6550f0dd..c33f7ef10e258 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -233,6 +233,7 @@ where R: Rows output: Arc, output_data: VecDeque, + need_data: bool, } impl MultiSortMergeProcessor @@ -257,11 +258,11 @@ where R: Rows inputs, output, output_data: VecDeque::new(), + need_data: false, }) } } -// #[async_trait::async_trait] impl Processor for MultiSortMergeProcessor where R: Rows + Send + 'static { @@ -298,13 +299,19 @@ where R: Rows + Send + 'static return Ok(Event::Finished); } - Ok(Event::Sync) + if self.need_data { + self.need_data = false; + Ok(Event::NeedData) + } else { + Ok(Event::Sync) + } } fn process(&mut self) -> Result<()> { while let Some(block) = self.merger.next_block()? { self.output_data.push_back(block); } + self.need_data = true; Ok(()) } }