Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect offline shotover nodes for KafkaSinkCluster #1762

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
d979776
Move shotover node and add nodestate field
justinweng-instaclustr Sep 30, 2024
132ff42
renaming
justinweng-instaclustr Sep 30, 2024
d7f553a
Merge branch 'main' into handle-offline-shotover-nodes
justinweng-instaclustr Sep 30, 2024
f45682c
change shotovernode visibility
justinweng-instaclustr Oct 1, 2024
e4aa6a8
Merge branch 'refs/heads/main' into handle-offline-shotover-nodes
justinweng-instaclustr Oct 1, 2024
6a57857
add background task to check shotover peers state
justinweng-instaclustr Oct 3, 2024
1b90587
Merge branch 'refs/heads/main' into handle-offline-shotover-nodes
justinweng-instaclustr Oct 3, 2024
e41a1ec
fix configs
justinweng-instaclustr Oct 3, 2024
d8a08ee
add random delay
justinweng-instaclustr Oct 3, 2024
5fd897c
update test case
justinweng-instaclustr Oct 3, 2024
98c71d8
fix test
justinweng-instaclustr Oct 3, 2024
f9be00d
add doco about new config field
justinweng-instaclustr Oct 3, 2024
44bbfb2
Merge branch 'refs/heads/main' into handle-offline-shotover-nodes
justinweng-instaclustr Oct 4, 2024
76efc1c
revert to use kafkaaddress and add error handling
justinweng-instaclustr Oct 4, 2024
c6c63c2
update test
justinweng-instaclustr Oct 4, 2024
57a8a03
fix log format
justinweng-instaclustr Oct 4, 2024
ff16443
fix infinite loop when shotover_peers is empty
justinweng-instaclustr Oct 8, 2024
1ae95b3
add new config for tests
justinweng-instaclustr Oct 8, 2024
9c9ec2e
update changelog
justinweng-instaclustr Oct 8, 2024
db16134
address comments
justinweng-instaclustr Oct 8, 2024
eeb8f54
add expected warn logs
justinweng-instaclustr Oct 9, 2024
74c41d3
add expected warn logs
justinweng-instaclustr Oct 9, 2024
b1e7742
Merge branch 'main' into handle-offline-shotover-nodes
justinweng-instaclustr Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ This assists us in knowing when to make the next release a breaking release and
* `Transform::transform` now takes `&mut Wrapper` instead of `Wrapper`.
* `Wrapper` is renamed to ChainState.

### topology.yaml

* A new mandatory configuration `check_shotover_peers_delay_ms` is added for `KafkaSinkCluster`. See [transform.md](docs/src/transforms.md) for details on this configuration.

## 0.4.0

### shotover rust API
Expand Down
6 changes: 6 additions & 0 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ If SCRAM authentication against the first kafka broker fails, shotover will term
# When a timeout occurs the connection to the client is immediately closed.
# read_timeout: 60

# Shotover will regularly open a TCP connection to each of its peers to check if they are up or down.
# If shotover detects that a peer is down shotover will exclude the down peer from its metadata reports to the client.
# Each peer is checked in a round robin fashion and this `check_shotover_peers_delay_ms` field defines the milliseconds delay taken before moving onto the next peer to check.
# If the connection cannot be established within connect_timeout_ms, then the peer is considered down.
justinweng-instaclustr marked this conversation as resolved.
Show resolved Hide resolved
check_shotover_peers_delay_ms: 3000

# When this field is provided TLS is used when connecting to the remote address.
# Removing this field will disable TLS.
#tls:
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl KafkaBench {
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => Box::new(KafkaSinkClusterConfig {
connect_timeout_ms: 3000,
read_timeout: None,
check_shotover_peers_delay_ms: None,
check_shotover_peers_delay_ms: 3000,
first_contact_points: vec![kafka_address],
shotover_nodes: vec![ShotoverNodeConfig {
address: host_address.parse().unwrap(),
Expand Down
103 changes: 94 additions & 9 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,61 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}
}
#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover_with_one_shotover_down(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose.yaml");

// One shotover instance per rack
let mut shotovers = vec![];
for i in 1..3 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await,
);
}

// Wait for check_shotover_peers to start
tokio::time::sleep(Duration::from_secs(15)).await;

// Kill one shotover node
tokio::time::timeout(
Duration::from_secs(10),
shotovers.remove(0).shutdown_and_then_consume_events(&[]),
)
.await
.expect("Shotover did not shutdown within 10s");

// Wait for the other shotover node to detect the down node
tokio::time::sleep(Duration::from_secs(5)).await;

for shotover in shotovers {
let events = tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&multi_shotover_events(driver)),
)
.await
.expect("Shotover did not shutdown within 10s");

// Check if the other shotover node detected the killed node
events.assert_contains(
justinweng-instaclustr marked this conversation as resolved.
Show resolved Hide resolved
&EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer localhost:9191 is down"#),
);
}
}

#[rstest]
//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram
Expand Down Expand Up @@ -676,21 +731,51 @@ async fn cluster_sasl_plain_multi_shotover(#[case] driver: KafkaDriver) {
}

