Skip to content

Commit

Permalink
Remove Transform::transform_pushed
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 13, 2024
1 parent 1ba1c48 commit 51881af
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 124 deletions.
24 changes: 5 additions & 19 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -714,17 +714,17 @@ impl<C: CodecBuilder + 'static> Handler<C> {
requests.extend(x);
}
debug!("Received requests from client {:?}", requests);
self.process_forward(client_details, local_addr, &out_tx, requests).await?
self.process(client_details, local_addr, &out_tx, requests).await?
}
None => {
// Either we timed out the connection or the client disconnected, so terminate this connection
return Ok(())
}
}
},
Some(responses) = self.pushed_messages_rx.recv() => {
debug!("Received unrequested responses from destination {:?}", responses);
self.process_backward(client_details, local_addr, responses).await?
Some(_) = self.pushed_messages_rx.recv() => {
debug!("responses pending from destination");
self.process(client_details, local_addr, &out_tx, vec!()).await?
}
_ = self.shutdown.recv() => {
// If a shutdown signal is received, return from `run`.
Expand All @@ -744,7 +744,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
Ok(())
}

async fn process_forward(
async fn process(
&mut self,
client_details: &str,
local_addr: SocketAddr,
Expand Down Expand Up @@ -786,20 +786,6 @@ impl<C: CodecBuilder + 'static> Handler<C> {
}
}
}

async fn process_backward(
&mut self,
client_details: &str,
local_addr: SocketAddr,
responses: Messages,
) -> Result<Messages> {
let wrapper =
Wrapper::new_with_client_details(responses, client_details.to_owned(), local_addr);

self.chain.process_request_rev(wrapper).await.context(
"Chain failed to receive pushed messages/events, the connection will now be closed.",
)
}
}

/// Listens for the server shutdown signal.
Expand Down
26 changes: 8 additions & 18 deletions shotover/src/transforms/cassandra/peers_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,32 +78,22 @@ impl Transform for CassandraPeersRewrite {
let mut responses = requests_wrapper.call_next_transform().await?;

for response in &mut responses {
if let Some(id) = response.request_id() {
let name_list = self.column_names_to_rewrite.remove(&id).unwrap();
rewrite_port(response, &name_list, self.port);
}
}

Ok(responses)
}

async fn transform_pushed<'a>(
&'a mut self,
mut requests_wrapper: Wrapper<'a>,
) -> Result<Messages> {
for message in &mut requests_wrapper.requests {
if let Some(Frame::Cassandra(frame)) = message.frame() {
if let Some(Frame::Cassandra(frame)) = response.frame() {
if let Event(ServerEvent::StatusChange(StatusChange { addr, .. })) =
&mut frame.operation
{
addr.set_port(self.port);
message.invalidate_cache();
response.invalidate_cache();
}
}

if let Some(id) = response.request_id() {
let name_list = self.column_names_to_rewrite.remove(&id).unwrap();
rewrite_port(response, &name_list, self.port);
}
}

let response = requests_wrapper.call_next_transform_pushed().await?;
Ok(response)
Ok(responses)
}
}

Expand Down
42 changes: 19 additions & 23 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,25 @@ impl CassandraSinkCluster {
}
}

// remove topology and status change messages since they contain references to real cluster IPs
// TODO: we should be rewriting them not just deleting them.
responses.retain_mut(|message| {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Event(event),
..
})) = message.frame()
{
match event {
ServerEvent::TopologyChange(_) => false,
ServerEvent::StatusChange(_) => false,
ServerEvent::SchemaChange(_) => true,
_ => unreachable!(),
}
} else {
true
}
});

Ok(responses)
}

Expand Down Expand Up @@ -721,29 +740,6 @@ impl Transform for CassandraSinkCluster {
self.send_message(requests_wrapper.requests).await
}

