diff --git a/shotover/src/server.rs b/shotover/src/server.rs index 2eef16403..4af185a94 100644 --- a/shotover/src/server.rs +++ b/shotover/src/server.rs @@ -714,7 +714,7 @@ impl Handler { requests.extend(x); } debug!("Received requests from client {:?}", requests); - self.process_forward(client_details, local_addr, &out_tx, requests).await? + self.process(client_details, local_addr, &out_tx, requests).await? } None => { // Either we timed out the connection or the client disconnected, so terminate this connection @@ -722,9 +722,9 @@ impl Handler { } } }, - Some(responses) = self.pushed_messages_rx.recv() => { - debug!("Received unrequested responses from destination {:?}", responses); - self.process_backward(client_details, local_addr, responses).await? + Some(_) = self.pushed_messages_rx.recv() => { + debug!("responses pending from destination"); + self.process(client_details, local_addr, &out_tx, vec!()).await? } _ = self.shutdown.recv() => { // If a shutdown signal is received, return from `run`. @@ -744,7 +744,7 @@ impl Handler { Ok(()) } - async fn process_forward( + async fn process( &mut self, client_details: &str, local_addr: SocketAddr, @@ -786,20 +786,6 @@ impl Handler { } } } - - async fn process_backward( - &mut self, - client_details: &str, - local_addr: SocketAddr, - responses: Messages, - ) -> Result { - let wrapper = - Wrapper::new_with_client_details(responses, client_details.to_owned(), local_addr); - - self.chain.process_request_rev(wrapper).await.context( - "Chain failed to receive pushed messages/events, the connection will now be closed.", - ) - } } /// Listens for the server shutdown signal. diff --git a/shotover/src/transforms/cassandra/peers_rewrite.rs b/shotover/src/transforms/cassandra/peers_rewrite.rs index c887a0d16..68e9893ae 100644 --- a/shotover/src/transforms/cassandra/peers_rewrite.rs +++ b/shotover/src/transforms/cassandra/peers_rewrite.rs @@ -78,32 +78,22 @@ impl Transform for CassandraPeersRewrite { let mut responses = requests_wrapper.call_next_transform().await?; for response in &mut responses { - if let Some(id) = response.request_id() { - let name_list = self.column_names_to_rewrite.remove(&id).unwrap(); - rewrite_port(response, &name_list, self.port); - } - } - - Ok(responses) - } - - async fn transform_pushed<'a>( - &'a mut self, - mut requests_wrapper: Wrapper<'a>, - ) -> Result { - for message in &mut requests_wrapper.requests { - if let Some(Frame::Cassandra(frame)) = message.frame() { + if let Some(Frame::Cassandra(frame)) = response.frame() { if let Event(ServerEvent::StatusChange(StatusChange { addr, .. })) = &mut frame.operation { addr.set_port(self.port); - message.invalidate_cache(); + response.invalidate_cache(); } } + + if let Some(id) = response.request_id() { + let name_list = self.column_names_to_rewrite.remove(&id).unwrap(); + rewrite_port(response, &name_list, self.port); + } } - let response = requests_wrapper.call_next_transform_pushed().await?; - Ok(response) + Ok(responses) } } diff --git a/shotover/src/transforms/cassandra/sink_cluster/mod.rs b/shotover/src/transforms/cassandra/sink_cluster/mod.rs index 5fb4fd222..8c93ec6f9 100644 --- a/shotover/src/transforms/cassandra/sink_cluster/mod.rs +++ b/shotover/src/transforms/cassandra/sink_cluster/mod.rs @@ -526,6 +526,25 @@ impl CassandraSinkCluster { } } + // remove topology and status change messages since they contain references to real cluster IPs + // TODO: we should be rewriting them not just deleting them. + responses.retain_mut(|message| { + if let Some(Frame::Cassandra(CassandraFrame { + operation: CassandraOperation::Event(event), + .. + })) = message.frame() + { + match event { + ServerEvent::TopologyChange(_) => false, + ServerEvent::StatusChange(_) => false, + ServerEvent::SchemaChange(_) => true, + _ => unreachable!(), + } + } else { + true + } + }); + Ok(responses) } @@ -721,29 +740,6 @@ impl Transform for CassandraSinkCluster { self.send_message(requests_wrapper.requests).await } - async fn transform_pushed<'a>( - &'a mut self, - mut requests_wrapper: Wrapper<'a>, - ) -> Result { - requests_wrapper.requests.retain_mut(|message| { - if let Some(Frame::Cassandra(CassandraFrame { - operation: CassandraOperation::Event(event), - .. - })) = message.frame() - { - match event { - ServerEvent::TopologyChange(_) => false, - ServerEvent::StatusChange(_) => false, - ServerEvent::SchemaChange(_) => true, - _ => unreachable!(), - } - } else { - true - } - }); - requests_wrapper.call_next_transform_pushed().await - } - fn set_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender) { self.connection_factory .set_pushed_messages_tx(pushed_messages_tx); diff --git a/shotover/src/transforms/chain.rs b/shotover/src/transforms/chain.rs index dd5b798e0..2332e8d95 100644 --- a/shotover/src/transforms/chain.rs +++ b/shotover/src/transforms/chain.rs @@ -171,22 +171,6 @@ impl TransformChain { histogram!("shotover_chain_latency_seconds", "chain" => self.name, "client_details" => client_details).record(start.elapsed()); result } - - pub async fn process_request_rev(&mut self, mut wrapper: Wrapper<'_>) -> Result { - let start = Instant::now(); - wrapper.reset_rev(&mut self.chain); - - self.chain_batch_size.record(wrapper.requests.len() as f64); - let client_details = wrapper.client_details.to_owned(); - let result = wrapper.call_next_transform_pushed().await; - self.chain_total.increment(1); - if result.is_err() { - self.chain_failures.increment(1); - } - - histogram!("shotover_chain_latency_seconds", "chain" => self.name, "client_details" => client_details).record(start.elapsed()); - result - } } pub struct TransformAndMetrics { diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index 034d8cb09..a29240d3d 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -170,33 +170,6 @@ impl<'a> Wrapper<'a> { result } - pub async fn call_next_transform_pushed(mut self) -> Result { - let TransformAndMetrics { - transform, - transform_pushed_total, - transform_pushed_failures, - transform_pushed_latency, - .. - } = match self.transforms.next() { - Some(transform) => transform, - None => return Ok(self.requests), - }; - - let transform_name = transform.get_name(); - - let start = Instant::now(); - let result = transform - .transform_pushed(self) - .await - .map_err(|e| e.context(anyhow!("{transform_name} transform failed"))); - transform_pushed_total.increment(1); - if result.is_err() { - transform_pushed_failures.increment(1); - } - transform_pushed_latency.record(start.elapsed()); - result - } - pub fn clone_requests_into_hashmap(&self, destination: &mut MessageIdMap) { for request in &self.requests { destination.insert(request.id(), request.clone()); @@ -333,27 +306,6 @@ pub trait Transform: Send { /// You can have have a transform that is both non-terminating and a sink. async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result; - /// TODO: This method should be removed and integrated with `Transform::transform` once we properly support out of order protocols. - /// - /// This method should be should be implemented by your transform if it is required to process pushed messages (typically events - /// or messages that your source is subscribed to. The wrapper object contains the queries/frames - /// in a [`Vec(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { - let response = requests_wrapper.call_next_transform_pushed().await?; - Ok(response) - } - fn get_name(&self) -> &'static str; fn set_pushed_messages_tx(&mut self, _pushed_messages_tx: mpsc::UnboundedSender) {} diff --git a/shotover/src/transforms/redis/sink_single.rs b/shotover/src/transforms/redis/sink_single.rs index 5329127bf..ade0840ce 100644 --- a/shotover/src/transforms/redis/sink_single.rs +++ b/shotover/src/transforms/redis/sink_single.rs @@ -1,3 +1,5 @@ +// TODO: refer to https://github.com/shotover/shotover-proxy/pull/645/files + use crate::frame::{Frame, RedisFrame}; use crate::message::{Message, Messages}; use crate::tcp;