Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server.rs - move response handling into .process() #1727

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::sources::Transport;
use crate::tls::{AcceptError, TlsAcceptor};
use crate::transforms::chain::{TransformChain, TransformChainBuilder};
use crate::transforms::{TransformContextBuilder, TransformContextConfig, Wrapper};
use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, Result};
use bytes::BytesMut;
use futures::future::join_all;
use futures::{SinkExt, StreamExt};
Expand Down Expand Up @@ -631,7 +631,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
};

let result = self
.process_messages(&client_details, local_addr, in_rx, out_tx, force_run_chain)
.run_loop(&client_details, local_addr, in_rx, out_tx, force_run_chain)
.await;

// Only flush messages if we are shutting down due to application shutdown
Expand Down Expand Up @@ -670,7 +670,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
}
}

async fn process_messages(
async fn run_loop(
&mut self,
client_details: &str,
local_addr: SocketAddr,
Expand All @@ -683,7 +683,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
while !self.shutdown.is_shutdown() {
// While reading a request frame, also listen for the shutdown signal
debug!("Waiting for message {client_details}");
let responses = tokio::select! {
tokio::select! {
biased;
_ = self.shutdown.recv() => {
// If a shutdown signal is received, return from `run`.
Expand All @@ -696,7 +696,9 @@ impl<C: CodecBuilder + 'static> Handler<C> {
requests.extend(x);
}
debug!("A transform in the chain requested that a chain run occur, requests {:?}", requests);
self.process(local_addr, &out_tx, requests).await?
if let Some(_close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? {
return Ok(())
}
},
requests = Self::receive_with_timeout(self.timeout, &mut in_rx, client_details) => {
match requests {
Expand All @@ -705,56 +707,60 @@ impl<C: CodecBuilder + 'static> Handler<C> {
requests.extend(x);
}
debug!("Received requests from client {:?}", requests);
self.process(local_addr, &out_tx, requests).await?
}
None => {
// Either we timed out the connection or the client disconnected, so terminate this connection
return Ok(())
if let Some(_close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? {
return Ok(())
}
}
// Either we timed out the connection or the client disconnected, so terminate this connection
None => return Ok(()),
}
},
};

// send the result of the process up stream
if !responses.is_empty() {
debug!("sending response to client: {:?}", responses);
if out_tx.send(responses).is_err() {
// the client has disconnected so we should terminate this connection
return Ok(());
}
}
}

Ok(())
}

async fn process(
async fn send_receive_chain(
&mut self,
local_addr: SocketAddr,
out_tx: &mpsc::UnboundedSender<Messages>,
requests: Messages,
) -> Result<Messages> {
) -> Result<Option<CloseReason>> {
self.pending_requests.process_requests(&requests);

let mut wrapper = Wrapper::new_with_addr(requests, local_addr);

match self.chain.process_request(&mut wrapper).await.context(
"Chain failed to send and/or receive messages, the connection will now be closed.",
) {
Ok(x) => {
self.pending_requests.process_responses(&x);
Ok(x)
}
let responses = match self.chain.process_request(&mut wrapper).await {
Ok(x) => x,
Err(err) => {
let err = err.context("Chain failed to send and/or receive messages, the connection will now be closed.");
// The connection is going to be closed once we return Err.
// So first make a best effort attempt of responding to any pending requests with an error response.
out_tx.send(self.pending_requests.to_errors(&err))?;
Err(err)
return Err(err);
}
};
self.pending_requests.process_responses(&responses);

// send the result of the process up stream
if !responses.is_empty() {
debug!("sending response to client: {:?}", responses);
if out_tx.send(responses).is_err() {
// the client has disconnected so we should terminate this connection
return Ok(Some(CloseReason::Generic));
}
}

Ok(None)
}
}

/// Indicates that the connection to the client must be closed.
enum CloseReason {
Generic,
}

/// Listens for the server shutdown signal.
///
/// Shutdown is signaled using a `broadcast::Receiver`. Only a single value is
Expand Down
Loading