async fn transform_pushed<'a>(
&'a mut self,
mut requests_wrapper: Wrapper<'a>,
) -> Result<Messages> {
requests_wrapper.requests.retain_mut(|message| {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Event(event),
..
})) = message.frame()
{
match event {
ServerEvent::TopologyChange(_) => false,
ServerEvent::StatusChange(_) => false,
ServerEvent::SchemaChange(_) => true,
_ => unreachable!(),
}
} else {
true
}
});
requests_wrapper.call_next_transform_pushed().await
}

fn set_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender<Messages>) {
self.connection_factory
.set_pushed_messages_tx(pushed_messages_tx);
Expand Down
16 changes: 0 additions & 16 deletions shotover/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,22 +171,6 @@ impl TransformChain {
histogram!("shotover_chain_latency_seconds", "chain" => self.name, "client_details" => client_details).record(start.elapsed());
result
}

pub async fn process_request_rev(&mut self, mut wrapper: Wrapper<'_>) -> Result<Messages> {
let start = Instant::now();
wrapper.reset_rev(&mut self.chain);

self.chain_batch_size.record(wrapper.requests.len() as f64);
let client_details = wrapper.client_details.to_owned();
let result = wrapper.call_next_transform_pushed().await;
self.chain_total.increment(1);
if result.is_err() {
self.chain_failures.increment(1);
}

histogram!("shotover_chain_latency_seconds", "chain" => self.name, "client_details" => client_details).record(start.elapsed());
result
}
}

pub struct TransformAndMetrics {
Expand Down
48 changes: 0 additions & 48 deletions shotover/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,33 +170,6 @@ impl<'a> Wrapper<'a> {
result
}

pub async fn call_next_transform_pushed(mut self) -> Result<Messages> {
let TransformAndMetrics {
transform,
transform_pushed_total,
transform_pushed_failures,
transform_pushed_latency,
..
} = match self.transforms.next() {
Some(transform) => transform,
None => return Ok(self.requests),
};

let transform_name = transform.get_name();

let start = Instant::now();
let result = transform
.transform_pushed(self)
.await
.map_err(|e| e.context(anyhow!("{transform_name} transform failed")));
transform_pushed_total.increment(1);
if result.is_err() {
transform_pushed_failures.increment(1);
}
transform_pushed_latency.record(start.elapsed());
result
}

pub fn clone_requests_into_hashmap(&self, destination: &mut MessageIdMap<Message>) {
for request in &self.requests {
destination.insert(request.id(), request.clone());
Expand Down Expand Up @@ -333,27 +306,6 @@ pub trait Transform: Send {
/// You can have have a transform that is both non-terminating and a sink.
async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages>;

/// TODO: This method should be removed and integrated with `Transform::transform` once we properly support out of order protocols.
///
/// This method should be should be implemented by your transform if it is required to process pushed messages (typically events
/// or messages that your source is subscribed to. The wrapper object contains the queries/frames
/// in a [`Vec<Message`](crate::message::Message).
///
/// This transform method is not the same request/response model as the other transform method.
/// This method processes one pushed message before sending it in reverse on the chain back to the source.
///
/// You can modify the messages in the wrapper struct to achieve your own designs. Your transform can
/// also modify the response from `requests_wrapper.call_next_transform_pushed` if it needs to. As long as the message
/// carries on through the chain, it will function correctly. You are able to add or remove messages as this method is not expecting
/// request/response pairs.
///
/// # Invariants
/// * _Non-terminating_ - Your `transform_pushed` method should not be terminating as the messages should get passed back to the source, where they will terminate.
async fn transform_pushed<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages> {
let response = requests_wrapper.call_next_transform_pushed().await?;
Ok(response)
}

fn get_name(&self) -> &'static str;

fn set_pushed_messages_tx(&mut self, _pushed_messages_tx: mpsc::UnboundedSender<Messages>) {}
Expand Down
2 changes: 2 additions & 0 deletions shotover/src/transforms/redis/sink_single.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// TODO: refer to https://github.com/shotover/shotover-proxy/pull/645/files

use crate::frame::{Frame, RedisFrame};
use crate::message::{Message, Messages};
use crate::tcp;
Expand Down

0 comments on commit 51881af

Please sign in to comment.