Skip to content

Commit

Permalink
Windsock: generate topology.yaml
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 21, 2023
1 parent f9f8d1b commit ccf0586
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 93 deletions.
9 changes: 9 additions & 0 deletions shotover-proxy/benches/windsock/common.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use anyhow::Context;
use shotover::{config::topology::Topology, sources::SourceConfig};
use std::path::Path;

#[derive(Clone, Copy)]
Expand Down Expand Up @@ -31,3 +32,11 @@ pub async fn rewritten_file(path: &Path, find_replace: &[(&str, &str)]) -> Strin
}
text
}

pub fn generate_topology(source: SourceConfig) -> String {
Topology {
sources: vec![source],
}
.serialize()
.unwrap()
}
149 changes: 100 additions & 49 deletions shotover-proxy/benches/windsock/redis.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::{
aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover, WindsockAws},
common::{rewritten_file, Shotover},
common::{self, Shotover},
profilers::{self, CloudProfilerRunner, ProfilerRunner},
shotover::shotover_process,
shotover::shotover_process_custom_topology,
};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
Expand All @@ -13,12 +13,21 @@ use fred::{
};
use itertools::Itertools;
use rustls_pemfile::{certs, Item};
use shotover::{
config::chain::TransformChainConfig,
sources::SourceConfig,
tls::{TlsAcceptorConfig, TlsConnectorConfig},
transforms::{
debug::force_parse::DebugForceEncodeConfig,
redis::{sink_cluster::RedisSinkClusterConfig, sink_single::RedisSinkSingleConfig},
TransformConfig,
},
};
use std::{
collections::HashMap,
fs::File,
io::BufReader,
net::IpAddr,
path::Path,
sync::Arc,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -68,6 +77,81 @@ impl RedisBench {
encryption,
}
}

fn generate_topology_yaml(&self, host_address: String, redis_address: String) -> String {
let certs = "tests/test-configs/redis/tls/certs";
let tls_connector = match self.encryption {
Encryption::Tls => Some(TlsConnectorConfig {
certificate_authority_path: format!("{certs}/localhost_CA.crt"),
certificate_path: Some(format!("{certs}/localhost.crt")),
private_key_path: Some(format!("{certs}/localhost.key")),
verify_hostname: true,
}),
Encryption::None => None,
};
let tls_acceptor = match self.encryption {
Encryption::Tls => Some(TlsAcceptorConfig {
certificate_path: format!("{certs}/localhost.crt"),
private_key_path: format!("{certs}/localhost.key"),
certificate_authority_path: None,
}),
Encryption::None => None,
};

let mut transforms = vec![];
if let Shotover::ForcedMessageParsed = self.shotover {
transforms.push(Box::new(DebugForceEncodeConfig {
encode_requests: true,
encode_responses: true,
}) as Box<dyn TransformConfig>);
}

match self.topology {
RedisTopology::Cluster3 => {
transforms.push(Box::new(RedisSinkClusterConfig {
first_contact_points: vec![redis_address],
direct_destination: None,
tls: tls_connector,
connection_count: None,
connect_timeout_ms: 3000,
}));
}
RedisTopology::Single => {
transforms.push(Box::new(RedisSinkSingleConfig {
address: redis_address,
tls: tls_connector,
connect_timeout_ms: 3000,
}));
}
}

common::generate_topology(SourceConfig::Redis(shotover::sources::redis::RedisConfig {
name: "redis".to_owned(),
listen_addr: host_address,
connection_limit: None,
hard_connection_limit: None,
tls: tls_acceptor,
timeout: None,
chain: TransformChainConfig(transforms),
}))
}

async fn run_aws_shotover(
&self,
instance: Arc<Ec2InstanceWithShotover>,
shotover: Shotover,
redis_ip: String,
) -> Option<RunningShotover> {
let ip = instance.instance.private_ip().to_string();
match shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology =
self.generate_topology_yaml(format!("{ip}:6379"), format!("{redis_ip}:6379"));
Some(instance.run_shotover(&topology).await)
}
Shotover::None => None,
}
}
}

