Skip to content

Commit

Permalink
Add NodeState to ShotoverNode + some renaming (#1758)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinweng-instaclustr authored Oct 1, 2024
1 parent 25827de commit eb4fb2f
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 39 deletions.
4 changes: 3 additions & 1 deletion shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ use pretty_assertions::assert_eq;
use shotover::config::chain::TransformChainConfig;
use shotover::sources::SourceConfig;
use shotover::transforms::debug::force_parse::DebugForceEncodeConfig;
use shotover::transforms::kafka::sink_cluster::{KafkaSinkClusterConfig, ShotoverNodeConfig};
use shotover::transforms::kafka::sink_cluster::shotover_node::ShotoverNodeConfig;
use shotover::transforms::kafka::sink_cluster::KafkaSinkClusterConfig;
use shotover::transforms::kafka::sink_single::KafkaSinkSingleConfig;
use shotover::transforms::TransformConfig;
use std::sync::Arc;
Expand Down Expand Up @@ -96,6 +97,7 @@ impl KafkaBench {
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => Box::new(KafkaSinkClusterConfig {
connect_timeout_ms: 3000,
read_timeout: None,
check_shotover_peers_delay_ms: None,
first_contact_points: vec![kafka_address],
shotover_nodes: vec![ShotoverNodeConfig {
address: host_address.parse().unwrap(),
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use rand::{rngs::SmallRng, seq::IteratorRandom};
use std::{collections::HashMap, time::Instant};

use super::{
node::{ConnectionFactory, KafkaAddress, KafkaNode, NodeState},
kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState},
scram_over_mtls::{connection::ScramOverMtlsConnection, AuthorizeScramOverMtls},
SASL_SCRAM_MECHANISMS,
};
Expand Down Expand Up @@ -234,9 +234,9 @@ impl Connections {

// Update the node state according to whether we can currently open a connection.
let node_state = if connection.is_err() {
NodeState::Down
KafkaNodeState::Down
} else {
NodeState::Up
KafkaNodeState::Up
};
nodes
.iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ pub struct KafkaNode {
pub broker_id: BrokerId,
pub rack: Option<StrBytes>,
pub kafka_address: KafkaAddress,
state: Arc<AtomicNodeState>,
state: Arc<AtomicKafkaNodeState>,
}

impl KafkaNode {
Expand All @@ -292,22 +292,22 @@ impl KafkaNode {
broker_id,
kafka_address,
rack,
state: Arc::new(AtomicNodeState::new(NodeState::Up)),
state: Arc::new(AtomicKafkaNodeState::new(KafkaNodeState::Up)),
}
}

pub fn is_up(&self) -> bool {
self.state.load(Ordering::Relaxed) == NodeState::Up
self.state.load(Ordering::Relaxed) == KafkaNodeState::Up
}

pub fn set_state(&self, state: NodeState) {
pub fn set_state(&self, state: KafkaNodeState) {
self.state.store(state, Ordering::Relaxed)
}
}

#[atomic_enum]
#[derive(PartialEq)]
pub enum NodeState {
pub enum KafkaNodeState {
Up,
Down,
}
34 changes: 6 additions & 28 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use connections::{Connections, Destination};
use dashmap::DashMap;
use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState};
use kafka_protocol::indexmap::IndexMap;
use kafka_protocol::messages::fetch_request::FetchTopic;
use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch;
Expand All @@ -27,7 +28,6 @@ use kafka_protocol::messages::{
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
use metrics::{counter, Counter};
use node::{ConnectionFactory, KafkaAddress, KafkaNode, NodeState};
use rand::rngs::SmallRng;
use rand::seq::{IteratorRandom, SliceRandom};
use rand::SeedableRng;
Expand All @@ -36,6 +36,7 @@ use scram_over_mtls::{
OriginalScramState,
};
use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use std::collections::{HashMap, VecDeque};
use std::hash::Hasher;
use std::sync::atomic::AtomicI64;
Expand All @@ -45,8 +46,9 @@ use tokio::sync::RwLock;
use uuid::Uuid;

mod connections;
mod node;
mod kafka_node;
mod scram_over_mtls;
pub mod shotover_node;

const SASL_SCRAM_MECHANISMS: [&str; 2] = ["SCRAM-SHA-256", "SCRAM-SHA-512"];

Expand All @@ -66,35 +68,11 @@ pub struct KafkaSinkClusterConfig {
pub local_shotover_broker_id: i32,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
pub check_shotover_peers_delay_ms: Option<u64>,
pub tls: Option<TlsConnectorConfig>,
pub authorize_scram_over_mtls: Option<AuthorizeScramOverMtlsConfig>,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct ShotoverNodeConfig {
pub address: String,
pub rack: String,
pub broker_id: i32,
}

impl ShotoverNodeConfig {
fn build(self) -> Result<ShotoverNode> {
Ok(ShotoverNode {
address: KafkaAddress::from_str(&self.address)?,
rack: StrBytes::from_string(self.rack),
broker_id: BrokerId(self.broker_id),
})
}
}

#[derive(Clone)]
struct ShotoverNode {
pub address: KafkaAddress,
pub rack: StrBytes,
pub broker_id: BrokerId,
}

const NAME: &str = "KafkaSinkCluster";
#[typetag::serde(name = "KafkaSinkCluster")]
#[async_trait(?Send)]
Expand Down Expand Up @@ -1268,7 +1246,7 @@ impl KafkaSinkCluster {
}
})
.unwrap()
.set_state(NodeState::Down);
.set_state(KafkaNodeState::Down);

// bubble up error
let request_types: Vec<String> = requests
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::node::{ConnectionFactory, KafkaAddress};
use super::kafka_node::{ConnectionFactory, KafkaAddress};
use crate::{
connection::SinkConnection,
tls::{TlsConnector, TlsConnectorConfig},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use kafka_protocol::{
};
use rand::{rngs::SmallRng, seq::IteratorRandom};

use crate::transforms::kafka::sink_cluster::node::{ConnectionFactory, KafkaAddress};
use crate::transforms::kafka::sink_cluster::kafka_node::{ConnectionFactory, KafkaAddress};
use crate::transforms::kafka::sink_cluster::scram_over_mtls::{DelegationToken, Node};
use crate::{
connection::SinkConnection,
Expand Down
41 changes: 41 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/shotover_node.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use crate::transforms::kafka::sink_cluster::kafka_node::KafkaAddress;
use atomic_enum::atomic_enum;
use kafka_protocol::messages::BrokerId;
use kafka_protocol::protocol::StrBytes;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
pub struct ShotoverNodeConfig {
pub address: String,
pub rack: String,
pub broker_id: i32,
}

impl ShotoverNodeConfig {
pub(crate) fn build(self) -> anyhow::Result<ShotoverNode> {
Ok(ShotoverNode {
address: KafkaAddress::from_str(&self.address)?,
rack: StrBytes::from_string(self.rack),
broker_id: BrokerId(self.broker_id),
state: Arc::new(AtomicShotoverNodeState::new(ShotoverNodeState::Up)),
})
}
}

#[derive(Clone)]
pub(crate) struct ShotoverNode {
pub address: KafkaAddress,
pub rack: StrBytes,
pub broker_id: BrokerId,
#[allow(unused)]
state: Arc<AtomicShotoverNodeState>,
}

#[atomic_enum]
#[derive(PartialEq)]
pub(crate) enum ShotoverNodeState {
Up,
Down,
}

0 comments on commit eb4fb2f

Please sign in to comment.