Skip to content

Commit

Permalink
KafkaSinkCluster: add local_shotover_broker_id config field
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed May 21, 2024
1 parent 4c0bf18 commit 01c8c96
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 26 deletions.
61 changes: 39 additions & 22 deletions docs/src/transforms.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Every transform chain must have exactly one terminating transform and it must be

### Debug

Debug transforms can be temporarily used to test how your Shotover configuration performs. Dont forget to remove them when you are finished.
Debug transforms can be temporarily used to test how your Shotover configuration performs. Don't forget to remove them when you are finished.

### Implementation Status

Expand Down Expand Up @@ -75,11 +75,11 @@ While `system.peers`/`system.peers_v2` will be rewritten to list the configured
# This is usually the same address as the Shotover source that is connected to this sink.
# But it may be different if you want Shotover to report a different address.
- address: "127.0.0.1:9042"
# The data_center this Shotover node will report as and route messages to.
# For performance reasons, Shotover should be physically located in this data_center.
# The data_center the Shotover node will report as and route messages to.
# For performance reasons, the Shotover node should be physically located in this data_center.
data_center: "dc1"
# The rack this Shotover node will report as and route messages to.
# For performance reasons, Shotover should be physically located in this rack.
# The rack the Shotover node will report as and route messages to.
# For performance reasons, the Shotover node should be physically located in this rack.
rack: "rack1"
# The host_id that Shotover will report as.
# Does not affect message routing.
Expand Down Expand Up @@ -132,7 +132,7 @@ While `system.peers`/`system.peers_v2` will be rewritten to list the configured
#### Error handling

If Shotover sends a request to a node and never gets a response, (maybe the node went down), Shotover will return a Cassandra `Server` error to the client.
This is because the message may or may not have succeded, so only the client can attempt to retry as the retry may involve checking if the original query did in fact complete succesfully.
This is because the message may or may not have succeeded, so only the client can attempt to retry as the retry may involve checking if the original query did in fact complete successfully.

If no nodes are capable of receiving the query then Shotover will return a Cassandra `Overloaded` error indicating that the client should retry the query at some point.

Expand All @@ -141,7 +141,7 @@ And all Cassandra errors will be passed directly back to the client.

#### Metrics

This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkCluster` and `chain` as the name of the chain that this transform is in.
This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkCluster` and `chain` as the name of the chain that this transform is in.

### CassandraSinkSingle

Expand Down Expand Up @@ -176,7 +176,7 @@ No cluster discovery or routing occurs with this transform.
# read_timeout: 60
```

This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in.
This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in.

### CassandraPeersRewrite

Expand Down Expand Up @@ -254,6 +254,8 @@ Instead Shotover will pretend to be either a single Kafka node or part of a clus

This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster messages to contain the nodes in the shotover cluster instead of the kafka cluster.

#### SASL SCRAM

By default KafkaSinkCluster does not support SASL SCRAM authentication, if a client attempts to use SCRAM it will appear as if its not enabled in the server.
SCRAM can not be supported normally as it is protected against replaying of auth messages, preventing shotover from opening multiple outgoing connections.

Expand All @@ -266,18 +268,33 @@ If SCRAM authentication against the first kafka broker fails, shotover will term
```yaml
- KafkaSinkCluster:
# Addresses of the initial kafka brokers to connect to.
first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"]
first_contact_points: ["172.16.1.2:9092", "172.16.1.3:9092"]
# A list of every Shotover node that will be proxying to the same kafka cluster.
# This field should be identical for all Shotover nodes proxying to the same kafka cluster.
shotover_nodes:
# Address of the Shotover node.
# This is usually the same address as the Shotover source that is connected to this sink.
# But it may be different if you want Shotover to report a different address.
- "127.0.0.1:9042"
# Address of the Shotover node.
# This is usually the same address as the Shotover source that is connected to this sink.
# But it may be different if you want Shotover to report a different address.
- address: "127.0.0.1:9092"
# The rack the Shotover node will report as and route messages to.
# For performance reasons, the Shotover node should be physically located in this rack.
rack: "rack0"
# The broker ID the Shotover node will report as.
# Does not affect how shotover will route the requests it receives.
# Make sure to set this to a unique value for each Shotover node.
# This must be done to allow the client to properly tell the shotover instances apart.
broker_id: 0
# If you only have a single Shotover instance then you only want a single node.
# Otherwise if you have multiple Shotover instances then add more nodes e.g.
#- "127.0.0.2:9042"
#- address: "127.0.0.2:9092"
# rack: "rack1"
# broker_id: 1
# Defines which entry in shotover_nodes this Shotover instance will become.
# This determines which rack shotover will route to.
local_shotover_broker_id: 0
# Number of milliseconds to wait for a connection to be created to a destination kafka broker.
# If the timeout is exceeded then connection to another node is attempted
Expand Down Expand Up @@ -320,16 +337,16 @@ If SCRAM authentication against the first kafka broker fails, shotover will term
This transform will send/receive Kafka messages to a single Kafka node running on the same machine as shotover.
All kafka brokers in the cluster must be configured with a shotover instance in front of them.
All shotover instances must be on the same port X and all kafka instances must use another port Y.
The client will then connect via shotovers port X.
The client will then connect via shotover's port X.

