Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Transform::transform_pushed #1524

Merged
merged 2 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/src/user-guide/observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ This interface will serve Prometheus metrics from `/metrics`. The following metr
| `shotover_transform_total_count` | `transform` | [counter](#counter) | Counts the amount of times the `transform` is used |
| `shotover_transform_failures_count` | `transform` | [counter](#counter) | Counts the amount of times the `transform` fails |
| `shotover_transform_latency_seconds` | `transform` | [histogram](#histogram) | The latency for a message batch to go through the `transform` |
| `shotover_transform_pushed_latency_seconds`| `transform` | [histogram](#histogram) | The latency for a pushed message from the DB to go through the `transform`|
| `shotover_chain_total_count` | `chain` | [counter](#counter) | Counts the amount of times `chain` is used |
| `shotover_chain_failures_count` | `chain` | [counter](#counter) | Counts the amount of times `chain` fails |
| `shotover_chain_latency_seconds` | `chain` | [histogram](#histogram) | The latency for running `chain` |
Expand Down
27 changes: 0 additions & 27 deletions shotover-proxy/tests/runner/observability_int_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ async fn test_metrics() {
# TYPE shotover_sink_to_source_latency_seconds summary
# TYPE shotover_transform_failures_count counter
# TYPE shotover_transform_latency_seconds summary
# TYPE shotover_transform_pushed_failures_count counter
# TYPE shotover_transform_pushed_latency_seconds summary
# TYPE shotover_transform_pushed_total_count counter
# TYPE shotover_transform_total_count counter
shotover_available_connections_count{source="redis"}
shotover_chain_failures_count{chain="redis"}
Expand Down Expand Up @@ -68,30 +65,6 @@ shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.95"}
shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.99"}
shotover_transform_latency_seconds{transform="QueryCounter",quantile="0.999"}
shotover_transform_latency_seconds{transform="QueryCounter",quantile="1"}
shotover_transform_pushed_failures_count{transform="NullSink"}
shotover_transform_pushed_failures_count{transform="QueryCounter"}
shotover_transform_pushed_latency_seconds_count{transform="NullSink"}
shotover_transform_pushed_latency_seconds_count{transform="QueryCounter"}
shotover_transform_pushed_latency_seconds_sum{transform="NullSink"}
shotover_transform_pushed_latency_seconds_sum{transform="QueryCounter"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.1"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.5"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.9"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.95"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.99"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="0.999"}
shotover_transform_pushed_latency_seconds{transform="NullSink",quantile="1"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.1"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.5"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.9"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.95"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.99"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="0.999"}
shotover_transform_pushed_latency_seconds{transform="QueryCounter",quantile="1"}
shotover_transform_pushed_total_count{transform="NullSink"}
shotover_transform_pushed_total_count{transform="QueryCounter"}
shotover_transform_total_count{transform="NullSink"}
shotover_transform_total_count{transform="QueryCounter"}
"#;
Expand Down
3 changes: 0 additions & 3 deletions shotover/src/codec/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ impl Decoder for RedisDecoder {
type Item = Messages;
type Error = CodecReadError;

// TODO: this duplicates a bunch of logic from sink_single.rs
// As soon as we remove `Transforms::transform_pushed` we can remove the duplication on the RedisSinkSingle side.
// Once thats done we will have pubsub support for both RedisSinkSingle AND RedisSinkCluster. Progress!
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
let received_at = Instant::now();
match decode_bytes_mut(src)
Expand Down
31 changes: 4 additions & 27 deletions shotover/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,6 @@ impl<C: CodecBuilder + 'static> TcpCodecListener<C> {
self.available_connections_gauge
.set(self.limit_connections.available_permits() as f64);

let (pushed_messages_tx, pushed_messages_rx) =
tokio::sync::mpsc::unbounded_channel::<Messages>();

let client_details = stream
.peer_addr()
.map(|p| p.ip().to_string())
Expand All @@ -203,14 +200,11 @@ impl<C: CodecBuilder + 'static> TcpCodecListener<C> {
};

let handler = Handler {
chain: self
.chain_builder
.build_with_pushed_messages(pushed_messages_tx, context),
chain: self.chain_builder.build(context),
codec: self.codec.clone(),
shutdown: Shutdown::new(self.trigger_shutdown_rx.clone()),
tls: self.tls.clone(),
timeout: self.timeout,
pushed_messages_rx,
_permit: permit,
};

Expand Down Expand Up @@ -300,7 +294,6 @@ pub struct Handler<C: CodecBuilder> {
shutdown: Shutdown,
/// Timeout in seconds after which to kill an idle connection. No timeout means connections will never be timed out.
timeout: Option<Duration>,
pushed_messages_rx: UnboundedReceiver<Messages>,
_permit: OwnedSemaphorePermit,
}

Expand Down Expand Up @@ -691,17 +684,13 @@ impl<C: CodecBuilder + 'static> Handler<C> {
// This will result in the task terminating.
return Ok(());
}
Some(responses) = self.pushed_messages_rx.recv() => {
debug!("Received unrequested responses from destination {:?}", responses);
self.process_backward(local_addr, responses).await?
}
() = force_run_chain.notified() => {
let mut requests = vec!();
while let Ok(x) = in_rx.try_recv() {
requests.extend(x);
}
debug!("A transform in the chain requested that a chain run occur, requests {:?}", requests);
self.process_forward(local_addr, &out_tx, requests).await?
self.process(local_addr, &out_tx, requests).await?
},
requests = Self::receive_with_timeout(self.timeout, &mut in_rx, client_details) => {
match requests {
Expand All @@ -710,7 +699,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
requests.extend(x);
}
debug!("Received requests from client {:?}", requests);
self.process_forward(local_addr, &out_tx, requests).await?
self.process(local_addr, &out_tx, requests).await?
}
None => {
// Either we timed out the connection or the client disconnected, so terminate this connection
Expand All @@ -733,7 +722,7 @@ impl<C: CodecBuilder + 'static> Handler<C> {
Ok(())
}

async fn process_forward(
async fn process(
&mut self,
local_addr: SocketAddr,
out_tx: &mpsc::UnboundedSender<Messages>,
Expand Down Expand Up @@ -773,18 +762,6 @@ impl<C: CodecBuilder + 'static> Handler<C> {
}
}
}

async fn process_backward(
&mut self,
local_addr: SocketAddr,
responses: Messages,
) -> Result<Messages> {
let wrapper = Wrapper::new_with_addr(responses, local_addr);

self.chain.process_request_rev(wrapper).await.context(
"Chain failed to receive pushed messages/events, the connection will now be closed.",
)
}
}

/// Listens for the server shutdown signal.
Expand Down
67 changes: 1 addition & 66 deletions shotover/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,31 +173,13 @@ impl TransformChain {
self.chain_latency_seconds.record(start.elapsed());
result
}

pub async fn process_request_rev(&mut self, mut wrapper: Wrapper<'_>) -> Result<Messages> {
let start = Instant::now();
wrapper.reset_rev(&mut self.chain);

self.chain_batch_size.record(wrapper.requests.len() as f64);
let result = wrapper.call_next_transform_pushed().await;
self.chain_total.increment(1);
if result.is_err() {
self.chain_failures.increment(1);
}

self.chain_latency_seconds.record(start.elapsed());
result
}
}

pub struct TransformAndMetrics {
pub transform: Box<dyn Transform>,
pub transform_total: Counter,
pub transform_failures: Counter,
pub transform_latency: Histogram,
pub transform_pushed_total: Counter,
pub transform_pushed_failures: Counter,
pub transform_pushed_latency: Histogram,
}

impl TransformAndMetrics {
Expand All @@ -208,9 +190,6 @@ impl TransformAndMetrics {
transform_total: Counter::noop(),
transform_failures: Counter::noop(),
transform_latency: Histogram::noop(),
transform_pushed_total: Counter::noop(),
transform_pushed_failures: Counter::noop(),
transform_pushed_latency: Histogram::noop(),
}
}
}
Expand All @@ -225,12 +204,6 @@ pub struct TransformBuilderAndMetrics {
transform_failures: Counter,
#[derivative(Debug = "ignore")]
transform_latency: Histogram,
#[derivative(Debug = "ignore")]
transform_pushed_total: Counter,
#[derivative(Debug = "ignore")]
transform_pushed_failures: Counter,
#[derivative(Debug = "ignore")]
transform_pushed_latency: Histogram,
}

impl TransformBuilderAndMetrics {
Expand All @@ -240,9 +213,6 @@ impl TransformBuilderAndMetrics {
transform_total: self.transform_total.clone(),
transform_failures: self.transform_failures.clone(),
transform_latency: self.transform_latency.clone(),
transform_pushed_total: self.transform_pushed_total.clone(),
transform_pushed_failures: self.transform_pushed_failures.clone(),
transform_pushed_latency: self.transform_pushed_latency.clone(),
}
}
}
Expand All @@ -268,9 +238,6 @@ impl TransformChainBuilder {
transform_total: counter!("shotover_transform_total_count", "transform" => builder.get_name()),
transform_failures: counter!("shotover_transform_failures_count", "transform" => builder.get_name()),
transform_latency: histogram!("shotover_transform_latency_seconds", "transform" => builder.get_name()),
transform_pushed_total: counter!("shotover_transform_pushed_total_count", "transform" => builder.get_name()),
transform_pushed_failures: counter!("shotover_transform_pushed_failures_count", "transform" => builder.get_name()),
transform_pushed_latency: histogram!("shotover_transform_pushed_latency_seconds", "transform" => builder.get_name()),
builder,
}
).collect();
Expand Down Expand Up @@ -403,7 +370,7 @@ impl TransformChainBuilder {
}
}

/// Clone the chain while adding a producer for the pushed messages channel
/// Build the chain
pub fn build(&self, context: TransformContextBuilder) -> TransformChain {
let chain = self
.chain
Expand All @@ -424,38 +391,6 @@ impl TransformChainBuilder {
),
}
}

/// Clone the chain while adding a producer for the pushed messages channel
pub fn build_with_pushed_messages(
&self,
pushed_messages_tx: mpsc::UnboundedSender<Messages>,
context: TransformContextBuilder,
) -> TransformChain {
let chain = self
.chain
.iter()
.map(|x| {
let mut transform = x.build(context.clone());
transform
.transform
.set_pushed_messages_tx(pushed_messages_tx.clone());
transform
})
.collect();

TransformChain {
name: self.name,
chain,
chain_total: self.chain_total.clone(),
chain_failures: self.chain_failures.clone(),
chain_batch_size: self.chain_batch_size.clone(),
chain_latency_seconds: histogram!(
"shotover_chain_latency_seconds",
"chain" => self.name,
"client_details" => context.client_details
),
}
}
}

