Skip to content

Commit

Permalink
Split ShotoverNode::address into address_for_peers and address_for_cl…
Browse files Browse the repository at this point in the history
…ient
  • Loading branch information
rukai committed Oct 20, 2024
1 parent 26550b8 commit 8007ab5
Show file tree
Hide file tree
Showing 24 changed files with 120 additions and 63 deletions.
10 changes: 7 additions & 3 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,10 +261,14 @@ If SCRAM authentication against the first kafka broker fails, shotover will term
# A list of every Shotover node that will be proxying to the same kafka cluster.
# This field should be identical for all Shotover nodes proxying to the same kafka cluster.
shotover_nodes:
# Address of the Shotover node.
# Address of the Shotover node that is reported to the kafka clients.
# This is usually the same address as the Shotover source that is connected to this sink.
# But it may be different if you want Shotover to report a different address.
- address: "127.0.0.1:9092"
# But it may be different if you want Shotover to report a different address to its clients.
- address_for_client: "127.0.0.1:9092"
# Address of the shotover node as used to check for peers that are up.
# This is usually the same address as the Shotover source that is connected to this sink.
# But it may be different if you want Shotover to connect to its peers via a different address.
address_for_peers: "127.0.0.1:9092"
# The rack the Shotover node will report as and route messages to.
# For performance reasons, the Shotover node should be physically located in this rack.
rack: "rack0"
Expand Down
3 changes: 2 additions & 1 deletion shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ impl KafkaBench {
check_shotover_peers_delay_ms: Some(3000),
first_contact_points: vec![kafka_address],
shotover_nodes: vec![ShotoverNodeConfig {
address: host_address.parse().unwrap(),
address_for_clients: host_address.parse().unwrap(),
address_for_peers: host_address.parse().unwrap(),
rack: "rack1".into(),
broker_id: 0,
}],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "localhost:9191"
- address_for_peers: "localhost:9191"
address_for_client: "localhost:9191"
rack: "rack1"
broker_id: 0
- address: "localhost:9192"
- address_for_peers: "localhost:9192"
address_for_client: "localhost:9192"
rack: "rack2"
broker_id: 1
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "localhost:9191"
- address_for_peers: "localhost:9191"
address_for_client: "localhost:9191"
rack: "rack1"
broker_id: 0
- address: "localhost:9192"
- address_for_peers: "localhost:9192"
address_for_client: "localhost:9192"
rack: "rack2"
broker_id: 1
local_shotover_broker_id: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "localhost:9191"
- address_for_peers: "localhost:9191"
address_for_client: "localhost:9191"
rack: "rack1"
broker_id: 0
- address: "localhost:9192"
- address_for_peers: "localhost:9192"
address_for_client: "localhost:9192"
rack: "rack2"
broker_id: 1
- address: "localhost:9193"
- address_for_peers: "localhost:9193"
address_for_client: "localhost:9193"
rack: "rack3"
broker_id: 2
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "localhost:9191"
- address_for_peers: "localhost:9191"
address_for_client: "localhost:9191"
rack: "rack1"
broker_id: 0
- address: "localhost:9192"
- address_for_peers: "localhost:9192"
address_for_client: "localhost:9192"
rack: "rack2"
broker_id: 1
- address: "localhost:9193"
- address_for_peers: "localhost:9193"
address_for_client: "localhost:9193"
rack: "rack3"
broker_id: 2
local_shotover_broker_id: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "localhost:9191"
- address_for_peers: "localhost:9191"
address_for_client: "localhost:9191"
rack: "rack1"
broker_id: 0
- address: "localhost:9192"
- address_for_peers: "localhost:9192"
address_for_client: "localhost:9192"
rack: "rack2"
broker_id: 1
- address: "localhost:9193"
- address_for_peers: "localhost:9193"
address_for_client: "localhost:9193"
rack: "rack3"
broker_id: 2
local_shotover_broker_id: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
- address_for_peers: "localhost:9192"
address_for_client: "localhost:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9191"
- address_for_peers: "127.0.0.1:9191"
address_for_client: "127.0.0.1:9191"
rack: "rack0"
broker_id: 0
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 1
- address: "127.0.0.1:9193"
- address_for_peers: "127.0.0.1:9193"
address_for_client: "127.0.0.1:9193"
rack: "rack0"
broker_id: 2
local_shotover_broker_id: 2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ sources:
chain:
- KafkaSinkCluster:
shotover_nodes:
- address: "127.0.0.1:9192"
- address_for_peers: "127.0.0.1:9192"
address_for_client: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
Expand Down
14 changes: 7 additions & 7 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl TransformConfig for KafkaSinkClusterConfig {
if !unique_broker_ids.insert(node.broker_id) {
return Err(anyhow::anyhow!(
"Duplicate broker_id found in shotover node {}",
node.address
node.address_for_clients
));
}
}
Expand Down Expand Up @@ -2644,8 +2644,8 @@ impl KafkaSinkCluster {
partition_shotover_nodes_by_rack(&up_shotover_nodes, coordinator_rack);
let shotover_node = select_shotover_node_by_hash(shotover_nodes_by_rack, hash);

find_coordinator.host = shotover_node.address.host.clone();
find_coordinator.port = shotover_node.address.port;
find_coordinator.host = shotover_node.address_for_clients.host.clone();
find_coordinator.port = shotover_node.address_for_clients.port;
find_coordinator.node_id = shotover_node.broker_id;
}
} else {
Expand Down Expand Up @@ -2674,8 +2674,8 @@ impl KafkaSinkCluster {
partition_shotover_nodes_by_rack(&up_shotover_nodes, coordinator_rack);
let shotover_node = select_shotover_node_by_hash(shotover_nodes_by_rack, hash);

coordinator.host = shotover_node.address.host.clone();
coordinator.port = shotover_node.address.port;
coordinator.host = shotover_node.address_for_clients.host.clone();
coordinator.port = shotover_node.address_for_clients.port;
coordinator.node_id = shotover_node.broker_id;
}
}
Expand Down Expand Up @@ -2705,8 +2705,8 @@ impl KafkaSinkCluster {
.map(|shotover_node| {
MetadataResponseBroker::default()
.with_node_id(shotover_node.broker_id)
.with_host(shotover_node.address.host.clone())
.with_port(shotover_node.address.port)
.with_host(shotover_node.address_for_clients.host.clone())
.with_port(shotover_node.address_for_clients.port)
.with_rack(Some(shotover_node.rack.clone()))
})
.collect();
Expand Down
Loading

0 comments on commit 8007ab5

Please sign in to comment.