Skip to content

Commit

Permalink
Remove Transform::transform_pushed
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Apr 4, 2024
1 parent 00e3229 commit 3348474
Show file tree
Hide file tree
Showing 6 changed files with 5 additions and 170 deletions.
1 change: 0 additions & 1 deletion docs/src/user-guide/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ This interface will serve Prometheus metrics from `/metrics`. The following metr
| `shotover_transform_total_count` | `transform` | [counter](#counter) | Counts the amount of times the `transform` is used |
| `shotover_transform_failures_count` | `transform` | [counter](#counter) | Counts the amount of times the `transform` fails |
| `shotover_transform_latency_seconds` | `transform` | [histogram](#histogram) | The latency for a message batch to go through the `transform` |
| `shotover_transform_pushed_latency_seconds`| `transform` | [histogram](#histogram) | The latency for a pushed message from the DB to go through the `transform`|
| `shotover_chain_total_count` | `chain` | [counter](#counter) | Counts the amount of times `chain` is used |
| `shotover_chain_failures_count` | `chain` | [counter](#counter) | Counts the amount of times `chain` fails |
| `shotover_chain_latency_seconds` | `chain` | [histogram](#histogram) | The latency for running `chain` |
Expand Down
27 changes: 0 additions & 27 deletions shotover-proxy/tests/runner/observability_int_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ async fn test_metrics() {
# TYPE shotover_sink_to_source_latency_seconds summary
# TYPE shotover_transform_failures_count counter
# TYPE shotover_transform_latency_seconds summary
# TYPE shotover_transform_pushed_failures_count counter
# TYPE shotover_transform_pushed_latency_seconds summary
# TYPE shotover_transform_pushed_total_count counter
# TYPE shotover_transform_total_count counter
shotover_available_connections_count{source="redis"}
shotover_chain_failures_count{chain="redis"}
Expand Down Expand Up @@ -68,30 +65,6 @@ shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.95"}
shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.99"}
shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.999"}
shotover_transform_latency_seconds{transform="QueryCounter",quantile="1"}
shotover_transform_pushed_failures_count{transform="NullSink"}
shotover_transform_pushed_failures_count{transform="QueryCounter"}
shotover_transform_pushed_latency_seconds_count{transform="NullSink"}
shotover_transform_pushed_latency_seconds_count{transform="QueryCounter"}
shotover_transform_pushed_latency_seconds_sum{transform="NullSink"}
shotover_transform_pushed_latency_seconds_sum{transform="QueryCounter"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.1"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.5"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.9"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.95"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.99"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.999"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="1"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.1"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.5"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.9"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.95"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.99"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.999"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="1"}
shotover_transform_pushed_total_count{transform="NullSink"}
shotover_transform_pushed_total_count{transform="QueryCounter"}
shotover_transform_total_count{transform="NullSink"}
shotover_transform_total_count{transform="QueryCounter"}
"#;
Expand Down
3 changes: 0 additions & 3 deletions shotover/src/codec/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ impl Decoder for RedisDecoder {
type Item = Messages;
type Error = CodecReadError;

// TODO: this duplicates a bunch of logic from sink_single.rs
// As soon as we remove `Transforms::transform_pushed` we can remove the duplication on the RedisSinkSingle side.
// Once thats done we will have pubsub support for both RedisSinkSingle AND RedisSinkCluster. Progress!
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let received_at = Instant::now();
match decode_bytes_mut(src)
Expand Down
33 changes: 4 additions & 29 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,23 +187,17 @@ impl<C: CodecBuilder + 'static> TcpCodecListener<C> {
self.available_connections_gauge
.set(self.limit_connections.available_permits() as f64);

let (pushed_messages_tx, pushed_messages_rx) =
tokio::sync::mpsc::unbounded_channel::<Messages>();

let force_run_chain = Arc::new(Notify::new());
let context = TransformContextBuilder {
force_run_chain: force_run_chain.clone(),
};

let handler = Handler {
chain: self
.chain_builder
.build_with_pushed_messages(pushed_messages_tx, context),
chain: self.chain_builder.build(context),
codec: self.codec.clone(),
shutdown: Shutdown::new(self.trigger_shutdown_rx.clone()),
tls: self.tls.clone(),
timeout: self.timeout,
pushed_messages_rx,
_permit: permit,
};

Expand Down Expand Up @@ -290,7 +284,6 @@ pub struct Handler<C: CodecBuilder> {
shutdown: Shutdown,
/// Timeout in seconds after which to kill an idle connection. No timeout means connections will never be timed out.
timeout: Option<Duration>,
pushed_messages_rx: UnboundedReceiver<Messages>,
_permit: OwnedSemaphorePermit,
}

Expand Down Expand Up @@ -686,17 +679,13 @@ impl<C: CodecBuilder + 'static> Handler<C> {
// This will result in the task terminating.
return Ok(());
}
Some(responses) = self.pushed_messages_rx.recv() => {
debug!("Received unrequested responses from destination {:?}", responses);
self.process_backward(client_details, local_addr, responses).await?
}
() = force_run_chain.notified() => {
let mut requests = vec!();
while let Ok(x) = in_rx.try_recv() {
requests.extend(x);
}
debug!("A transform in the chain requested that a chain run occur, requests {:?}", requests);
self.process_forward(client_details, local_addr, &out_tx, requests).await?
self.process(client_details, local_addr, &out_tx, requests).await?
},
requests = Self::receive_with_timeout(self.timeout, &mut in_rx, client_details) => {
match requests {
Expand All @@ -705,7 +694,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
requests.extend(x);
}
debug!("Received requests from client {:?}", requests);
self.process_forward(client_details, local_addr, &out_tx, requests).await?
self.process(client_details, local_addr, &out_tx, requests).await?
}
None => {
// Either we timed out the connection or the client disconnected, so terminate this connection
Expand All @@ -728,7 +717,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
Ok(())
}

async fn process_forward(
async fn process(
&mut self,
client_details: &str,
local_addr: SocketAddr,
Expand Down Expand Up @@ -770,20 +759,6 @@ impl<C: CodecBuilder + 'static> Handler<C> {
}
}
}

async fn process_backward(
&mut self,
client_details: &str,
local_addr: SocketAddr,
responses: Messages,
) -> Result<Messages> {
let wrapper =
Wrapper::new_with_client_details(responses, client_details.to_owned(), local_addr);

self.chain.process_request_rev(wrapper).await.context(
"Chain failed to receive pushed messages/events, the connection will now be closed.",
)
}
}

/// Listens for the server shutdown signal.
Expand Down
63 changes: 1 addition & 62 deletions shotover/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,32 +173,13 @@ impl TransformChain {
histogram!("shotover_chain_latency_seconds", "chain" => self.name, "client_details" => client_details).record(start.elapsed());
result
}

pub async fn process_request_rev(&mut self, mut wrapper: Wrapper<'_>) -> Result<Messages> {
let start = Instant::now();
wrapper.reset_rev(&mut self.chain);

self.chain_batch_size.record(wrapper.requests.len() as f64);
let client_details = wrapper.client_details.to_owned();
let result = wrapper.call_next_transform_pushed().await;
self.chain_total.increment(1);
if result.is_err() {
self.chain_failures.increment(1);
}

histogram!("shotover_chain_latency_seconds", "chain" => self.name, "client_details" => client_details).record(start.elapsed());
result
}
}

pub struct TransformAndMetrics {
pub transform: Box<dyn Transform>,
pub transform_total: Counter,
pub transform_failures: Counter,
pub transform_latency: Histogram,
pub transform_pushed_total: Counter,
pub transform_pushed_failures: Counter,
pub transform_pushed_latency: Histogram,
}

impl TransformAndMetrics {
Expand All @@ -209,9 +190,6 @@ impl TransformAndMetrics {
transform_total: Counter::noop(),
transform_failures: Counter::noop(),
transform_latency: Histogram::noop(),
transform_pushed_total: Counter::noop(),
transform_pushed_failures: Counter::noop(),
transform_pushed_latency: Histogram::noop(),
}
}
}
Expand All @@ -226,12 +204,6 @@ pub struct TransformBuilderAndMetrics {
transform_failures: Counter,
#[derivative(Debug = "ignore")]
transform_latency: Histogram,
#[derivative(Debug = "ignore")]
transform_pushed_total: Counter,
#[derivative(Debug = "ignore")]
transform_pushed_failures: Counter,
#[derivative(Debug = "ignore")]
transform_pushed_latency: Histogram,
}

impl TransformBuilderAndMetrics {
Expand All @@ -241,9 +213,6 @@ impl TransformBuilderAndMetrics {
transform_total: self.transform_total.clone(),
transform_failures: self.transform_failures.clone(),
transform_latency: self.transform_latency.clone(),
transform_pushed_total: self.transform_pushed_total.clone(),
transform_pushed_failures: self.transform_pushed_failures.clone(),
transform_pushed_latency: self.transform_pushed_latency.clone(),
}
}
}
Expand All @@ -269,9 +238,6 @@ impl TransformChainBuilder {
transform_total: counter!("shotover_transform_total_count", "transform" => builder.get_name()),
transform_failures: counter!("shotover_transform_failures_count", "transform" => builder.get_name()),
transform_latency: histogram!("shotover_transform_latency_seconds", "transform" => builder.get_name()),
transform_pushed_total: counter!("shotover_transform_pushed_total_count", "transform" => builder.get_name()),
transform_pushed_failures: counter!("shotover_transform_pushed_failures_count", "transform" => builder.get_name()),
transform_pushed_latency: histogram!("shotover_transform_pushed_latency_seconds", "transform" => builder.get_name()),
builder,
}
).collect();
Expand Down Expand Up @@ -403,7 +369,7 @@ impl TransformChainBuilder {
}
}

/// Clone the chain while adding a producer for the pushed messages channel
/// Build the chain
pub fn build(&self, context: TransformContextBuilder) -> TransformChain {
let chain = self
.chain
Expand All @@ -419,33 +385,6 @@ impl TransformChainBuilder {
chain_batch_size: self.chain_batch_size.clone(),
}
}

/// Clone the chain while adding a producer for the pushed messages channel
pub fn build_with_pushed_messages(
&self,
pushed_messages_tx: mpsc::UnboundedSender<Messages>,
context: TransformContextBuilder,
) -> TransformChain {
let chain = self
.chain
.iter()
.map(|x| {
let mut transform = x.build(context.clone());
transform
.transform
.set_pushed_messages_tx(pushed_messages_tx.clone());
transform
})
.collect();

TransformChain {
name: self.name,
chain,
chain_total: self.chain_total.clone(),
chain_failures: self.chain_failures.clone(),
chain_batch_size: self.chain_batch_size.clone(),
}
}
}

