diff --git a/docs/src/user-guide/observability.md b/docs/src/user-guide/observability.md index b26fb717d..60802b0c3 100644 --- a/docs/src/user-guide/observability.md +++ b/docs/src/user-guide/observability.md @@ -7,7 +7,6 @@ This interface will serve Prometheus metrics from `/metrics`. The following metr | `shotover_transform_total_count` | `transform` | [counter](#counter) | Counts the amount of times the `transform` is used | | `shotover_transform_failures_count` | `transform` | [counter](#counter) | Counts the amount of times the `transform` fails | | `shotover_transform_latency_seconds` | `transform` | [histogram](#histogram) | The latency for a message batch to go through the `transform` | -| `shotover_transform_pushed_latency_seconds`| `transform` | [histogram](#histogram) | The latency for a pushed message from the DB to go through the `transform`| | `shotover_chain_total_count` | `chain` | [counter](#counter) | Counts the amount of times `chain` is used | | `shotover_chain_failures_count` | `chain` | [counter](#counter) | Counts the amount of times `chain` fails | | `shotover_chain_latency_seconds` | `chain` | [histogram](#histogram) | The latency for running `chain` | diff --git a/shotover-proxy/tests/runner/observability_int_tests.rs b/shotover-proxy/tests/runner/observability_int_tests.rs index f3e679d69..4ea54a591 100644 --- a/shotover-proxy/tests/runner/observability_int_tests.rs +++ b/shotover-proxy/tests/runner/observability_int_tests.rs @@ -18,9 +18,6 @@ async fn test_metrics() { # TYPE shotover_sink_to_source_latency_seconds summary # TYPE shotover_transform_failures_count counter # TYPE shotover_transform_latency_seconds summary -# TYPE shotover_transform_pushed_failures_count counter -# TYPE shotover_transform_pushed_latency_seconds summary -# TYPE shotover_transform_pushed_total_count counter # TYPE shotover_transform_total_count counter shotover_available_connections_count{source="redis"} shotover_chain_failures_count{chain="redis"} @@ -68,30 +65,6 @@ shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.95"} shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.99"} shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.999"} shotover_transform_latency_seconds{transform="QueryCounter",quantile="1"} -shotover_transform_pushed_failures_count{transform="NullSink"} -shotover_transform_pushed_failures_count{transform="QueryCounter"} -shotover_transform_pushed_latency_seconds_count{transform="NullSink"} -shotover_transform_pushed_latency_seconds_count{transform="QueryCounter"} -shotover_transform_pushed_latency_seconds_sum{transform="NullSink"} -shotover_transform_pushed_latency_seconds_sum{transform="QueryCounter"} -shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0"} -shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.1"} -shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.5"} -shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.9"} -shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.95"} -shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.99"} -shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.999"} -shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="1"} -shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0"} -shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.1"} -shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.5"} -shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.9"} -shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.95"} -shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.99"} -shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.999"} -shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="1"} -shotover_transform_pushed_total_count{transform="NullSink"} -shotover_transform_pushed_total_count{transform="QueryCounter"} shotover_transform_total_count{transform="NullSink"} shotover_transform_total_count{transform="QueryCounter"} "#; diff --git a/shotover/src/codec/redis.rs b/shotover/src/codec/redis.rs index e56fc6479..33a4445e5 100644 --- a/shotover/src/codec/redis.rs +++ b/shotover/src/codec/redis.rs @@ -95,9 +95,6 @@ impl Decoder for RedisDecoder { type Item = Messages; type Error = CodecReadError; - // TODO: this duplicates a bunch of logic from sink_single.rs - // As soon as we remove `Transforms::transform_pushed` we can remove the duplication on the RedisSinkSingle side. - // Once thats done we will have pubsub support for both RedisSinkSingle AND RedisSinkCluster. Progress! fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { let received_at = Instant::now(); match decode_bytes_mut(src) diff --git a/shotover/src/server.rs b/shotover/src/server.rs index e33de446f..059190e41 100644 --- a/shotover/src/server.rs +++ b/shotover/src/server.rs @@ -187,9 +187,6 @@ impl TcpCodecListener { self.available_connections_gauge .set(self.limit_connections.available_permits() as f64); - let (pushed_messages_tx, pushed_messages_rx) = - tokio::sync::mpsc::unbounded_channel::(); - let client_details = stream .peer_addr() .map(|p| p.ip().to_string()) @@ -203,14 +200,11 @@ impl TcpCodecListener { }; let handler = Handler { - chain: self - .chain_builder - .build_with_pushed_messages(pushed_messages_tx, context), + chain: self.chain_builder.build(context), codec: self.codec.clone(), shutdown: Shutdown::new(self.trigger_shutdown_rx.clone()), tls: self.tls.clone(), timeout: self.timeout, - pushed_messages_rx, _permit: permit, }; @@ -300,7 +294,6 @@ pub struct Handler { shutdown: Shutdown, /// Timeout in seconds after which to kill an idle connection. No timeout means connections will never be timed out. timeout: Option, - pushed_messages_rx: UnboundedReceiver, _permit: OwnedSemaphorePermit, } @@ -691,17 +684,13 @@ impl Handler { // This will result in the task terminating. return Ok(()); } - Some(responses) = self.pushed_messages_rx.recv() => { - debug!("Received unrequested responses from destination {:?}", responses); - self.process_backward(local_addr, responses).await? - } () = force_run_chain.notified() => { let mut requests = vec!(); while let Ok(x) = in_rx.try_recv() { requests.extend(x); } debug!("A transform in the chain requested that a chain run occur, requests {:?}", requests); - self.process_forward(local_addr, &out_tx, requests).await? + self.process(local_addr, &out_tx, requests).await? }, requests = Self::receive_with_timeout(self.timeout, &mut in_rx, client_details) => { match requests { @@ -710,7 +699,7 @@ impl Handler { requests.extend(x); } debug!("Received requests from client {:?}", requests); - self.process_forward(local_addr, &out_tx, requests).await? + self.process(local_addr, &out_tx, requests).await? } None => { // Either we timed out the connection or the client disconnected, so terminate this connection @@ -733,7 +722,7 @@ impl Handler { Ok(()) } - async fn process_forward( + async fn process( &mut self, local_addr: SocketAddr, out_tx: &mpsc::UnboundedSender, @@ -773,18 +762,6 @@ impl Handler { } } } - - async fn process_backward( - &mut self, - local_addr: SocketAddr, - responses: Messages, - ) -> Result { - let wrapper = Wrapper::new_with_addr(responses, local_addr); - - self.chain.process_request_rev(wrapper).await.context( - "Chain failed to receive pushed messages/events, the connection will now be closed.", - ) - } } /// Listens for the server shutdown signal. diff --git a/shotover/src/transforms/chain.rs b/shotover/src/transforms/chain.rs index 63de8d756..44c12dc57 100644 --- a/shotover/src/transforms/chain.rs +++ b/shotover/src/transforms/chain.rs @@ -173,21 +173,6 @@ impl TransformChain { self.chain_latency_seconds.record(start.elapsed()); result } - - pub async fn process_request_rev(&mut self, mut wrapper: Wrapper<'_>) -> Result { - let start = Instant::now(); - wrapper.reset_rev(&mut self.chain); - - self.chain_batch_size.record(wrapper.requests.len() as f64); - let result = wrapper.call_next_transform_pushed().await; - self.chain_total.increment(1); - if result.is_err() { - self.chain_failures.increment(1); - } - - self.chain_latency_seconds.record(start.elapsed()); - result - } } pub struct TransformAndMetrics { @@ -195,9 +180,6 @@ pub struct TransformAndMetrics { pub transform_total: Counter, pub transform_failures: Counter, pub transform_latency: Histogram, - pub transform_pushed_total: Counter, - pub transform_pushed_failures: Counter, - pub transform_pushed_latency: Histogram, } impl TransformAndMetrics { @@ -208,9 +190,6 @@ impl TransformAndMetrics { transform_total: Counter::noop(), transform_failures: Counter::noop(), transform_latency: Histogram::noop(), - transform_pushed_total: Counter::noop(), - transform_pushed_failures: Counter::noop(), - transform_pushed_latency: Histogram::noop(), } } } @@ -225,12 +204,6 @@ pub struct TransformBuilderAndMetrics { transform_failures: Counter, #[derivative(Debug = "ignore")] transform_latency: Histogram, - #[derivative(Debug = "ignore")] - transform_pushed_total: Counter, - #[derivative(Debug = "ignore")] - transform_pushed_failures: Counter, - #[derivative(Debug = "ignore")] - transform_pushed_latency: Histogram, } impl TransformBuilderAndMetrics { @@ -240,9 +213,6 @@ impl TransformBuilderAndMetrics { transform_total: self.transform_total.clone(), transform_failures: self.transform_failures.clone(), transform_latency: self.transform_latency.clone(), - transform_pushed_total: self.transform_pushed_total.clone(), - transform_pushed_failures: self.transform_pushed_failures.clone(), - transform_pushed_latency: self.transform_pushed_latency.clone(), } } } @@ -268,9 +238,6 @@ impl TransformChainBuilder { transform_total: counter!("shotover_transform_total_count", "transform" => builder.get_name()), transform_failures: counter!("shotover_transform_failures_count", "transform" => builder.get_name()), transform_latency: histogram!("shotover_transform_latency_seconds", "transform" => builder.get_name()), - transform_pushed_total: counter!("shotover_transform_pushed_total_count", "transform" => builder.get_name()), - transform_pushed_failures: counter!("shotover_transform_pushed_failures_count", "transform" => builder.get_name()), - transform_pushed_latency: histogram!("shotover_transform_pushed_latency_seconds", "transform" => builder.get_name()), builder, } ).collect(); @@ -403,7 +370,7 @@ impl TransformChainBuilder { } } - /// Clone the chain while adding a producer for the pushed messages channel + /// Build the chain pub fn build(&self, context: TransformContextBuilder) -> TransformChain { let chain = self .chain @@ -424,38 +391,6 @@ impl TransformChainBuilder { ), } } - - /// Clone the chain while adding a producer for the pushed messages channel - pub fn build_with_pushed_messages( - &self, - pushed_messages_tx: mpsc::UnboundedSender, - context: TransformContextBuilder, - ) -> TransformChain { - let chain = self - .chain - .iter() - .map(|x| { - let mut transform = x.build(context.clone()); - transform - .transform - .set_pushed_messages_tx(pushed_messages_tx.clone()); - transform - }) - .collect(); - - TransformChain { - name: self.name, - chain, - chain_total: self.chain_total.clone(), - chain_failures: self.chain_failures.clone(), - chain_batch_size: self.chain_batch_size.clone(), - chain_latency_seconds: histogram!( - "shotover_chain_latency_seconds", - "chain" => self.name, - "client_details" => context.client_details - ), - } - } } #[cfg(test)] diff --git a/shotover/src/transforms/mod.rs b/shotover/src/transforms/mod.rs index b4f913e6c..bc9cea4bb 100644 --- a/shotover/src/transforms/mod.rs +++ b/shotover/src/transforms/mod.rs @@ -191,33 +191,6 @@ impl<'a> Wrapper<'a> { result } - pub async fn call_next_transform_pushed(mut self) -> Result { - let TransformAndMetrics { - transform, - transform_pushed_total, - transform_pushed_failures, - transform_pushed_latency, - .. - } = match self.transforms.next() { - Some(transform) => transform, - None => return Ok(self.requests), - }; - - let transform_name = transform.get_name(); - - let start = Instant::now(); - let result = transform - .transform_pushed(self) - .await - .map_err(|e| e.context(anyhow!("{transform_name} transform failed"))); - transform_pushed_total.increment(1); - if result.is_err() { - transform_pushed_failures.increment(1); - } - transform_pushed_latency.record(start.elapsed()); - result - } - pub fn clone_requests_into_hashmap(&self, destination: &mut MessageIdMap) { for request in &self.requests { destination.insert(request.id(), request.clone()); @@ -337,27 +310,6 @@ pub trait Transform: Send { /// You can have have a transform that is both non-terminating and a sink. async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result; - /// TODO: This method should be removed and integrated with `Transform::transform` once we properly support out of order protocols. - /// - /// This method should be should be implemented by your transform if it is required to process pushed messages (typically events - /// or messages that your source is subscribed to. The wrapper object contains the queries/frames - /// in a [`Vec(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result { - let response = requests_wrapper.call_next_transform_pushed().await?; - Ok(response) - } - fn get_name(&self) -> &'static str; fn set_pushed_messages_tx(&mut self, _pushed_messages_tx: mpsc::UnboundedSender) {}