diff --git a/changelog.md b/changelog.md index 5ce164e72..ba786f75c 100644 --- a/changelog.md +++ b/changelog.md @@ -10,6 +10,10 @@ This assists us in knowing when to make the next release a breaking release and * `Transform::transform` now takes `&mut Wrapper` instead of `Wrapper`. * `Wrapper` is renamed to ChainState. +### topology.yaml + +* A new mandatory configuration `check_shotover_peers_delay_ms` is added for `KafkaSinkCluster`. See [transform.md](docs/src/transforms.md) for details on this configuration. + ## 0.4.0 ### shotover rust API diff --git a/docs/src/transforms.md b/docs/src/transforms.md index db4174323..ad5879dc4 100644 --- a/docs/src/transforms.md +++ b/docs/src/transforms.md @@ -293,6 +293,12 @@ If SCRAM authentication against the first kafka broker fails, shotover will term # When a timeout occurs the connection to the client is immediately closed. # read_timeout: 60 + # Shotover will regularly open a TCP connection to each of its peers to check if they are up or down. + # If shotover detects that a peer is down shotover will exclude the down peer from its metadata reports to the client. + # Each peer is checked in a round robin fashion and this `check_shotover_peers_delay_ms` field defines the milliseconds delay taken before moving onto the next peer to check. + # If the connection cannot be established within connect_timeout_ms, then the peer is considered down. + check_shotover_peers_delay_ms: 3000 + # When this field is provided TLS is used when connecting to the remote address. # Removing this field will disable TLS. #tls: diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 1939a4137..d71ed660a 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -97,7 +97,7 @@ impl KafkaBench { KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => Box::new(KafkaSinkClusterConfig { connect_timeout_ms: 3000, read_timeout: None, - check_shotover_peers_delay_ms: None, + check_shotover_peers_delay_ms: 3000, first_contact_points: vec![kafka_address], shotover_nodes: vec![ShotoverNodeConfig { address: host_address.parse().unwrap(), diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index e0376adf7..46d4d9f67 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -400,6 +400,61 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) { .expect("Shotover did not shutdown within 10s"); } } +#[rstest] +#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] +#[case::java(KafkaDriver::Java)] +#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear +async fn cluster_2_racks_multi_shotover_with_one_shotover_down(#[case] driver: KafkaDriver) { + let _docker_compose = + docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml"); + + // One shotover instance per rack + let mut shotovers = vec![]; + for i in 1..3 { + shotovers.push( + shotover_process(&format!( + "tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml" + )) + .with_config(&format!( + "tests/test-configs/shotover-config/config{i}.yaml" + )) + .with_log_name(&format!("shotover{i}")) + .start() + .await, + ); + } + + // Wait for check_shotover_peers to start + tokio::time::sleep(Duration::from_secs(15)).await; + + // Kill one shotover node + tokio::time::timeout( + Duration::from_secs(10), + shotovers.remove(0).shutdown_and_then_consume_events(&[]), + ) + .await + .expect("Shotover did not shutdown within 10s"); + + // Wait for the other shotover node to detect the down node + tokio::time::sleep(Duration::from_secs(5)).await; + + for shotover in shotovers { + let events = tokio::time::timeout( + Duration::from_secs(10), + shotover.shutdown_and_then_consume_events(&multi_shotover_events(driver)), + ) + .await + .expect("Shotover did not shutdown within 10s"); + + // Check if the other shotover node detected the killed node + events.assert_contains( + &EventMatcher::new() + .with_level(Level::Warn) + .with_target("shotover::transforms::kafka::sink_cluster::shotover_node") + .with_message(r#"Shotover peer localhost:9191 is down"#), + ); + } +} #[rstest] //#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram @@ -676,6 +731,36 @@ async fn cluster_sasl_plain_multi_shotover(#[case] driver: KafkaDriver) { } fn multi_shotover_events(driver: KafkaDriver) -> Vec { + // Shotover nodes can be detected as down during shutdown. + // We should ignore "Shotover peer ... is down" for multi-shotover tests where shotover nodes are not killed. + let mut expected_events = vec![ + EventMatcher::new() + .with_level(Level::Warn) + .with_target("shotover::transforms::kafka::sink_cluster::shotover_node") + .with_message(r#"Shotover peer 127.0.0.1:9191 is down"#) + .with_count(Count::Any), + EventMatcher::new() + .with_level(Level::Warn) + .with_target("shotover::transforms::kafka::sink_cluster::shotover_node") + .with_message(r#"Shotover peer 127.0.0.1:9192 is down"#) + .with_count(Count::Any), + EventMatcher::new() + .with_level(Level::Warn) + .with_target("shotover::transforms::kafka::sink_cluster::shotover_node") + .with_message(r#"Shotover peer 127.0.0.1:9193 is down"#) + .with_count(Count::Any), + EventMatcher::new() + .with_level(Level::Warn) + .with_target("shotover::transforms::kafka::sink_cluster::shotover_node") + .with_message(r#"Shotover peer localhost:9191 is down"#) + .with_count(Count::Any), + EventMatcher::new() + .with_level(Level::Warn) + .with_target("shotover::transforms::kafka::sink_cluster::shotover_node") + .with_message(r#"Shotover peer localhost:9192 is down"#) + .with_count(Count::Any), + ]; + #[allow(irrefutable_let_patterns)] if let KafkaDriver::Java = driver { // The java driver manages to send requests fast enough that shotover's find_coordinator_of_group method @@ -683,14 +768,14 @@ fn multi_shotover_events(driver: KafkaDriver) -> Vec { // If the client were connecting directly to kafka it would get this same error, so it doesnt make sense for us to retry on error. // Instead we just let shotover pass on the NOT_COORDINATOR error to the client which triggers this warning in the process. // So we ignore the warning in this case. - vec![EventMatcher::new() - .with_level(Level::Warn) - .with_target("shotover::transforms::kafka::sink_cluster") - .with_message( - r#"no known coordinator for GroupId("some_group"), routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"#, - ) - .with_count(Count::Any)] - } else { - vec![] + expected_events.push(EventMatcher::new() + .with_level(Level::Warn) + .with_target("shotover::transforms::kafka::sink_cluster") + .with_message( + r#"no known coordinator for GroupId("some_group"), routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"#, + ) + .with_count(Count::Any)); } + + expected_events } diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml index 70545b5a1..f8981a0cd 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml @@ -12,3 +12,4 @@ sources: local_shotover_broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml index c0cfc0191..720561055 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml @@ -18,3 +18,4 @@ sources: local_shotover_broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml index d66c887b1..8678ddd60 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml @@ -18,3 +18,4 @@ sources: local_shotover_broker_id: 1 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml index c3e5ad277..7d8bfae08 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml @@ -18,3 +18,4 @@ sources: local_shotover_broker_id: 2 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml index 84c411ec9..c74a25d44 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml @@ -15,3 +15,4 @@ sources: local_shotover_broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml index 51a359925..2291e6d42 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml @@ -15,3 +15,4 @@ sources: local_shotover_broker_id: 1 first_contact_points: ["172.16.1.5:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml index 7c54d3bbc..f7df4c2b3 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml @@ -15,6 +15,7 @@ sources: local_shotover_broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 tls: certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml index 70545b5a1..f8981a0cd 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml @@ -12,3 +12,4 @@ sources: local_shotover_broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml index c0cfc0191..720561055 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml @@ -18,3 +18,4 @@ sources: local_shotover_broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml index d66c887b1..8678ddd60 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml @@ -18,3 +18,4 @@ sources: local_shotover_broker_id: 1 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml index c3e5ad277..7d8bfae08 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml @@ -18,3 +18,4 @@ sources: local_shotover_broker_id: 2 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml index 5cc8eca20..35d46ac54 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml @@ -20,6 +20,7 @@ sources: verify_hostname: true delegation_token_lifetime_seconds: 86400 # 1 day connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 tls: certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml index e03c78923..c7337cadb 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml @@ -27,6 +27,7 @@ sources: verify_hostname: true delegation_token_lifetime_seconds: 15 connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 tls: certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml index 84e827027..fc7c5627f 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml @@ -27,6 +27,7 @@ sources: verify_hostname: true delegation_token_lifetime_seconds: 15 connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 tls: certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml index ce8129637..5d69bb456 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml @@ -27,6 +27,7 @@ sources: verify_hostname: true delegation_token_lifetime_seconds: 15 connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 tls: certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt" diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml index 70545b5a1..f8981a0cd 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml @@ -12,3 +12,4 @@ sources: local_shotover_broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml index facdcfa60..2cdee5dcc 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml @@ -15,6 +15,7 @@ sources: local_shotover_broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 + check_shotover_peers_delay_ms: 3000 tls: certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt" verify_hostname: true diff --git a/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs b/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs index d795b9228..9c06dc9f0 100644 --- a/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/kafka_node.rs @@ -17,6 +17,7 @@ use sasl::client::mechanisms::Scram; use sasl::client::Mechanism; use sasl::common::scram::Sha256; use sasl::common::ChannelBinding; +use std::fmt::Display; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; @@ -278,6 +279,12 @@ impl KafkaAddress { } } +impl Display for KafkaAddress { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}:{}", self.host.as_str(), self.port) + } +} + #[derive(Debug, Clone)] pub struct KafkaNode { pub broker_id: BrokerId, diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 59a8bd5d1..b022cbadc 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -2,6 +2,7 @@ use crate::frame::kafka::{KafkaFrame, RequestBody, ResponseBody}; use crate::frame::{Frame, MessageType}; use crate::message::{Message, Messages}; use crate::tls::{TlsConnector, TlsConnectorConfig}; +use crate::transforms::kafka::sink_cluster::shotover_node::start_shotover_peers_check; use crate::transforms::{ ChainState, DownChainProtocol, Transform, TransformBuilder, TransformContextBuilder, UpChainProtocol, @@ -68,7 +69,7 @@ pub struct KafkaSinkClusterConfig { pub local_shotover_broker_id: i32, pub connect_timeout_ms: u64, pub read_timeout: Option, - pub check_shotover_peers_delay_ms: Option, + pub check_shotover_peers_delay_ms: u64, pub tls: Option, pub authorize_scram_over_mtls: Option, } @@ -113,9 +114,11 @@ impl TransformConfig for KafkaSinkClusterConfig { first_contact_points?, &self.authorize_scram_over_mtls, shotover_nodes, + self.local_shotover_broker_id, rack, self.connect_timeout_ms, self.read_timeout, + self.check_shotover_peers_delay_ms, tls, )?)) } @@ -153,13 +156,26 @@ impl KafkaSinkClusterBuilder { first_contact_points: Vec, authorize_scram_over_mtls: &Option, shotover_nodes: Vec, + local_shotover_broker_id: i32, rack: StrBytes, connect_timeout_ms: u64, timeout: Option, + check_shotover_peers_delay_ms: u64, tls: Option, ) -> Result { let read_timeout = timeout.map(Duration::from_secs); let connect_timeout = Duration::from_millis(connect_timeout_ms); + let shotover_peers = shotover_nodes + .iter() + .filter(|x| x.broker_id.0 != local_shotover_broker_id) + .cloned() + .collect(); + + start_shotover_peers_check( + shotover_peers, + check_shotover_peers_delay_ms, + connect_timeout, + ); Ok(KafkaSinkClusterBuilder { first_contact_points, diff --git a/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs b/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs index fb449e6ca..1e330f0e9 100644 --- a/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs @@ -1,9 +1,15 @@ +use crate::tcp::tcp_stream; use crate::transforms::kafka::sink_cluster::kafka_node::KafkaAddress; use atomic_enum::atomic_enum; use kafka_protocol::messages::BrokerId; use kafka_protocol::protocol::StrBytes; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; use serde::{Deserialize, Serialize}; +use std::sync::atomic::Ordering; use std::sync::Arc; +use std::time::Duration; +use tokio::time::sleep; #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] @@ -33,9 +39,85 @@ pub(crate) struct ShotoverNode { state: Arc, } +impl ShotoverNode { + #![allow(unused)] + pub(crate) fn is_up(&self) -> bool { + self.state.load(Ordering::Relaxed) == ShotoverNodeState::Up + } + + pub(crate) fn set_state(&self, state: ShotoverNodeState) { + self.state.store(state, Ordering::Relaxed) + } +} + #[atomic_enum] #[derive(PartialEq)] pub(crate) enum ShotoverNodeState { Up, Down, } + +pub(crate) fn start_shotover_peers_check( + shotover_peers: Vec, + check_shotover_peers_delay_ms: u64, + connect_timeout: Duration, +) { + if !shotover_peers.is_empty() { + tokio::spawn(async move { + // Wait for all shotover nodes to start + sleep(Duration::from_secs(10)).await; + loop { + match check_shotover_peers( + &shotover_peers, + check_shotover_peers_delay_ms, + connect_timeout, + ) + .await + { + Ok(_) => {} + Err(err) => { + tracing::error!( + "Restarting the shotover peers check due to error: {err:?}" + ); + } + }; + } + }); + } +} + +async fn check_shotover_peers( + shotover_peers: &[ShotoverNode], + check_shotover_peers_delay_ms: u64, + connect_timeout: Duration, +) -> Result<(), anyhow::Error> { + let mut shotover_peers_cycle = shotover_peers.iter().cycle(); + let mut rng = StdRng::from_rng(rand::thread_rng())?; + let check_shotover_peers_delay_ms = check_shotover_peers_delay_ms as i64; + loop { + if let Some(shotover_peer) = shotover_peers_cycle.next() { + let tcp_stream = tcp_stream( + connect_timeout, + ( + shotover_peer.address.host.as_str(), + shotover_peer.address.port as u16, + ), + ) + .await; + match tcp_stream { + Ok(_) => { + shotover_peer.set_state(ShotoverNodeState::Up); + } + Err(_) => { + tracing::warn!("Shotover peer {} is down", shotover_peer.address); + shotover_peer.set_state(ShotoverNodeState::Down); + } + } + let random_delay = (check_shotover_peers_delay_ms + + rng.gen_range( + -check_shotover_peers_delay_ms / 10..check_shotover_peers_delay_ms / 10, + )) as u64; + sleep(Duration::from_millis(random_delay)).await; + } + } +}