Skip to content

Commit

Permalink
fix(cluster): close flight channel in drop
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Dec 15, 2023
1 parent 61b4af4 commit 3549a5e
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 7 deletions.
14 changes: 11 additions & 3 deletions src/query/service/src/api/rpc/exchange/exchange_sink_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ impl ExchangeWriterSink {
}
}

impl Drop for ExchangeWriterSink {
fn drop(&mut self) {
self.flight_sender.close();
}
}

#[async_trait::async_trait]
impl AsyncSink for ExchangeWriterSink {
const NAME: &'static str = "ExchangeWriterSink";
Expand Down Expand Up @@ -85,9 +91,11 @@ impl AsyncSink for ExchangeWriterSink {
for packet in serialize_meta.packet {
bytes += packet.bytes_size();
if let Err(error) = self.flight_sender.send(packet).await {
if error.code() == ErrorCode::ABORTED_QUERY {
return Ok(true);
}
self.flight_sender.close();

// if error.code() == ErrorCode::ABORTED_QUERY {
// return Ok(true);
// }

return Err(error);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,12 @@ impl ExchangeSourceReader {
}
}

impl Drop for ExchangeSourceReader {
fn drop(&mut self) {
self.flight_receiver.close();
}
}

#[async_trait::async_trait]
impl Processor for ExchangeSourceReader {
fn name(&self) -> String {
Expand Down
4 changes: 0 additions & 4 deletions src/query/service/src/api/rpc/flight_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,10 +225,6 @@ impl FlightSender {
FlightSender { tx }
}

pub fn is_closed(&self) -> bool {
self.tx.is_closed()
}

#[async_backtrace::framed]
pub async fn send(&self, data: DataPacket) -> Result<()> {
if let Err(_cause) = self.tx.send(Ok(FlightData::try_from(data)?)).await {
Expand Down

0 comments on commit 3549a5e

Please sign in to comment.