Skip to content

Commit

Permalink
CassandraPeersRewrite: port to MessageId invariants
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 6, 2024
1 parent bdb02d5 commit 6f800f3
Showing 1 changed file with 14 additions and 18 deletions.
32 changes: 14 additions & 18 deletions shotover/src/transforms/cassandra/peers_rewrite.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::message::{Message, Messages};
use crate::message::{Message, MessageIdMap, Messages};
use crate::transforms::cassandra::peers_rewrite::CassandraOperation::Event;
use crate::transforms::{Transform, TransformBuilder, TransformConfig, Wrapper};
use crate::{
Expand Down Expand Up @@ -38,13 +38,15 @@ impl TransformConfig for CassandraPeersRewriteConfig {
pub struct CassandraPeersRewrite {
port: u16,
peer_table: FQName,
column_names_to_rewrite: MessageIdMap<Vec<Identifier>>,
}

impl CassandraPeersRewrite {
pub fn new(port: u16) -> Self {
CassandraPeersRewrite {
port,
peer_table: FQName::new("system", "peers_v2"),
column_names_to_rewrite: Default::default(),
}
}
}
Expand All @@ -68,27 +70,21 @@ impl Transform for CassandraPeersRewrite {
async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result<Messages> {
// Find the indices of queries to system.peers & system.peers_v2
// we need to know which columns in which CQL queries in which messages have system peers
let column_names: Vec<(usize, Vec<Identifier>)> = requests_wrapper
.requests
.iter_mut()
.enumerate()
.filter_map(|(i, m)| {
let sys_peers = extract_native_port_column(&self.peer_table, m);
if sys_peers.is_empty() {
None
} else {
Some((i, sys_peers))
}
})
.collect();
for request in &mut requests_wrapper.requests {
let sys_peers = extract_native_port_column(&self.peer_table, request);
self.column_names_to_rewrite.insert(request.id(), sys_peers);
}

let mut response = requests_wrapper.call_next_transform().await?;
let mut responses = requests_wrapper.call_next_transform().await?;

for (i, name_list) in column_names {
rewrite_port(&mut response[i], &name_list, self.port);
for response in &mut responses {
if let Some(id) = response.request_id() {
let name_list = self.column_names_to_rewrite.remove(&id).unwrap();
rewrite_port(response, &name_list, self.port);
}
}

Ok(response)
Ok(responses)
}

async fn transform_pushed<'a>(
Expand Down

0 comments on commit 6f800f3

Please sign in to comment.