#[async_trait]
Expand Down Expand Up @@ -147,13 +231,9 @@ impl Bench for RedisBench {

redis_instances.run(self.encryption).await;
// unlike other sinks, redis cluster sink needs the redis instance to be already up
let running_shotover = run_aws_shotover(
shotover_instance.clone(),
self.shotover,
redis_ip.clone(),
self.topology,
)
.await;
let running_shotover = self
.run_aws_shotover(shotover_instance.clone(), self.shotover, redis_ip.clone())
.await;

let destination_ip = if running_shotover.is_some() {
format!("redis://{shotover_ip}")
Expand Down Expand Up @@ -185,14 +265,18 @@ impl Bench for RedisBench {
test_helpers::cert::generate_redis_test_certs();

// rediss:// url is not needed to enable TLS because we overwrite the TLS config later on
let address = match (self.topology, self.shotover) {
let client_url = match (self.topology, self.shotover) {
(RedisTopology::Single, Shotover::None) => "redis://127.0.0.1:1111",
(RedisTopology::Cluster3, Shotover::None) => "redis-cluster://172.16.1.2:6379",
(
RedisTopology::Single | RedisTopology::Cluster3,
Shotover::Standard | Shotover::ForcedMessageParsed,
) => "redis://127.0.0.1:6379",
};
let redis_address = match self.topology {
RedisTopology::Single => "127.0.0.1:1111",
RedisTopology::Cluster3 => "172.16.1.2:6379",
};
let config_dir = match (self.topology, self.encryption) {
(RedisTopology::Single, Encryption::None) => "tests/test-configs/redis/passthrough",
(RedisTopology::Cluster3, Encryption::None) => {
Expand All @@ -204,17 +288,16 @@ impl Bench for RedisBench {
let _compose = docker_compose(&format!("{config_dir}/docker-compose.yaml"));
let mut profiler = ProfilerRunner::new(self.name(), profiling);
let shotover = match self.shotover {
Shotover::Standard => {
Some(shotover_process(&format!("{config_dir}/topology.yaml"), &profiler).await)
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology_yaml = self
.generate_topology_yaml("127.0.0.1:6379".to_owned(), redis_address.to_owned());
Some(shotover_process_custom_topology(&topology_yaml, &profiler).await)
}
Shotover::ForcedMessageParsed => Some(
shotover_process(&format!("{config_dir}/topology-encode.yaml"), &profiler).await,
),
Shotover::None => None,
};
profiler.run(&shotover).await;

self.execute_run(address, &parameters).await;
self.execute_run(client_url, &parameters).await;

if let Some(shotover) = shotover {
shotover
Expand Down Expand Up @@ -370,38 +453,6 @@ impl BenchTask for BenchTaskRedis {
}
}

async fn run_aws_shotover(
instance: Arc<Ec2InstanceWithShotover>,
shotover: Shotover,
redis_ip: String,
topology: RedisTopology,
) -> Option<RunningShotover> {
let config_dir = "tests/test-configs/redis/bench";
let ip = instance.instance.private_ip().to_string();
match shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let clustered = match topology {
RedisTopology::Single => "",
RedisTopology::Cluster3 => "-cluster",
};
let encoded = match shotover {
Shotover::Standard => "",
Shotover::ForcedMessageParsed => "-encode",
Shotover::None => unreachable!(),
};
let topology = rewritten_file(
Path::new(&format!(
"{config_dir}/topology{clustered}{encoded}-cloud.yaml"
)),
&[("HOST_ADDRESS", &ip), ("REDIS_ADDRESS", &redis_ip)],
)
.await;
Some(instance.run_shotover(&topology).await)
}
Shotover::None => None,
}
}

enum RedisCluster {
Single(Arc<Ec2InstanceWithDocker>),
Cluster3 {
Expand Down
15 changes: 15 additions & 0 deletions shotover-proxy/benches/windsock/shotover.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
use crate::profilers::ProfilerRunner;
use test_helpers::shotover_process::ShotoverProcessBuilder;
use tokio_bin_process::{bin_path, BinProcess};
use uuid::Uuid;

#[cfg(feature = "rdkafka-driver-tests")]
pub async fn shotover_process(topology_path: &str, profiler: &ProfilerRunner) -> BinProcess {
ShotoverProcessBuilder::new_with_topology(topology_path)
.with_bin(bin_path!("shotover-proxy"))
.with_profile(profiler.shotover_profile())
.start()
.await
}

pub async fn shotover_process_custom_topology(
topology_contents: &str,
profiler: &ProfilerRunner,
) -> BinProcess {
let topology_path = std::env::temp_dir().join(Uuid::new_v4().to_string());
std::fs::write(&topology_path, topology_contents).unwrap();
ShotoverProcessBuilder::new_with_topology(topology_path.to_str().unwrap())
.with_bin(bin_path!("shotover-proxy"))
.with_profile(profiler.shotover_profile())
.start()
.await
}

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit ccf0586

Please sign in to comment.