From 540132ba0405b489c1f59cd4fc2f7d1ecc9afa49 Mon Sep 17 00:00:00 2001 From: Conor Date: Fri, 22 Sep 2023 19:26:25 +1000 Subject: [PATCH 1/4] add rstest_reuse (#1345) --- Cargo.lock | 13 ++++++++++++ shotover-proxy/Cargo.toml | 1 + .../tests/cassandra_int_tests/mod.rs | 20 ++++++++----------- shotover-proxy/tests/lib.rs | 3 +++ 4 files changed, 25 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a36f111a6..deb79459d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3873,6 +3873,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "rstest_reuse" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88530b681abe67924d42cca181d070e3ac20e0740569441a9e35a7cedd2b34a4" +dependencies = [ + "quote", + "rand 0.8.5", + "rustc_version", + "syn 2.0.36", +] + [[package]] name = "russh" version = "0.38.0" @@ -4460,6 +4472,7 @@ dependencies = [ "redis-protocol", "regex", "rstest", + "rstest_reuse", "rustls-pemfile", "scylla", "serde", diff --git a/shotover-proxy/Cargo.toml b/shotover-proxy/Cargo.toml index 9a4a3ef91..7870ab944 100644 --- a/shotover-proxy/Cargo.toml +++ b/shotover-proxy/Cargo.toml @@ -18,6 +18,7 @@ tokio.workspace = true tracing.workspace = true clap.workspace = true rstest = "0.18.0" +rstest_reuse = "0.6.0" cassandra-cpp = { version = "2.0.0", default-features = false } test-helpers = { path = "../test-helpers" } redis.workspace = true diff --git a/shotover-proxy/tests/cassandra_int_tests/mod.rs b/shotover-proxy/tests/cassandra_int_tests/mod.rs index 02b036ba5..7c25eefbf 100644 --- a/shotover-proxy/tests/cassandra_int_tests/mod.rs +++ b/shotover-proxy/tests/cassandra_int_tests/mod.rs @@ -7,6 +7,7 @@ use cdrs_tokio::frame::events::{ use futures::future::join_all; use futures::Future; use rstest::rstest; +use rstest_reuse::{self, *}; #[cfg(feature = "cassandra-cpp-driver-tests")] use test_helpers::connection::cassandra::CassandraDriver::Datastax; use test_helpers::connection::cassandra::Compression; @@ -58,10 +59,14 @@ where timestamp::test(&connection).await; } +#[template] #[rstest] #[case::cdrs(CdrsTokio)] #[cfg_attr(feature = "cassandra-cpp-driver-tests", case::datastax(Datastax))] #[case::scylla(Scylla)] +fn all_cassandra_drivers(#[case] driver: CassandraDriver) {} + +#[apply(all_cassandra_drivers)] #[tokio::test(flavor = "multi_thread")] async fn passthrough_standard(#[case] driver: CassandraDriver) { let _compose = docker_compose("tests/test-configs/cassandra/passthrough/docker-compose.yaml"); @@ -78,10 +83,7 @@ async fn passthrough_standard(#[case] driver: CassandraDriver) { } #[cfg(feature = "alpha-transforms")] -#[rstest] -#[case::cdrs(CdrsTokio)] -#[cfg_attr(feature = "cassandra-cpp-driver-tests", case::datastax(Datastax))] -#[case::scylla(Scylla)] +#[apply(all_cassandra_drivers)] #[tokio::test(flavor = "multi_thread")] async fn passthrough_encode(#[case] driver: CassandraDriver) { let _compose = docker_compose("tests/test-configs/cassandra/passthrough/docker-compose.yaml"); @@ -172,10 +174,7 @@ async fn cluster_single_rack_v3(#[case] driver: CassandraDriver) { cluster::single_rack_v3::test_topology_task(None).await; } -#[rstest] -#[case::cdrs(CdrsTokio)] -#[cfg_attr(feature = "cassandra-cpp-driver-tests", case::datastax(Datastax))] -#[case::scylla(Scylla)] +#[apply(all_cassandra_drivers)] #[tokio::test(flavor = "multi_thread")] async fn cluster_single_rack_v4(#[case] driver: CassandraDriver) { let mut compose = docker_compose("tests/test-configs/cassandra/cluster-v4/docker-compose.yaml"); @@ -329,10 +328,7 @@ async fn source_tls_and_cluster_tls(#[case] driver: CassandraDriver) { cluster::single_rack_v4::test_topology_task(Some(ca_cert), None).await; } -#[rstest] -#[case::cdrs(CdrsTokio)] -#[cfg_attr(feature = "cassandra-cpp-driver-tests", case::datastax(Datastax))] -#[case::scylla(Scylla)] +#[apply(all_cassandra_drivers)] #[tokio::test(flavor = "multi_thread")] async fn cassandra_redis_cache(#[case] driver: CassandraDriver) { let _compose = docker_compose("tests/test-configs/cassandra/redis-cache/docker-compose.yaml"); diff --git a/shotover-proxy/tests/lib.rs b/shotover-proxy/tests/lib.rs index be984d876..48568237b 100644 --- a/shotover-proxy/tests/lib.rs +++ b/shotover-proxy/tests/lib.rs @@ -1,3 +1,6 @@ +#[allow(clippy::single_component_path_imports)] +use rstest_reuse; + use test_helpers::shotover_process::ShotoverProcessBuilder; use tokio_bin_process::bin_path; From c3d0615650d9f7bca3a1e81eabe60001f515b5c1 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Fri, 22 Sep 2023 20:32:42 +1000 Subject: [PATCH 2/4] Windsock: redis benches - generate topology.yaml (#1332) --- shotover-proxy/benches/windsock/common.rs | 9 ++ shotover-proxy/benches/windsock/redis.rs | 149 ++++++++++++------ shotover-proxy/benches/windsock/shotover.rs | 15 ++ .../redis/bench/topology-cloud.yaml | 9 -- .../redis/bench/topology-cluster-cloud.yaml | 10 -- .../bench/topology-cluster-encode-cloud.yaml | 13 -- .../redis/bench/topology-encode-cloud.yaml | 12 -- 7 files changed, 124 insertions(+), 93 deletions(-) delete mode 100644 shotover-proxy/tests/test-configs/redis/bench/topology-cloud.yaml delete mode 100644 shotover-proxy/tests/test-configs/redis/bench/topology-cluster-cloud.yaml delete mode 100644 shotover-proxy/tests/test-configs/redis/bench/topology-cluster-encode-cloud.yaml delete mode 100644 shotover-proxy/tests/test-configs/redis/bench/topology-encode-cloud.yaml diff --git a/shotover-proxy/benches/windsock/common.rs b/shotover-proxy/benches/windsock/common.rs index cf505bfea..8547f2269 100644 --- a/shotover-proxy/benches/windsock/common.rs +++ b/shotover-proxy/benches/windsock/common.rs @@ -1,4 +1,5 @@ use anyhow::Context; +use shotover::{config::topology::Topology, sources::SourceConfig}; use std::path::Path; #[derive(Clone, Copy)] @@ -31,3 +32,11 @@ pub async fn rewritten_file(path: &Path, find_replace: &[(&str, &str)]) -> Strin } text } + +pub fn generate_topology(source: SourceConfig) -> String { + Topology { + sources: vec![source], + } + .serialize() + .unwrap() +} diff --git a/shotover-proxy/benches/windsock/redis.rs b/shotover-proxy/benches/windsock/redis.rs index 404e5af3c..147ba8a38 100644 --- a/shotover-proxy/benches/windsock/redis.rs +++ b/shotover-proxy/benches/windsock/redis.rs @@ -1,8 +1,8 @@ use crate::{ aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover, WindsockAws}, - common::{rewritten_file, Shotover}, + common::{self, Shotover}, profilers::{self, CloudProfilerRunner, ProfilerRunner}, - shotover::shotover_process, + shotover::shotover_process_custom_topology, }; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; @@ -13,12 +13,21 @@ use fred::{ }; use itertools::Itertools; use rustls_pemfile::{certs, Item}; +use shotover::{ + config::chain::TransformChainConfig, + sources::SourceConfig, + tls::{TlsAcceptorConfig, TlsConnectorConfig}, + transforms::{ + debug::force_parse::DebugForceEncodeConfig, + redis::{sink_cluster::RedisSinkClusterConfig, sink_single::RedisSinkSingleConfig}, + TransformConfig, + }, +}; use std::{ collections::HashMap, fs::File, io::BufReader, net::IpAddr, - path::Path, sync::Arc, time::{Duration, Instant}, }; @@ -68,6 +77,81 @@ impl RedisBench { encryption, } } + + fn generate_topology_yaml(&self, host_address: String, redis_address: String) -> String { + let certs = "tests/test-configs/redis/tls/certs"; + let tls_connector = match self.encryption { + Encryption::Tls => Some(TlsConnectorConfig { + certificate_authority_path: format!("{certs}/localhost_CA.crt"), + certificate_path: Some(format!("{certs}/localhost.crt")), + private_key_path: Some(format!("{certs}/localhost.key")), + verify_hostname: true, + }), + Encryption::None => None, + }; + let tls_acceptor = match self.encryption { + Encryption::Tls => Some(TlsAcceptorConfig { + certificate_path: format!("{certs}/localhost.crt"), + private_key_path: format!("{certs}/localhost.key"), + certificate_authority_path: None, + }), + Encryption::None => None, + }; + + let mut transforms = vec![]; + if let Shotover::ForcedMessageParsed = self.shotover { + transforms.push(Box::new(DebugForceEncodeConfig { + encode_requests: true, + encode_responses: true, + }) as Box); + } + + match self.topology { + RedisTopology::Cluster3 => { + transforms.push(Box::new(RedisSinkClusterConfig { + first_contact_points: vec![redis_address], + direct_destination: None, + tls: tls_connector, + connection_count: None, + connect_timeout_ms: 3000, + })); + } + RedisTopology::Single => { + transforms.push(Box::new(RedisSinkSingleConfig { + address: redis_address, + tls: tls_connector, + connect_timeout_ms: 3000, + })); + } + } + + common::generate_topology(SourceConfig::Redis(shotover::sources::redis::RedisConfig { + name: "redis".to_owned(), + listen_addr: host_address, + connection_limit: None, + hard_connection_limit: None, + tls: tls_acceptor, + timeout: None, + chain: TransformChainConfig(transforms), + })) + } + + async fn run_aws_shotover( + &self, + instance: Arc, + shotover: Shotover, + redis_ip: String, + ) -> Option { + let ip = instance.instance.private_ip().to_string(); + match shotover { + Shotover::Standard | Shotover::ForcedMessageParsed => { + let topology = + self.generate_topology_yaml(format!("{ip}:6379"), format!("{redis_ip}:6379")); + Some(instance.run_shotover(&topology).await) + } + Shotover::None => None, + } + } } #[async_trait] @@ -147,13 +231,9 @@ impl Bench for RedisBench { redis_instances.run(self.encryption).await; // unlike other sinks, redis cluster sink needs the redis instance to be already up - let running_shotover = run_aws_shotover( - shotover_instance.clone(), - self.shotover, - redis_ip.clone(), - self.topology, - ) - .await; + let running_shotover = self + .run_aws_shotover(shotover_instance.clone(), self.shotover, redis_ip.clone()) + .await; let destination_ip = if running_shotover.is_some() { format!("redis://{shotover_ip}") @@ -185,7 +265,7 @@ impl Bench for RedisBench { test_helpers::cert::generate_redis_test_certs(); // rediss:// url is not needed to enable TLS because we overwrite the TLS config later on - let address = match (self.topology, self.shotover) { + let client_url = match (self.topology, self.shotover) { (RedisTopology::Single, Shotover::None) => "redis://127.0.0.1:1111", (RedisTopology::Cluster3, Shotover::None) => "redis-cluster://172.16.1.2:6379", ( @@ -193,6 +273,10 @@ impl Bench for RedisBench { Shotover::Standard | Shotover::ForcedMessageParsed, ) => "redis://127.0.0.1:6379", }; + let redis_address = match self.topology { + RedisTopology::Single => "127.0.0.1:1111", + RedisTopology::Cluster3 => "172.16.1.2:6379", + }; let config_dir = match (self.topology, self.encryption) { (RedisTopology::Single, Encryption::None) => "tests/test-configs/redis/passthrough", (RedisTopology::Cluster3, Encryption::None) => { @@ -204,17 +288,16 @@ impl Bench for RedisBench { let _compose = docker_compose(&format!("{config_dir}/docker-compose.yaml")); let mut profiler = ProfilerRunner::new(self.name(), profiling); let shotover = match self.shotover { - Shotover::Standard => { - Some(shotover_process(&format!("{config_dir}/topology.yaml"), &profiler).await) + Shotover::Standard | Shotover::ForcedMessageParsed => { + let topology_yaml = self + .generate_topology_yaml("127.0.0.1:6379".to_owned(), redis_address.to_owned()); + Some(shotover_process_custom_topology(&topology_yaml, &profiler).await) } - Shotover::ForcedMessageParsed => Some( - shotover_process(&format!("{config_dir}/topology-encode.yaml"), &profiler).await, - ), Shotover::None => None, }; profiler.run(&shotover).await; - self.execute_run(address, ¶meters).await; + self.execute_run(client_url, ¶meters).await; if let Some(shotover) = shotover { shotover @@ -370,38 +453,6 @@ impl BenchTask for BenchTaskRedis { } } -async fn run_aws_shotover( - instance: Arc, - shotover: Shotover, - redis_ip: String, - topology: RedisTopology, -) -> Option { - let config_dir = "tests/test-configs/redis/bench"; - let ip = instance.instance.private_ip().to_string(); - match shotover { - Shotover::Standard | Shotover::ForcedMessageParsed => { - let clustered = match topology { - RedisTopology::Single => "", - RedisTopology::Cluster3 => "-cluster", - }; - let encoded = match shotover { - Shotover::Standard => "", - Shotover::ForcedMessageParsed => "-encode", - Shotover::None => unreachable!(), - }; - let topology = rewritten_file( - Path::new(&format!( - "{config_dir}/topology{clustered}{encoded}-cloud.yaml" - )), - &[("HOST_ADDRESS", &ip), ("REDIS_ADDRESS", &redis_ip)], - ) - .await; - Some(instance.run_shotover(&topology).await) - } - Shotover::None => None, - } -} - enum RedisCluster { Single(Arc), Cluster3 { diff --git a/shotover-proxy/benches/windsock/shotover.rs b/shotover-proxy/benches/windsock/shotover.rs index ea53392f6..9ee7e2805 100644 --- a/shotover-proxy/benches/windsock/shotover.rs +++ b/shotover-proxy/benches/windsock/shotover.rs @@ -1,7 +1,9 @@ use crate::profilers::ProfilerRunner; use test_helpers::shotover_process::ShotoverProcessBuilder; use tokio_bin_process::{bin_path, BinProcess}; +use uuid::Uuid; +#[cfg(feature = "rdkafka-driver-tests")] pub async fn shotover_process(topology_path: &str, profiler: &ProfilerRunner) -> BinProcess { ShotoverProcessBuilder::new_with_topology(topology_path) .with_bin(bin_path!("shotover-proxy")) @@ -9,3 +11,16 @@ pub async fn shotover_process(topology_path: &str, profiler: &ProfilerRunner) -> .start() .await } + +pub async fn shotover_process_custom_topology( + topology_contents: &str, + profiler: &ProfilerRunner, +) -> BinProcess { + let topology_path = std::env::temp_dir().join(Uuid::new_v4().to_string()); + std::fs::write(&topology_path, topology_contents).unwrap(); + ShotoverProcessBuilder::new_with_topology(topology_path.to_str().unwrap()) + .with_bin(bin_path!("shotover-proxy")) + .with_profile(profiler.shotover_profile()) + .start() + .await +} diff --git a/shotover-proxy/tests/test-configs/redis/bench/topology-cloud.yaml b/shotover-proxy/tests/test-configs/redis/bench/topology-cloud.yaml deleted file mode 100644 index 1b5503d04..000000000 --- a/shotover-proxy/tests/test-configs/redis/bench/topology-cloud.yaml +++ /dev/null @@ -1,9 +0,0 @@ ---- -sources: - - Redis: - name: "redis" - listen_addr: "HOST_ADDRESS:6379" - chain: - - RedisSinkSingle: - remote_address: "REDIS_ADDRESS:6379" - connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/redis/bench/topology-cluster-cloud.yaml b/shotover-proxy/tests/test-configs/redis/bench/topology-cluster-cloud.yaml deleted file mode 100644 index 5e7dfb78a..000000000 --- a/shotover-proxy/tests/test-configs/redis/bench/topology-cluster-cloud.yaml +++ /dev/null @@ -1,10 +0,0 @@ ---- -sources: - - Redis: - name: "redis" - listen_addr: "HOST_ADDRESS:6379" - chain: - - RedisSinkCluster: - first_contact_points: - - "REDIS_ADDRESS:6379" - connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/redis/bench/topology-cluster-encode-cloud.yaml b/shotover-proxy/tests/test-configs/redis/bench/topology-cluster-encode-cloud.yaml deleted file mode 100644 index 60f301802..000000000 --- a/shotover-proxy/tests/test-configs/redis/bench/topology-cluster-encode-cloud.yaml +++ /dev/null @@ -1,13 +0,0 @@ ---- -sources: - - Redis: - name: "redis" - listen_addr: "HOST_ADDRESS:6379" - chain: - - DebugForceEncode: - encode_requests: true - encode_responses: true - - RedisSinkCluster: - first_contact_points: - - "REDIS_ADDRESS:6379" - connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/redis/bench/topology-encode-cloud.yaml b/shotover-proxy/tests/test-configs/redis/bench/topology-encode-cloud.yaml deleted file mode 100644 index 8928f7b5b..000000000 --- a/shotover-proxy/tests/test-configs/redis/bench/topology-encode-cloud.yaml +++ /dev/null @@ -1,12 +0,0 @@ ---- -sources: - - Redis: - name: "redis" - listen_addr: "HOST_ADDRESS:6379" - chain: - - DebugForceEncode: - encode_requests: true - encode_responses: true - - RedisSinkSingle: - remote_address: "REDIS_ADDRESS:6379" - connect_timeout_ms: 3000 From b82a450d2c2c4af9d962093133c54720c002cca7 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 25 Sep 2023 21:14:39 +1000 Subject: [PATCH 3/4] windsock: redis cloud benches startup redis + shotover concurrently (#1348) --- shotover-proxy/benches/windsock/redis.rs | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/shotover-proxy/benches/windsock/redis.rs b/shotover-proxy/benches/windsock/redis.rs index 147ba8a38..d024dda17 100644 --- a/shotover-proxy/benches/windsock/redis.rs +++ b/shotover-proxy/benches/windsock/redis.rs @@ -139,11 +139,10 @@ impl RedisBench { async fn run_aws_shotover( &self, instance: Arc, - shotover: Shotover, redis_ip: String, ) -> Option { let ip = instance.instance.private_ip().to_string(); - match shotover { + match self.shotover { Shotover::Standard | Shotover::ForcedMessageParsed => { let topology = self.generate_topology_yaml(format!("{ip}:6379"), format!("{redis_ip}:6379")); @@ -229,11 +228,10 @@ impl Bench for RedisBench { let redis_ip = redis_instances.private_ips()[0].to_string(); let shotover_ip = shotover_instance.instance.private_ip().to_string(); - redis_instances.run(self.encryption).await; - // unlike other sinks, redis cluster sink needs the redis instance to be already up - let running_shotover = self - .run_aws_shotover(shotover_instance.clone(), self.shotover, redis_ip.clone()) - .await; + let (_, running_shotover) = futures::join!( + redis_instances.run(self.encryption), + self.run_aws_shotover(shotover_instance.clone(), redis_ip.clone()) + ); let destination_ip = if running_shotover.is_some() { format!("redis://{shotover_ip}") From a84aced26ed818fce142b25166b7eb56f8c6956e Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 27 Sep 2023 05:56:02 +1000 Subject: [PATCH 4/4] windsock: kafka and cassandra benches generate their topology.yaml (#1349) --- shotover-proxy/benches/windsock/cassandra.rs | 151 ++++++++++++------ shotover-proxy/benches/windsock/common.rs | 13 -- shotover-proxy/benches/windsock/kafka.rs | 90 +++++++---- shotover-proxy/benches/windsock/shotover.rs | 9 -- .../cassandra/bench/topology-cloud.yaml | 9 -- .../bench/topology-cluster-cloud.yaml | 15 -- .../bench/topology-cluster-encode-cloud.yaml | 18 --- .../bench/topology-encode-cloud.yaml | 12 -- .../kafka/bench/topology-cloud.yaml | 9 -- .../kafka/bench/topology-encode-cloud.yaml | 12 -- .../kafka/bench/topology-encode.yaml | 12 -- .../test-configs/kafka/bench/topology.yaml | 9 -- windsock-cloud-docker/src/main.rs | 2 +- 13 files changed, 156 insertions(+), 205 deletions(-) delete mode 100644 shotover-proxy/tests/test-configs/cassandra/bench/topology-cloud.yaml delete mode 100644 shotover-proxy/tests/test-configs/cassandra/bench/topology-cluster-cloud.yaml delete mode 100644 shotover-proxy/tests/test-configs/cassandra/bench/topology-cluster-encode-cloud.yaml delete mode 100644 shotover-proxy/tests/test-configs/cassandra/bench/topology-encode-cloud.yaml delete mode 100644 shotover-proxy/tests/test-configs/kafka/bench/topology-cloud.yaml delete mode 100644 shotover-proxy/tests/test-configs/kafka/bench/topology-encode-cloud.yaml delete mode 100644 shotover-proxy/tests/test-configs/kafka/bench/topology-encode.yaml delete mode 100644 shotover-proxy/tests/test-configs/kafka/bench/topology.yaml diff --git a/shotover-proxy/benches/windsock/cassandra.rs b/shotover-proxy/benches/windsock/cassandra.rs index 3da61a33f..eceb452ac 100644 --- a/shotover-proxy/benches/windsock/cassandra.rs +++ b/shotover-proxy/benches/windsock/cassandra.rs @@ -1,6 +1,6 @@ use crate::{ aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover}, - common::{rewritten_file, Shotover}, + common::{self, Shotover}, profilers::{self, CloudProfilerRunner, ProfilerRunner}, }; use anyhow::Result; @@ -28,9 +28,20 @@ use scylla::{ transport::Compression as ScyllaCompression, Session as ScyllaSession, SessionBuilder as ScyllaSessionBuilder, }; +use shotover::{ + config::chain::TransformChainConfig, + sources::SourceConfig, + transforms::{ + cassandra::{ + sink_cluster::{CassandraSinkClusterConfig, ShotoverNode}, + sink_single::CassandraSinkSingleConfig, + }, + debug::force_parse::DebugForceEncodeConfig, + TransformConfig, + }, +}; use std::{ collections::HashMap, - path::Path, sync::Arc, time::{Duration, Instant}, }; @@ -41,6 +52,7 @@ use test_helpers::{ }; use tokio::sync::mpsc::UnboundedSender; use tokio_bin_process::bin_path; +use uuid::Uuid; use windsock::{Bench, BenchParameters, BenchTask, Profiling, Report}; const ROW_COUNT: usize = 1000; @@ -365,6 +377,71 @@ impl CassandraBench { cassandra: 1, } } + + fn generate_topology_yaml(&self, host_address: String, cassandra_address: String) -> String { + let mut transforms = vec![]; + if let Shotover::ForcedMessageParsed = self.shotover { + transforms.push(Box::new(DebugForceEncodeConfig { + encode_requests: true, + encode_responses: true, + }) as Box); + } + + match self.topology { + Topology::Cluster3 => { + transforms.push(Box::new(CassandraSinkClusterConfig { + first_contact_points: vec![cassandra_address], + tls: None, + connect_timeout_ms: 3000, + local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a".parse().unwrap(), + read_timeout: None, + shotover_nodes: vec![ShotoverNode { + address: host_address.parse().unwrap(), + data_center: "dc1".to_owned(), + rack: "rack1".to_owned(), + host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a".parse().unwrap(), + }], + })); + } + Topology::Single => { + transforms.push(Box::new(CassandraSinkSingleConfig { + address: cassandra_address, + tls: None, + connect_timeout_ms: 3000, + read_timeout: None, + })); + } + } + + common::generate_topology(SourceConfig::Cassandra( + shotover::sources::cassandra::CassandraConfig { + name: "cassandra".to_owned(), + listen_addr: host_address, + connection_limit: None, + hard_connection_limit: None, + tls: None, + timeout: None, + chain: TransformChainConfig(transforms), + transport: None, + }, + )) + } + + async fn run_aws_shotover( + &self, + instance: Arc, + cassandra_ip: String, + ) -> Option { + let ip = instance.instance.private_ip().to_string(); + match self.shotover { + Shotover::Standard | Shotover::ForcedMessageParsed => { + let topology = self + .generate_topology_yaml(format!("{ip}:9042"), format!("{cassandra_ip}:9042")); + Some(instance.run_shotover(&topology).await) + } + Shotover::None => None, + } + } } #[async_trait] @@ -488,12 +565,7 @@ impl Bench for CassandraBench { let (_, running_shotover) = futures::join!( run_aws_cassandra(cassandra_nodes, self.topology.clone()), - run_aws_shotover( - shotover_instance.clone(), - self.shotover, - cassandra_ip.clone(), - self.topology.clone(), - ) + self.run_aws_shotover(shotover_instance.clone(), cassandra_ip.clone(),) ); let destination_ip = if running_shotover.is_some() { @@ -525,10 +597,8 @@ impl Bench for CassandraBench { let address = match (&self.topology, &self.shotover) { (Topology::Single, Shotover::None) => "127.0.0.1:9043", - (Topology::Single, Shotover::Standard) => "127.0.0.1:9042", (Topology::Cluster3, Shotover::None) => "172.16.1.2:9044", - (Topology::Cluster3, Shotover::Standard) => "127.0.0.1:9042", - (_, Shotover::ForcedMessageParsed) => todo!(), + (_, Shotover::Standard | Shotover::ForcedMessageParsed) => "127.0.0.1:9042", }; let config_dir = match &self.topology { Topology::Single => "tests/test-configs/cassandra/passthrough", @@ -546,16 +616,26 @@ impl Bench for CassandraBench { panic!("Mocked cassandra database does not provide a clustered mode") } }; + let cassandra_address = match &self.topology { + Topology::Single => "127.0.0.1:9043".to_owned(), + Topology::Cluster3 => "172.16.1.2:9044".to_owned(), + }; let mut profiler = ProfilerRunner::new(self.name(), profiling); let shotover = match self.shotover { - Shotover::Standard => Some( - ShotoverProcessBuilder::new_with_topology(&format!("{config_dir}/topology.yaml")) - .with_bin(bin_path!("shotover-proxy")) - .with_profile(profiler.shotover_profile()) - .with_cores(core_count.shotover as u32) - .start() - .await, - ), + Shotover::Standard => { + let topology_contents = + self.generate_topology_yaml(address.to_owned(), cassandra_address); + let topology_path = std::env::temp_dir().join(Uuid::new_v4().to_string()); + std::fs::write(&topology_path, topology_contents).unwrap(); + Some( + ShotoverProcessBuilder::new_with_topology(topology_path.to_str().unwrap()) + .with_bin(bin_path!("shotover-proxy")) + .with_profile(profiler.shotover_profile()) + .with_cores(core_count.shotover as u32) + .start() + .await, + ) + } Shotover::None => None, Shotover::ForcedMessageParsed => todo!(), }; @@ -627,39 +707,6 @@ impl Bench for CassandraBench { self.operation.run(&session, reporter, parameters).await; } } - -async fn run_aws_shotover( - instance: Arc, - shotover: Shotover, - cassandra_ip: String, - topology: Topology, -) -> Option { - let config_dir = "tests/test-configs/cassandra/bench"; - let ip = instance.instance.private_ip().to_string(); - match shotover { - Shotover::Standard | Shotover::ForcedMessageParsed => { - let encoded = match shotover { - Shotover::Standard => "", - Shotover::ForcedMessageParsed => "-encode", - Shotover::None => unreachable!(), - }; - let clustered = match topology { - Topology::Single => "", - Topology::Cluster3 => "-cluster", - }; - let topology = rewritten_file( - Path::new(&format!( - "{config_dir}/topology{clustered}{encoded}-cloud.yaml" - )), - &[("HOST_ADDRESS", &ip), ("CASSANDRA_ADDRESS", &cassandra_ip)], - ) - .await; - Some(instance.run_shotover(&topology).await) - } - Shotover::None => None, - } -} - async fn run_aws_cassandra(nodes: Vec, topology: Topology) { match topology { Topology::Cluster3 => run_aws_cassandra_cluster(nodes).await, diff --git a/shotover-proxy/benches/windsock/common.rs b/shotover-proxy/benches/windsock/common.rs index 8547f2269..1fd01728b 100644 --- a/shotover-proxy/benches/windsock/common.rs +++ b/shotover-proxy/benches/windsock/common.rs @@ -1,6 +1,4 @@ -use anyhow::Context; use shotover::{config::topology::Topology, sources::SourceConfig}; -use std::path::Path; #[derive(Clone, Copy)] pub enum Shotover { @@ -22,17 +20,6 @@ impl Shotover { } } -pub async fn rewritten_file(path: &Path, find_replace: &[(&str, &str)]) -> String { - let mut text = tokio::fs::read_to_string(path) - .await - .with_context(|| format!("Failed to read from {path:?}")) - .unwrap(); - for (find, replace) in find_replace { - text = text.replace(find, replace); - } - text -} - pub fn generate_topology(source: SourceConfig) -> String { Topology { sources: vec![source], diff --git a/shotover-proxy/benches/windsock/kafka.rs b/shotover-proxy/benches/windsock/kafka.rs index 2a40b1281..50421a38b 100644 --- a/shotover-proxy/benches/windsock/kafka.rs +++ b/shotover-proxy/benches/windsock/kafka.rs @@ -1,12 +1,16 @@ use crate::aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover}; -use crate::common::{rewritten_file, Shotover}; +use crate::common::{self, Shotover}; use crate::profilers::{self, CloudProfilerRunner, ProfilerRunner}; -use crate::shotover::shotover_process; +use crate::shotover::shotover_process_custom_topology; use anyhow::Result; use async_trait::async_trait; use aws_throwaway::Ec2Instance; use futures::StreamExt; -use std::path::Path; +use shotover::config::chain::TransformChainConfig; +use shotover::sources::SourceConfig; +use shotover::transforms::debug::force_parse::DebugForceEncodeConfig; +use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig; +use shotover::transforms::TransformConfig; use std::sync::Arc; use std::{collections::HashMap, time::Duration}; use test_helpers::docker_compose::docker_compose; @@ -36,6 +40,48 @@ impl KafkaBench { message_size, } } + + fn generate_topology_yaml(&self, host_address: String, kafka_address: String) -> String { + let mut transforms = vec![]; + if let Shotover::ForcedMessageParsed = self.shotover { + transforms.push(Box::new(DebugForceEncodeConfig { + encode_requests: true, + encode_responses: true, + }) as Box); + } + + transforms.push(Box::new(KafkaSinkSingleConfig { + address: kafka_address, + connect_timeout_ms: 3000, + read_timeout: None, + })); + + common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig { + name: "kafka".to_owned(), + listen_addr: host_address, + connection_limit: None, + hard_connection_limit: None, + tls: None, + timeout: None, + chain: TransformChainConfig(transforms), + })) + } + + async fn run_aws_shotover( + &self, + instance: Arc, + kafka_ip: String, + ) -> Option { + let ip = instance.instance.private_ip().to_string(); + match self.shotover { + Shotover::Standard | Shotover::ForcedMessageParsed => { + let topology = + self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9092")); + Some(instance.run_shotover(&topology).await) + } + Shotover::None => None, + } + } } #[async_trait] @@ -93,7 +139,7 @@ impl Bench for KafkaBench { let (_, running_shotover) = futures::join!( run_aws_kafka(kafka_instance), - run_aws_shotover(shotover_instance, self.shotover, kafka_ip.clone()) + self.run_aws_shotover(shotover_instance, kafka_ip.clone()) ); let destination_ip = if running_shotover.is_some() { @@ -125,13 +171,14 @@ impl Bench for KafkaBench { let mut profiler = ProfilerRunner::new(self.name(), profiling); let shotover = match self.shotover { - Shotover::Standard => { - Some(shotover_process(&format!("{config_dir}/topology.yaml"), &profiler).await) + Shotover::Standard | Shotover::ForcedMessageParsed => { + let topology_yaml = self.generate_topology_yaml( + "127.0.0.1:9192".to_owned(), + "127.0.0.1:9092".to_owned(), + ); + Some(shotover_process_custom_topology(&topology_yaml, &profiler).await) } Shotover::None => None, - Shotover::ForcedMessageParsed => Some( - shotover_process(&format!("{config_dir}/topology-encode.yaml"), &profiler).await, - ), }; let broker_address = match self.shotover { @@ -216,31 +263,6 @@ impl Bench for KafkaBench { } } -async fn run_aws_shotover( - instance: Arc, - shotover: Shotover, - kafka_ip: String, -) -> Option { - let config_dir = "tests/test-configs/kafka/bench"; - let ip = instance.instance.private_ip().to_string(); - match shotover { - Shotover::Standard | Shotover::ForcedMessageParsed => { - let encoded = match shotover { - Shotover::Standard => "", - Shotover::ForcedMessageParsed => "-encode", - Shotover::None => unreachable!(), - }; - let topology = rewritten_file( - Path::new(&format!("{config_dir}/topology{encoded}-cloud.yaml")), - &[("HOST_ADDRESS", &ip), ("KAFKA_ADDRESS", &kafka_ip)], - ) - .await; - Some(instance.run_shotover(&topology).await) - } - Shotover::None => None, - } -} - async fn run_aws_kafka(instance: Arc) { let ip = instance.instance.private_ip().to_string(); instance diff --git a/shotover-proxy/benches/windsock/shotover.rs b/shotover-proxy/benches/windsock/shotover.rs index 9ee7e2805..0f6c5adc2 100644 --- a/shotover-proxy/benches/windsock/shotover.rs +++ b/shotover-proxy/benches/windsock/shotover.rs @@ -3,15 +3,6 @@ use test_helpers::shotover_process::ShotoverProcessBuilder; use tokio_bin_process::{bin_path, BinProcess}; use uuid::Uuid; -#[cfg(feature = "rdkafka-driver-tests")] -pub async fn shotover_process(topology_path: &str, profiler: &ProfilerRunner) -> BinProcess { - ShotoverProcessBuilder::new_with_topology(topology_path) - .with_bin(bin_path!("shotover-proxy")) - .with_profile(profiler.shotover_profile()) - .start() - .await -} - pub async fn shotover_process_custom_topology( topology_contents: &str, profiler: &ProfilerRunner, diff --git a/shotover-proxy/tests/test-configs/cassandra/bench/topology-cloud.yaml b/shotover-proxy/tests/test-configs/cassandra/bench/topology-cloud.yaml deleted file mode 100644 index 0b2bcb060..000000000 --- a/shotover-proxy/tests/test-configs/cassandra/bench/topology-cloud.yaml +++ /dev/null @@ -1,9 +0,0 @@ ---- -sources: - - Cassandra: - name: "cassandra" - listen_addr: "HOST_ADDRESS:9042" - chain: - - CassandraSinkSingle: - remote_address: "CASSANDRA_ADDRESS:9042" - connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/cassandra/bench/topology-cluster-cloud.yaml b/shotover-proxy/tests/test-configs/cassandra/bench/topology-cluster-cloud.yaml deleted file mode 100644 index 10c563872..000000000 --- a/shotover-proxy/tests/test-configs/cassandra/bench/topology-cluster-cloud.yaml +++ /dev/null @@ -1,15 +0,0 @@ ---- -sources: - - Cassandra: - name: "cassandra" - listen_addr: "HOST_ADDRESS:9042" - chain: - - CassandraSinkCluster: - first_contact_points: ["CASSANDRA_ADDRESS:9042"] - local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" - shotover_nodes: - - address: "HOST_ADDRESS:9042" - data_center: "dc1" - rack: "rack1" - host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" - connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/cassandra/bench/topology-cluster-encode-cloud.yaml b/shotover-proxy/tests/test-configs/cassandra/bench/topology-cluster-encode-cloud.yaml deleted file mode 100644 index 1d8077d55..000000000 --- a/shotover-proxy/tests/test-configs/cassandra/bench/topology-cluster-encode-cloud.yaml +++ /dev/null @@ -1,18 +0,0 @@ ---- -sources: - - Cassandra: - name: "cassandra" - listen_addr: "HOST_ADDRESS:9042" - chain: - - DebugForceEncode: - encode_requests: true - encode_responses: true - - CassandraSinkCluster: - first_contact_points: ["CASSANDRA_ADDRESS:9042"] - local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" - shotover_nodes: - - address: "HOST_ADDRESS:9042" - data_center: "dc1" - rack: "rack1" - host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a" - connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/cassandra/bench/topology-encode-cloud.yaml b/shotover-proxy/tests/test-configs/cassandra/bench/topology-encode-cloud.yaml deleted file mode 100644 index e9fa8ec63..000000000 --- a/shotover-proxy/tests/test-configs/cassandra/bench/topology-encode-cloud.yaml +++ /dev/null @@ -1,12 +0,0 @@ ---- -sources: - - Cassandra: - name: "cassandra" - listen_addr: "HOST_ADDRESS:9042" - chain: - - DebugForceEncode: - encode_requests: true - encode_responses: true - - CassandraSinkSingle: - remote_address: "CASSANDRA_ADDRESS:9042" - connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/bench/topology-cloud.yaml b/shotover-proxy/tests/test-configs/kafka/bench/topology-cloud.yaml deleted file mode 100644 index 8fdcf443b..000000000 --- a/shotover-proxy/tests/test-configs/kafka/bench/topology-cloud.yaml +++ /dev/null @@ -1,9 +0,0 @@ ---- -sources: - - Kafka: - name: "kafka" - listen_addr: "HOST_ADDRESS:9092" - chain: - - KafkaSinkSingle: - remote_address: "KAFKA_ADDRESS:9092" - connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/bench/topology-encode-cloud.yaml b/shotover-proxy/tests/test-configs/kafka/bench/topology-encode-cloud.yaml deleted file mode 100644 index 2edd8dc9f..000000000 --- a/shotover-proxy/tests/test-configs/kafka/bench/topology-encode-cloud.yaml +++ /dev/null @@ -1,12 +0,0 @@ ---- -sources: - - Kafka: - name: "kafka" - listen_addr: "HOST_ADDRESS:9092" - chain: - - DebugForceEncode: - encode_requests: true - encode_responses: true - - KafkaSinkSingle: - remote_address: "KAFKA_ADDRESS:9092" - connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/bench/topology-encode.yaml b/shotover-proxy/tests/test-configs/kafka/bench/topology-encode.yaml deleted file mode 100644 index 218d3b508..000000000 --- a/shotover-proxy/tests/test-configs/kafka/bench/topology-encode.yaml +++ /dev/null @@ -1,12 +0,0 @@ ---- -sources: - - Kafka: - name: "kafka" - listen_addr: "127.0.0.1:9192" - chain: - - DebugForceEncode: - encode_requests: true - encode_responses: true - - KafkaSinkSingle: - remote_address: "127.0.0.1:9092" - connect_timeout_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/bench/topology.yaml b/shotover-proxy/tests/test-configs/kafka/bench/topology.yaml deleted file mode 100644 index bdeae01cb..000000000 --- a/shotover-proxy/tests/test-configs/kafka/bench/topology.yaml +++ /dev/null @@ -1,9 +0,0 @@ ---- -sources: - - Kafka: - name: "kafka" - listen_addr: "127.0.0.1:9192" - chain: - - KafkaSinkSingle: - remote_address: "127.0.0.1:9092" - connect_timeout_ms: 3000 diff --git a/windsock-cloud-docker/src/main.rs b/windsock-cloud-docker/src/main.rs index 523322fad..b92430433 100644 --- a/windsock-cloud-docker/src/main.rs +++ b/windsock-cloud-docker/src/main.rs @@ -60,7 +60,7 @@ fn main() { container_bash(&format!( r#"cd shotover-proxy; source "$HOME/.cargo/env"; -AWS_ACCESS_KEY_ID={access_key_id} AWS_SECRET_ACCESS_KEY={secret_access_key} CARGO_TERM_COLOR=always cargo test --target-dir /target --release --bench windsock --features alpha-transforms -- {args}"# +AWS_ACCESS_KEY_ID={access_key_id} AWS_SECRET_ACCESS_KEY={secret_access_key} CARGO_TERM_COLOR=always cargo test --target-dir /target --release --bench windsock --features alpha-transforms --features rdkafka-driver-tests -- {args}"# )); // extract windsock results