In order to force clients to connect through shotover the FindCoordinator, Metadata and DescribeCluster messages are rewritten to use the shotover port.

```yaml
- KafkaSinkSingle:
# The port of the upstream Cassandra node/service.
destination_port: 9042
# The port of the upstream Kafka node/service.
destination_port: 9092
# Number of milliseconds to wait for a connection to be created to the destination cassandra instance.
# Number of milliseconds to wait for a connection to be created to the destination kafka instance.
# If the timeout is exceeded then an error is returned to the client.
connect_timeout_ms: 3000
Expand All @@ -351,7 +368,7 @@ In order to force clients to connect through shotover the FindCoordinator, Metad
# #verify_hostname: true
```

This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in.
This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in.

### NullSink

Expand Down Expand Up @@ -534,7 +551,7 @@ Unlike other Redis cluster drivers, this transform does support pipelining. It d

Latency and throughput will be different from pipelining with a single Redis node, but not by much.

This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `RedisSinkCluster` and `chain` as the name of the chain that this transform is in.
This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `RedisSinkCluster` and `chain` as the name of the chain that this transform is in.

#### Differences to real Redis

Expand Down Expand Up @@ -575,7 +592,7 @@ This transform will take a query, serialise it into a RESP2 compatible format an

Note: this will just pass the query to the remote node. No cluster discovery or routing occurs with this transform.

This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `RedisSinkSingle` and `chain` as the name of the chain that this transform is in.
This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `RedisSinkSingle` and `chain` as the name of the chain that this transform is in.

### Tee