fn multi_shotover_events(driver: KafkaDriver) -> Vec<EventMatcher> {
// Shotover nodes can be detected as down during shutdown.
// We should ignore "Shotover peer ... is down" for multi-shotover tests where shotover nodes are not killed.
let mut expected_events = vec![
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer 127.0.0.1:9191 is down"#)
.with_count(Count::Any),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer 127.0.0.1:9192 is down"#)
.with_count(Count::Any),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer 127.0.0.1:9193 is down"#)
.with_count(Count::Any),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer localhost:9191 is down"#)
.with_count(Count::Any),
EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster::shotover_node")
.with_message(r#"Shotover peer localhost:9192 is down"#)
.with_count(Count::Any),
];

#[allow(irrefutable_let_patterns)]
if let KafkaDriver::Java = driver {
// The java driver manages to send requests fast enough that shotover's find_coordinator_of_group method
// gets a COORDINATOR_NOT_AVAILABLE response from kafka.
// If the client were connecting directly to kafka it would get this same error, so it doesnt make sense for us to retry on error.
// Instead we just let shotover pass on the NOT_COORDINATOR error to the client which triggers this warning in the process.
// So we ignore the warning in this case.
vec![EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster")
.with_message(
r#"no known coordinator for GroupId("some_group"), routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"#,
)
.with_count(Count::Any)]
} else {
vec![]
expected_events.push(EventMatcher::new()
.with_level(Level::Warn)
.with_target("shotover::transforms::kafka::sink_cluster")
.with_message(
r#"no known coordinator for GroupId("some_group"), routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client"#,
)
.with_count(Count::Any));
}

expected_events
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 1
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ sources:
local_shotover_broker_id: 1
first_contact_points: ["172.16.1.5:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 1
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ sources:
local_shotover_broker_id: 2
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 86400 # 1 day
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 15
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 15
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ sources:
verify_hostname: true
delegation_token_lifetime_seconds: 15
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
certificate_path: "tests/test-configs/kafka/tls/certs/localhost.crt"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ sources:
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
check_shotover_peers_delay_ms: 3000
tls:
certificate_authority_path: "tests/test-configs/kafka/tls/certs/localhost_CA.crt"
verify_hostname: true
7 changes: 7 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/kafka_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use sasl::client::mechanisms::Scram;
use sasl::client::Mechanism;
use sasl::common::scram::Sha256;
use sasl::common::ChannelBinding;
use std::fmt::Display;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -278,6 +279,12 @@ impl KafkaAddress {
}
}

impl Display for KafkaAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.host.as_str(), self.port)
}
}

#[derive(Debug, Clone)]
pub struct KafkaNode {
pub broker_id: BrokerId,
Expand Down
18 changes: 17 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::frame::kafka::{KafkaFrame, RequestBody, ResponseBody};
use crate::frame::{Frame, MessageType};
use crate::message::{Message, Messages};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::kafka::sink_cluster::shotover_node::start_shotover_peers_check;
use crate::transforms::{
ChainState, DownChainProtocol, Transform, TransformBuilder, TransformContextBuilder,
UpChainProtocol,
Expand Down Expand Up @@ -68,7 +69,7 @@ pub struct KafkaSinkClusterConfig {
pub local_shotover_broker_id: i32,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
pub check_shotover_peers_delay_ms: Option<u64>,
pub check_shotover_peers_delay_ms: u64,
pub tls: Option<TlsConnectorConfig>,
pub authorize_scram_over_mtls: Option<AuthorizeScramOverMtlsConfig>,
}
Expand Down Expand Up @@ -113,9 +114,11 @@ impl TransformConfig for KafkaSinkClusterConfig {
first_contact_points?,
&self.authorize_scram_over_mtls,
shotover_nodes,
self.local_shotover_broker_id,
rack,
self.connect_timeout_ms,
self.read_timeout,
self.check_shotover_peers_delay_ms,
tls,
)?))
}
Expand Down Expand Up @@ -153,13 +156,26 @@ impl KafkaSinkClusterBuilder {
first_contact_points: Vec<KafkaAddress>,
authorize_scram_over_mtls: &Option<AuthorizeScramOverMtlsConfig>,
shotover_nodes: Vec<ShotoverNode>,
local_shotover_broker_id: i32,
rack: StrBytes,
connect_timeout_ms: u64,
timeout: Option<u64>,
check_shotover_peers_delay_ms: u64,
tls: Option<TlsConnector>,
) -> Result<KafkaSinkClusterBuilder> {
let read_timeout = timeout.map(Duration::from_secs);
let connect_timeout = Duration::from_millis(connect_timeout_ms);
let shotover_peers = shotover_nodes
.iter()
.filter(|x| x.broker_id.0 != local_shotover_broker_id)
.cloned()
.collect();

start_shotover_peers_check(
shotover_peers,
check_shotover_peers_delay_ms,
connect_timeout,
);

Ok(KafkaSinkClusterBuilder {
first_contact_points,
Expand Down
Loading
Loading