#[cfg(test)]
Expand Down
48 changes: 0 additions & 48 deletions shotover/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,33 +193,6 @@ impl<'a> Wrapper<'a> {
result
}

pub async fn call_next_transform_pushed(mut self) -> Result<Messages> {
let TransformAndMetrics {
transform,
transform_pushed_total,
transform_pushed_failures,
transform_pushed_latency,
..
} = match self.transforms.next() {
Some(transform) => transform,
None => return Ok(self.requests),
};

let transform_name = transform.get_name();

let start = Instant::now();
let result = transform
.transform_pushed(self)
.await
.map_err(|e| e.context(anyhow!("{transform_name} transform failed")));
transform_pushed_total.increment(1);
if result.is_err() {
transform_pushed_failures.increment(1);
}
transform_pushed_latency.record(start.elapsed());
result
}

pub fn clone_requests_into_hashmap(&self, destination: &mut MessageIdMap<Message>) {
for request in &self.requests {
destination.insert(request.id(), request.clone());
Expand Down Expand Up @@ -356,27 +329,6 @@ pub trait Transform: Send {
/// You can have have a transform that is both non-terminating and a sink.
async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages>;

/// TODO: This method should be removed and integrated with `Transform::transform` once we properly support out of order protocols.
///
/// This method should be should be implemented by your transform if it is required to process pushed messages (typically events
/// or messages that your source is subscribed to. The wrapper object contains the queries/frames
/// in a [`Vec<Message`](crate::message::Message).
///
/// This transform method is not the same request/response model as the other transform method.
/// This method processes one pushed message before sending it in reverse on the chain back to the source.
///
/// You can modify the messages in the wrapper struct to achieve your own designs. Your transform can
/// also modify the response from `requests_wrapper.call_next_transform_pushed` if it needs to. As long as the message
/// carries on through the chain, it will function correctly. You are able to add or remove messages as this method is not expecting
/// request/response pairs.
///
/// # Invariants
/// * _Non-terminating_ - Your `transform_pushed` method should not be terminating as the messages should get passed back to the source, where they will terminate.
async fn transform_pushed<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages> {
let response = requests_wrapper.call_next_transform_pushed().await?;
Ok(response)
}

fn get_name(&self) -> &'static str;

fn set_pushed_messages_tx(&mut self, _pushed_messages_tx: mpsc::UnboundedSender<Messages>) {}
Expand Down

0 comments on commit 3348474

Please sign in to comment.