Skip to content

Commit

Permalink
Remove Transform::transform_pushed
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 18, 2024
1 parent 2146fa3 commit 40c2994
Show file tree
Hide file tree
Showing 8 changed files with 32 additions and 211 deletions.
1 change: 0 additions & 1 deletion docs/src/user-guide/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ This interface will serve Prometheus metrics from `/metrics`. The following metr
| `shotover_transform_total_count` | `transform` | [counter](#counter) | Counts the amount of times the `transform` is used |
| `shotover_transform_failures_count` | `transform` | [counter](#counter) | Counts the amount of times the `transform` fails |
| `shotover_transform_latency_seconds` | `transform` | [histogram](#histogram) | The latency for a message batch to go through the `transform` |
| `shotover_transform_pushed_latency_seconds`| `transform` | [histogram](#histogram) | The latency for a pushed message from the DB to go through the `transform`|
| `shotover_chain_total_count` | `chain` | [counter](#counter) | Counts the amount of times `chain` is used |
| `shotover_chain_failures_count` | `chain` | [counter](#counter) | Counts the amount of times `chain` fails |
| `shotover_chain_latency_seconds` | `chain` | [histogram](#histogram) | The latency for running `chain` |
Expand Down
27 changes: 0 additions & 27 deletions shotover-proxy/tests/runner/observability_int_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ async fn test_metrics() {
# TYPE shotover_sink_to_source_latency_seconds summary
# TYPE shotover_transform_failures_count counter
# TYPE shotover_transform_latency_seconds summary
# TYPE shotover_transform_pushed_failures_count counter
# TYPE shotover_transform_pushed_latency_seconds summary
# TYPE shotover_transform_pushed_total_count counter
# TYPE shotover_transform_total_count counter
shotover_available_connections_count{source="redis"}
shotover_chain_failures_count{chain="redis"}
Expand Down Expand Up @@ -68,30 +65,6 @@ shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.95"}
shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.99"}
shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.999"}
shotover_transform_latency_seconds{transform="QueryCounter",quantile="1"}
shotover_transform_pushed_failures_count{transform="NullSink"}
shotover_transform_pushed_failures_count{transform="QueryCounter"}
shotover_transform_pushed_latency_seconds_count{transform="NullSink"}
shotover_transform_pushed_latency_seconds_count{transform="QueryCounter"}
shotover_transform_pushed_latency_seconds_sum{transform="NullSink"}
shotover_transform_pushed_latency_seconds_sum{transform="QueryCounter"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.1"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.5"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.9"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.95"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.99"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.999"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="1"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.1"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.5"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.9"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.95"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.99"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.999"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="1"}
shotover_transform_pushed_total_count{transform="NullSink"}
shotover_transform_pushed_total_count{transform="QueryCounter"}
shotover_transform_total_count{transform="NullSink"}
shotover_transform_total_count{transform="QueryCounter"}
"#;
Expand Down
3 changes: 0 additions & 3 deletions shotover/src/codec/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ impl Decoder for RedisDecoder {
type Item = Messages;
type Error = CodecReadError;

// TODO: this duplicates a bunch of logic from sink_single.rs
// As soon as we remove `Transforms::transform_pushed` we can remove the duplication on the RedisSinkSingle side.
// Once thats done we will have pubsub support for both RedisSinkSingle AND RedisSinkCluster. Progress!
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let received_at = Instant::now();
match decode_mut(src)
Expand Down
33 changes: 4 additions & 29 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,23 +187,17 @@ impl<C: CodecBuilder + 'static> TcpCodecListener<C> {
self.available_connections_gauge
.set(self.limit_connections.available_permits() as f64);

let (pushed_messages_tx, pushed_messages_rx) =
tokio::sync::mpsc::unbounded_channel::<Messages>();

let force_run_chain = Arc::new(Notify::new());
let context = TransformContextBuilder {
force_run_chain: force_run_chain.clone(),
};

let handler = Handler {
chain: self
.chain_builder
.build_with_pushed_messages(pushed_messages_tx, context),
chain: self.chain_builder.build(context),
codec: self.codec.clone(),
shutdown: Shutdown::new(self.trigger_shutdown_rx.clone()),
tls: self.tls.clone(),
timeout: self.timeout,
pushed_messages_rx,
_permit: permit,
};

Expand Down Expand Up @@ -290,7 +284,6 @@ pub struct Handler<C: CodecBuilder> {
shutdown: Shutdown,
/// Timeout in seconds after which to kill an idle connection. No timeout means connections will never be timed out.
timeout: Option<Duration>,
pushed_messages_rx: UnboundedReceiver<Messages>,
_permit: OwnedSemaphorePermit,
}

Expand Down Expand Up @@ -730,17 +723,13 @@ impl<C: CodecBuilder + 'static> Handler<C> {
// This will result in the task terminating.
return Ok(());
}
Some(responses) = self.pushed_messages_rx.recv() => {
debug!("Received unrequested responses from destination {:?}", responses);
self.process_backward(client_details, local_addr, responses).await?
}
() = force_run_chain.notified() => {
let mut requests = vec!();
while let Ok(x) = in_rx.try_recv() {
requests.extend(x);
}
debug!("A transform in the chain requested that a chain run occur, requests {:?}", requests);
self.process_forward(client_details, local_addr, &out_tx, requests).await?
self.process(client_details, local_addr, &out_tx, requests).await?
},
requests = Self::receive_with_timeout(self.timeout, &mut in_rx, client_details) => {
match requests {
Expand All @@ -749,7 +738,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
requests.extend(x);
}
debug!("Received requests from client {:?}", requests);
self.process_forward(client_details, local_addr, &out_tx, requests).await?
self.process(client_details, local_addr, &out_tx, requests).await?
}
None => {
// Either we timed out the connection or the client disconnected, so terminate this connection
Expand All @@ -770,7 +759,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
Ok(())
}

async fn process_forward(
async fn process(
&mut self,
client_details: &str,
local_addr: SocketAddr,
Expand Down Expand Up @@ -812,20 +801,6 @@ impl<C: CodecBuilder + 'static> Handler<C> {
}
}
}

async fn process_backward(
&mut self,
client_details: &str,
local_addr: SocketAddr,
responses: Messages,
) -> Result<Messages> {
let wrapper =
Wrapper::new_with_client_details(responses, client_details.to_owned(), local_addr);

self.chain.process_request_rev(wrapper).await.context(
"Chain failed to receive pushed messages/events, the connection will now be closed.",
)
}
}

/// Listens for the server shutdown signal.
Expand Down
26 changes: 8 additions & 18 deletions shotover/src/transforms/cassandra/peers_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,32 +80,22 @@ impl Transform for CassandraPeersRewrite {
let mut responses = requests_wrapper.call_next_transform().await?;

for response in &mut responses {
if let Some(id) = response.request_id() {
let name_list = self.column_names_to_rewrite.remove(&id).unwrap();
rewrite_port(response, &name_list, self.port);
}
}

Ok(responses)
}

async fn transform_pushed<'a>(
&'a mut self,
mut requests_wrapper: Wrapper<'a>,
) -> Result<Messages> {
for message in &mut requests_wrapper.requests {
if let Some(Frame::Cassandra(frame)) = message.frame() {
if let Some(Frame::Cassandra(frame)) = response.frame() {
if let Event(ServerEvent::StatusChange(StatusChange { addr, .. })) =
&mut frame.operation
{
addr.set_port(self.port);
message.invalidate_cache();
response.invalidate_cache();
}
}

if let Some(id) = response.request_id() {
let name_list = self.column_names_to_rewrite.remove(&id).unwrap();
rewrite_port(response, &name_list, self.port);
}
}

let response = requests_wrapper.call_next_transform_pushed().await?;
Ok(response)
Ok(responses)
}
}

Expand Down
42 changes: 19 additions & 23 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,25 @@ impl CassandraSinkCluster {
}
}

// remove topology and status change messages since they contain references to real cluster IPs
// TODO: we should be rewriting them not just deleting them.
responses.retain_mut(|message| {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Event(event),
..
})) = message.frame()
{
match event {
ServerEvent::TopologyChange(_) => false,
ServerEvent::StatusChange(_) => false,
ServerEvent::SchemaChange(_) => true,
_ => unreachable!(),
}
} else {
true
}
});

Ok(responses)
}

