Skip to content

Commit

Permalink
Fix transform lifetimes (#1724)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Aug 6, 2024
1 parent ff09b14 commit c909d86
Show file tree
Hide file tree
Showing 28 changed files with 96 additions and 88 deletions.
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@
Any breaking changes to the `topology.yaml` or `shotover` rust API should be documented here.
This assists us in knowing when to make the next release a breaking release and assists users with making upgrades to new breaking releases.

## 0.5.0

### shotover rust API

`Transform::transform` now takes `&mut Wrapper` instead of `Wrapper`.

## 0.4.0

### shotover rust API
Expand Down
6 changes: 3 additions & 3 deletions custom-transforms-example/src/redis_get_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ impl Transform for RedisGetRewrite {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
for message in requests_wrapper.requests.iter_mut() {
if let Some(frame) = message.frame() {
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/cassandra/peers_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ impl Transform for CassandraPeersRewrite {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
// Find the indices of queries to system.peers & system.peers_v2
// we need to know which columns in which CQL queries in which messages have system peers
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -761,9 +761,9 @@ impl Transform for CassandraSinkCluster {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
self.send_message(std::mem::take(&mut requests_wrapper.requests))
.await
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/cassandra/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ impl Transform for CassandraSinkSingle {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
self.send_message(std::mem::take(&mut requests_wrapper.requests))
.await
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,9 @@ impl BufferedChain {
}

impl TransformChain {
pub async fn process_request<'a>(
&'a mut self,
wrapper: &'a mut Wrapper<'a>,
pub async fn process_request<'shorter, 'longer: 'shorter>(
&'longer mut self,
wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
let start = Instant::now();
wrapper.reset(&mut self.chain);
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ impl Transform for Coalesce {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
self.buffer.append(&mut requests_wrapper.requests);

Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/debug/force_parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ impl Transform for DebugForceParse {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
for message in &mut requests_wrapper.requests {
if self.parse_requests {
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/debug/log_to_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ impl Transform for DebugLogToFile {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Vec<Message>> {
for message in &requests_wrapper.requests {
self.request_counter += 1;
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/debug/printer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ impl Transform for DebugPrinter {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
for request in &mut requests_wrapper.requests {
info!("Request: {}", request.to_high_level_string());
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/debug/returner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ impl Transform for DebugReturner {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
requests_wrapper
.requests
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ impl Transform for QueryTypeFilter {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
for request in requests_wrapper.requests.iter_mut() {
let filter_out = match &self.filter {
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,9 +339,9 @@ impl Transform for KafkaSinkCluster {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
let mut responses = if requests_wrapper.requests.is_empty() {
// there are no requests, so no point sending any, but we should check for any responses without awaiting
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/kafka/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ impl Transform for KafkaSinkSingle {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
if self.connection.is_none() {
let codec = KafkaCodecBuilder::new(Direction::Sink, "KafkaSinkSingle".to_owned());
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/load_balance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ impl Transform for ConnectionBalanceAndPool {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
if self.active_connection.is_none() {
let mut all_connections = self.all_connections.lock().await;
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/loopback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ impl Transform for Loopback {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
// This transform ultimately doesnt make a lot of sense semantically
// but make a vague attempt to follow transform invariants anyway.
Expand Down
12 changes: 7 additions & 5 deletions shotover/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ impl<'a> Clone for Wrapper<'a> {
}
}

impl<'a> Wrapper<'a> {
impl<'shorter, 'longer: 'shorter> Wrapper<'longer> {
fn take(&mut self) -> Self {
Wrapper {
requests: std::mem::take(&mut self.requests),
Expand All @@ -192,7 +192,7 @@ impl<'a> Wrapper<'a> {
/// the execution time of the [Transform::transform] function as a metrics latency histogram.
///
/// The result of calling the next transform is then provided as a response.
pub async fn call_next_transform(&'a mut self) -> Result<Messages> {
pub async fn call_next_transform(&'shorter mut self) -> Result<Messages> {
let TransformAndMetrics {
transform,
transform_total,
Expand Down Expand Up @@ -266,7 +266,7 @@ impl<'a> Wrapper<'a> {
format!("{:?}", messages)
}

pub fn reset(&mut self, transforms: &'a mut [TransformAndMetrics]) {
pub fn reset(&mut self, transforms: &'longer mut [TransformAndMetrics]) {
self.transforms = transforms.iter_mut();
}
}
Expand Down Expand Up @@ -336,8 +336,10 @@ pub trait Transform: Send {
/// * Transform that do call subsquent chains via `requests_wrapper.call_next_transform()` are non-terminating transforms.
///
/// You can have have a transform that is both non-terminating and a sink.
async fn transform<'a>(&'a mut self, requests_wrapper: &'a mut Wrapper<'a>)
-> Result<Messages>;
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages>;

/// Name of the transform used in logs and displayed to the user
fn get_name(&self) -> &'static str;
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/null.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,9 @@ impl Transform for NullSink {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
for request in &mut requests_wrapper.requests {
// reuse the requests to hold the responses to avoid an allocation
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/opensearch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ impl Transform for OpenSearchSinkSingle {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
// Return immediately if we have no messages.
// If we tried to send no messages we would block forever waiting for a reply that will never come.
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/parallel_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,9 @@ impl Transform for ParallelMap {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
let mut results = Vec::with_capacity(requests_wrapper.requests.len());
let mut message_iter = requests_wrapper.requests.drain(..);
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/protect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,9 @@ impl Transform for Protect {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
// encrypt the values included in any INSERT or UPDATE queries
for message in requests_wrapper.requests.iter_mut() {
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/query_counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ impl Transform for QueryCounter {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
for m in &mut requests_wrapper.requests {
match m.frame() {
Expand Down
10 changes: 5 additions & 5 deletions shotover/src/transforms/redis/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,9 +374,9 @@ impl SimpleRedisCache {
}

/// calls the next transform and process the result for caching.
async fn execute_upstream_and_write_to_cache<'a>(
async fn execute_upstream_and_write_to_cache(
&mut self,
requests_wrapper: &'a mut Wrapper<'a>,
requests_wrapper: &mut Wrapper<'_>,
) -> Result<Messages> {
let local_addr = requests_wrapper.local_addr;
let mut request_messages: Vec<_> = requests_wrapper
Expand Down Expand Up @@ -618,9 +618,9 @@ impl Transform for SimpleRedisCache {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
self.read_from_cache(&mut requests_wrapper.requests, requests_wrapper.local_addr)
.await
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/redis/cluster_ports_rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ impl Transform for RedisClusterPortsRewrite {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
for message in requests_wrapper.requests.iter_mut() {
let message_id = message.id();
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/redis/sink_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,9 +1017,9 @@ impl Transform for RedisSinkCluster {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
if !self.has_run_init {
self.topology = (*self.shared_topology.read().await).clone();
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/redis/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ impl Transform for RedisSinkSingle {
NAME
}

async fn transform<'a>(
&'a mut self,
requests_wrapper: &'a mut Wrapper<'a>,
async fn transform<'shorter, 'longer: 'shorter>(
&mut self,
requests_wrapper: &'shorter mut Wrapper<'longer>,
) -> Result<Messages> {
if self.connection.is_none() {
let codec = RedisCodecBuilder::new(Direction::Sink, "RedisSinkSingle".to_owned());
Expand Down
Loading

0 comments on commit c909d86

Please sign in to comment.