Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Feb 29, 2024
1 parent 7cad772 commit f46ac5c
Showing 1 changed file with 0 additions and 31 deletions.
31 changes: 0 additions & 31 deletions shotover/src/transforms/kafka/sink_cluster/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ use crate::message::Message;
use crate::tcp;
use crate::tls::TlsConnector;
use crate::transforms::util::cluster_connection_pool::{spawn_read_write_tasks, Connection};
use crate::transforms::util::Request;
use anyhow::{anyhow, Result};
use kafka_protocol::messages::BrokerId;
use kafka_protocol::protocol::StrBytes;
use std::time::Duration;
use tokio::io::split;
use tokio::sync::oneshot;

pub struct ConnectionFactory {
tls: Option<TlsConnector>,
Expand Down Expand Up @@ -46,35 +44,6 @@ impl ConnectionFactory {
let tcp_stream = tcp::tcp_stream(self.connect_timeout, address).await?;
let (rx, tx) = tcp_stream.into_split();
let connection = spawn_read_write_tasks(&codec, rx, tx);

if let Some(message) = self.auth_message.as_ref() {
let handshake_msg = self.handshake_message.as_ref().unwrap();

let (tx, rx) = oneshot::channel();
connection
.send(Request {
message: handshake_msg.clone(),
return_chan: Some(tx),
})
.map_err(|_| anyhow!("Failed to send"))?;

let response = rx.await.map_err(|_| anyhow!("Failed to receive"))?;

tracing::info!("Received response {:?}", response);

let (tx, rx) = oneshot::channel();
connection
.send(Request {
message: message.clone(),
return_chan: Some(tx),
})
.map_err(|_| anyhow!("Failed to send"))?;

let response = rx.await.map_err(|_| anyhow!("Failed to receive"))?;

tracing::info!("Received response {:?}", response);
}

Ok(connection)
}
}
Expand Down

0 comments on commit f46ac5c

Please sign in to comment.