Skip to content

Commit

Permalink
Allow disabling shotover peers check
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 15, 2024
1 parent 5a41409 commit d0aaa6c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 8 deletions.
1 change: 1 addition & 0 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ If SCRAM authentication against the first kafka broker fails, shotover will term
# If shotover detects that a peer is down shotover will exclude the down peer from its metadata reports to the client.
# Each peer is checked in a round robin fashion and this `check_shotover_peers_delay_ms` field defines the milliseconds delay taken before moving onto the next peer to check.
# If the connection cannot be established within connect_timeout_ms, then the peer is considered down.
# If this field is not provided, checking of shotover nodes state will be disabled and no outgoing TCP connections to peers will be made.
check_shotover_peers_delay_ms: 3000

# When this field is provided TLS is used when connecting to the remote address.
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl KafkaBench {
KafkaTopology::Cluster1 | KafkaTopology::Cluster3 => Box::new(KafkaSinkClusterConfig {
connect_timeout_ms: 3000,
read_timeout: None,
check_shotover_peers_delay_ms: 3000,
check_shotover_peers_delay_ms: Some(3000),
first_contact_points: vec![kafka_address],
shotover_nodes: vec![ShotoverNodeConfig {
address: host_address.parse().unwrap(),
Expand Down
16 changes: 9 additions & 7 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub struct KafkaSinkClusterConfig {
pub local_shotover_broker_id: i32,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
pub check_shotover_peers_delay_ms: u64,
pub check_shotover_peers_delay_ms: Option<u64>,
pub tls: Option<TlsConnectorConfig>,
pub authorize_scram_over_mtls: Option<AuthorizeScramOverMtlsConfig>,
}
Expand Down Expand Up @@ -165,7 +165,7 @@ impl KafkaSinkClusterBuilder {
rack: StrBytes,
connect_timeout_ms: u64,
timeout: Option<u64>,
check_shotover_peers_delay_ms: u64,
check_shotover_peers_delay_ms: Option<u64>,
tls: Option<TlsConnector>,
) -> Result<KafkaSinkClusterBuilder> {
let read_timeout = timeout.map(Duration::from_secs);
Expand All @@ -176,11 +176,13 @@ impl KafkaSinkClusterBuilder {
.cloned()
.collect();

start_shotover_peers_check(
shotover_peers,
check_shotover_peers_delay_ms,
connect_timeout,
);
if let Some(check_shotover_peers_delay_ms) = check_shotover_peers_delay_ms {
start_shotover_peers_check(
shotover_peers,
check_shotover_peers_delay_ms,
connect_timeout,
);
}

Ok(KafkaSinkClusterBuilder {
first_contact_points,
Expand Down

0 comments on commit d0aaa6c

Please sign in to comment.