From 8007ab574585669fbb803c0c2e7d24bbb333285c Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 21 Oct 2024 09:36:30 +1100 Subject: [PATCH] Split ShotoverNode::address into address_for_peers and address_for_client --- docs/src/transforms.md | 10 +++++++--- shotover-proxy/benches/windsock/kafka/bench.rs | 3 ++- .../kafka/cluster-1-rack/topology-single.yaml | 3 ++- .../kafka/cluster-1-rack/topology1.yaml | 9 ++++++--- .../kafka/cluster-1-rack/topology2.yaml | 9 ++++++--- .../kafka/cluster-1-rack/topology3.yaml | 9 ++++++--- .../kafka/cluster-2-racks/topology-rack1.yaml | 6 ++++-- .../kafka/cluster-2-racks/topology-rack2.yaml | 6 ++++-- .../kafka/cluster-3-racks/topology-rack1.yaml | 9 ++++++--- .../kafka/cluster-3-racks/topology-rack2.yaml | 9 ++++++--- .../kafka/cluster-3-racks/topology-rack3.yaml | 9 ++++++--- .../kafka/cluster-mtls/topology.yaml | 3 ++- .../cluster-sasl-plain/topology-single.yaml | 3 ++- .../kafka/cluster-sasl-plain/topology1.yaml | 9 ++++++--- .../kafka/cluster-sasl-plain/topology2.yaml | 9 ++++++--- .../kafka/cluster-sasl-plain/topology3.yaml | 9 ++++++--- .../topology-single.yaml | 3 ++- .../topology1.yaml | 9 ++++++--- .../topology2.yaml | 9 ++++++--- .../topology3.yaml | 9 ++++++--- .../cluster-sasl-scram/topology-single.yaml | 3 ++- .../kafka/cluster-tls/topology.yaml | 3 ++- .../src/transforms/kafka/sink_cluster/mod.rs | 14 +++++++------- .../kafka/sink_cluster/shotover_node.rs | 18 ++++++++++++------ 24 files changed, 120 insertions(+), 63 deletions(-) diff --git a/docs/src/transforms.md b/docs/src/transforms.md index 126fb0e2b..ffba4a09b 100644 --- a/docs/src/transforms.md +++ b/docs/src/transforms.md @@ -261,10 +261,14 @@ If SCRAM authentication against the first kafka broker fails, shotover will term # A list of every Shotover node that will be proxying to the same kafka cluster. # This field should be identical for all Shotover nodes proxying to the same kafka cluster. shotover_nodes: - # Address of the Shotover node. + # Address of the Shotover node that is reported to the kafka clients. # This is usually the same address as the Shotover source that is connected to this sink. - # But it may be different if you want Shotover to report a different address. - - address: "127.0.0.1:9092" + # But it may be different if you want Shotover to report a different address to its clients. + - address_for_client: "127.0.0.1:9092" + # Address of the shotover node as used to check for peers that are up. + # This is usually the same address as the Shotover source that is connected to this sink. + # But it may be different if you want Shotover to connect to its peers via a different address. + address_for_peers: "127.0.0.1:9092" # The rack the Shotover node will report as and route messages to. # For performance reasons, the Shotover node should be physically located in this rack. rack: "rack0" diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index 4f0a524b4..920f1f3e6 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -100,7 +100,8 @@ impl KafkaBench { check_shotover_peers_delay_ms: Some(3000), first_contact_points: vec![kafka_address], shotover_nodes: vec![ShotoverNodeConfig { - address: host_address.parse().unwrap(), + address_for_clients: host_address.parse().unwrap(), + address_for_peers: host_address.parse().unwrap(), rack: "rack1".into(), broker_id: 0, }], diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml index f8981a0cd..254100803 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml @@ -6,7 +6,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml index 720561055..6d19b3567 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology1.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_client: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_client: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml index 8678ddd60..9ec76f63c 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology2.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_client: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_client: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 1 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml index 7d8bfae08..daf869b6a 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology3.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_client: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_client: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 2 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml index c74a25d44..33ad2d280 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack1.yaml @@ -6,10 +6,12 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "localhost:9191" + - address_for_peers: "localhost:9191" + address_for_client: "localhost:9191" rack: "rack1" broker_id: 0 - - address: "localhost:9192" + - address_for_peers: "localhost:9192" + address_for_client: "localhost:9192" rack: "rack2" broker_id: 1 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml index 2291e6d42..d69a81b6e 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/topology-rack2.yaml @@ -6,10 +6,12 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "localhost:9191" + - address_for_peers: "localhost:9191" + address_for_client: "localhost:9191" rack: "rack1" broker_id: 0 - - address: "localhost:9192" + - address_for_peers: "localhost:9192" + address_for_client: "localhost:9192" rack: "rack2" broker_id: 1 local_shotover_broker_id: 1 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack1.yaml index 194e80241..976ad5bf2 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack1.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "localhost:9191" + - address_for_peers: "localhost:9191" + address_for_client: "localhost:9191" rack: "rack1" broker_id: 0 - - address: "localhost:9192" + - address_for_peers: "localhost:9192" + address_for_client: "localhost:9192" rack: "rack2" broker_id: 1 - - address: "localhost:9193" + - address_for_peers: "localhost:9193" + address_for_client: "localhost:9193" rack: "rack3" broker_id: 2 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack2.yaml index 611282f92..cf47fc4f0 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack2.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "localhost:9191" + - address_for_peers: "localhost:9191" + address_for_client: "localhost:9191" rack: "rack1" broker_id: 0 - - address: "localhost:9192" + - address_for_peers: "localhost:9192" + address_for_client: "localhost:9192" rack: "rack2" broker_id: 1 - - address: "localhost:9193" + - address_for_peers: "localhost:9193" + address_for_client: "localhost:9193" rack: "rack3" broker_id: 2 local_shotover_broker_id: 1 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack3.yaml index ae8c6c9b8..cf86fd15d 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-3-racks/topology-rack3.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "localhost:9191" + - address_for_peers: "localhost:9191" + address_for_client: "localhost:9191" rack: "rack1" broker_id: 0 - - address: "localhost:9192" + - address_for_peers: "localhost:9192" + address_for_client: "localhost:9192" rack: "rack2" broker_id: 1 - - address: "localhost:9193" + - address_for_peers: "localhost:9193" + address_for_client: "localhost:9193" rack: "rack3" broker_id: 2 local_shotover_broker_id: 2 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml index f7df4c2b3..a84a51e84 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-mtls/topology.yaml @@ -9,7 +9,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "localhost:9192" + address_for_client: "localhost:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml index f8981a0cd..254100803 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology-single.yaml @@ -6,7 +6,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml index 720561055..6d19b3567 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology1.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_client: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_client: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml index 8678ddd60..9ec76f63c 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology2.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_client: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_client: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 1 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml index 7d8bfae08..daf869b6a 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-plain/topology3.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_client: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_client: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 2 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml index 35d46ac54..6cec5a930 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology-single.yaml @@ -6,7 +6,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml index c7337cadb..3ee711424 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology1.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_client: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_client: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml index fc7c5627f..c1f31e17f 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology2.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_client: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_client: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 1 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml index 5d69bb456..e519b30e2 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram-over-mtls/topology3.yaml @@ -6,13 +6,16 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9191" + - address_for_peers: "127.0.0.1:9191" + address_for_client: "127.0.0.1:9191" rack: "rack0" broker_id: 0 - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 1 - - address: "127.0.0.1:9193" + - address_for_peers: "127.0.0.1:9193" + address_for_client: "127.0.0.1:9193" rack: "rack0" broker_id: 2 local_shotover_broker_id: 2 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml index f8981a0cd..254100803 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-sasl-scram/topology-single.yaml @@ -6,7 +6,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml index 2cdee5dcc..c0591a4e7 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-tls/topology.yaml @@ -9,7 +9,8 @@ sources: chain: - KafkaSinkCluster: shotover_nodes: - - address: "127.0.0.1:9192" + - address_for_peers: "127.0.0.1:9192" + address_for_client: "127.0.0.1:9192" rack: "rack0" broker_id: 0 local_shotover_broker_id: 0 diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index b6801414b..5580a6baf 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -109,7 +109,7 @@ impl TransformConfig for KafkaSinkClusterConfig { if !unique_broker_ids.insert(node.broker_id) { return Err(anyhow::anyhow!( "Duplicate broker_id found in shotover node {}", - node.address + node.address_for_clients )); } } @@ -2644,8 +2644,8 @@ impl KafkaSinkCluster { partition_shotover_nodes_by_rack(&up_shotover_nodes, coordinator_rack); let shotover_node = select_shotover_node_by_hash(shotover_nodes_by_rack, hash); - find_coordinator.host = shotover_node.address.host.clone(); - find_coordinator.port = shotover_node.address.port; + find_coordinator.host = shotover_node.address_for_clients.host.clone(); + find_coordinator.port = shotover_node.address_for_clients.port; find_coordinator.node_id = shotover_node.broker_id; } } else { @@ -2674,8 +2674,8 @@ impl KafkaSinkCluster { partition_shotover_nodes_by_rack(&up_shotover_nodes, coordinator_rack); let shotover_node = select_shotover_node_by_hash(shotover_nodes_by_rack, hash); - coordinator.host = shotover_node.address.host.clone(); - coordinator.port = shotover_node.address.port; + coordinator.host = shotover_node.address_for_clients.host.clone(); + coordinator.port = shotover_node.address_for_clients.port; coordinator.node_id = shotover_node.broker_id; } } @@ -2705,8 +2705,8 @@ impl KafkaSinkCluster { .map(|shotover_node| { MetadataResponseBroker::default() .with_node_id(shotover_node.broker_id) - .with_host(shotover_node.address.host.clone()) - .with_port(shotover_node.address.port) + .with_host(shotover_node.address_for_clients.host.clone()) + .with_port(shotover_node.address_for_clients.port) .with_rack(Some(shotover_node.rack.clone())) }) .collect(); diff --git a/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs b/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs index 1e330f0e9..deff5f9df 100644 --- a/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs +++ b/shotover/src/transforms/kafka/sink_cluster/shotover_node.rs @@ -14,7 +14,8 @@ use tokio::time::sleep; #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(deny_unknown_fields)] pub struct ShotoverNodeConfig { - pub address: String, + pub address_for_clients: String, + pub address_for_peers: String, pub rack: String, pub broker_id: i32, } @@ -22,7 +23,8 @@ pub struct ShotoverNodeConfig { impl ShotoverNodeConfig { pub(crate) fn build(self) -> anyhow::Result { Ok(ShotoverNode { - address: KafkaAddress::from_str(&self.address)?, + address_for_clients: KafkaAddress::from_str(&self.address_for_clients)?, + address_for_peers: KafkaAddress::from_str(&self.address_for_peers)?, rack: StrBytes::from_string(self.rack), broker_id: BrokerId(self.broker_id), state: Arc::new(AtomicShotoverNodeState::new(ShotoverNodeState::Up)), @@ -32,7 +34,8 @@ impl ShotoverNodeConfig { #[derive(Clone)] pub(crate) struct ShotoverNode { - pub address: KafkaAddress, + pub address_for_clients: KafkaAddress, + pub address_for_peers: KafkaAddress, pub rack: StrBytes, pub broker_id: BrokerId, #[allow(unused)] @@ -99,8 +102,8 @@ async fn check_shotover_peers( let tcp_stream = tcp_stream( connect_timeout, ( - shotover_peer.address.host.as_str(), - shotover_peer.address.port as u16, + shotover_peer.address_for_peers.host.as_str(), + shotover_peer.address_for_peers.port as u16, ), ) .await; @@ -109,7 +112,10 @@ async fn check_shotover_peers( shotover_peer.set_state(ShotoverNodeState::Up); } Err(_) => { - tracing::warn!("Shotover peer {} is down", shotover_peer.address); + tracing::warn!( + "Shotover peer {} is down", + shotover_peer.address_for_clients + ); shotover_peer.set_state(ShotoverNodeState::Down); } }