Expand Down Expand Up @@ -722,29 +741,6 @@ impl Transform for CassandraSinkCluster {
self.send_message(requests_wrapper.requests).await
}

async fn transform_pushed<'a>(
&'a mut self,
mut requests_wrapper: Wrapper<'a>,
) -> Result<Messages> {
requests_wrapper.requests.retain_mut(|message| {
if let Some(Frame::Cassandra(CassandraFrame {
operation: CassandraOperation::Event(event),
..
})) = message.frame()
{
match event {
ServerEvent::TopologyChange(_) => false,
ServerEvent::StatusChange(_) => false,
ServerEvent::SchemaChange(_) => true,
_ => unreachable!(),
}
} else {
true
}
});
requests_wrapper.call_next_transform_pushed().await
}

fn set_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender<Messages>) {
self.connection_factory
.set_pushed_messages_tx(pushed_messages_tx);
Expand Down
63 changes: 1 addition & 62 deletions shotover/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,32 +173,13 @@ impl TransformChain {
histogram!("shotover_chain_latency_seconds", "chain" => self.name, "client_details" => client_details).record(start.elapsed());
result
}

pub async fn process_request_rev(&mut self, mut wrapper: Wrapper<'_>) -> Result<Messages> {
let start = Instant::now();
wrapper.reset_rev(&mut self.chain);

self.chain_batch_size.record(wrapper.requests.len() as f64);
let client_details = wrapper.client_details.to_owned();
let result = wrapper.call_next_transform_pushed().await;
self.chain_total.increment(1);
if result.is_err() {
self.chain_failures.increment(1);
}

histogram!("shotover_chain_latency_seconds", "chain" => self.name, "client_details" => client_details).record(start.elapsed());
result
}
}

