Skip to content

Commit

Permalink
Merge branch 'main' into opensearch-dual-write
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Sep 27, 2023
2 parents 04ce914 + a84aced commit ca9d69e
Show file tree
Hide file tree
Showing 19 changed files with 271 additions and 291 deletions.
151 changes: 99 additions & 52 deletions shotover-proxy/benches/windsock/cassandra.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover, RunningShotover},
common::{rewritten_file, Shotover},
common::{self, Shotover},
profilers::{self, CloudProfilerRunner, ProfilerRunner},
};
use anyhow::Result;
Expand Down Expand Up @@ -28,9 +28,20 @@ use scylla::{
transport::Compression as ScyllaCompression, Session as ScyllaSession,
SessionBuilder as ScyllaSessionBuilder,
};
use shotover::{
config::chain::TransformChainConfig,
sources::SourceConfig,
transforms::{
cassandra::{
sink_cluster::{CassandraSinkClusterConfig, ShotoverNode},
sink_single::CassandraSinkSingleConfig,
},
debug::force_parse::DebugForceEncodeConfig,
TransformConfig,
},
};
use std::{
collections::HashMap,
path::Path,
sync::Arc,
time::{Duration, Instant},
};
Expand All @@ -41,6 +52,7 @@ use test_helpers::{
};
use tokio::sync::mpsc::UnboundedSender;
use tokio_bin_process::bin_path;
use uuid::Uuid;
use windsock::{Bench, BenchParameters, BenchTask, Profiling, Report};

const ROW_COUNT: usize = 1000;
Expand Down Expand Up @@ -365,6 +377,71 @@ impl CassandraBench {
cassandra: 1,
}
}

fn generate_topology_yaml(&self, host_address: String, cassandra_address: String) -> String {
let mut transforms = vec![];
if let Shotover::ForcedMessageParsed = self.shotover {
transforms.push(Box::new(DebugForceEncodeConfig {
encode_requests: true,
encode_responses: true,
}) as Box<dyn TransformConfig>);
}

match self.topology {
Topology::Cluster3 => {
transforms.push(Box::new(CassandraSinkClusterConfig {
first_contact_points: vec![cassandra_address],
tls: None,
connect_timeout_ms: 3000,
local_shotover_host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a".parse().unwrap(),
read_timeout: None,
shotover_nodes: vec![ShotoverNode {
address: host_address.parse().unwrap(),
data_center: "dc1".to_owned(),
rack: "rack1".to_owned(),
host_id: "2dd022d6-2937-4754-89d6-02d2933a8f7a".parse().unwrap(),
}],
}));
}
Topology::Single => {
transforms.push(Box::new(CassandraSinkSingleConfig {
address: cassandra_address,
tls: None,
connect_timeout_ms: 3000,
read_timeout: None,
}));
}
}

common::generate_topology(SourceConfig::Cassandra(
shotover::sources::cassandra::CassandraConfig {
name: "cassandra".to_owned(),
listen_addr: host_address,
connection_limit: None,
hard_connection_limit: None,
tls: None,
timeout: None,
chain: TransformChainConfig(transforms),
transport: None,
},
))
}

async fn run_aws_shotover(
&self,
instance: Arc<Ec2InstanceWithShotover>,
cassandra_ip: String,
) -> Option<RunningShotover> {
let ip = instance.instance.private_ip().to_string();
match self.shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology = self
.generate_topology_yaml(format!("{ip}:9042"), format!("{cassandra_ip}:9042"));
Some(instance.run_shotover(&topology).await)
}
Shotover::None => None,
}
}
}

