diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index e916dd7cb..1939a4137 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -14,7 +14,8 @@ use pretty_assertions::assert_eq; use shotover::config::chain::TransformChainConfig; use shotover::sources::SourceConfig; use shotover::transforms::debug::force_parse::DebugForceEncodeConfig; -use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, ShotoverNodeConfig}; +use shotover::transforms::kafka::sink_cluster::shotover_node::ShotoverNodeConfig; +use shotover::transforms::kafka::sink_cluster::KafkaSinkClusterConfig; use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig; use shotover::transforms::TransformConfig; use std::sync::Arc; @@ -96,6 +97,7 @@ impl KafkaBench { KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => Box::new(KafkaSinkClusterConfig { connect_timeout_ms: 3000, read_timeout: None, + check_shotover_peers_delay_ms: None, first_contact_points: vec![kafka_address], shotover_nodes: vec![ShotoverNodeConfig { address: host_address.parse().unwrap(), diff --git a/shotover/src/transforms/kafka/sink_cluster/connections.rs b/shotover/src/transforms/kafka/sink_cluster/connections.rs index 79554871c..97d71c4a5 100644 --- a/shotover/src/transforms/kafka/sink_cluster/connections.rs +++ b/shotover/src/transforms/kafka/sink_cluster/connections.rs @@ -10,7 +10,7 @@ use rand::{rngs::SmallRng, seq::IteratorRandom}; use std::{collections::HashMap, time::Instant}; use super::{ - node::{ConnectionFactory, KafkaAddress, KafkaNode, NodeState}, + kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState}, scram_over_mtls::{connection::ScramOverMtlsConnection, AuthorizeScramOverMtls}, SASL_SCRAM_MECHANISMS, }; @@ -234,9 +234,9 @@ impl Connections { // Update the node state according to whether we can currently open a connection. let node_state = if connection.is_err() { - NodeState::Down + KafkaNodeState::Down } else { - NodeState::Up + KafkaNodeState::Up }; nodes .iter() diff --git a/shotover/src/transforms/kafka/sink_cluster/node.rs b/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs similarity index 97% rename from shotover/src/transforms/kafka/sink_cluster/node.rs rename to shotover/src/transforms/kafka/sink_cluster/kafka_node.rs index 76e1dc471..d795b9228 100644 --- a/shotover/src/transforms/kafka/sink_cluster/node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs @@ -283,7 +283,7 @@ pub struct KafkaNode { pub broker_id: BrokerId, pub rack: Option, pub kafka_address: KafkaAddress, - state: Arc, + state: Arc, } impl KafkaNode { @@ -292,22 +292,22 @@ impl KafkaNode { broker_id, kafka_address, rack, - state: Arc::new(AtomicNodeState::new(NodeState::Up)), + state: Arc::new(AtomicKafkaNodeState::new(KafkaNodeState::Up)), } } pub fn is_up(&self) -> bool { - self.state.load(Ordering::Relaxed) == NodeState::Up + self.state.load(Ordering::Relaxed) == KafkaNodeState::Up } - pub fn set_state(&self, state: NodeState) { + pub fn set_state(&self, state: KafkaNodeState) { self.state.store(state, Ordering::Relaxed) } } #[atomic_enum] #[derive(PartialEq)] -pub enum NodeState { +pub enum KafkaNodeState { Up, Down, } diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 2e59c3b3d..4d4a027fa 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -11,6 +11,7 @@ use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use connections::{Connections, Destination}; use dashmap::DashMap; +use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState}; use kafka_protocol::indexmap::IndexMap; use kafka_protocol::messages::fetch_request::FetchTopic; use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch; @@ -27,7 +28,6 @@ use kafka_protocol::messages::{ use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; use metrics::{counter, Counter}; -use node::{ConnectionFactory, KafkaAddress, KafkaNode, NodeState}; use rand::rngs::SmallRng; use rand::seq::{IteratorRandom, SliceRandom}; use rand::SeedableRng; @@ -36,6 +36,7 @@ use scram_over_mtls::{ OriginalScramState, }; use serde::{Deserialize, Serialize}; +use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use std::collections::{HashMap, VecDeque}; use std::hash::Hasher; use std::sync::atomic::AtomicI64; @@ -45,8 +46,9 @@ use tokio::sync::RwLock; use uuid::Uuid; mod connections; -mod node; +mod kafka_node; mod scram_over_mtls; +pub mod shotover_node; const SASL_SCRAM_MECHANISMS: [&str; 2] = ["SCRAM-SHA-256", "SCRAM-SHA-512"]; @@ -66,35 +68,11 @@ pub struct KafkaSinkClusterConfig { pub local_shotover_broker_id: i32, pub connect_timeout_ms: u64, pub read_timeout: Option, + pub check_shotover_peers_delay_ms: Option, pub tls: Option, pub authorize_scram_over_mtls: Option, } -#[derive(Serialize, Deserialize, Debug, Clone)] -#[serde(deny_unknown_fields)] -pub struct ShotoverNodeConfig { - pub address: String, - pub rack: String, - pub broker_id: i32, -} - -impl ShotoverNodeConfig { - fn build(self) -> Result { - Ok(ShotoverNode { - address: KafkaAddress::from_str(&self.address)?, - rack: StrBytes::from_string(self.rack), - broker_id: BrokerId(self.broker_id), - }) - } -} - -#[derive(Clone)] -struct ShotoverNode { - pub address: KafkaAddress, - pub rack: StrBytes, - pub broker_id: BrokerId, -} - const NAME: &str = "KafkaSinkCluster"; #[typetag::serde(name = "KafkaSinkCluster")] #[async_trait(?Send)] @@ -1268,7 +1246,7 @@ impl KafkaSinkCluster { } }) .unwrap() - .set_state(NodeState::Down); + .set_state(KafkaNodeState::Down); // bubble up error let request_types: Vec = requests diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs index 745ea523a..7fc3186c8 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls.rs @@ -1,4 +1,4 @@ -use super::node::{ConnectionFactory, KafkaAddress}; +use super::kafka_node::{ConnectionFactory, KafkaAddress}; use crate::{ connection::SinkConnection, tls::{TlsConnector, TlsConnectorConfig}, diff --git a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs index 212ec725e..c31909eac 100644 --- a/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs +++ b/shotover/src/transforms/kafka/sink_cluster/scram_over_mtls/create_token.rs @@ -13,7 +13,7 @@ use kafka_protocol::{ }; use rand::{rngs::SmallRng, seq::IteratorRandom}; -use crate::transforms::kafka::sink_cluster::node::{ConnectionFactory, KafkaAddress}; +use crate::transforms::kafka::sink_cluster::kafka_node::{ConnectionFactory, KafkaAddress}; use crate::transforms::kafka::sink_cluster::scram_over_mtls::{DelegationToken, Node}; use crate::{ connection::SinkConnection, diff --git a/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs b/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs new file mode 100644 index 000000000..fb449e6ca --- /dev/null +++ b/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs @@ -0,0 +1,41 @@ +use crate::transforms::kafka::sink_cluster::kafka_node::KafkaAddress; +use atomic_enum::atomic_enum; +use kafka_protocol::messages::BrokerId; +use kafka_protocol::protocol::StrBytes; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(deny_unknown_fields)] +pub struct ShotoverNodeConfig { + pub address: String, + pub rack: String, + pub broker_id: i32, +} + +impl ShotoverNodeConfig { + pub(crate) fn build(self) -> anyhow::Result { + Ok(ShotoverNode { + address: KafkaAddress::from_str(&self.address)?, + rack: StrBytes::from_string(self.rack), + broker_id: BrokerId(self.broker_id), + state: Arc::new(AtomicShotoverNodeState::new(ShotoverNodeState::Up)), + }) + } +} + +#[derive(Clone)] +pub(crate) struct ShotoverNode { + pub address: KafkaAddress, + pub rack: StrBytes, + pub broker_id: BrokerId, + #[allow(unused)] + state: Arc, +} + +#[atomic_enum] +#[derive(PartialEq)] +pub(crate) enum ShotoverNodeState { + Up, + Down, +}