Expand Down Expand Up @@ -626,7 +643,7 @@ Tee also exposes an optional HTTP API to switch which chain to use as the "resul
- NullSink
```
This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `tee_dropped_messages` and the label `chain` as `Tee`.
This transform emits a metrics [counter](user-guide/observability.md#counter) named `tee_dropped_messages` and the label `chain` as `Tee`.

### RequestThrottling

Expand Down
1 change: 1 addition & 0 deletions shotover-proxy/benches/windsock/kafka/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ impl KafkaBench {
rack: "rack1".into(),
broker_id: 0,
}],
local_shotover_broker_id: 0,
authorize_scram_over_mtls: None,
tls: None,
}),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ sources:
- address: "127.0.0.1:9192"
rack: "rack0"
broker_id: 0
local_shotover_broker_id: 0
first_contact_points: ["172.16.1.2:9092"]
connect_timeout_ms: 3000
23 changes: 19 additions & 4 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ enum FindCoordinatorError {
pub struct KafkaSinkClusterConfig {
pub first_contact_points: Vec<String>,
pub shotover_nodes: Vec<ShotoverNodeConfig>,
pub local_shotover_broker_id: i32,
pub connect_timeout_ms: u64,
pub read_timeout: Option<u64>,
pub tls: Option<TlsConnectorConfig>,
Expand Down Expand Up @@ -94,7 +95,7 @@ const NAME: &str = "KafkaSinkCluster";
impl TransformConfig for KafkaSinkClusterConfig {
async fn get_builder(
&self,
transform_context: TransformContextConfig,
_transform_context: TransformContextConfig,
) -> Result<Box<dyn TransformBuilder>> {
let tls = self.tls.clone().map(TlsConnector::new).transpose()?;

Expand All @@ -105,13 +106,23 @@ impl TransformConfig for KafkaSinkClusterConfig {
.map(ShotoverNodeConfig::build)
.collect();
let mut shotover_nodes = shotover_nodes?;
let rack = shotover_nodes
.iter()
.find(|x| x.broker_id.0 == self.local_shotover_broker_id)
.map(|x| x.rack.clone())
.ok_or_else(|| {
anyhow!(
"local_shotover_broker_id {} was missing in shotover_nodes",
self.local_shotover_broker_id
)
})?;
shotover_nodes.sort_by_key(|x| x.broker_id);

Ok(Box::new(KafkaSinkClusterBuilder::new(
self.first_contact_points.clone(),
&self.authorize_scram_over_mtls,
shotover_nodes,
transform_context.chain_name,
rack,
self.connect_timeout_ms,
self.read_timeout,
tls,
Expand All @@ -131,6 +142,7 @@ pub struct KafkaSinkClusterBuilder {
// contains address and port
first_contact_points: Vec<String>,
shotover_nodes: Vec<ShotoverNode>,
rack: StrBytes,
connect_timeout: Duration,
read_timeout: Option<Duration>,
controller_broker: Arc<AtomicBrokerId>,
Expand All @@ -139,7 +151,6 @@ pub struct KafkaSinkClusterBuilder {
topic_by_id: Arc<DashMap<Uuid, Topic>>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
authorize_scram_over_mtls: Option<AuthorizeScramOverMtlsBuilder>,

tls: Option<TlsConnector>,
}

Expand All @@ -148,7 +159,7 @@ impl KafkaSinkClusterBuilder {
first_contact_points: Vec<String>,
authorize_scram_over_mtls: &Option<AuthorizeScramOverMtlsConfig>,
shotover_nodes: Vec<ShotoverNode>,
_chain_name: String,
rack: StrBytes,
connect_timeout_ms: u64,
timeout: Option<u64>,
tls: Option<TlsConnector>,
Expand All @@ -163,6 +174,7 @@ impl KafkaSinkClusterBuilder {
.map(|x| x.get_builder(connect_timeout, read_timeout))
.transpose()?,
shotover_nodes,
rack,
connect_timeout,
read_timeout,
controller_broker: Arc::new(AtomicBrokerId::new()),
Expand All @@ -180,6 +192,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
Box::new(KafkaSinkCluster {
first_contact_points: self.first_contact_points.clone(),
shotover_nodes: self.shotover_nodes.clone(),
_rack: self.rack.clone(),
nodes: vec![],
nodes_shared: self.nodes_shared.clone(),
controller_broker: self.controller_broker.clone(),
Expand Down Expand Up @@ -238,6 +251,8 @@ impl AtomicBrokerId {
pub struct KafkaSinkCluster {
first_contact_points: Vec<String>,
shotover_nodes: Vec<ShotoverNode>,
// TODO: use this for rack aware routing
_rack: StrBytes,
nodes: Vec<KafkaNode>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
controller_broker: Arc<AtomicBrokerId>,
Expand Down

0 comments on commit 01c8c96

Please sign in to comment.