diff --git a/docs/src/transforms.md b/docs/src/transforms.md index f22e92bc6..c9a84656a 100644 --- a/docs/src/transforms.md +++ b/docs/src/transforms.md @@ -12,7 +12,7 @@ Every transform chain must have exactly one terminating transform and it must be ### Debug -Debug transforms can be temporarily used to test how your Shotover configuration performs. Dont forget to remove them when you are finished. +Debug transforms can be temporarily used to test how your Shotover configuration performs. Don't forget to remove them when you are finished. ### Implementation Status @@ -75,11 +75,11 @@ While `system.peers`/`system.peers_v2` will be rewritten to list the configured # This is usually the same address as the Shotover source that is connected to this sink. # But it may be different if you want Shotover to report a different address. - address: "127.0.0.1:9042" - # The data_center this Shotover node will report as and route messages to. - # For performance reasons, Shotover should be physically located in this data_center. + # The data_center the Shotover node will report as and route messages to. + # For performance reasons, the Shotover node should be physically located in this data_center. data_center: "dc1" - # The rack this Shotover node will report as and route messages to. - # For performance reasons, Shotover should be physically located in this rack. + # The rack the Shotover node will report as and route messages to. + # For performance reasons, the Shotover node should be physically located in this rack. rack: "rack1" # The host_id that Shotover will report as. # Does not affect message routing. @@ -132,7 +132,7 @@ While `system.peers`/`system.peers_v2` will be rewritten to list the configured #### Error handling If Shotover sends a request to a node and never gets a response, (maybe the node went down), Shotover will return a Cassandra `Server` error to the client. -This is because the message may or may not have succeded, so only the client can attempt to retry as the retry may involve checking if the original query did in fact complete succesfully. +This is because the message may or may not have succeeded, so only the client can attempt to retry as the retry may involve checking if the original query did in fact complete successfully. If no nodes are capable of receiving the query then Shotover will return a Cassandra `Overloaded` error indicating that the client should retry the query at some point. @@ -141,7 +141,7 @@ And all Cassandra errors will be passed directly back to the client. #### Metrics -This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkCluster` and `chain` as the name of the chain that this transform is in. +This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkCluster` and `chain` as the name of the chain that this transform is in. ### CassandraSinkSingle @@ -176,7 +176,7 @@ No cluster discovery or routing occurs with this transform. # read_timeout: 60 ``` -This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in. +This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in. ### CassandraPeersRewrite @@ -254,6 +254,8 @@ Instead Shotover will pretend to be either a single Kafka node or part of a clus This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster messages to contain the nodes in the shotover cluster instead of the kafka cluster. +#### SASL SCRAM + By default KafkaSinkCluster does not support SASL SCRAM authentication, if a client attempts to use SCRAM it will appear as if its not enabled in the server. SCRAM can not be supported normally as it is protected against replaying of auth messages, preventing shotover from opening multiple outgoing connections. @@ -266,18 +268,33 @@ If SCRAM authentication against the first kafka broker fails, shotover will term ```yaml - KafkaSinkCluster: # Addresses of the initial kafka brokers to connect to. - first_contact_points: ["172.16.1.2:9042", "172.16.1.3:9042"] + first_contact_points: ["172.16.1.2:9092", "172.16.1.3:9092"] # A list of every Shotover node that will be proxying to the same kafka cluster. # This field should be identical for all Shotover nodes proxying to the same kafka cluster. shotover_nodes: - # Address of the Shotover node. - # This is usually the same address as the Shotover source that is connected to this sink. - # But it may be different if you want Shotover to report a different address. - - "127.0.0.1:9042" + # Address of the Shotover node. + # This is usually the same address as the Shotover source that is connected to this sink. + # But it may be different if you want Shotover to report a different address. + - address: "127.0.0.1:9092" + # The rack the Shotover node will report as and route messages to. + # For performance reasons, the Shotover node should be physically located in this rack. + rack: "rack0" + # The broker ID the Shotover node will report as. + # Does not affect how shotover will route the requests it receives. + # Make sure to set this to a unique value for each Shotover node. + # This must be done to allow the client to properly tell the shotover instances apart. + broker_id: 0 # If you only have a single Shotover instance then you only want a single node. # Otherwise if you have multiple Shotover instances then add more nodes e.g. - #- "127.0.0.2:9042" + #- address: "127.0.0.2:9092" + # rack: "rack1" + # broker_id: 1 + + + # Defines which entry in shotover_nodes this Shotover instance will become. + # This determines which rack shotover will route to. + local_shotover_broker_id: 0 # Number of milliseconds to wait for a connection to be created to a destination kafka broker. # If the timeout is exceeded then connection to another node is attempted @@ -320,16 +337,16 @@ If SCRAM authentication against the first kafka broker fails, shotover will term This transform will send/receive Kafka messages to a single Kafka node running on the same machine as shotover. All kafka brokers in the cluster must be configured with a shotover instance in front of them. All shotover instances must be on the same port X and all kafka instances must use another port Y. -The client will then connect via shotovers port X. +The client will then connect via shotover's port X. In order to force clients to connect through shotover the FindCoordinator, Metadata and DescribeCluster messages are rewritten to use the shotover port. ```yaml - KafkaSinkSingle: - # The port of the upstream Cassandra node/service. - destination_port: 9042 + # The port of the upstream Kafka node/service. + destination_port: 9092 - # Number of milliseconds to wait for a connection to be created to the destination cassandra instance. + # Number of milliseconds to wait for a connection to be created to the destination kafka instance. # If the timeout is exceeded then an error is returned to the client. connect_timeout_ms: 3000 @@ -351,7 +368,7 @@ In order to force clients to connect through shotover the FindCoordinator, Metad # #verify_hostname: true ``` -This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in. +This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `CassandraSinkSingle` and `chain` as the name of the chain that this transform is in. ### NullSink @@ -534,7 +551,7 @@ Unlike other Redis cluster drivers, this transform does support pipelining. It d Latency and throughput will be different from pipelining with a single Redis node, but not by much. -This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `RedisSinkCluster` and `chain` as the name of the chain that this transform is in. +This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `RedisSinkCluster` and `chain` as the name of the chain that this transform is in. #### Differences to real Redis @@ -575,7 +592,7 @@ This transform will take a query, serialise it into a RESP2 compatible format an Note: this will just pass the query to the remote node. No cluster discovery or routing occurs with this transform. -This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `RedisSinkSingle` and `chain` as the name of the chain that this transform is in. +This transform emits a metrics [counter](user-guide/observability.md#counter) named `failed_requests` and the labels `transform` defined as `RedisSinkSingle` and `chain` as the name of the chain that this transform is in. ### Tee @@ -626,7 +643,7 @@ Tee also exposes an optional HTTP API to switch which chain to use as the "resul - NullSink ``` -This transfrom emits a metrics [counter](user-guide/observability.md#counter) named `tee_dropped_messages` and the label `chain` as `Tee`. +This transform emits a metrics [counter](user-guide/observability.md#counter) named `tee_dropped_messages` and the label `chain` as `Tee`. ### RequestThrottling diff --git a/shotover-proxy/benches/windsock/kafka/bench.rs b/shotover-proxy/benches/windsock/kafka/bench.rs index b503b39f7..036121d4f 100644 --- a/shotover-proxy/benches/windsock/kafka/bench.rs +++ b/shotover-proxy/benches/windsock/kafka/bench.rs @@ -98,6 +98,7 @@ impl KafkaBench { rack: "rack1".into(), broker_id: 0, }], + local_shotover_broker_id: 0, authorize_scram_over_mtls: None, tls: None, }), diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml index f2d8f4368..70545b5a1 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-1-rack/topology-single.yaml @@ -9,5 +9,6 @@ sources: - address: "127.0.0.1:9192" rack: "rack0" broker_id: 0 + local_shotover_broker_id: 0 first_contact_points: ["172.16.1.2:9092"] connect_timeout_ms: 3000 diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 73f09a9eb..33f5c83d8 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -57,6 +57,7 @@ enum FindCoordinatorError { pub struct KafkaSinkClusterConfig { pub first_contact_points: Vec, pub shotover_nodes: Vec, + pub local_shotover_broker_id: i32, pub connect_timeout_ms: u64, pub read_timeout: Option, pub tls: Option, @@ -94,7 +95,7 @@ const NAME: &str = "KafkaSinkCluster"; impl TransformConfig for KafkaSinkClusterConfig { async fn get_builder( &self, - transform_context: TransformContextConfig, + _transform_context: TransformContextConfig, ) -> Result> { let tls = self.tls.clone().map(TlsConnector::new).transpose()?; @@ -105,13 +106,23 @@ impl TransformConfig for KafkaSinkClusterConfig { .map(ShotoverNodeConfig::build) .collect(); let mut shotover_nodes = shotover_nodes?; + let rack = shotover_nodes + .iter() + .find(|x| x.broker_id.0 == self.local_shotover_broker_id) + .map(|x| x.rack.clone()) + .ok_or_else(|| { + anyhow!( + "local_shotover_broker_id {} was missing in shotover_nodes", + self.local_shotover_broker_id + ) + })?; shotover_nodes.sort_by_key(|x| x.broker_id); Ok(Box::new(KafkaSinkClusterBuilder::new( self.first_contact_points.clone(), &self.authorize_scram_over_mtls, shotover_nodes, - transform_context.chain_name, + rack, self.connect_timeout_ms, self.read_timeout, tls, @@ -131,6 +142,7 @@ pub struct KafkaSinkClusterBuilder { // contains address and port first_contact_points: Vec, shotover_nodes: Vec, + rack: StrBytes, connect_timeout: Duration, read_timeout: Option, controller_broker: Arc, @@ -139,7 +151,6 @@ pub struct KafkaSinkClusterBuilder { topic_by_id: Arc>, nodes_shared: Arc>>, authorize_scram_over_mtls: Option, - tls: Option, } @@ -148,7 +159,7 @@ impl KafkaSinkClusterBuilder { first_contact_points: Vec, authorize_scram_over_mtls: &Option, shotover_nodes: Vec, - _chain_name: String, + rack: StrBytes, connect_timeout_ms: u64, timeout: Option, tls: Option, @@ -163,6 +174,7 @@ impl KafkaSinkClusterBuilder { .map(|x| x.get_builder(connect_timeout, read_timeout)) .transpose()?, shotover_nodes, + rack, connect_timeout, read_timeout, controller_broker: Arc::new(AtomicBrokerId::new()), @@ -180,6 +192,7 @@ impl TransformBuilder for KafkaSinkClusterBuilder { Box::new(KafkaSinkCluster { first_contact_points: self.first_contact_points.clone(), shotover_nodes: self.shotover_nodes.clone(), + _rack: self.rack.clone(), nodes: vec![], nodes_shared: self.nodes_shared.clone(), controller_broker: self.controller_broker.clone(), @@ -238,6 +251,8 @@ impl AtomicBrokerId { pub struct KafkaSinkCluster { first_contact_points: Vec, shotover_nodes: Vec, + // TODO: use this for rack aware routing + _rack: StrBytes, nodes: Vec, nodes_shared: Arc>>, controller_broker: Arc,