Skip to content

Commit

Permalink
KafkaSinkCluster: Support new consumer protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 20, 2024
1 parent 3f76b54 commit 1cff7c3
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 6 deletions.
5 changes: 5 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,11 @@ async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) {

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;
test_cases::produce_consume_partitions_new_consumer_group_protocol(
&connection_builder,
"partitions3_new_consumer_group_protocol",
)
.await;

for shotover in shotovers {
tokio::time::timeout(
Expand Down
65 changes: 64 additions & 1 deletion shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ConsumerGroupDescription, ExpectedResponse, IsolationLevel, KafkaAdmin,
ConsumerGroupDescription, ConsumerProtocol, ExpectedResponse, IsolationLevel, KafkaAdmin,
KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, ListOffsetsResultInfo,
NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, Record,
RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
Expand Down Expand Up @@ -47,6 +47,11 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_new_consumer_group_protocol",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "acks0",
num_partitions: 1,
Expand Down Expand Up @@ -1091,6 +1096,64 @@ pub async fn produce_consume_partitions3(
}
}

/// The new consumer protocol must be specifically enabled in the broker config.
/// We only do this for the kafka 3.9 docker-compose.yaml so this test case is
/// manually called for that test and not included in the standard test suite.
pub async fn produce_consume_partitions_new_consumer_group_protocol(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
) {
let producer = connection_builder.connect_producer("1", 0).await;
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("some_group")
.with_protocol(ConsumerProtocol::Consumer),
)
.await;

for _ in 0..5 {
producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key".into()),
},
// We cant predict the offsets since that will depend on which partition the keyless record ends up in
None,
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: None,
},
None,
)
.await;

consumer
.assert_consume_in_any_order(vec![
ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: None,
},
ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: None,
},
])
.await;
}
}

pub async fn produce_consume_multi_topic_consumer(connection_builder: &KafkaConnectionBuilder) {
let producer = connection_builder.connect_producer("1", 0).await;
let mut consumer = connection_builder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ services:
#
# However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run.
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"

KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "consumer, classic"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
Expand Down
25 changes: 20 additions & 5 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ use kafka_protocol::messages::produce_response::{
};
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest, DescribeGroupsResponse,
DescribeProducersRequest, DescribeProducersResponse, DescribeTransactionsRequest,
DescribeTransactionsResponse, EndTxnRequest, FetchRequest, FetchResponse,
FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
BrokerId, ConsumerGroupHeartbeatRequest, DeleteGroupsRequest, DeleteGroupsResponse,
DeleteRecordsRequest, DeleteRecordsResponse, DescribeClusterResponse, DescribeGroupsRequest,
DescribeGroupsResponse, DescribeProducersRequest, DescribeProducersResponse,
DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
Expand Down Expand Up @@ -736,6 +736,10 @@ impl KafkaSinkCluster {
Some(Frame::Kafka(KafkaFrame::Request {
body:
RequestBody::Heartbeat(HeartbeatRequest { group_id, .. })
| RequestBody::ConsumerGroupHeartbeat(ConsumerGroupHeartbeatRequest {
group_id,
..
})
| RequestBody::SyncGroup(SyncGroupRequest { group_id, .. })
| RequestBody::JoinGroup(JoinGroupRequest { group_id, .. })
| RequestBody::LeaveGroup(LeaveGroupRequest { group_id, .. })
Expand Down Expand Up @@ -955,6 +959,13 @@ impl KafkaSinkCluster {
let group_id = heartbeat.group_id.clone();
self.route_to_group_coordinator(request, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ConsumerGroupHeartbeat(heartbeat),
..
})) => {
let group_id = heartbeat.group_id.clone();
self.route_to_group_coordinator(request, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::SyncGroup(sync_group),
..
Expand Down Expand Up @@ -2928,6 +2939,10 @@ The connection to the client has been closed."
body: ResponseBody::Heartbeat(heartbeat),
..
})) => self.handle_group_coordinator_routing_error(&request_ty, heartbeat.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ConsumerGroupHeartbeat(heartbeat),
..
})) => self.handle_group_coordinator_routing_error(&request_ty, heartbeat.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::SyncGroup(sync_group),
..
Expand Down
4 changes: 4 additions & 0 deletions test-helpers/src/connection/kafka/cpp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ impl KafkaConnectionBuilderCpp {
.create()
.unwrap();

if let ConsumerProtocol::Consumer = config.protocol {
panic!("New consumer protocol not support by rdkafka driver");
}

let topic_names: Vec<&str> = config.topic_names.iter().map(|x| x.as_str()).collect();
consumer.subscribe(&topic_names).unwrap();
KafkaConsumerCpp { consumer }
Expand Down
8 changes: 8 additions & 0 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,14 @@ impl KafkaConnectionBuilderJava {
"org.apache.kafka.common.serialization.StringDeserializer".to_owned(),
);

config.insert(
"group.protocol".to_owned(),
match consumer_config.protocol {
super::ConsumerProtocol::Classic => "CLASSIC".to_owned(),
super::ConsumerProtocol::Consumer => "CONSUMER".to_owned(),
},
);

let consumer = self.jvm.construct(
"org.apache.kafka.clients.consumer.KafkaConsumer",
vec![properties(&self.jvm, &config)],
Expand Down
12 changes: 12 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ pub struct ConsumerConfig {
fetch_min_bytes: i32,
fetch_max_wait_ms: i32,
isolation_level: IsolationLevel,
protocol: ConsumerProtocol,
}

impl ConsumerConfig {
Expand All @@ -722,6 +723,7 @@ impl ConsumerConfig {
fetch_min_bytes: 1,
fetch_max_wait_ms: 500,
isolation_level: IsolationLevel::ReadUncommitted,
protocol: ConsumerProtocol::Classic,
}
}

Expand All @@ -744,6 +746,11 @@ impl ConsumerConfig {
self.isolation_level = isolation_level;
self
}

pub fn with_protocol(mut self, protocol: ConsumerProtocol) -> Self {
self.protocol = protocol;
self
}
}

pub enum IsolationLevel {
Expand All @@ -760,6 +767,11 @@ impl IsolationLevel {
}
}

pub enum ConsumerProtocol {
Classic,
Consumer,
}

#[derive(PartialEq, Debug)]
pub struct ConsumerGroupDescription {
pub is_simple_consumer: bool,
Expand Down

0 comments on commit 1cff7c3

Please sign in to comment.