diff --git a/docs/src/transforms.md b/docs/src/transforms.md index 7fbf4001d..db4174323 100644 --- a/docs/src/transforms.md +++ b/docs/src/transforms.md @@ -229,7 +229,7 @@ This transform will drop any messages it receives and return the supplied respon This transform will route kafka messages to a broker within a Kafka cluster: * produce messages are routed to the partition leader -* fetch messages are routed to a random partition replica +* fetch messages are routed to the partition leader * heartbeat, syncgroup, offsetfetch, joingroup and leavegroup are all routed to the group coordinator * all other messages go to a random node. @@ -238,6 +238,10 @@ Instead Shotover will pretend to be either a single Kafka node or part of a clus This is achieved by rewriting the FindCoordinator, Metadata and DescribeCluster messages to contain the nodes in the shotover cluster instead of the kafka cluster. +Note that: +Produce and fetch requests will be split into multiple requests if no single broker can fulfil the request. +e.g. A produce request contains records for topics that have leaders on different brokers in the real kafka cluster, but the shotover cluster appeared to have them hosted on the same cluster. + #### SASL SCRAM By default KafkaSinkCluster does not support SASL SCRAM authentication, if a client attempts to use SCRAM it will appear as if its not enabled in the server. diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index d9b4624c9..700fc78fc 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -37,6 +37,16 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { num_partitions: 1, replication_factor: 1, }, + NewTopic { + name: "batch_test_partitions_1", + num_partitions: 1, + replication_factor: 1, + }, + NewTopic { + name: "batch_test_partitions_3", + num_partitions: 3, + replication_factor: 1, + }, ]) .await; @@ -66,12 +76,129 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { admin.delete_topics(&["to_delete"]).await } +/// Attempt to make the driver batch produce requests for different topics into the same request +/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests. +pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnectionBuilder) { + // set linger to 100ms to ensure that the concurrent produce requests are combined into a single batched request. + let producer = connection_builder.connect_producer("all", 100).await; + // create an initial record to force kafka to create the topic if it doesnt yet exist + tokio::join!( + producer.assert_produce( + Record { + payload: "initial1", + topic_name: "batch_test_partitions_1", + key: None, + }, + Some(0), + ), + producer.assert_produce( + Record { + payload: "initial2", + topic_name: "batch_test_partitions_3", + key: Some("foo"), + }, + Some(0), + ), + producer.assert_produce( + Record { + payload: "initial3", + topic_name: "batch_test_unknown", + key: None, + }, + Some(0), + ) + ); + + let mut consumer_partitions_1 = connection_builder + .connect_consumer("batch_test_partitions_1", "batch_test_partitions_1_group") + .await; + let mut consumer_partitions_3 = connection_builder + .connect_consumer("batch_test_partitions_3", "batch_test_partitions_3_group") + .await; + let mut consumer_unknown = connection_builder + .connect_consumer("batch_test_unknown", "batch_test_unknown_group") + .await; + + tokio::join!( + consumer_partitions_1.assert_consume(ExpectedResponse { + message: "initial1".to_owned(), + key: None, + topic_name: "batch_test_partitions_1".to_owned(), + offset: Some(0), + }), + consumer_partitions_3.assert_consume(ExpectedResponse { + message: "initial2".to_owned(), + // ensure we route to the same partition every time, so we can assert on the offset when consuming. + key: Some("foo".to_owned()), + topic_name: "batch_test_partitions_3".to_owned(), + offset: Some(0), + }), + consumer_unknown.assert_consume(ExpectedResponse { + message: "initial3".to_owned(), + key: None, + topic_name: "batch_test_unknown".to_owned(), + offset: Some(0), + }) + ); + + // create and consume records + for i in 0..5 { + tokio::join!( + producer.assert_produce( + Record { + payload: "Message1", + topic_name: "batch_test_partitions_1", + key: None, + }, + Some(i + 1), + ), + producer.assert_produce( + Record { + payload: "Message2", + topic_name: "batch_test_partitions_3", + key: Some("foo"), + }, + None, + ), + producer.assert_produce( + Record { + payload: "Message3", + topic_name: "batch_test_unknown", + key: None, + }, + Some(i + 1), + ) + ); + + tokio::join!( + consumer_partitions_1.assert_consume(ExpectedResponse { + message: "Message1".to_owned(), + key: None, + topic_name: "batch_test_partitions_1".to_owned(), + offset: Some(i + 1), + }), + consumer_partitions_3.assert_consume(ExpectedResponse { + message: "Message2".to_owned(), + key: Some("foo".to_owned()), + topic_name: "batch_test_partitions_3".to_owned(), + offset: Some(i + 1), + }), + consumer_unknown.assert_consume(ExpectedResponse { + message: "Message3".to_owned(), + key: None, + topic_name: "batch_test_unknown".to_owned(), + offset: Some(i + 1), + }) + ); + } +} + pub async fn produce_consume_partitions1( connection_builder: &KafkaConnectionBuilder, topic_name: &str, ) { { - let producer = connection_builder.connect_producer("all").await; + let producer = connection_builder.connect_producer("all", 0).await; // create an initial record to force kafka to create the topic if it doesnt yet exist producer .assert_produce( @@ -201,7 +328,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down( } { - let producer = connection_builder.connect_producer("all").await; + let producer = connection_builder.connect_producer("all", 0).await; // create an initial record to force kafka to create the topic if it doesnt yet exist producer .assert_produce( @@ -309,7 +436,7 @@ pub async fn produce_consume_commit_offsets_partitions1( topic_name: &str, ) { { - let producer = connection_builder.connect_producer("1").await; + let producer = connection_builder.connect_producer("1", 0).await; producer .assert_produce( Record { @@ -434,7 +561,7 @@ async fn produce_consume_partitions3( connection_builder: &KafkaConnectionBuilder, topic_name: &str, ) { - let producer = connection_builder.connect_producer("1").await; + let producer = connection_builder.connect_producer("1", 0).await; let mut consumer = connection_builder .connect_consumer(topic_name, "some_group") .await; @@ -483,7 +610,7 @@ async fn produce_consume_partitions3( async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) { let topic_name = "acks0"; - let producer = connection_builder.connect_producer("0").await; + let producer = connection_builder.connect_producer("0", 0).await; for _ in 0..10 { producer @@ -520,6 +647,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { produce_consume_partitions1(connection_builder, "unknown_topic").await; produce_consume_commit_offsets_partitions1(connection_builder, "partitions1_with_offset").await; produce_consume_partitions3(connection_builder, "partitions3").await; + produce_consume_multi_topic_batch(connection_builder).await; // Only run this test case on the java driver, // since even without going through shotover the cpp driver fails this test. diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index a1a307541..caf3f3000 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -11,16 +11,18 @@ use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use connections::{Connections, Destination}; use dashmap::DashMap; +use kafka_protocol::indexmap::IndexMap; use kafka_protocol::messages::fetch_request::FetchTopic; use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; use kafka_protocol::messages::metadata_response::MetadataResponseBroker; +use kafka_protocol::messages::produce_request::TopicProduceData; use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch; use kafka_protocol::messages::{ - ApiKey, BrokerId, FetchRequest, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, - HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, MetadataRequest, MetadataResponse, - RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, - SyncGroupRequest, TopicName, + ApiKey, BrokerId, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, + GroupId, HeartbeatRequest, JoinGroupRequest, LeaveGroupRequest, MetadataRequest, + MetadataResponse, ProduceRequest, ProduceResponse, RequestHeader, SaslAuthenticateRequest, + SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, TopicName, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -809,48 +811,127 @@ impl KafkaSinkCluster { .. })) = message.frame() { - let mut connection = None; - // assume that all topics in this message have the same routing requirements - let (topic_name, topic_data) = produce - .topic_data - .iter() - .next() - .ok_or_else(|| anyhow!("No topics in produce message"))?; - if let Some(topic) = self.topic_by_name.get(topic_name) { - // assume that all partitions in this topic have the same routing requirements - let partition = &topic.partitions[topic_data - .partition_data - .first() - .ok_or_else(|| anyhow!("No partitions in topic"))? - .index as usize]; - for node in &mut self.nodes { - if node.broker_id == partition.leader_id { - connection = Some(node.broker_id); + let routing = self.split_produce_request_by_destination(produce); + + if routing.is_empty() { + // Produce contains no topics, so we can just pick a random destination. + // The message is unchanged so we can just send as is. + let destination = random_broker_id(&self.nodes, &mut self.rng); + + self.pending_requests.push_back(PendingRequest { + ty: PendingRequestTy::routed(destination, message), + combine_responses: 1, + }); + } else if routing.len() == 1 { + // Only 1 destination, + // so we can just reconstruct the original message as is, + // act like this never happened 😎, + // we dont even need to invalidate the message's cache. + let (destination, topic_data) = routing.into_iter().next().unwrap(); + let destination = if destination == -1 { + random_broker_id(&self.nodes, &mut self.rng) + } else { + destination + }; + + produce.topic_data = topic_data; + self.pending_requests.push_back(PendingRequest { + ty: PendingRequestTy::routed(destination, message), + combine_responses: 1, + }); + } else { + // The message has been split so it may be delivered to multiple destinations. + // We must generate a unique message for each destination. + let combine_responses = routing.len(); + message.invalidate_cache(); + for (i, (destination, topic_data)) in routing.into_iter().enumerate() { + let destination = if destination == -1 { + random_broker_id(&self.nodes, &mut self.rng) + } else { + destination + }; + let mut request = if i == 0 { + // First message acts as base and retains message id + message.clone() + } else { + message.clone_with_new_id() + }; + if let Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::Produce(produce), + .. + })) = request.frame() + { + produce.topic_data = topic_data; } + self.pending_requests.push_back(PendingRequest { + ty: PendingRequestTy::routed(destination, request), + combine_responses, + }); } } - let destination = match connection { - Some(connection) => connection, - None => { - tracing::debug!( - r#"no known partition leader for {topic_name:?} -routing message to a random node so that: -* if auto topic creation is enabled, auto topic creation will occur -* if auto topic creation is disabled a NOT_LEADER_OR_FOLLOWER is returned to the client"# - ); - random_broker_id(&self.nodes, &mut self.rng) - } - }; - - self.pending_requests.push_back(PendingRequest { - ty: PendingRequestTy::routed(destination, message), - combine_responses: 1, - }); } Ok(()) } + /// This method removes all topics from the produce request and returns them split up by their destination + /// If any topics are unroutable they will have their destination BrokerId set to -1 + fn split_produce_request_by_destination( + &mut self, + produce: &mut ProduceRequest, + ) -> HashMap> { + let mut result: HashMap> = + Default::default(); + + for (name, mut topic) in produce.topic_data.drain(..) { + let topic_meta = self.topic_by_name.get(&name); + if let Some(topic_meta) = topic_meta { + for partition in std::mem::take(&mut topic.partition_data) { + let partition_index = partition.index as usize; + let destination = if let Some(partition) = + topic_meta.partitions.get(partition_index) + { + if partition.leader_id == -1 { + tracing::warn!("leader_id is unknown for topic {name:?} at partition index {partition_index}"); + } + partition.leader_id + } else { + let partition_len = topic_meta.partitions.len(); + tracing::warn!("no known partition replica for {name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + BrokerId(-1) + }; + + // Get the topics already routed to this destination + let routed_topics = result.entry(destination).or_default(); + + if let Some(routed_topic) = routed_topics.get_mut(&name) { + // we have already routed this topic to this broker, add another partition + routed_topic.partition_data.push(partition); + } else { + // we have not yet routed this topic to this broker, add the first partition + // Clone the original topic value, to ensure we carry over any `unknown_tagged_fields` values. + // The partition_data is empty at this point due to the previous `std::mem::take` + let mut topic = topic.clone(); + topic.partition_data.push(partition); + routed_topics.insert(name.clone(), topic); + } + } + } else { + tracing::debug!( + r#"no known partition leader for {name:?} + routing request to a random node so that: + * if auto topic creation is enabled, auto topic creation will occur + * if auto topic creation is disabled a NOT_LEADER_OR_FOLLOWER is returned to the client"# + ); + let destination = BrokerId(-1); + let dest_topics = result.entry(destination).or_default(); + dest_topics.insert(name, topic); + } + } + + result + } + /// This method removes all topics from the fetch request and returns them split up by their destination /// If any topics are unroutable they will have their BrokerId set to -1 fn split_fetch_request_by_destination( @@ -1276,7 +1357,7 @@ routing message to a random node so that: unreachable!("Guaranteed by all_combined_received") } }); - responses.push(Self::combine_fetch_responses(drain)?); + responses.push(Self::combine_responses(drain)?); } } else { // The pending_request is not received, we need to break to maintain response ordering. @@ -1286,57 +1367,105 @@ routing message to a random node so that: Ok(responses) } - fn combine_fetch_responses(mut drain: impl Iterator) -> Result { + fn combine_responses(mut drain: impl Iterator) -> Result { // Take this response as base. // Then iterate over all remaining combined responses and integrate them into the base. let mut base = drain.next().unwrap(); - if let Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::Fetch(base_fetch), - .. - })) = base.frame() - { - for mut next in drain { - if let Some(Frame::Kafka(KafkaFrame::Response { - body: ResponseBody::Fetch(next_fetch), - .. - })) = next.frame() - { - for next_response in std::mem::take(&mut next_fetch.responses) { - if let Some(base_response) = - base_fetch.responses.iter_mut().find(|response| { - response.topic == next_response.topic - && response.topic_id == next_response.topic_id - }) - { - for next_partition in &next_response.partitions { - for base_partition in &base_response.partitions { - if next_partition.partition_index - == base_partition.partition_index - { - tracing::warn!("Duplicate partition indexes in combined fetch response, if this ever occurs we should investigate the repercussions") - } + base.invalidate_cache(); + + match base.frame() { + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::Fetch(base), + .. + })) => Self::combine_fetch_responses(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::Produce(base), + .. + })) => Self::combine_produce_responses(base, drain)?, + _ => { + return Err(anyhow!( + "Combining of this message type is currently unsupported" + )) + } + } + + Ok(base) + } + + fn combine_fetch_responses( + base_fetch: &mut FetchResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::Fetch(next_fetch), + .. + })) = next.frame() + { + for next_response in std::mem::take(&mut next_fetch.responses) { + if let Some(base_response) = base_fetch.responses.iter_mut().find(|response| { + response.topic == next_response.topic + && response.topic_id == next_response.topic_id + }) { + for next_partition in &next_response.partitions { + for base_partition in &base_response.partitions { + if next_partition.partition_index == base_partition.partition_index + { + tracing::warn!("Duplicate partition indexes in combined fetch response, if this ever occurs we should investigate the repercussions") } } - // A partition can only be contained in one response so there is no risk of duplicating partitions - base_response.partitions.extend(next_response.partitions) - } else { - base_fetch.responses.push(next_response); } + // A partition can only be contained in one response so there is no risk of duplicating partitions + base_response.partitions.extend(next_response.partitions) + } else { + base_fetch.responses.push(next_response); } - } else { - return Err(anyhow!( - "Combining Fetch messages but received another message type" - )); } + } else { + return Err(anyhow!( + "Combining Fetch responses but received another message type" + )); } - } else { - return Err(anyhow!( - "Combining of message types other than Fetch is currently unsupported" - )); } - base.invalidate_cache(); - Ok(base) + Ok(()) + } + + fn combine_produce_responses( + base_produce: &mut ProduceResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::Produce(next_produce), + .. + })) = next.frame() + { + for (next_name, next_response) in std::mem::take(&mut next_produce.responses) { + if let Some(base_response) = base_produce.responses.get_mut(&next_name) { + for next_partition in &next_response.partition_responses { + for base_partition in &base_response.partition_responses { + if next_partition.index == base_partition.index { + tracing::warn!("Duplicate partition indexes in combined produce response, if this ever occurs we should investigate the repercussions") + } + } + } + // A partition can only be contained in one response so there is no risk of duplicating partitions + base_response + .partition_responses + .extend(next_response.partition_responses) + } else { + base_produce.responses.insert(next_name, next_response); + } + } + } else { + return Err(anyhow!( + "Combining Produce responses but received another message type" + )); + } + } + + Ok(()) } async fn process_responses( diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index a9451329a..d2008433f 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -50,12 +50,13 @@ impl KafkaConnectionBuilderCpp { self } - pub async fn connect_producer(&self, acks: &str) -> KafkaProducerCpp { + pub async fn connect_producer(&self, acks: &str, linger_ms: u32) -> KafkaProducerCpp { KafkaProducerCpp { producer: self .client .clone() .set("message.timeout.ms", "5000") + .set("linger.ms", linger_ms.to_string()) .set("acks", acks) .create() .unwrap(), diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 68c4b2634..083a82371 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -79,9 +79,10 @@ impl KafkaConnectionBuilderJava { self } - pub async fn connect_producer(&self, acks: &str) -> KafkaProducerJava { + pub async fn connect_producer(&self, acks: &str, linger_ms: u32) -> KafkaProducerJava { let mut config = self.base_config.clone(); config.insert("acks".to_owned(), acks.to_string()); + config.insert("linger.ms".to_owned(), linger_ms.to_string()); config.insert( "key.serializer".to_owned(), "org.apache.kafka.common.serialization.StringSerializer".to_owned(), diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index f2c737f35..1363156f7 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -67,11 +67,11 @@ impl KafkaConnectionBuilder { } } - pub async fn connect_producer(&self, acks: &str) -> KafkaProducer { + pub async fn connect_producer(&self, acks: &str, linger_ms: u32) -> KafkaProducer { match self { #[cfg(feature = "kafka-cpp-driver-tests")] - Self::Cpp(cpp) => KafkaProducer::Cpp(cpp.connect_producer(acks).await), - Self::Java(java) => KafkaProducer::Java(java.connect_producer(acks).await), + Self::Cpp(cpp) => KafkaProducer::Cpp(cpp.connect_producer(acks, linger_ms).await), + Self::Java(java) => KafkaProducer::Java(java.connect_producer(acks, linger_ms).await), } } @@ -148,10 +148,24 @@ impl KafkaConsumer { Self::Cpp(cpp) => cpp.consume().await, Self::Java(java) => java.consume().await, }; - assert_eq!(expected_response.message, response.message); - assert_eq!(expected_response.key, response.key); - assert_eq!(expected_response.topic_name, response.topic_name); - assert_eq!(expected_response.offset, response.offset); + + let topic = &expected_response.topic_name; + assert_eq!( + expected_response.topic_name, response.topic_name, + "Unexpected topic" + ); + assert_eq!( + expected_response.message, response.message, + "Unexpected message for topic {topic}" + ); + assert_eq!( + expected_response.key, response.key, + "Unexpected key for topic {topic}" + ); + assert_eq!( + expected_response.offset, response.offset, + "Unexpected offset for topic {topic}" + ); } pub async fn assert_consume_in_any_order(&mut self, expected_responses: Vec) {