Skip to content

Commit

Permalink
fix: sender on finish unexpected close
Browse files Browse the repository at this point in the history
  • Loading branch information
dqhl76 committed Nov 19, 2024
1 parent c2e9dd4 commit 713be70
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ impl AsyncSource for BlockPartitionReceiverSource {
#[async_backtrace::framed]
async fn generate(&mut self) -> Result<Option<DataBlock>> {
match self.meta_receiver.recv().await {
Ok(Ok(part)) => Ok(Some(DataBlock::empty_with_meta(
Ok(Ok(part)) => dbg!(Ok(Some(DataBlock::empty_with_meta(
BlockPartitionMeta::create(vec![part]),
))),
)))),
Ok(Err(e)) => Err(
// The error is occurred in pruning process
e,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::pruning_pipeline::block_prune_result_meta::BlockPruneResult;
use crate::FuseBlockPartInfo;

pub struct SendPartInfoSink {
sender: Sender<Result<PartInfoPtr>>,
sender: Option<Sender<Result<PartInfoPtr>>>,
push_downs: Option<PushDownInfo>,
top_k: Option<(TopK, Scalar)>,
schema: TableSchemaRef,
Expand All @@ -41,7 +41,7 @@ impl SendPartInfoSink {
Ok(ProcessorPtr::create(AsyncSinker::create(
input,
SendPartInfoSink {
sender,
sender: Some(sender),
push_downs,
top_k,
schema,
Expand All @@ -55,7 +55,7 @@ impl AsyncSink for SendPartInfoSink {
const NAME: &'static str = "SendPartInfoSink";

async fn on_finish(&mut self) -> Result<()> {
self.sender.close();
drop(self.sender.take());
Ok(())
}

Expand All @@ -74,11 +74,14 @@ impl AsyncSink for SendPartInfoSink {
}
},
};

for info in info_ptr {
let _ = self.sender.send(Ok(info)).await;
if let Some(sender) = &self.sender {
let _ = dbg!(sender.send(Ok(info)).await);
}
}

return Ok(true);
return Ok(false);
}
}
Err(ErrorCode::Internal(
Expand Down

0 comments on commit 713be70

Please sign in to comment.