Skip to content

Commit

Permalink
refactor Handler::process_messages (#1430)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Jan 23, 2024
1 parent f214b76 commit 4621efa
Showing 1 changed file with 65 additions and 59 deletions.
124 changes: 65 additions & 59 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,88 +697,94 @@ impl<C: CodecBuilder + 'static> Handler<C> {
while !self.shutdown.is_shutdown() {
// While reading a request frame, also listen for the shutdown signal
debug!("Waiting for message {client_details}");
let mut reverse_chain = false;

let messages = tokio::select! {
let responses = tokio::select! {
requests = Self::receive_with_timeout(self.timeout, &mut in_rx, client_details) => {
debug!("Received requests from client {:?}", requests);
match requests {
Some(requests) => requests,
Some(requests) => self.process_forward(client_details, local_addr, &out_tx, requests).await?,
None => {
// Either we timed out the connection or the client disconnected, so terminate this connection
return Ok(())
}
}
},
Some(res) = self.pushed_messages_rx.recv() => {
reverse_chain = true;
res
},
Some(responses) = self.pushed_messages_rx.recv() => {
debug!("Received unrequested responses from destination {:?}", responses);
self.process_backward(client_details, local_addr, responses).await?
}
_ = self.shutdown.recv() => {
// If a shutdown signal is received, return from `run`.
// This will result in the task terminating.
return Ok(());
}
};

debug!("Received raw messages {:?}", messages);

let mut error_report_messages = if reverse_chain {
// Avoid allocating for reverse chains as we dont make use of this value in that case
vec![]
} else {
// This clone should be cheap as cloning a Message that has never had `.frame()`
// called should result in no new allocations.
messages.clone()
};
debug!("sending response to client: {:?}", responses);
// send the result of the process up stream
if out_tx.send(responses).is_err() {
// the client has disconnected so we should terminate this connection
return Ok(());
}
}

let wrapper =
Wrapper::new_with_client_details(messages, client_details.to_owned(), local_addr);
Ok(())
}

let modified_messages = if reverse_chain {
self.chain
.process_request_rev(wrapper)
.await
.context("Chain failed to receive pushed messages/events, the connection will now be closed.")?
} else {
match self
.chain
.process_request(wrapper)
.await
.context("Chain failed to send and/or receive messages, the connection will now be closed.")
{
Ok(x) => x,
Err(err) => {
// An internal error occured and we need to terminate the connection because we can no
// longer make any gaurantees about the state its in.
// However before we do that we need to return errors for all the messages in this batch for two reasons:
// * Poorly programmed clients may hang forever waiting for a response
// * We want to give the user a hint as to what went wrong
// + they might not know to check the shotover logs
// + they may not be able to correlate which error in the shotover logs corresponds to their failed message
for m in &mut error_report_messages {
#[allow(clippy::single_match)]
match m.to_error_response(format!("Internal shotover (or custom transform) bug: {err:?}")) {
Ok(new_m) => *m = new_m,
Err(_) => {
// If we cant produce an error then nothing we can do, just continue on and close the connection.
}
}
async fn process_forward(
&mut self,
client_details: &str,
local_addr: SocketAddr,
out_tx: &mpsc::UnboundedSender<Messages>,
requests: Messages,
) -> Result<Messages> {
// This clone should be cheap as cloning requests Message that has never had `.frame()`
// called should result in no new allocations.
let mut error_report_messages = requests.clone();

let wrapper =
Wrapper::new_with_client_details(requests, client_details.to_owned(), local_addr);

match self.chain.process_request(wrapper).await.context(
"Chain failed to send and/or receive messages, the connection will now be closed.",
) {
Ok(x) => Ok(x),
Err(err) => {
// An internal error occured and we need to terminate the connection because we can no
// longer make any gaurantees about the state its in.
// However before we do that we need to return errors for all the messages in this batch for two reasons:
// * Poorly programmed clients may hang forever waiting for a response
// * We want to give the user a hint as to what went wrong
// + they might not know to check the shotover logs
// + they may not be able to correlate which error in the shotover logs corresponds to their failed message
for m in &mut error_report_messages {
#[allow(clippy::single_match)]
match m.to_error_response(format!(
"Internal shotover (or custom transform) bug: {err:?}"
)) {
Ok(new_m) => *m = new_m,
Err(_) => {
// If we cant produce an error then nothing we can do, just continue on and close the connection.
}
out_tx.send(error_report_messages)?;
return Err(err);
}
}
};

debug!("sending message: {:?}", modified_messages);
// send the result of the process up stream
if out_tx.send(modified_messages).is_err() {
// the client has disconnected so we should terminate this connection
return Ok(());
out_tx.send(error_report_messages)?;
Err(err)
}
}
}

Ok(())
async fn process_backward(
&mut self,
client_details: &str,
local_addr: SocketAddr,
responses: Messages,
) -> Result<Messages> {
let wrapper =
Wrapper::new_with_client_details(responses, client_details.to_owned(), local_addr);

self.chain.process_request_rev(wrapper).await.context(
"Chain failed to receive pushed messages/events, the connection will now be closed.",
)
}
}

Expand Down

0 comments on commit 4621efa

Please sign in to comment.