Skip to content

Commit

Permalink
Detect offline shotover nodes for KafkaSinkCluster (#1762)
Browse files Browse the repository at this point in the history
  • Loading branch information
justinweng-instaclustr authored Oct 9, 2024
1 parent 2b11e0c commit 9801ed4
Show file tree
Hide file tree
Showing 24 changed files with 228 additions and 11 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ This assists us in knowing when to make the next release a breaking release and
* `Transform::transform` now takes `&mut Wrapper` instead of `Wrapper`.
* `Wrapper` is renamed to ChainState.

### topology.yaml

* A new mandatory configuration `check_shotover_peers_delay_ms` is added for `KafkaSinkCluster`. See [transform.md](docs/src/transforms.md) for details on this configuration.

## 0.4.0

### shotover rust API
Expand Down
6 changes: 6 additions & 0 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ If SCRAM authentication against the first kafka broker fails, shotover will term
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60
# Shotover will regularly open a TCP connection to each of its peers to check if they are up or down.
# If shotover detects that a peer is down shotover will exclude the down peer from its metadata reports to the client.
# Each peer is checked in a round robin fashion and this `check_shotover_peers_delay_ms` field defines the milliseconds delay taken before moving onto the next peer to check.
# If the connection cannot be established within connect_timeout_ms, then the peer is considered down.
check_shotover_peers_delay_ms: 3000

# When this field is provided TLS is used when connecting to the remote address.
# Removing this field will disable TLS.
#tls:
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl KafkaBench {
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => Box::new(KafkaSinkClusterConfig {
connect_timeout_ms: 3000,
read_timeout: None,
check_shotover_peers_delay_ms: None,
check_shotover_peers_delay_ms: 3000,
first_contact_points: vec![kafka_address],
shotover_nodes: vec![ShotoverNodeConfig {
address: host_address.parse().unwrap(),
Expand Down
103 changes: 94 additions & 9 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,61 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}
}
#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover_with_one_shotover_down(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");

// One shotover instance per rack
let mut shotovers = vec![];
for i in 1..3 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await,
);
}

// Wait for check_shotover_peers to start
tokio::time::sleep(Duration::from_secs(15)).await;

// Kill one shotover node
tokio::time::timeout(
Duration::from_secs(10),
shotovers.remove(0).shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");

// Wait for the other shotover node to detect the down node
tokio::time::sleep(Duration::from_secs(5)).await;

for shotover in shotovers {
let events = tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&multi_shotover_events(driver)),
)
.await
.expect("Shotover did not shutdown within 10s");

// Check if the other shotover node detected the killed node
events.assert_contains(
&EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer localhost:9191 is down"#),
);
}
}

#[rstest]
//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram
Expand Down Expand Up @@ -676,21 +731,51 @@ async fn cluster_sasl_plain_multi_shotover(#[case] driver: KafkaDriver) {
}

fn multi_shotover_events(driver: KafkaDriver) -> Vec<EventMatcher> {
// Shotover nodes can be detected as down during shutdown.
// We should ignore "Shotover peer ... is down" for multi-shotover tests where shotover nodes are not killed.
let mut expected_events = vec![
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer 127.0.0.1:9191 is down"#)
.with_count(Count::Any),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer 127.0.0.1:9192 is down"#)
.with_count(Count::Any),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer 127.0.0.1:9193 is down"#)
.with_count(Count::Any),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer localhost:9191 is down"#)
.with_count(Count::Any),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer localhost:9192 is down"#)
.with_count(Count::Any),
];

#[allow(irrefutable_let_patterns)]
if let KafkaDriver::Java = driver {
// The java driver manages to send requests fast enough that shotover's find_coordinator_of_group method
// gets a COORDINATOR_NOT_AVAILABLE response from kafka.
// If the client were connecting directly to kafka it would get this same error, so it doesnt make sense for us to retry on error.
// Instead we just let shotover pass on the NOT_COORDINATOR error to the client which triggers this warning in the process.
// So we ignore the warning in this case.
vec![EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster")
.with_message(
r#"no known coordinator for GroupId("some_group"), routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"#,
)
.with_count(Count::Any)]
} else {
vec![]
expected_events.push(EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster")
.with_message(
r#"no known coordinator for GroupId("some_group"), routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"#,
)
.with_count(Count::Any));
}

expected_events
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 1
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ sources:
local_shotover_broker_id: 1
first_contact_points: ["172.16.1.5:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 1
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 86400 # 1 day
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 15
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 15
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 15
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
verify_hostname: true
7 changes: 7 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/kafka_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use sasl::client::mechanisms::Scram;
use sasl::client::Mechanism;
use sasl::common::scram::Sha256;
use sasl::common::ChannelBinding;
use std::fmt::Display;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -278,6 +279,12 @@ impl KafkaAddress {
}
}

impl Display for KafkaAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.host.as_str(), self.port)
}
}

#[derive(Debug, Clone)]
pub struct KafkaNode {
pub broker_id: BrokerId,
Expand Down
18 changes: 17 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::frame::kafka::{KafkaFrame, RequestBody, ResponseBody};
use crate::frame::{Frame, MessageType};
use crate::message::{Message, Messages};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::kafka::sink_cluster::shotover_node::start_shotover_peers_check;
use crate::transforms::{
ChainState, DownChainProtocol, Transform, TransformBuilder, TransformContextBuilder,
UpChainProtocol,
Expand Down Expand Up @@ -68,7 +69,7 @@ pub struct KafkaSinkClusterConfig {
pub local_shotover_broker_id: i32,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
pub check_shotover_peers_delay_ms: Option<u64>,
pub check_shotover_peers_delay_ms: u64,
pub tls: Option<TlsConnectorConfig>,
pub authorize_scram_over_mtls: Option<AuthorizeScramOverMtlsConfig>,
}
Expand Down Expand Up @@ -113,9 +114,11 @@ impl TransformConfig for KafkaSinkClusterConfig {
first_contact_points?,
&self.authorize_scram_over_mtls,
shotover_nodes,
self.local_shotover_broker_id,
rack,
self.connect_timeout_ms,
self.read_timeout,
self.check_shotover_peers_delay_ms,
tls,
)?))
}
Expand Down Expand Up @@ -153,13 +156,26 @@ impl KafkaSinkClusterBuilder {
first_contact_points: Vec<KafkaAddress>,
authorize_scram_over_mtls: &Option<AuthorizeScramOverMtlsConfig>,
shotover_nodes: Vec<ShotoverNode>,
local_shotover_broker_id: i32,
rack: StrBytes,
connect_timeout_ms: u64,
timeout: Option<u64>,
check_shotover_peers_delay_ms: u64,
tls: Option<TlsConnector>,
) -> Result<KafkaSinkClusterBuilder> {
let read_timeout = timeout.map(Duration::from_secs);
let connect_timeout = Duration::from_millis(connect_timeout_ms);
let shotover_peers = shotover_nodes
.iter()
.filter(|x| x.broker_id.0 != local_shotover_broker_id)
.cloned()
.collect();

start_shotover_peers_check(
shotover_peers,
check_shotover_peers_delay_ms,
connect_timeout,
);

Ok(KafkaSinkClusterBuilder {
first_contact_points,
Expand Down
Loading

0 comments on commit 9801ed4

Please sign in to comment.