Skip to content

Commit

Permalink
KafkaSinkCluster: route ListGroups request
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 29, 2024
1 parent e3bac43 commit 7dcfcf0
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 58 deletions.
124 changes: 74 additions & 50 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, IsolationLevel, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver,
KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata,
OffsetSpec, Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer,
KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic,
OffsetAndMetadata, OffsetSpec, Record, ResourcePatternType, ResourceSpecifier,
ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};
Expand All @@ -25,11 +26,6 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "partitions3",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_case1",
num_partitions: 3,
Expand Down Expand Up @@ -1367,74 +1363,102 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
.await;
produce_consume_partitions1(connection_builder, "partitions1").await;

let results = admin
.list_offsets(HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
OffsetSpec::Latest,
),
]))
.await;
// rdkafka-rs doesnt support these methods
list_offsets(&admin).await;
list_groups(connection_builder, &admin).await;
}

produce_consume_acks0(connection_builder).await;
admin_cleanup(connection_builder).await;
}

let expected = HashMap::from([
async fn list_offsets(admin: &KafkaAdmin) {
let results = admin
.list_offsets(HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 0 },
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
ListOffsetsResultInfo { offset: 0 },
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
ListOffsetsResultInfo { offset: 0 },
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 11 },
OffsetSpec::Latest,
),
]);
assert_eq!(results, expected);
}
]))
.await;

produce_consume_acks0(connection_builder).await;
admin_cleanup(connection_builder).await;
let expected = HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 11 },
),
]);
assert_eq!(results, expected);
}

async fn list_groups(connection_builder: &KafkaConnectionBuilder, admin: &KafkaAdmin) {
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()])
.with_group("list_groups_test"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: "partitions1".to_owned(),
offset: Some(0),
})
.await;

let actual_results = admin.list_groups().await;
if !actual_results.contains(&"list_groups_test".to_owned()) {
panic!("Expected to find list_groups_test in {actual_results:?} but was misisng")
}
}

pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
Expand Down
56 changes: 50 additions & 6 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@ use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest,
ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest,
ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsRequest,
ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse,
OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -48,7 +49,7 @@ use scram_over_mtls::{
use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter,
ListOffsetsRequestSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
Expand Down Expand Up @@ -986,6 +987,11 @@ impl KafkaSinkCluster {
..
})) => self.route_to_controller(request),

Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListGroups(_),
..
})) => self.split_and_route_request::<ListGroupsSplitAndRouter>(request)?,

// route to random broker
Some(Frame::Kafka(KafkaFrame::Request {
body:
Expand Down Expand Up @@ -1400,6 +1406,21 @@ impl KafkaSinkCluster {
result
}

/// This method removes all group ids from the DeleteGroups request and returns them split up by their destination.
/// If any groups ids are unroutable they will have their BrokerId set to -1
fn split_list_groups_request_by_destination(
&mut self,
body: &mut ListGroupsRequest,
) -> HashMap<BrokerId, ()> {
let mut result: HashMap<BrokerId, ()> = Default::default();

for broker in &self.nodes {
result.insert(broker.broker_id, ());
}

result
}

/// This method removes all groups from the OffsetFetch request and returns them split up by their destination.
/// If any groups are unroutable they will have their BrokerId set to -1
fn split_offset_fetch_request_by_destination(
Expand Down Expand Up @@ -1973,6 +1994,10 @@ impl KafkaSinkCluster {
body: ResponseBody::OffsetFetch(base),
..
})) => Self::combine_offset_fetch(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListGroups(base),
..
})) => Self::combine_list_groups(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(base),
version,
Expand Down Expand Up @@ -2193,6 +2218,25 @@ impl KafkaSinkCluster {
Ok(())
}

fn combine_list_groups(
base_list_groups: &mut ListGroupsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListGroups(next_list_groups),
..
})) = next.frame()
{
base_list_groups
.groups
.extend(std::mem::take(&mut next_list_groups.groups));
}
}

Ok(())
}

fn combine_add_partitions_to_txn(
base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
drain: impl Iterator<Item = Message>,
Expand Down
32 changes: 30 additions & 2 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use kafka_protocol::messages::{
add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListOffsetsRequest,
OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListGroupsRequest,
ListOffsetsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -166,6 +166,34 @@ impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter {
}
}

pub struct ListGroupsSplitAndRouter;

impl RequestSplitAndRouter for ListGroupsSplitAndRouter {
type Request = ListGroupsRequest;
type SubRequests = ();

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_list_groups_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListGroups(request),
..
})) => Some(request),
_ => None,
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
// No need to reassemble, each ListGroups is an exact clone of the original
}
}

pub struct OffsetFetchSplitAndRouter;

impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
Expand Down
19 changes: 19 additions & 0 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,25 @@ impl KafkaAdminJava {
results
}

pub async fn list_groups(&self) -> Vec<String> {
let java_results = self
.admin
.call("listConsumerGroups", vec![])
.call_async("all", vec![])
.await;

let mut results = vec![];
for java_group in java_results.call("iterator", vec![]).into_iter() {
results.push(
java_group
.cast("org.apache.kafka.clients.admin.ConsumerGroupListing")
.call("groupId", vec![])
.into_rust(),
)
}
results
}

pub async fn create_acls(&self, acls: Vec<Acl>) {
let resource_type = self
.jvm
Expand Down
8 changes: 8 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,14 @@ impl KafkaAdmin {
}
}

pub async fn list_groups(&self) -> Vec<String> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => panic!("rdkafka-rs driver does not support list_offsets"),
Self::Java(java) => java.list_groups().await,
}
}

pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Expand Down

0 comments on commit 7dcfcf0

Please sign in to comment.