From f214b76c49c632ef002f38db84a22dbec3c162ed Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 23 Jan 2024 21:58:00 +1100 Subject: [PATCH 1/2] Fix parsing multiple windsock profilers (#1429) --- windsock/src/cli.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/windsock/src/cli.rs b/windsock/src/cli.rs index 8182302f7..ddd1f6b29 100644 --- a/windsock/src/cli.rs +++ b/windsock/src/cli.rs @@ -32,7 +32,7 @@ pub struct Args { /// Instruct benches to profile the application under test with the specified profilers. /// Benches that do not support the specified profilers will be skipped. - #[clap(long, verbatim_doc_comment)] + #[clap(long, verbatim_doc_comment, value_delimiter = ',')] pub profilers: Vec, /// How long in seconds to run each bench for. From 4621efae9d722f36dbeccb47a3de2a86263d1161 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 23 Jan 2024 22:24:16 +1100 Subject: [PATCH 2/2] refactor Handler::process_messages (#1430) --- shotover/src/server.rs | 124 +++++++++++++++++++++-------------------- 1 file changed, 65 insertions(+), 59 deletions(-) diff --git a/shotover/src/server.rs b/shotover/src/server.rs index 8bcc5aa36..9d81e3585 100644 --- a/shotover/src/server.rs +++ b/shotover/src/server.rs @@ -697,22 +697,21 @@ impl Handler { while !self.shutdown.is_shutdown() { // While reading a request frame, also listen for the shutdown signal debug!("Waiting for message {client_details}"); - let mut reverse_chain = false; - - let messages = tokio::select! { + let responses = tokio::select! { requests = Self::receive_with_timeout(self.timeout, &mut in_rx, client_details) => { + debug!("Received requests from client {:?}", requests); match requests { - Some(requests) => requests, + Some(requests) => self.process_forward(client_details, local_addr, &out_tx, requests).await?, None => { // Either we timed out the connection or the client disconnected, so terminate this connection return Ok(()) } } }, - Some(res) = self.pushed_messages_rx.recv() => { - reverse_chain = true; - res - }, + Some(responses) = self.pushed_messages_rx.recv() => { + debug!("Received unrequested responses from destination {:?}", responses); + self.process_backward(client_details, local_addr, responses).await? + } _ = self.shutdown.recv() => { // If a shutdown signal is received, return from `run`. // This will result in the task terminating. @@ -720,65 +719,72 @@ impl Handler { } }; - debug!("Received raw messages {:?}", messages); - - let mut error_report_messages = if reverse_chain { - // Avoid allocating for reverse chains as we dont make use of this value in that case - vec![] - } else { - // This clone should be cheap as cloning a Message that has never had `.frame()` - // called should result in no new allocations. - messages.clone() - }; + debug!("sending response to client: {:?}", responses); + // send the result of the process up stream + if out_tx.send(responses).is_err() { + // the client has disconnected so we should terminate this connection + return Ok(()); + } + } - let wrapper = - Wrapper::new_with_client_details(messages, client_details.to_owned(), local_addr); + Ok(()) + } - let modified_messages = if reverse_chain { - self.chain - .process_request_rev(wrapper) - .await - .context("Chain failed to receive pushed messages/events, the connection will now be closed.")? - } else { - match self - .chain - .process_request(wrapper) - .await - .context("Chain failed to send and/or receive messages, the connection will now be closed.") - { - Ok(x) => x, - Err(err) => { - // An internal error occured and we need to terminate the connection because we can no - // longer make any gaurantees about the state its in. - // However before we do that we need to return errors for all the messages in this batch for two reasons: - // * Poorly programmed clients may hang forever waiting for a response - // * We want to give the user a hint as to what went wrong - // + they might not know to check the shotover logs - // + they may not be able to correlate which error in the shotover logs corresponds to their failed message - for m in &mut error_report_messages { - #[allow(clippy::single_match)] - match m.to_error_response(format!("Internal shotover (or custom transform) bug: {err:?}")) { - Ok(new_m) => *m = new_m, - Err(_) => { - // If we cant produce an error then nothing we can do, just continue on and close the connection. - } - } + async fn process_forward( + &mut self, + client_details: &str, + local_addr: SocketAddr, + out_tx: &mpsc::UnboundedSender, + requests: Messages, + ) -> Result { + // This clone should be cheap as cloning requests Message that has never had `.frame()` + // called should result in no new allocations. + let mut error_report_messages = requests.clone(); + + let wrapper = + Wrapper::new_with_client_details(requests, client_details.to_owned(), local_addr); + + match self.chain.process_request(wrapper).await.context( + "Chain failed to send and/or receive messages, the connection will now be closed.", + ) { + Ok(x) => Ok(x), + Err(err) => { + // An internal error occured and we need to terminate the connection because we can no + // longer make any gaurantees about the state its in. + // However before we do that we need to return errors for all the messages in this batch for two reasons: + // * Poorly programmed clients may hang forever waiting for a response + // * We want to give the user a hint as to what went wrong + // + they might not know to check the shotover logs + // + they may not be able to correlate which error in the shotover logs corresponds to their failed message + for m in &mut error_report_messages { + #[allow(clippy::single_match)] + match m.to_error_response(format!( + "Internal shotover (or custom transform) bug: {err:?}" + )) { + Ok(new_m) => *m = new_m, + Err(_) => { + // If we cant produce an error then nothing we can do, just continue on and close the connection. } - out_tx.send(error_report_messages)?; - return Err(err); } } - }; - - debug!("sending message: {:?}", modified_messages); - // send the result of the process up stream - if out_tx.send(modified_messages).is_err() { - // the client has disconnected so we should terminate this connection - return Ok(()); + out_tx.send(error_report_messages)?; + Err(err) } } + } - Ok(()) + async fn process_backward( + &mut self, + client_details: &str, + local_addr: SocketAddr, + responses: Messages, + ) -> Result { + let wrapper = + Wrapper::new_with_client_details(responses, client_details.to_owned(), local_addr); + + self.chain.process_request_rev(wrapper).await.context( + "Chain failed to receive pushed messages/events, the connection will now be closed.", + ) } }