Skip to content

Commit

Permalink
KafkaSinkSingle: remove dest host config, rely solely on dest port
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 20, 2023
1 parent 312f287 commit 1082b2c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 24 deletions.
15 changes: 5 additions & 10 deletions shotover-proxy/benches/windsock/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl KafkaBench {
}
}

fn generate_topology_yaml(&self, host_address: String, kafka_address: String) -> String {
fn generate_topology_yaml(&self, host_address: String) -> String {
let mut transforms = vec![];
if let Shotover::ForcedMessageParsed = self.shotover {
transforms.push(Box::new(DebugForceEncodeConfig {
Expand All @@ -51,7 +51,7 @@ impl KafkaBench {
}

transforms.push(Box::new(KafkaSinkSingleConfig {
address: kafka_address,
destination_port: 9192,
connect_timeout_ms: 3000,
read_timeout: None,
}));
Expand All @@ -70,13 +70,11 @@ impl KafkaBench {
async fn run_aws_shotover(
&self,
instance: Arc<Ec2InstanceWithShotover>,
kafka_ip: String,
) -> Option<crate::aws::RunningShotover> {
let ip = instance.instance.private_ip().to_string();
match self.shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology =
self.generate_topology_yaml(format!("{ip}:9092"), format!("{kafka_ip}:9092"));
let topology = self.generate_topology_yaml(format!("{ip}:9092"));
Some(instance.run_shotover(&topology).await)
}
Shotover::None => None,
Expand Down Expand Up @@ -139,7 +137,7 @@ impl Bench for KafkaBench {

let (_, running_shotover) = futures::join!(
run_aws_kafka(kafka_instance),
self.run_aws_shotover(shotover_instance, kafka_ip.clone())
self.run_aws_shotover(shotover_instance)
);

let destination_ip = if running_shotover.is_some() {
Expand Down Expand Up @@ -172,10 +170,7 @@ impl Bench for KafkaBench {
let mut profiler = ProfilerRunner::new(self.name(), profiling);
let shotover = match self.shotover {
Shotover::Standard | Shotover::ForcedMessageParsed => {
let topology_yaml = self.generate_topology_yaml(
"127.0.0.1:9192".to_owned(),
"127.0.0.1:9092".to_owned(),
);
let topology_yaml = self.generate_topology_yaml("127.0.0.1:9192".to_owned());
Some(shotover_process_custom_topology(&topology_yaml, &profiler).await)
}
Shotover::None => None,
Expand Down
24 changes: 10 additions & 14 deletions shotover/src/transforms/kafka/sink_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ use tokio::time::timeout;

#[derive(Serialize, Deserialize, Debug)]
#[serde(deny_unknown_fields)]
/// KafkaSinkSingle is designed solely for the use case of running a shotover instance on the same machine as each kafka instance.
/// The kafka instance and shotover instance must run on seperate ports.
pub struct KafkaSinkSingleConfig {
#[serde(rename = "remote_address")]
pub address: String,
pub destination_port: u16,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
}
Expand All @@ -32,7 +33,7 @@ use crate::transforms::TransformConfig;
impl TransformConfig for KafkaSinkSingleConfig {
async fn get_builder(&self, chain_name: String) -> Result<Box<dyn TransformBuilder>> {
Ok(Box::new(KafkaSinkSingleBuilder::new(
self.address.clone(),
self.destination_port,
chain_name,
self.connect_timeout_ms,
self.read_timeout,
Expand All @@ -42,28 +43,21 @@ impl TransformConfig for KafkaSinkSingleConfig {

pub struct KafkaSinkSingleBuilder {
// contains address and port
address: String,
address_port: u16,
connect_timeout: Duration,
read_timeout: Option<Duration>,
}

impl KafkaSinkSingleBuilder {
pub fn new(
address: String,
address_port: u16,
_chain_name: String,
connect_timeout_ms: u64,
timeout: Option<u64>,
) -> KafkaSinkSingleBuilder {
let receive_timeout = timeout.map(Duration::from_secs);
let address_port = address
.rsplit(':')
.next()
.and_then(|str| str.parse().ok())
.unwrap_or(9092);

KafkaSinkSingleBuilder {
address,
address_port,
connect_timeout: Duration::from_millis(connect_timeout_ms),
read_timeout: receive_timeout,
Expand All @@ -75,7 +69,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder {
fn build(&self) -> Transforms {
Transforms::KafkaSinkSingle(KafkaSinkSingle {
outbound: None,
address: self.address.clone(),
address_port: self.address_port,
pushed_messages_tx: None,
connect_timeout: self.connect_timeout,
Expand All @@ -93,7 +86,6 @@ impl TransformBuilder for KafkaSinkSingleBuilder {
}

pub struct KafkaSinkSingle {
address: String,
address_port: u16,
outbound: Option<Connection>,
pushed_messages_tx: Option<mpsc::UnboundedSender<Messages>>,
Expand All @@ -106,7 +98,11 @@ impl Transform for KafkaSinkSingle {
async fn transform<'a>(&'a mut self, mut requests_wrapper: Wrapper<'a>) -> Result<Messages> {
if self.outbound.is_none() {
let codec = KafkaCodecBuilder::new(Direction::Sink);
let tcp_stream = tcp::tcp_stream(self.connect_timeout, &self.address).await?;
let tcp_stream = tcp::tcp_stream(
self.connect_timeout,
(requests_wrapper.local_addr.ip(), self.address_port),
)
.await?;
let (rx, tx) = tcp_stream.into_split();
self.outbound = Some(spawn_read_write_tasks(&codec, rx, tx));
}
Expand Down

0 comments on commit 1082b2c

Please sign in to comment.