#[async_trait]
Expand Down Expand Up @@ -488,12 +565,7 @@ impl Bench for CassandraBench {

let (_, running_shotover) = futures::join!(
run_aws_cassandra(cassandra_nodes, self.topology.clone()),
run_aws_shotover(
shotover_instance.clone(),
self.shotover,
cassandra_ip.clone(),
self.topology.clone(),
)
self.run_aws_shotover(shotover_instance.clone(), cassandra_ip.clone(),)
);

let destination_ip = if running_shotover.is_some() {
Expand Down Expand Up @@ -525,10 +597,8 @@ impl Bench for CassandraBench {

let address = match (&self.topology, &self.shotover) {
(Topology::Single, Shotover::None) => "127.0.0.1:9043",
(Topology::Single, Shotover::Standard) => "127.0.0.1:9042",
(Topology::Cluster3, Shotover::None) => "172.16.1.2:9044",
(Topology::Cluster3, Shotover::Standard) => "127.0.0.1:9042",
(_, Shotover::ForcedMessageParsed) => todo!(),
(_, Shotover::Standard | Shotover::ForcedMessageParsed) => "127.0.0.1:9042",
};
let config_dir = match &self.topology {
Topology::Single => "tests/test-configs/cassandra/passthrough",
Expand All @@ -546,16 +616,26 @@ impl Bench for CassandraBench {
panic!("Mocked cassandra database does not provide a clustered mode")
}
};
let cassandra_address = match &self.topology {
Topology::Single => "127.0.0.1:9043".to_owned(),
Topology::Cluster3 => "172.16.1.2:9044".to_owned(),
};
let mut profiler = ProfilerRunner::new(self.name(), profiling);
let shotover = match self.shotover {
Shotover::Standard => Some(
ShotoverProcessBuilder::new_with_topology(&format!("{config_dir}/topology.yaml"))
.with_bin(bin_path!("shotover-proxy"))
.with_profile(profiler.shotover_profile())
.with_cores(core_count.shotover as u32)
.start()
.await,
),
Shotover::Standard => {
let topology_contents =
self.generate_topology_yaml(address.to_owned(), cassandra_address);
let topology_path = std::env::temp_dir().join(Uuid::new_v4().to_string());
std::fs::write(&topology_path, topology_contents).unwrap();
Some(
ShotoverProcessBuilder::new_with_topology(topology_path.to_str().unwrap())
.with_bin(bin_path!("shotover-proxy"))
.with_profile(profiler.shotover_profile())
.with_cores(core_count.shotover as u32)
.start()
.await,
)
}
Shotover::None => None,
Shotover::ForcedMessageParsed => todo!(),
};
Expand Down Expand Up @@ -627,39 +707,6 @@ impl Bench for CassandraBench {
self.operation.run(&session, reporter, parameters).await;
}
}

async fn run_aws_shotover(
instance: Arc<Ec2InstanceWithShotover>,
shotover: Shotover,
cassandra_ip: String,
topology: Topology,
) -> Option<RunningShotover> {
let config_dir = "tests/test-configs/cassandra/bench";
let ip = instance.instance.private_ip().to_string();
match shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let encoded = match shotover {
Shotover::Standard => "",
Shotover::ForcedMessageParsed => "-encode",
Shotover::None => unreachable!(),
};
let clustered = match topology {
Topology::Single => "",
Topology::Cluster3 => "-cluster",
};
let topology = rewritten_file(
Path::new(&format!(
"{config_dir}/topology{clustered}{encoded}-cloud.yaml"
)),
&[("HOST_ADDRESS", &ip), ("CASSANDRA_ADDRESS", &cassandra_ip)],
)
.await;
Some(instance.run_shotover(&topology).await)
}
Shotover::None => None,
}
}

