Skip to content

Commit

Permalink
Merge branch 'main' into allow_reusing_instances
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 7, 2024
2 parents e8e669a + 34208ba commit 05b6dbc
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 24 deletions.
5 changes: 2 additions & 3 deletions shotover/src/transforms/cassandra/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ impl CassandraSinkCluster {
}

if self.keyspaces_rx.has_changed()? {
self.pool.update_keyspaces(&mut self.keyspaces_rx).await;
self.pool.update_keyspaces(&mut self.keyspaces_rx);
}

// CAREFUL: indexes into messages are invalidated here
Expand Down Expand Up @@ -523,8 +523,7 @@ impl CassandraSinkCluster {
}

self.message_rewriter
.rewrite_responses(tables_to_rewrite, &mut responses)
.await?;
.rewrite_responses(tables_to_rewrite, &mut responses)?;

for response in responses.iter_mut() {
if let Some((id, metadata)) = get_prepared_result_message(response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ impl NodePool {
}
}

pub async fn update_keyspaces(&mut self, keyspaces_rx: &mut KeyspaceChanRx) {
pub fn update_keyspaces(&mut self, keyspaces_rx: &mut KeyspaceChanRx) {
let updated_keyspaces = keyspaces_rx.borrow_and_update().clone();
self.keyspace_metadata = updated_keyspaces;
}
Expand Down
25 changes: 12 additions & 13 deletions shotover/src/transforms/cassandra/sink_cluster/rewrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,18 @@ impl MessageRewriter {

/// Rewrite responses using the `Vec<TableToRewrite>` returned by rewrite_requests.
/// All extra responses are combined back into the amount of responses expected by the client.
pub async fn rewrite_responses(
pub fn rewrite_responses(
&self,
tables_to_rewrite: Vec<TableToRewrite>,
responses: &mut Vec<Message>,
) -> Result<()> {
for table_to_rewrite in tables_to_rewrite {
self.rewrite_response(table_to_rewrite, responses).await?;
self.rewrite_response(table_to_rewrite, responses)?;
}
Ok(())
}

async fn rewrite_response(
&self,
table: TableToRewrite,
responses: &mut Vec<Message>,
) -> Result<()> {
fn rewrite_response(&self, table: TableToRewrite, responses: &mut Vec<Message>) -> Result<()> {
fn get_warnings(message: &mut Message) -> Vec<String> {
if let Some(Frame::Cassandra(frame)) = message.frame() {
frame.warnings.clone()
Expand Down Expand Up @@ -237,8 +233,7 @@ impl MessageRewriter {
warnings.extend(get_warnings(&mut peers_response));
warnings.extend(get_warnings(local_response));

self.rewrite_table_local(table, local_response, peers_response, warnings)
.await?;
self.rewrite_table_local(table, local_response, peers_response, warnings)?;
local_response.invalidate_cache();
}
}
Expand Down Expand Up @@ -275,8 +270,12 @@ impl MessageRewriter {
Err(MessageParseError::ParseFailure(err)) => return Err(err),
};

self.rewrite_table_peers(table, client_peers_response, nodes, warnings)
.await?;
self.rewrite_table_peers(
table,
client_peers_response,
nodes,
warnings,
)?;
client_peers_response.invalidate_cache();
}
}
Expand Down Expand Up @@ -352,7 +351,7 @@ impl MessageRewriter {
Ok(())
}

async fn rewrite_table_peers(
fn rewrite_table_peers(
&self,
table: TableToRewrite,
peers_response: &mut Message,
Expand Down Expand Up @@ -503,7 +502,7 @@ impl MessageRewriter {
}
}

async fn rewrite_table_local(
fn rewrite_table_local(
&self,
table: TableToRewrite,
local_response: &mut Message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ mod test_token_aware_router {
)
.unwrap();

router.update_keyspaces(&mut keyspaces_rx).await;
router.update_keyspaces(&mut keyspaces_rx);

router
.add_prepared_result(id.clone(), prepared_metadata())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ mod scatter_transform_tests {
assert_eq!(message.frame().unwrap(), &expected);
}

async fn build_chains(route_map: HashMap<String, TransformChainBuilder>) -> Vec<BufferedChain> {
fn build_chains(route_map: HashMap<String, TransformChainBuilder>) -> Vec<BufferedChain> {
route_map
.into_values()
.map(|x| x.build_buffered(10))
Expand Down Expand Up @@ -343,7 +343,7 @@ mod scatter_transform_tests {
);

let mut tuneable_success_consistency = Box::new(TuneableConsistentencyScatter {
route_map: build_chains(two_of_three).await,
route_map: build_chains(two_of_three),
write_consistency: 2,
read_consistency: 2,
});
Expand All @@ -370,7 +370,7 @@ mod scatter_transform_tests {
);

let mut tuneable_fail_consistency = Box::new(TuneableConsistentencyScatter {
route_map: build_chains(one_of_three).await,
route_map: build_chains(one_of_three),
write_consistency: 2,
read_consistency: 2,
});
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/transforms/protect/key_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl KeyManager {
async fn get_key(&self, dek: Option<Vec<u8>>, kek_alt: Option<String>) -> Result<KeyMaterial> {
match &self {
KeyManager::AWSKms(aws) => aws.get_key(dek, kek_alt).await,
KeyManager::Local(local) => local.get_key(dek).await,
KeyManager::Local(local) => local.get_key(dek),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/transforms/protect/local_kek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub struct DEKStructure {
}

impl LocalKeyManagement {
pub async fn get_key(&self, dek: Option<Vec<u8>>) -> Result<KeyMaterial> {
pub fn get_key(&self, dek: Option<Vec<u8>>) -> Result<KeyMaterial> {
match dek {
None => {
let plaintext_dek = gen_key();
Expand Down
8 changes: 7 additions & 1 deletion windsock-cloud-docker/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,16 @@ unzip awscliv2.zip
let args = args.join(" ");
let access_key_id = std::env::var("AWS_ACCESS_KEY_ID").unwrap();
let secret_access_key = std::env::var("AWS_SECRET_ACCESS_KEY").unwrap();
let rust_log = std::env::var("RUST_LOG").unwrap_or_default();
container_bash(&format!(
r#"cd shotover-proxy;
source "$HOME/.cargo/env";
AWS_ACCESS_KEY_ID={access_key_id} AWS_SECRET_ACCESS_KEY={secret_access_key} CARGO_TERM_COLOR=always cargo test --target-dir /target --release --bench windsock --no-default-features --features alpha-transforms,rdkafka-driver-tests,{features} -- {args}"#
export RUST_LOG={rust_log}
export AWS_ACCESS_KEY_ID={access_key_id}
export AWS_SECRET_ACCESS_KEY={secret_access_key}
export CARGO_TERM_COLOR=always
cargo test --target-dir /target --release --bench windsock --no-default-features --features alpha-transforms,rdkafka-driver-tests,{features} -- {args}"#
)).await;

// extract windsock results
Expand Down

0 comments on commit 05b6dbc

Please sign in to comment.