pub struct TransformAndMetrics {
pub transform: Box<dyn Transform>,
pub transform_total: Counter,
pub transform_failures: Counter,
pub transform_latency: Histogram,
pub transform_pushed_total: Counter,
pub transform_pushed_failures: Counter,
pub transform_pushed_latency: Histogram,
}

impl TransformAndMetrics {
Expand All @@ -209,9 +190,6 @@ impl TransformAndMetrics {
transform_total: Counter::noop(),
transform_failures: Counter::noop(),
transform_latency: Histogram::noop(),
transform_pushed_total: Counter::noop(),
transform_pushed_failures: Counter::noop(),
transform_pushed_latency: Histogram::noop(),
}
}
}
Expand All @@ -226,12 +204,6 @@ pub struct TransformBuilderAndMetrics {
transform_failures: Counter,
#[derivative(Debug = "ignore")]
transform_latency: Histogram,
#[derivative(Debug = "ignore")]
transform_pushed_total: Counter,
#[derivative(Debug = "ignore")]
transform_pushed_failures: Counter,
#[derivative(Debug = "ignore")]
transform_pushed_latency: Histogram,
}

impl TransformBuilderAndMetrics {
Expand All @@ -241,9 +213,6 @@ impl TransformBuilderAndMetrics {
transform_total: self.transform_total.clone(),
transform_failures: self.transform_failures.clone(),
transform_latency: self.transform_latency.clone(),
transform_pushed_total: self.transform_pushed_total.clone(),
transform_pushed_failures: self.transform_pushed_failures.clone(),
transform_pushed_latency: self.transform_pushed_latency.clone(),
}
}
}
Expand All @@ -269,9 +238,6 @@ impl TransformChainBuilder {
transform_total: counter!("shotover_transform_total_count", "transform" => builder.get_name()),
transform_failures: counter!("shotover_transform_failures_count", "transform" => builder.get_name()),
transform_latency: histogram!("shotover_transform_latency_seconds", "transform" => builder.get_name()),
transform_pushed_total: counter!("shotover_transform_pushed_total_count", "transform" => builder.get_name()),
transform_pushed_failures: counter!("shotover_transform_pushed_failures_count", "transform" => builder.get_name()),
transform_pushed_latency: histogram!("shotover_transform_pushed_latency_seconds", "transform" => builder.get_name()),
builder,
}
).collect();
Expand Down Expand Up @@ -403,7 +369,7 @@ impl TransformChainBuilder {
}
}

/// Clone the chain while adding a producer for the pushed messages channel
/// Build the chain
pub fn build(&self, context: TransformContextBuilder) -> TransformChain {
let chain = self
.chain
Expand All @@ -419,33 +385,6 @@ impl TransformChainBuilder {
chain_batch_size: self.chain_batch_size.clone(),
}
}

/// Clone the chain while adding a producer for the pushed messages channel
pub fn build_with_pushed_messages(
&self,
pushed_messages_tx: mpsc::UnboundedSender<Messages>,
context: TransformContextBuilder,
) -> TransformChain {
let chain = self
.chain
.iter()
.map(|x| {
let mut transform = x.build(context.clone());
transform
.transform
.set_pushed_messages_tx(pushed_messages_tx.clone());
transform
})
.collect();

TransformChain {
name: self.name,
chain,
chain_total: self.chain_total.clone(),
chain_failures: self.chain_failures.clone(),
chain_batch_size: self.chain_batch_size.clone(),
}
}
}

#[cfg(test)]
Expand Down
Loading

0 comments on commit 40c2994

Please sign in to comment.