Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect offline shotover nodes for KafkaSinkCluster #1762

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d979776
Move shotover node and add nodestate field
justinweng-instaclustr Sep 30, 2024
132ff42
renaming
justinweng-instaclustr Sep 30, 2024
d7f553a
Merge branch 'main' into handle-offline-shotover-nodes
justinweng-instaclustr Sep 30, 2024
f45682c
change shotovernode visibility
justinweng-instaclustr Oct 1, 2024
e4aa6a8
Merge branch 'refs/heads/main' into handle-offline-shotover-nodes
justinweng-instaclustr Oct 1, 2024
6a57857
add background task to check shotover peers state
justinweng-instaclustr Oct 3, 2024
1b90587
Merge branch 'refs/heads/main' into handle-offline-shotover-nodes
justinweng-instaclustr Oct 3, 2024
e41a1ec
fix configs
justinweng-instaclustr Oct 3, 2024
d8a08ee
add random delay
justinweng-instaclustr Oct 3, 2024
5fd897c
update test case
justinweng-instaclustr Oct 3, 2024
98c71d8
fix test
justinweng-instaclustr Oct 3, 2024
f9be00d
add doco about new config field
justinweng-instaclustr Oct 3, 2024
44bbfb2
Merge branch 'refs/heads/main' into handle-offline-shotover-nodes
justinweng-instaclustr Oct 4, 2024
76efc1c
revert to use kafkaaddress and add error handling
justinweng-instaclustr Oct 4, 2024
c6c63c2
update test
justinweng-instaclustr Oct 4, 2024
57a8a03
fix log format
justinweng-instaclustr Oct 4, 2024
ff16443
fix infinite loop when shotover_peers is empty
justinweng-instaclustr Oct 8, 2024
1ae95b3
add new config for tests
justinweng-instaclustr Oct 8, 2024
9c9ec2e
update changelog
justinweng-instaclustr Oct 8, 2024
db16134
address comments
justinweng-instaclustr Oct 8, 2024
eeb8f54
add expected warn logs
justinweng-instaclustr Oct 9, 2024
74c41d3
add expected warn logs
justinweng-instaclustr Oct 9, 2024
b1e7742
Merge branch 'main' into handle-offline-shotover-nodes
justinweng-instaclustr Oct 9, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ This assists us in knowing when to make the next release a breaking release and
* `Transform::transform` now takes `&mut Wrapper` instead of `Wrapper`.
* `Wrapper` is renamed to ChainState.

### topology.yaml

* A new mandatory configuration `check_shotover_peers_delay_ms` is added for `KafkaSinkCluster`. See [transform.md](docs/src/transforms.md) for details on this configuration.

## 0.4.0

### shotover rust API
Expand Down
5 changes: 5 additions & 0 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,11 @@ If SCRAM authentication against the first kafka broker fails, shotover will term
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60

# Delay in milliseconds before the local shotover node checks its next peer's node state.
# The local shotover node will try to establish a TCP connection with the next peer in the list.
# If the connection cannot be established within connect_timeout_ms, then the peer is considered down.
justinweng-instaclustr marked this conversation as resolved.
Show resolved Hide resolved
check_shotover_peers_delay_ms: 3000

# When this field is provided TLS is used when connecting to the remote address.
# Removing this field will disable TLS.
#tls:
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl KafkaBench {
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => Box::new(KafkaSinkClusterConfig {
connect_timeout_ms: 3000,
read_timeout: None,
check_shotover_peers_delay_ms: None,
check_shotover_peers_delay_ms: 3000,
first_contact_points: vec![kafka_address],
shotover_nodes: vec![ShotoverNodeConfig {
address: host_address.parse().unwrap(),
Expand Down
58 changes: 58 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,64 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}
}
#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover_with_one_shotover_down(#[case] _driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");

// One shotover instance per rack
let mut shotovers = vec![];
for i in 1..3 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await,
);
}

// Wait for check_shotover_peers to start
tokio::time::sleep(Duration::from_secs(15)).await;

// Kill one shotover node
tokio::time::timeout(
Duration::from_secs(10),
shotovers.remove(0).shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");

// Wait for the other shotover node to detect the down node
tokio::time::sleep(Duration::from_secs(5)).await;

for shotover in shotovers {
let events = tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&[EventMatcher::new()
rukai marked this conversation as resolved.
Show resolved Hide resolved
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer localhost:9191 is down"#)
.with_count(Count::Any)]), // with count > 0 is not supported
)
.await
.expect("Shotover did not shutdown within 10s");

events.assert_contains(
justinweng-instaclustr marked this conversation as resolved.
Show resolved Hide resolved
&EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer localhost:9191 is down"#),
);
}
}

#[rstest]
//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 1
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ sources:
local_shotover_broker_id: 1
first_contact_points: ["172.16.1.5:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 1
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 86400 # 1 day
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 15
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 15
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 15
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
verify_hostname: true
1 change: 1 addition & 0 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ dashmap = { version = "6.0.0", optional = true }
atoi = { version = "2.0.0", optional = true }
fnv = "1.0.7"
sasl = { version = "0.5.1", optional = true, default-features = false, features = ["scram"] }
rand_core = "0.6.4"

[dev-dependencies]
criterion = { version = "2.6.0", features = ["async_tokio"], package = "codspeed-criterion-compat" }
Expand Down
7 changes: 7 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/kafka_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use sasl::client::mechanisms::Scram;
use sasl::client::Mechanism;
use sasl::common::scram::Sha256;
use sasl::common::ChannelBinding;
use std::fmt::Display;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -278,6 +279,12 @@ impl KafkaAddress {
}
}

