Skip to content

Commit

Permalink
Fix event.
Browse files Browse the repository at this point in the history
  • Loading branch information
RinChanNOWWW committed Dec 21, 2023
1 parent 59b47b0 commit 51df83c
Showing 1 changed file with 9 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ where R: Rows
output: Arc<OutputPort>,

output_data: VecDeque<DataBlock>,
need_data: bool,
}

impl<R> MultiSortMergeProcessor<R>
Expand All @@ -257,11 +258,11 @@ where R: Rows
inputs,
output,
output_data: VecDeque::new(),
need_data: false,
})
}
}

// #[async_trait::async_trait]
impl<R> Processor for MultiSortMergeProcessor<R>
where R: Rows + Send + 'static
{
Expand Down Expand Up @@ -298,13 +299,19 @@ where R: Rows + Send + 'static
return Ok(Event::Finished);
}

Ok(Event::Sync)
if self.need_data {
self.need_data = false;
Ok(Event::NeedData)
} else {
Ok(Event::Sync)
}
}

fn process(&mut self) -> Result<()> {
while let Some(block) = self.merger.next_block()? {
self.output_data.push_back(block);
}
self.need_data = true;
Ok(())
}
}

0 comments on commit 51df83c

Please sign in to comment.