From 1cff7c3547b143bb794a5786770afb0cf6fd8287 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 18 Nov 2024 09:19:06 +1100 Subject: [PATCH] KafkaSinkCluster: Support new consumer protocol --- shotover-proxy/tests/kafka_int_tests/mod.rs | 5 ++ .../tests/kafka_int_tests/test_cases.rs | 65 ++++++++++++++++++- .../docker-compose-kafka-3.9.yaml | 2 + .../src/transforms/kafka/sink_cluster/mod.rs | 25 +++++-- test-helpers/src/connection/kafka/cpp.rs | 4 ++ test-helpers/src/connection/kafka/java.rs | 8 +++ test-helpers/src/connection/kafka/mod.rs | 12 ++++ 7 files changed, 115 insertions(+), 6 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/mod.rs b/shotover-proxy/tests/kafka_int_tests/mod.rs index 7675093b2..e49cbdb58 100644 --- a/shotover-proxy/tests/kafka_int_tests/mod.rs +++ b/shotover-proxy/tests/kafka_int_tests/mod.rs @@ -622,6 +622,11 @@ async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) { let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192"); test_cases::cluster_test_suite(&connection_builder).await; + test_cases::produce_consume_partitions_new_consumer_group_protocol( + &connection_builder, + "partitions3_new_consumer_group_protocol", + ) + .await; for shotover in shotovers { tokio::time::timeout( diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index aad87edcb..e7f79d873 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, time::Duration}; use test_helpers::{ connection::kafka::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig, - ConsumerGroupDescription, ExpectedResponse, IsolationLevel, KafkaAdmin, + ConsumerGroupDescription, ConsumerProtocol, ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition, @@ -47,6 +47,11 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { num_partitions: 3, replication_factor: 1, }, + NewTopic { + name: "partitions3_new_consumer_group_protocol", + num_partitions: 3, + replication_factor: 1, + }, NewTopic { name: "acks0", num_partitions: 1, @@ -1091,6 +1096,64 @@ pub async fn produce_consume_partitions3( } } +/// The new consumer protocol must be specifically enabled in the broker config. +/// We only do this for the kafka 3.9 docker-compose.yaml so this test case is +/// manually called for that test and not included in the standard test suite. +pub async fn produce_consume_partitions_new_consumer_group_protocol( + connection_builder: &KafkaConnectionBuilder, + topic_name: &str, +) { + let producer = connection_builder.connect_producer("1", 0).await; + let mut consumer = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()]) + .with_group("some_group") + .with_protocol(ConsumerProtocol::Consumer), + ) + .await; + + for _ in 0..5 { + producer + .assert_produce( + Record { + payload: "Message1", + topic_name, + key: Some("Key".into()), + }, + // We cant predict the offsets since that will depend on which partition the keyless record ends up in + None, + ) + .await; + producer + .assert_produce( + Record { + payload: "Message2", + topic_name, + key: None, + }, + None, + ) + .await; + + consumer + .assert_consume_in_any_order(vec![ + ExpectedResponse { + message: "Message1".to_owned(), + key: Some("Key".to_owned()), + topic_name: topic_name.to_owned(), + offset: None, + }, + ExpectedResponse { + message: "Message2".to_owned(), + key: None, + topic_name: topic_name.to_owned(), + offset: None, + }, + ]) + .await; + } +} + pub async fn produce_consume_multi_topic_consumer(connection_builder: &KafkaConnectionBuilder) { let producer = connection_builder.connect_producer("1", 0).await; let mut consumer = connection_builder diff --git a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml index 430d8e1c0..317e247bd 100644 --- a/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml +++ b/shotover-proxy/tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml @@ -36,6 +36,8 @@ services: # # However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run. KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0" + + KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "consumer, classic" volumes: &volumes - type: tmpfs target: /bitnami/kafka diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 6a4c72a71..77e0bc708 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -32,11 +32,11 @@ use kafka_protocol::messages::produce_response::{ }; use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, - BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, - DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse, - DescribeProducersRequest, DescribeProducersResponse, DescribeTransactionsRequest, - DescribeTransactionsResponse, EndTxnRequest, FetchRequest, FetchResponse, - FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, + BrokerId, ConsumerGroupHeartbeatRequest, DeleteGroupsRequest, DeleteGroupsResponse, + DeleteRecordsRequest, DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, + DescribeGroupsResponse, DescribeProducersRequest, DescribeProducersResponse, + DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest, + FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, @@ -736,6 +736,10 @@ impl KafkaSinkCluster { Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Heartbeat(HeartbeatRequest { group_id, .. }) + | RequestBody::ConsumerGroupHeartbeat(ConsumerGroupHeartbeatRequest { + group_id, + .. + }) | RequestBody::SyncGroup(SyncGroupRequest { group_id, .. }) | RequestBody::JoinGroup(JoinGroupRequest { group_id, .. }) | RequestBody::LeaveGroup(LeaveGroupRequest { group_id, .. }) @@ -955,6 +959,13 @@ impl KafkaSinkCluster { let group_id = heartbeat.group_id.clone(); self.route_to_group_coordinator(request, group_id); } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::ConsumerGroupHeartbeat(heartbeat), + .. + })) => { + let group_id = heartbeat.group_id.clone(); + self.route_to_group_coordinator(request, group_id); + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::SyncGroup(sync_group), .. @@ -2928,6 +2939,10 @@ The connection to the client has been closed." body: ResponseBody::Heartbeat(heartbeat), .. })) => self.handle_group_coordinator_routing_error(&request_ty, heartbeat.error_code), + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::ConsumerGroupHeartbeat(heartbeat), + .. + })) => self.handle_group_coordinator_routing_error(&request_ty, heartbeat.error_code), Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::SyncGroup(sync_group), .. diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index 718fd1399..2d891bece 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -96,6 +96,10 @@ impl KafkaConnectionBuilderCpp { .create() .unwrap(); + if let ConsumerProtocol::Consumer = config.protocol { + panic!("New consumer protocol not support by rdkafka driver"); + } + let topic_names: Vec<&str> = config.topic_names.iter().map(|x| x.as_str()).collect(); consumer.subscribe(&topic_names).unwrap(); KafkaConsumerCpp { consumer } diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 996010b92..d4d7ca9ac 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -172,6 +172,14 @@ impl KafkaConnectionBuilderJava { "org.apache.kafka.common.serialization.StringDeserializer".to_owned(), ); + config.insert( + "group.protocol".to_owned(), + match consumer_config.protocol { + super::ConsumerProtocol::Classic => "CLASSIC".to_owned(), + super::ConsumerProtocol::Consumer => "CONSUMER".to_owned(), + }, + ); + let consumer = self.jvm.construct( "org.apache.kafka.clients.consumer.KafkaConsumer", vec![properties(&self.jvm, &config)], diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 635609f24..e5ca66880 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -712,6 +712,7 @@ pub struct ConsumerConfig { fetch_min_bytes: i32, fetch_max_wait_ms: i32, isolation_level: IsolationLevel, + protocol: ConsumerProtocol, } impl ConsumerConfig { @@ -722,6 +723,7 @@ impl ConsumerConfig { fetch_min_bytes: 1, fetch_max_wait_ms: 500, isolation_level: IsolationLevel::ReadUncommitted, + protocol: ConsumerProtocol::Classic, } } @@ -744,6 +746,11 @@ impl ConsumerConfig { self.isolation_level = isolation_level; self } + + pub fn with_protocol(mut self, protocol: ConsumerProtocol) -> Self { + self.protocol = protocol; + self + } } pub enum IsolationLevel { @@ -760,6 +767,11 @@ impl IsolationLevel { } } +pub enum ConsumerProtocol { + Classic, + Consumer, +} + #[derive(PartialEq, Debug)] pub struct ConsumerGroupDescription { pub is_simple_consumer: bool,