impl Display for KafkaAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.host.as_str(), self.port)
}
}

#[derive(Debug, Clone)]
pub struct KafkaNode {
pub broker_id: BrokerId,
Expand Down
18 changes: 17 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::frame::kafka::{KafkaFrame, RequestBody, ResponseBody};
use crate::frame::{Frame, MessageType};
use crate::message::{Message, Messages};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::kafka::sink_cluster::shotover_node::start_shotover_peers_check;
use crate::transforms::{
ChainState, DownChainProtocol, Transform, TransformBuilder, TransformContextBuilder,
UpChainProtocol,
Expand Down Expand Up @@ -68,7 +69,7 @@ pub struct KafkaSinkClusterConfig {
pub local_shotover_broker_id: i32,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
pub check_shotover_peers_delay_ms: Option<u64>,
pub check_shotover_peers_delay_ms: u64,
pub tls: Option<TlsConnectorConfig>,
pub authorize_scram_over_mtls: Option<AuthorizeScramOverMtlsConfig>,
}
Expand Down Expand Up @@ -113,9 +114,11 @@ impl TransformConfig for KafkaSinkClusterConfig {
first_contact_points?,
&self.authorize_scram_over_mtls,
shotover_nodes,
self.local_shotover_broker_id,
rack,
self.connect_timeout_ms,
self.read_timeout,
self.check_shotover_peers_delay_ms,
tls,
)?))
}
Expand Down Expand Up @@ -153,13 +156,26 @@ impl KafkaSinkClusterBuilder {
first_contact_points: Vec<KafkaAddress>,
authorize_scram_over_mtls: &Option<AuthorizeScramOverMtlsConfig>,
shotover_nodes: Vec<ShotoverNode>,
local_shotover_broker_id: i32,
rack: StrBytes,
connect_timeout_ms: u64,
timeout: Option<u64>,
check_shotover_peers_delay_ms: u64,
tls: Option<TlsConnector>,
) -> Result<KafkaSinkClusterBuilder> {
let read_timeout = timeout.map(Duration::from_secs);
let connect_timeout = Duration::from_millis(connect_timeout_ms);
let shotover_peers = shotover_nodes
.iter()
.filter(|x| x.broker_id.0 != local_shotover_broker_id)
.cloned()
.collect();

start_shotover_peers_check(
shotover_peers,
check_shotover_peers_delay_ms,
connect_timeout_ms,
);

Ok(KafkaSinkClusterBuilder {
first_contact_points,
Expand Down
83 changes: 83 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/shotover_node.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use crate::tcp::tcp_stream;
use crate::transforms::kafka::sink_cluster::kafka_node::KafkaAddress;
use atomic_enum::atomic_enum;
use kafka_protocol::messages::BrokerId;
use kafka_protocol::protocol::StrBytes;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use rand_core::Error;
justinweng-instaclustr marked this conversation as resolved.
Show resolved Hide resolved
use serde::{Deserialize, Serialize};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;

#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(deny_unknown_fields)]
Expand Down Expand Up @@ -33,9 +40,85 @@ pub(crate) struct ShotoverNode {
state: Arc<AtomicShotoverNodeState>,
}

impl ShotoverNode {
#![allow(unused)]
pub(crate) fn is_up(&self) -> bool {
self.state.load(Ordering::Relaxed) == ShotoverNodeState::Up
}

pub(crate) fn set_state(&self, state: ShotoverNodeState) {
self.state.store(state, Ordering::Relaxed)
}
}

#[atomic_enum]
#[derive(PartialEq)]
pub(crate) enum ShotoverNodeState {
Up,
Down,
}

pub(crate) fn start_shotover_peers_check(
shotover_peers: Vec<ShotoverNode>,
check_shotover_peers_delay_ms: u64,
connect_timeout_ms: u64,
) {
if !shotover_peers.is_empty() {
tokio::spawn(async move {
// Wait for all shotover nodes to start
sleep(Duration::from_secs(10)).await;
loop {
match check_shotover_peers(
&shotover_peers,
check_shotover_peers_delay_ms,
connect_timeout_ms,
)
.await
{
Ok(_) => {}
Err(err) => {
tracing::error!(
"Restarting the shotover peers check due to error: {err:?}"
);
}
};
}
});
}
}

async fn check_shotover_peers(
shotover_peers: &[ShotoverNode],
check_shotover_peers_delay_ms: u64,
connect_timeout_ms: u64,
justinweng-instaclustr marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(), Error> {
let mut shotover_peers_cycle = shotover_peers.iter().cycle();
let mut rng = StdRng::from_rng(rand::thread_rng())?;
let check_shotover_peers_delay_ms = check_shotover_peers_delay_ms as i64;
loop {
if let Some(shotover_peer) = shotover_peers_cycle.next() {
let tcp_stream = tcp_stream(
justinweng-instaclustr marked this conversation as resolved.
Show resolved Hide resolved
Duration::from_millis(connect_timeout_ms),
&shotover_peer.address.to_string().as_str(),
)
.await;
match tcp_stream {
Ok(_) => {
shotover_peer.set_state(ShotoverNodeState::Up);
}
Err(_) => {
tracing::warn!(
"Shotover peer {} is down",
shotover_peer.address.to_string()
justinweng-instaclustr marked this conversation as resolved.
Show resolved Hide resolved
);
shotover_peer.set_state(ShotoverNodeState::Down);
}
}
let random_delay = (check_shotover_peers_delay_ms
+ rng.gen_range(
-check_shotover_peers_delay_ms / 10..check_shotover_peers_delay_ms / 10,
)) as u64;
sleep(Duration::from_millis(random_delay)).await;
}
}
}
Loading