diff --git a/Cargo.toml b/Cargo.toml index be69dde5f..aca6cbaf7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,9 @@ reqwest = "0.12.0" redis = { version = "0.24.0", features = ["tokio-comp", "cluster"] } cdrs-tokio = "8.0" cassandra-protocol = "3.0" -tracing = "0.1.15" +# https://docs.rs/tracing/latest/tracing/level_filters/index.html#compile-time-filters +# `trace` level is considered development only, and may contain sensitive data, do not include it in release builds. +tracing = { version = "0.1.15", features = ["release_max_level_debug"] } tracing-subscriber = { version = "0.3.1", features = ["env-filter", "json"] } tracing-appender = "0.2.0" serde_json = "1.0" diff --git a/shotover/src/codec/cassandra.rs b/shotover/src/codec/cassandra.rs index 995ace34d..c8fa86c78 100644 --- a/shotover/src/codec/cassandra.rs +++ b/shotover/src/codec/cassandra.rs @@ -642,8 +642,7 @@ impl Decoder for CassandraDecoder { Err(CodecReadError::Parser(anyhow!(msg))) } err => Err(CodecReadError::Parser(anyhow!( - "Failed to parse frame {:?}", - err + "Failed to parse frame {err:?}" ))), } } diff --git a/shotover/src/codec/kafka.rs b/shotover/src/codec/kafka.rs index f310deeab..b1685b801 100644 --- a/shotover/src/codec/kafka.rs +++ b/shotover/src/codec/kafka.rs @@ -297,7 +297,9 @@ impl Encoder for KafkaEncoder { }) => { dst.extend_from_slice(&body.auth_bytes); } - _ => unreachable!("not expected {frame:?}"), + _ => unreachable!( + "Expected kafka sasl authenticate request or response but was not" + ), } Ok(()) } else { diff --git a/shotover/src/connection.rs b/shotover/src/connection.rs index 9a01fedf6..47079c8ce 100644 --- a/shotover/src/connection.rs +++ b/shotover/src/connection.rs @@ -385,8 +385,8 @@ async fn reader_task { - if let Err(err) = out_tx.send(messages) { - error!("Failed to send RespondAndThenCloseConnection message: {:?}", err); + if out_tx.send(messages).is_err() { + error!("Failed to send RespondAndThenCloseConnection message"); } return Err(ConnectionError::ShotoverClosed); } diff --git a/shotover/src/frame/cassandra.rs b/shotover/src/frame/cassandra.rs index ff522ee07..b476c1bc7 100644 --- a/shotover/src/frame/cassandra.rs +++ b/shotover/src/frame/cassandra.rs @@ -459,6 +459,26 @@ impl CassandraFrame { } } } +pub(crate) fn operation_name(operation: &CassandraOperation) -> &'static str { + match operation { + CassandraOperation::Query { .. } => "Query", + CassandraOperation::Result(_) => "Result", + CassandraOperation::Error(_) => "Error", + CassandraOperation::Prepare(_) => "Prepare", + CassandraOperation::Execute(_) => "Execute", + CassandraOperation::Register(_) => "Register", + CassandraOperation::Event(_) => "Event", + CassandraOperation::Batch(_) => "Batch", + CassandraOperation::Startup(_) => "Startup", + CassandraOperation::Ready(_) => "Ready", + CassandraOperation::Authenticate(_) => "Authenticate", + CassandraOperation::Options(_) => "Options", + CassandraOperation::Supported(_) => "Supported", + CassandraOperation::AuthChallenge(_) => "AuthChallenge", + CassandraOperation::AuthResponse(_) => "AuthResponse", + CassandraOperation::AuthSuccess(_) => "AuthSuccess", + } +} #[derive(PartialEq, Debug, Clone)] pub enum CassandraOperation { diff --git a/shotover/src/frame/kafka.rs b/shotover/src/frame/kafka.rs index e393190f1..8b4200fec 100644 --- a/shotover/src/frame/kafka.rs +++ b/shotover/src/frame/kafka.rs @@ -44,7 +44,7 @@ impl Display for KafkaFrame { header.unknown_tagged_fields )?; } - write!(f, " {:?}", body)?; + write!(f, " {body:?}")?; } KafkaFrame::Response { version, @@ -63,7 +63,7 @@ impl Display for KafkaFrame { header.unknown_tagged_fields )?; } - write!(f, " {body:?}",)?; + write!(f, " {body:?}")?; } } Ok(()) diff --git a/shotover/src/frame/mod.rs b/shotover/src/frame/mod.rs index 098a4a59b..cfe57bc03 100644 --- a/shotover/src/frame/mod.rs +++ b/shotover/src/frame/mod.rs @@ -237,14 +237,14 @@ impl Display for Frame { fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult { match self { #[cfg(feature = "cassandra")] - Frame::Cassandra(frame) => write!(f, "Cassandra {}", frame), + Frame::Cassandra(frame) => write!(f, "Cassandra {frame}"), #[cfg(feature = "valkey")] - Frame::Valkey(frame) => write!(f, "Valkey {:?}", frame), + Frame::Valkey(frame) => write!(f, "Valkey {frame:?}"), #[cfg(feature = "kafka")] - Frame::Kafka(frame) => write!(f, "Kafka {}", frame), + Frame::Kafka(frame) => write!(f, "Kafka {frame}"), Frame::Dummy => write!(f, "Shotover internal dummy message"), #[cfg(feature = "opensearch")] - Frame::OpenSearch(frame) => write!(f, "OpenSearch: {:?}", frame), + Frame::OpenSearch(frame) => write!(f, "OpenSearch: {frame:?}"), } } } diff --git a/shotover/src/frame/valkey.rs b/shotover/src/frame/valkey.rs index d70d0ede1..ef17008b7 100644 --- a/shotover/src/frame/valkey.rs +++ b/shotover/src/frame/valkey.rs @@ -27,10 +27,7 @@ pub fn valkey_query_name(frame: &ValkeyFrame) -> Option { return Some(query_type); } Err(err) => { - tracing::error!( - "Failed to convert valkey bulkstring to string, err: {:?}", - err - ) + tracing::error!("Failed to convert valkey bulkstring to string, err: {err:?}") } } } diff --git a/shotover/src/message/mod.rs b/shotover/src/message/mod.rs index 1845c91d4..a9de459d9 100644 --- a/shotover/src/message/mod.rs +++ b/shotover/src/message/mod.rs @@ -553,7 +553,7 @@ impl Message { message_type, }) = &self.inner { - format!("Unparseable {:?} message {:?}", message_type, bytes) + format!("Unparseable {message_type:?} message {bytes:?}") } else { unreachable!("self.frame() failed so MessageInner must still be RawBytes") } diff --git a/shotover/src/server.rs b/shotover/src/server.rs index 43d495f82..6c619091b 100644 --- a/shotover/src/server.rs +++ b/shotover/src/server.rs @@ -26,8 +26,8 @@ use tokio_tungstenite::tungstenite::{ protocol::Message as WsMessage, }; use tokio_util::codec::{Decoder, Encoder, FramedRead, FramedWrite}; -use tracing::Instrument; use tracing::{debug, error, warn}; +use tracing::{trace, Instrument}; pub struct TcpCodecListener { chain_builder: TransformChainBuilder, @@ -358,13 +358,13 @@ async fn spawn_websocket_read_write_tasks< Err(CodecReadError::RespondAndThenCloseConnection(messages)) => { if let Err(err) = out_tx.send(messages) { // TODO we need to send a close message to the client - error!("Failed to send RespondAndThenCloseConnection message: {:?}", err); + error!("Failed to send RespondAndThenCloseConnection message: {err}"); } return; } Err(CodecReadError::Parser(err)) => { // TODO we need to send a close message to the client, protocol error - warn!("failed to decode message: {:?}", err); + warn!("failed to decode message: {err:?}"); return; } Err(CodecReadError::Io(_err)) => { @@ -480,12 +480,12 @@ pub fn spawn_read_write_tasks< } Err(CodecReadError::RespondAndThenCloseConnection(messages)) => { if let Err(err) = out_tx.send(messages) { - error!("Failed to send RespondAndThenCloseConnection message: {:?}", err); + error!("Failed to send RespondAndThenCloseConnection message: {err}"); } return; } Err(CodecReadError::Parser(err)) => { - warn!("failed to decode message: {:?}", err); + warn!("failed to decode message: {err:?}"); return; } Err(CodecReadError::Io(err)) => { @@ -494,7 +494,7 @@ pub fn spawn_read_write_tasks< // We shouldnt report that as a warning because its common for clients to do // that for performance reasons. if !matches!(err.kind(), ErrorKind::UnexpectedEof) { - warn!("failed to receive message on tcp stream: {:?}", err); + warn!("failed to receive message on tcp stream: {err:?}"); } return; } @@ -699,7 +699,7 @@ impl Handler { while let Ok(x) = in_rx.try_recv() { requests.extend(x); } - debug!("A transform in the chain requested that a chain run occur, requests {:?}", requests); + debug!("running transform chain because a transform in the chain requested that a chain run occur"); if let Some(close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? { return Ok(close_reason) } @@ -710,7 +710,7 @@ impl Handler { while let Ok(x) = in_rx.try_recv() { requests.extend(x); } - debug!("Received requests from client {:?}", requests); + debug!("running transform chain because requests received from client"); if let Some(close_reason) = self.send_receive_chain(local_addr, &out_tx, requests).await? { return Ok(close_reason) } @@ -731,6 +731,7 @@ impl Handler { out_tx: &mpsc::UnboundedSender, requests: Messages, ) -> Result> { + trace!("running transform chain with requests: {requests:?}"); let mut wrapper = ChainState::new_with_addr(requests, local_addr); self.pending_requests.process_requests(&wrapper.requests); @@ -748,7 +749,8 @@ impl Handler { // send the result of the process up stream if !responses.is_empty() { - debug!("sending response to client: {:?}", responses); + debug!("sending {} responses to client", responses.len()); + trace!("sending response to client: {responses:?}"); if out_tx.send(responses).is_err() { // the client has disconnected so we should terminate this connection return Ok(Some(CloseReason::ClientClosed)); diff --git a/shotover/src/transforms/cassandra/sink_cluster/mod.rs b/shotover/src/transforms/cassandra/sink_cluster/mod.rs index 772794848..7e9e695d5 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/mod.rs @@ -559,7 +559,7 @@ impl CassandraSinkCluster { } Err(GetReplicaErr::NoPreparedMetadata) => { let id = execute.id.clone(); - tracing::info!("forcing re-prepare on {:?}", id); + tracing::info!("forcing re-prepare on {id:?}"); // this shotover node doesn't have the metadata. // send an unprepared error in response to force // the client to reprepare the query @@ -632,7 +632,7 @@ impl CassandraSinkCluster { self.set_control_connection(connection, address); } tracing::info!( - "Control connection finalized against node at: {:?}", + "Control connection finalized against node at: {}", self.control_connection_address.unwrap() ); diff --git a/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs b/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs index e181deff6..451be5095 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/rewrite.rs @@ -1,6 +1,7 @@ use super::node::ConnectionFactory; use super::node_pool::NodePool; use super::ShotoverNode; +use crate::frame::cassandra::operation_name; use crate::frame::{CassandraFrame, CassandraOperation, CassandraResult, Frame}; use crate::message::{Message, MessageIdMap, Messages}; use crate::{ @@ -350,9 +351,16 @@ impl MessageRewriter { CassandraOperation::Error(_), .. })) => None, - other => { - tracing::error!("Response to Prepare query was not a Prepared, was instead: {other:?}"); - warnings.push(format!("Shotover: Response to Prepare query was not a Prepared, was instead: {other:?}")); + Some(Frame::Cassandra(CassandraFrame { operation, .. })) => { + let operation_name = operation_name(operation); + tracing::error!("Response to Prepare query was not a Prepared, was instead: {operation_name}"); + warnings.push(format!("Shotover: Response to Prepare query was not a Prepared, was instead: {operation_name}")); + None + } + Some(_) => unreachable!("Response to prepare was not cassandra message"), + None => { + tracing::error!("Response to Prepare query was not parseable"); + warnings.push("Shotover: Response to Prepare query was not parseable".to_owned()); None } }) @@ -366,12 +374,15 @@ impl MessageRewriter { output }); - tracing::error!( - "Nodes did not return the same response to PREPARE statement {err_str}" + tracing::warn!( + "Nodes did not return the same response to PREPARE statement" + ); + tracing::trace!( + "Nodes did not return the same response to PREPARE statement:{err_str}" ); warnings.push(format!( - "Shotover: Nodes did not return the same response to PREPARE statement {err_str}" - )); + "Shotover: Nodes did not return the same response to PREPARE statement {err_str}" + )); } } @@ -544,10 +555,7 @@ impl MessageRewriter { } Ok(()) } else { - Err(anyhow!( - "Failed to parse system.local response {:?}", - peers_response - )) + Err(anyhow!("Failed to parse system.local response")) } } @@ -670,10 +678,7 @@ impl MessageRewriter { } Ok(()) } else { - Err(anyhow!( - "Failed to parse system.local response {:?}", - local_response - )) + Err(anyhow!("Failed to parse system.local response")) } } } @@ -831,13 +836,13 @@ fn parse_system_nodes(mut response: Message) -> Result, MessagePar "system.local returned error: {error:?}", ))), operation => Err(MessageParseError::ParseFailure(anyhow!( - "system.local returned unexpected cassandra operation: {operation:?}", + "system.local returned unexpected cassandra operation: {:?}", + operation_name(operation) ))), } } else { Err(MessageParseError::ParseFailure(anyhow!( - "Failed to parse system.local response {:?}", - response + "Failed to parse system.local response" ))) } } diff --git a/shotover/src/transforms/cassandra/sink_cluster/topology.rs b/shotover/src/transforms/cassandra/sink_cluster/topology.rs index 0027c8b50..85767da47 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/topology.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/topology.rs @@ -2,6 +2,7 @@ use super::node::{CassandraNode, ConnectionFactory}; use super::node_pool::KeyspaceMetadata; use super::KeyspaceChanTx; use crate::connection::SinkConnection; +use crate::frame::cassandra::operation_name; use crate::frame::{ cassandra::{parse_statement_single, Tracing}, value::GenericValue, @@ -93,7 +94,7 @@ async fn topology_task_process( register_for_topology_and_status_events(&mut connection, version).await?; tracing::info!( - "Topology task control connection finalized against node at: {:?}", + "Topology task control connection finalized against node at: {}", connection_info.address ); @@ -208,7 +209,7 @@ async fn register_for_topology_and_status_events( if let Some(Frame::Cassandra(CassandraFrame { operation, .. })) = response.frame() { match operation { CassandraOperation::Ready(_) => Ok(()), - operation => Err(anyhow!("Expected Cassandra to respond to a Register with a Ready. Instead it responded with {:?}", operation)) + operation => Err(anyhow!("Expected Cassandra to respond to a Register with a Ready. Instead it responded with {}", operation_name(operation))) } } else { Err(anyhow!("Failed to parse cassandra message")) @@ -231,7 +232,10 @@ async fn fetch_current_nodes( } mod system_keyspaces { - use crate::transforms::cassandra::sink_cluster::node_pool::ReplicationStrategy; + use crate::{ + frame::cassandra::operation_name, + transforms::cassandra::sink_cluster::node_pool::ReplicationStrategy, + }; use super::*; use std::str::FromStr; @@ -272,8 +276,8 @@ mod system_keyspaces { .map(|row| build_keyspace(row, data_center)) .collect(), operation => Err(anyhow!( - "keyspace query returned unexpected cassandra operation: {:?}", - operation + "keyspace query returned unexpected cassandra operation: {}", + operation_name(operation) )), } } else { @@ -463,15 +467,12 @@ mod system_local { }) .collect(), operation => Err(anyhow!( - "system.peers returned unexpected cassandra operation: {:?}", - operation + "system.peers returned unexpected cassandra operation: {}", + operation_name(operation) )), } } else { - Err(anyhow!( - "Failed to parse system.local response {:?}", - response - )) + Err(anyhow!("Failed to parse system.local response")) } } } @@ -604,14 +605,13 @@ mod system_peers { }) .collect(), operation => Err(anyhow!( - "system.peers or system.peers_v2 returned unexpected cassandra operation: {:?}", - operation + "system.peers or system.peers_v2 returned unexpected cassandra operation: {}", + operation_name(operation) )), } } else { Err(anyhow!( - "Failed to parse system.peers or system.peers_v2 response {:?}", - response + "Failed to parse system.peers or system.peers_v2 response", )) } } diff --git a/shotover/src/transforms/chain.rs b/shotover/src/transforms/chain.rs index 35a9b2e91..9fdd18ebd 100644 --- a/shotover/src/transforms/chain.rs +++ b/shotover/src/transforms/chain.rs @@ -96,7 +96,7 @@ impl BufferedChain { chain_state.flush, one_tx, )) - .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) + .map_err(|e| anyhow!("Couldn't send message to wrapped chain {e}")) .await? } Some(timeout) => { @@ -110,7 +110,7 @@ impl BufferedChain { ), Duration::from_micros(timeout), ) - .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) + .map_err(|e| anyhow!("Couldn't send message to wrapped chain {e}")) .await? } } @@ -137,7 +137,7 @@ impl BufferedChain { chain_state.requests, chain_state.local_addr, )) - .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) + .map_err(|e| anyhow!("Couldn't send message to wrapped chain {e}")) .await? } Some(timeout) => { @@ -149,7 +149,7 @@ impl BufferedChain { ), Duration::from_micros(timeout), ) - .map_err(|e| anyhow!("Couldn't send message to wrapped chain {:?}", e)) + .map_err(|e| anyhow!("Couldn't send message to wrapped chain {e}")) .await? } } diff --git a/shotover/src/transforms/debug/log_to_file.rs b/shotover/src/transforms/debug/log_to_file.rs index 70711dff9..1fc734a24 100644 --- a/shotover/src/transforms/debug/log_to_file.rs +++ b/shotover/src/transforms/debug/log_to_file.rs @@ -115,7 +115,7 @@ impl Transform for DebugLogToFile { } async fn log_message(message: &Message, path: &Path) -> Result<()> { - info!("Logged message to {:?}", path); + info!("Logged message to {path:?}"); match message.clone().into_encodable() { Encodable::Bytes(bytes) => { tokio::fs::write(path, bytes) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 62a4a8e8e..8619a7cc1 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -841,9 +841,8 @@ impl KafkaSinkCluster { { Ok(node) => { tracing::debug!( - "Storing group_to_coordinator_broker metadata, group {:?} -> broker {}", - group.0, - node.broker_id.0 + "Storing group_to_coordinator_broker metadata, group {group:?} -> broker {:?}", + node.broker_id ); self.group_to_coordinator_broker .insert(group, node.broker_id); @@ -866,9 +865,8 @@ impl KafkaSinkCluster { { Ok(node) => { tracing::debug!( - "Storing transaction_to_coordinator_broker metadata, transaction {:?} -> broker {}", - transaction.0, - node.broker_id.0 + "Storing transaction_to_coordinator_broker metadata, transaction {transaction:?} -> broker {:?}", + node.broker_id ); self.transaction_to_coordinator_broker .insert(transaction, node.broker_id); @@ -1200,8 +1198,6 @@ The connection to the client has been closed." Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => { let request_type = format!("{:?}", ApiKey::try_from(header.request_api_key).unwrap()); - // remove Key postfix, since its not part of the actual message name which is confusing. - let request_type = request_type.trim_end_matches("Key"); tracing::warn!("Routing for request of type {request_type:?} has not been implemented yet."); self.route_to_random_broker(request) } @@ -1267,7 +1263,7 @@ The connection to the client has been closed." ty: PendingRequestTy::Other, combine_responses: 1, }); - tracing::debug!("Routing request to single broker {:?}", destination.0); + tracing::debug!("Routing request to single broker {destination:?}"); } else { // The request has been split so it may be delivered to multiple destinations. // We must generate a unique request for each destination. @@ -1988,9 +1984,11 @@ The connection to the client has been closed." "Unexpected server error from FindCoordinator {err}" ))), }, - other => Err(anyhow!( - "Unexpected response returned to findcoordinator request {other:?}" + Some(Frame::Kafka(_)) => Err(anyhow!( + "Unexpected response returned to findcoordinator request" ))?, + None => Err(anyhow!("Response to FindCoordinator could not be parsed"))?, + _ => unreachable!("response to FindCoordinator was not a kafka response"), } } @@ -3018,8 +3016,7 @@ The connection to the client has been closed." // The broker doesnt know who the new leader is, clear the entire topic. self.topic_by_name.remove(topic_name); tracing::info!( - "Produce response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?}", - topic_name, + "Produce response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {topic_name:?}" ); break; } @@ -3430,9 +3427,7 @@ The connection to the client has been closed." }; tracing::debug!( - "Routing request relating to group id {:?} to broker {}", - group_id.0, - destination.0 + "Routing request relating to group id {group_id:?} to broker {destination:?}", ); self.pending_requests.push_back(PendingRequest { @@ -3458,9 +3453,7 @@ The connection to the client has been closed." }; tracing::debug!( - "Routing request relating to transaction id {:?} to broker {}", - transaction_id.0, - destination.0 + "Routing request relating to transaction id {transaction_id:?} to broker {destination:?}" ); self.pending_requests.push_back(PendingRequest { diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs index 7fc3186c8..dee825eb8 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs @@ -125,7 +125,7 @@ async fn task( &username, delegation_token_lifetime ).await - .with_context(|| format!("Failed to recreate delegation token for {:?}", username))?; + .with_context(|| format!("Failed to recreate delegation token for {username:?}"))?; username_to_token.insert(username.clone(), token); recreate_queue.push(username.clone()); diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs index 543a0a174..2b9442e8c 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs @@ -98,9 +98,11 @@ async fn find_new_brokers(nodes: &mut Vec, rng: &mut SmallRng) -> Result<( nodes.extend(new_nodes); Ok(()) } - other => Err(anyhow!( - "Unexpected message returned to metadata request {other:?}" - )), + Some(Frame::Kafka(_)) => Err(anyhow!( + "Unexpected response returned to findcoordinator request" + ))?, + None => Err(anyhow!("Response to FindCoordinator could not be parsed"))?, + _ => unreachable!("response to FindCoordinator was not a kafka response"), } } @@ -152,9 +154,13 @@ async fn create_delegation_token_for_user( Ok(response) } } - response => Err(anyhow!( - "Unexpected response to CreateDelegationToken {response:?}" - )), + Some(Frame::Kafka(_)) => Err(anyhow!( + "Unexpected response returned to CreateDelegationToken request" + ))?, + None => Err(anyhow!( + "Response to CreateDelegationToken could not be parsed" + ))?, + _ => unreachable!("response to CreateDelegationToken was not a kafka response"), } } @@ -253,28 +259,32 @@ async fn is_delegation_token_ready( }, ))])?; let mut response = connection.recv().await?.pop().unwrap(); - if let Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::DescribeDelegationToken(response), - .. - })) = response.frame() - { - if let Some(err) = ResponseError::try_from_code(response.error_code) { - return Err(anyhow!( - "Kafka's response to DescribeDelegationToken was an error: {err}" - )); - } - if response - .tokens - .iter() - .any(|x| x.hmac == create_response.hmac && x.token_id == create_response.token_id) - { - Ok(true) - } else { - Ok(false) + match response.frame() { + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DescribeDelegationToken(response), + .. + })) => { + if let Some(err) = ResponseError::try_from_code(response.error_code) { + return Err(anyhow!( + "Kafka's response to DescribeDelegationToken was an error: {err}" + )); + } + if response + .tokens + .iter() + .any(|x| x.hmac == create_response.hmac && x.token_id == create_response.token_id) + { + Ok(true) + } else { + Ok(false) + } } - } else { - Err(anyhow!( - "Unexpected response to CreateDelegationToken {response:?}" - )) + Some(Frame::Kafka(_)) => Err(anyhow!( + "Unexpected response returned to DescribeDelegationToken request" + ))?, + None => Err(anyhow!( + "Response to DescribeDelegationToken could not be parsed" + ))?, + _ => unreachable!("response to DescribeDelegationToken was not a kafka response"), } } diff --git a/shotover/src/transforms/protect/crypto.rs b/shotover/src/transforms/protect/crypto.rs index 49f15dc9f..e95fcbf67 100644 --- a/shotover/src/transforms/protect/crypto.rs +++ b/shotover/src/transforms/protect/crypto.rs @@ -51,7 +51,7 @@ pub async fn decrypt( ) -> Result { let bytes = match value { GenericValue::Bytes(bytes) => bytes, - _ => bail!("expected varchar to decrypt but was {:?}", value), + _ => bail!("expected varchar to decrypt but was not varchar"), }; let protected: Protected = bincode::deserialize(bytes)?; diff --git a/shotover/src/transforms/tee.rs b/shotover/src/transforms/tee.rs index 427a948dd..1bac9729c 100644 --- a/shotover/src/transforms/tee.rs +++ b/shotover/src/transforms/tee.rs @@ -574,8 +574,7 @@ async fn put_result_source( "regular-chain" => ResultSource::RegularChain, _ => { return Err(HttpServerError(anyhow!( - r"Invalid value for result source: {:?}, should be 'tee-chain' or 'regular-chain'", - new_result_source + r"Invalid value for result source: {new_result_source:?}, should be 'tee-chain' or 'regular-chain'" ))); } }; diff --git a/shotover/src/transforms/util/cluster_connection_pool.rs b/shotover/src/transforms/util/cluster_connection_pool.rs index 444e9fce9..744fb2b8e 100644 --- a/shotover/src/transforms/util/cluster_connection_pool.rs +++ b/shotover/src/transforms/util/cluster_connection_pool.rs @@ -90,10 +90,7 @@ impl, T: Token> ConnectionPool, connection_count: usize, ) -> Result, ConnectionError> { - debug!( - "getting {} pool connections to {} with token: {:?}", - connection_count, address, token - ); + debug!("getting {connection_count} pool connections to {address} with token: {token:?}",); let mut lanes = self.lanes.lock().await; let lane = lanes.entry(token.clone()).or_default(); @@ -205,7 +202,7 @@ pub fn spawn_read_write_tasks< tokio::spawn(async move { tokio::select! { result = tx_process(dummy_request_tx, stream_tx, out_rx, return_tx, encoder) => if let Err(e) = result { - trace!("connection write-closed with error: {:?}", e); + trace!("connection write-closed with error: {e:?}"); } else { trace!("connection write-closed gracefully"); }, @@ -218,7 +215,7 @@ pub fn spawn_read_write_tasks< tokio::spawn( async move { if let Err(e) = rx_process(dummy_request_rx, stream_rx, return_rx, decoder).await { - trace!("connection read-closed with error: {:?}", e); + trace!("connection read-closed with error: {e:?}"); } else { trace!("connection read-closed gracefully"); } diff --git a/shotover/src/transforms/valkey/cluster_ports_rewrite.rs b/shotover/src/transforms/valkey/cluster_ports_rewrite.rs index 048849a6e..2b9a22af1 100644 --- a/shotover/src/transforms/valkey/cluster_ports_rewrite.rs +++ b/shotover/src/transforms/valkey/cluster_ports_rewrite.rs @@ -137,9 +137,9 @@ fn rewrite_port_slot(frame: &mut Frame, new_port: u16) -> Result<()> { [ValkeyFrame::BulkString(_ip), ValkeyFrame::Integer(port), ..] => { *port = new_port.into(); } - _ => bail!("expected host-port in slot map but was: {:?}", frame), + _ => bail!("expected slot to start with bulkstring followed by integer, but was something else"), }, - _ => bail!("unexpected value in slot map: {:?}", frame), + _ => bail!("non array value in slot map"), } } }; diff --git a/shotover/src/transforms/valkey/sink_cluster.rs b/shotover/src/transforms/valkey/sink_cluster.rs index fc7313834..96439cb2f 100644 --- a/shotover/src/transforms/valkey/sink_cluster.rs +++ b/shotover/src/transforms/valkey/sink_cluster.rs @@ -221,7 +221,7 @@ impl ValkeySinkCluster { let command = match message.frame() { Some(Frame::Valkey(ValkeyFrame::Array(ref command))) => command, None => bail!("Failed to parse valkey frame"), - message => bail!("syntax error: bad command: {message:?}"), + _ => bail!("Invalid redis command, must be an array but was not"), }; let routing_info = RoutingInfo::for_command_frame(command)?; @@ -357,12 +357,12 @@ impl ValkeySinkCluster { self.connection_pool .new_unpooled_connection(address, token) .map_err(move |err| { - trace!("error fetching slot map from {}: {:?}", address, err); + trace!("error fetching slot map from {address}: {err:?}"); TransformError::from(err) }) .and_then(get_topology_from_node) .map_ok(move |slots| { - trace!("fetched slot map from {}: {:?}", address, slots); + trace!("fetched slot map from {address}: {slots:?}"); slots }), ); @@ -429,7 +429,7 @@ impl ValkeySinkCluster { } Err(e) => { // Intentional debug! Some errors should be silently passed through. - debug!("failed to connect to {}: {:?}", node, e); + debug!("failed to connect to {node}: {e:?}"); errors.push(e.into()); } } @@ -599,7 +599,7 @@ impl ValkeySinkCluster { let command = match message.frame() { Some(Frame::Valkey(ValkeyFrame::Array(ref command))) => command, None => bail!("Failed to parse valkey frame"), - message => bail!("syntax error: bad command: {message:?}"), + _ => bail!("syntax error: bad command"), }; let mut args = command.iter().skip(1).rev().map(|f| match f { @@ -952,9 +952,9 @@ async fn get_topology_from_node( ValkeyFrame::Error(message) => Err(TransformError::Upstream(ValkeyError::from_message( &message, ))), - frame => Err(TransformError::Protocol(format!( - "unexpected response for cluster slots: {frame:?}" - ))), + _ => Err(TransformError::Protocol( + "unexpected response for cluster slots to be an array but was not".to_owned(), + )), } } @@ -993,7 +993,7 @@ async fn receive_frame_response(receiver: oneshot::Receiver) -> Result match response?.frame() { Some(Frame::Valkey(frame)) => Ok(frame.take()), None => Err(anyhow!("Failed to parse valkey frame")), - response => Err(anyhow!("Unexpected valkey response: {response:?}")), + _ => Err(anyhow!("Expected valkey response but was not valkey.")), } } @@ -1070,7 +1070,6 @@ impl Transform for ValkeySinkCluster { while let Some(s) = responses.next().await { let original = requests.pop().unwrap(); - trace!("Got resp {:?}", s); let Response { response } = s.or_else(|e| -> Result { Ok(Response { response: Ok(Message::from_frame(Frame::Valkey(ValkeyFrame::Error( @@ -1184,13 +1183,13 @@ impl Authenticator for ValkeyAuthenticator { trace!("authenticated upstream as user: {:?}", token.username); Ok(()) } - ValkeyFrame::SimpleString(s) => Err(TransformError::Protocol(format!( - "expected OK but got: {s:?}" - ))), + ValkeyFrame::SimpleString(_) => Err(TransformError::Protocol( + "expected auth response to be \"OK\" but was a different SimpleString".to_owned(), + )), ValkeyFrame::Error(e) => Err(TransformError::Upstream(ValkeyError::from_message(&e))), - f => Err(TransformError::Protocol(format!( - "unexpected response type: {f:?}" - ))), + _ => Err(TransformError::Protocol( + "Expected auth response to be a SimpleString but was something else".to_owned(), + )), } } }