#[cfg(test)]
Expand Down
48 changes: 0 additions & 48 deletions shotover/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,33 +191,6 @@ impl<'a> Wrapper<'a> {
result
}

pub async fn call_next_transform_pushed(mut self) -> Result<Messages> {
let TransformAndMetrics {
transform,
transform_pushed_total,
transform_pushed_failures,
transform_pushed_latency,
..
} = match self.transforms.next() {
Some(transform) => transform,
None => return Ok(self.requests),
};

let transform_name = transform.get_name();

let start = Instant::now();
let result = transform
.transform_pushed(self)
.await
.map_err(|e| e.context(anyhow!("{transform_name} transform failed")));
transform_pushed_total.increment(1);
if result.is_err() {
transform_pushed_failures.increment(1);
}
transform_pushed_latency.record(start.elapsed());
result
}

pub fn clone_requests_into_hashmap(&self, destination: &mut MessageIdMap<Message>) {
for request in &self.requests {
destination.insert(request.id(), request.clone());
Expand Down Expand Up @@ -337,27 +310,6 @@ pub trait Transform: Send {
/// You can have have a transform that is both non-terminating and a sink.
async fn transform<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages>;

/// TODO: This method should be removed and integrated with `Transform::transform` once we properly support out of order protocols.
///
/// This method should be should be implemented by your transform if it is required to process pushed messages (typically events
/// or messages that your source is subscribed to. The wrapper object contains the queries/frames
/// in a [`Vec<Message`](crate::message::Message).
///
/// This transform method is not the same request/response model as the other transform method.
/// This method processes one pushed message before sending it in reverse on the chain back to the source.
///
/// You can modify the messages in the wrapper struct to achieve your own designs. Your transform can
/// also modify the response from `requests_wrapper.call_next_transform_pushed` if it needs to. As long as the message
/// carries on through the chain, it will function correctly. You are able to add or remove messages as this method is not expecting
/// request/response pairs.
///
/// # Invariants
/// * _Non-terminating_ - Your `transform_pushed` method should not be terminating as the messages should get passed back to the source, where they will terminate.
async fn transform_pushed<'a>(&'a mut self, requests_wrapper: Wrapper<'a>) -> Result<Messages> {
let response = requests_wrapper.call_next_transform_pushed().await?;
Ok(response)
}

fn get_name(&self) -> &'static str;

fn set_pushed_messages_tx(&mut self, _pushed_messages_tx: mpsc::UnboundedSender<Messages>) {}
Expand Down