Skip to content

Commit

Permalink
Merge branch 'main' into use_const_address_for_flush
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Aug 19, 2024
2 parents c8c6e24 + 17ccad5 commit ac30d66
Showing 1 changed file with 35 additions and 29 deletions.
64 changes: 35 additions & 29 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::sources::Transport;
use crate::tls::{AcceptError, TlsAcceptor};
use crate::transforms::chain::{TransformChain, TransformChainBuilder};
use crate::transforms::{TransformContextBuilder, TransformContextConfig, Wrapper};
use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, Result};
use bytes::BytesMut;
use futures::future::join_all;
use futures::{SinkExt, StreamExt};
Expand Down Expand Up @@ -631,7 +631,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
};

let result = self
.process_messages(&client_details, local_addr, in_rx, out_tx, force_run_chain)
.run_loop(&client_details, local_addr, in_rx, out_tx, force_run_chain)
.await;

// Only flush messages if we are shutting down due to application shutdown
Expand Down Expand Up @@ -670,7 +670,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
}
}

async fn process_messages(
async fn run_loop(
&mut self,
client_details: &str,
local_addr: SocketAddr,
Expand All @@ -683,7 +683,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
while !self.shutdown.is_shutdown() {
// While reading a request frame, also listen for the shutdown signal
debug!("Waiting for message {client_details}");
let responses = tokio::select! {
tokio::select! {
biased;
_ = self.shutdown.recv() => {
// If a shutdown signal is received, return from `run`.
Expand All @@ -696,7 +696,9 @@ impl<C: CodecBuilder + 'static> Handler<C> {
requests.extend(x);
}
debug!("A transform in the chain requested that a chain run occur, requests {:?}", requests);
self.process(local_addr, &out_tx, requests).await?
if let Some(_close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? {
return Ok(())
}
},
requests = Self::receive_with_timeout(self.timeout, &mut in_rx, client_details) => {
match requests {
Expand All @@ -705,56 +707,60 @@ impl<C: CodecBuilder + 'static> Handler<C> {
requests.extend(x);
}
debug!("Received requests from client {:?}", requests);
self.process(local_addr, &out_tx, requests).await?
}
None => {
// Either we timed out the connection or the client disconnected, so terminate this connection
return Ok(())
if let Some(_close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? {
return Ok(())
}
}
// Either we timed out the connection or the client disconnected, so terminate this connection
None => return Ok(()),
}
},
};

// send the result of the process up stream
if !responses.is_empty() {
debug!("sending response to client: {:?}", responses);
if out_tx.send(responses).is_err() {
// the client has disconnected so we should terminate this connection
return Ok(());
}
}
}

Ok(())
}

async fn process(
async fn send_receive_chain(
&mut self,
local_addr: SocketAddr,
out_tx: &mpsc::UnboundedSender<Messages>,
requests: Messages,
) -> Result<Messages> {
) -> Result<Option<CloseReason>> {
self.pending_requests.process_requests(&requests);

let mut wrapper = Wrapper::new_with_addr(requests, local_addr);

match self.chain.process_request(&mut wrapper).await.context(
"Chain failed to send and/or receive messages, the connection will now be closed.",
) {
Ok(x) => {
self.pending_requests.process_responses(&x);
Ok(x)
}
let responses = match self.chain.process_request(&mut wrapper).await {
Ok(x) => x,
Err(err) => {
let err = err.context("Chain failed to send and/or receive messages, the connection will now be closed.");
// The connection is going to be closed once we return Err.
// So first make a best effort attempt of responding to any pending requests with an error response.
out_tx.send(self.pending_requests.to_errors(&err))?;
Err(err)
return Err(err);
}
};
self.pending_requests.process_responses(&responses);

// send the result of the process up stream
if !responses.is_empty() {
debug!("sending response to client: {:?}", responses);
if out_tx.send(responses).is_err() {
// the client has disconnected so we should terminate this connection
return Ok(Some(CloseReason::Generic));
}
}

Ok(None)
}
}

/// Indicates that the connection to the client must be closed.
enum CloseReason {
Generic,
}

/// Listens for the server shutdown signal.
///
/// Shutdown is signaled using a `broadcast::Receiver`. Only a single value is
Expand Down

0 comments on commit ac30d66

Please sign in to comment.