async fn run_aws_cassandra(nodes: Vec<AwsNodeInfo>, topology: Topology) {
match topology {
Topology::Cluster3 => run_aws_cassandra_cluster(nodes).await,
Expand Down
16 changes: 6 additions & 10 deletions shotover-proxy/benches/windsock/common.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::Context;
use std::path::Path;
use shotover::{config::topology::Topology, sources::SourceConfig};

#[derive(Clone, Copy)]
pub enum Shotover {
Expand All @@ -21,13 +20,10 @@ impl Shotover {
}
}

pub async fn rewritten_file(path: &Path, find_replace: &[(&str, &str)]) -> String {
let mut text = tokio::fs::read_to_string(path)
.await
.with_context(|| format!("Failed to read from {path:?}"))
.unwrap();
for (find, replace) in find_replace {
text = text.replace(find, replace);
pub fn generate_topology(source: SourceConfig) -> String {
Topology {
sources: vec![source],
}
text
.serialize()
.unwrap()
}
90 changes: 56 additions & 34 deletions shotover-proxy/benches/windsock/kafka.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
use crate::aws::{Ec2InstanceWithDocker, Ec2InstanceWithShotover};
use crate::common::{rewritten_file, Shotover};
use crate::common::{self, Shotover};
use crate::profilers::{self, CloudProfilerRunner, ProfilerRunner};
use crate::shotover::shotover_process;
use crate::shotover::shotover_process_custom_topology;
use anyhow::Result;
use async_trait::async_trait;
use aws_throwaway::Ec2Instance;
use futures::StreamExt;
use std::path::Path;
use shotover::config::chain::TransformChainConfig;
use shotover::sources::SourceConfig;
use shotover::transforms::debug::force_parse::DebugForceEncodeConfig;
use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig;
use shotover::transforms::TransformConfig;
use std::sync::Arc;
use std::{collections::HashMap, time::Duration};
use test_helpers::docker_compose::docker_compose;
Expand Down Expand Up @@ -36,6 +40,48 @@ impl KafkaBench {
message_size,
}
}

fn generate_topology_yaml(&self, host_address: String, kafka_address: String) -> String {
let mut transforms = vec![];
if let Shotover::ForcedMessageParsed = self.shotover {
transforms.push(Box::new(DebugForceEncodeConfig {
encode_requests: true,
encode_responses: true,
}) as Box<dyn TransformConfig>);
}

transforms.push(Box::new(KafkaSinkSingleConfig {
address: kafka_address,
connect_timeout_ms: 3000,
read_timeout: None,
}));

common::generate_topology(SourceConfig::Kafka(shotover::sources::kafka::KafkaConfig {
name: "kafka".to_owned(),
listen_addr: host_address,
connection_limit: None,
hard_connection_limit: None,
tls: None,
timeout: None,
chain: TransformChainConfig(transforms),
}))
}

async fn run_aws_shotover(
&self,
instance: Arc<Ec2InstanceWithShotover>,
kafka_ip: String,
) -> Option<crate::aws::RunningShotover> {
let ip = instance.instance.private_ip().to_string();
match self.shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology =
self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9092"));
Some(instance.run_shotover(&topology).await)
}
Shotover::None => None,
}
}
}

#[async_trait]
Expand Down Expand Up @@ -93,7 +139,7 @@ impl Bench for KafkaBench {

let (_, running_shotover) = futures::join!(
run_aws_kafka(kafka_instance),
run_aws_shotover(shotover_instance, self.shotover, kafka_ip.clone())
self.run_aws_shotover(shotover_instance, kafka_ip.clone())
);

let destination_ip = if running_shotover.is_some() {
Expand Down Expand Up @@ -125,13 +171,14 @@ impl Bench for KafkaBench {

let mut profiler = ProfilerRunner::new(self.name(), profiling);
let shotover = match self.shotover {
Shotover::Standard => {
Some(shotover_process(&format!("{config_dir}/topology.yaml"), &profiler).await)
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology_yaml = self.generate_topology_yaml(
"127.0.0.1:9192".to_owned(),
"127.0.0.1:9092".to_owned(),
);
Some(shotover_process_custom_topology(&topology_yaml, &profiler).await)
}
Shotover::None => None,
Shotover::ForcedMessageParsed => Some(
shotover_process(&format!("{config_dir}/topology-encode.yaml"), &profiler).await,
),
};

let broker_address = match self.shotover {
Expand Down Expand Up @@ -216,31 +263,6 @@ impl Bench for KafkaBench {
}
}

async fn run_aws_shotover(
instance: Arc<Ec2InstanceWithShotover>,
shotover: Shotover,
kafka_ip: String,
) -> Option<crate::aws::RunningShotover> {
let config_dir = "tests/test-configs/kafka/bench";
let ip = instance.instance.private_ip().to_string();
match shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let encoded = match shotover {
Shotover::Standard => "",
Shotover::ForcedMessageParsed => "-encode",
Shotover::None => unreachable!(),
};
let topology = rewritten_file(
Path::new(&format!("{config_dir}/topology{encoded}-cloud.yaml")),
&[("HOST_ADDRESS", &ip), ("KAFKA_ADDRESS", &kafka_ip)],
)
.await;
Some(instance.run_shotover(&topology).await)
}
Shotover::None => None,
}
}

async fn run_aws_kafka(instance: Arc<Ec2InstanceWithDocker>) {
let ip = instance.instance.private_ip().to_string();
instance
Expand Down
Loading

0 comments on commit ca9d69e

Please sign in to comment.