Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaSinkCluster: route ListGroups request #1790

Merged
merged 2 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ async fn cluster_1_rack_multi_shotover_with_1_shotover_down(#[case] driver: Kafk

// create a new connection and produce and consume messages
let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193");
test_cases::cluster_test_suite(&new_connection_builder).await;
test_cases::cluster_test_suite_with_lost_shotover_node(&new_connection_builder).await;

let mut expected_events = multi_shotover_events();
// Other shotover nodes should detect the killed node at least once
Expand Down Expand Up @@ -485,7 +485,7 @@ async fn cluster_3_racks_multi_shotover_with_2_shotover_down(#[case] driver: Kaf

// create a new connection and produce and consume messages
let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193");
test_cases::cluster_test_suite(&new_connection_builder).await;
test_cases::cluster_test_suite_with_lost_shotover_node(&new_connection_builder).await;

justinweng-instaclustr marked this conversation as resolved.
Show resolved Hide resolved
let mut expected_events = multi_shotover_events();
// The UP shotover node should detect the killed nodes at least once
Expand Down Expand Up @@ -537,7 +537,7 @@ async fn cluster_3_racks_multi_shotover_with_1_shotover_missing(#[case] driver:

// Send some produce and consume requests
let connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9192");
test_cases::cluster_test_suite(&connection_builder).await;
test_cases::cluster_test_suite_with_lost_shotover_node(&connection_builder).await;

let mut expected_events = multi_shotover_events();
// Other shotover nodes should detect the missing node at least once
Expand Down
154 changes: 101 additions & 53 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, IsolationLevel, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver,
KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata,
OffsetSpec, Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer,
KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic,
OffsetAndMetadata, OffsetSpec, Record, ResourcePatternType, ResourceSpecifier,
ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};
Expand All @@ -25,11 +26,6 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "partitions3",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_case1",
num_partitions: 3,
Expand Down Expand Up @@ -1330,7 +1326,7 @@ async fn test_produce_consume_10_times(producer: &mut KafkaProducer, consumer: &
}
}

pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
async fn standard_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
admin_setup(connection_builder).await;
produce_consume_partitions1(connection_builder, "partitions1").await;
produce_consume_partitions1(connection_builder, "unknown_topic").await;
Expand Down Expand Up @@ -1367,78 +1363,104 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
.await;
produce_consume_partitions1(connection_builder, "partitions1").await;

let results = admin
.list_offsets(HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
OffsetSpec::Latest,
),
]))
.await;
list_offsets(&admin).await;
}

produce_consume_acks0(connection_builder).await;
admin_cleanup(connection_builder).await;
}

let expected = HashMap::from([
async fn list_offsets(admin: &KafkaAdmin) {
let results = admin
.list_offsets(HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 0 },
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
ListOffsetsResultInfo { offset: 0 },
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
ListOffsetsResultInfo { offset: 0 },
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 11 },
OffsetSpec::Latest,
),
]);
assert_eq!(results, expected);
}
]))
.await;

produce_consume_acks0(connection_builder).await;
admin_cleanup(connection_builder).await;
let expected = HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 11 },
),
]);
assert_eq!(results, expected);
}

pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
standard_test_suite(connection_builder).await;
async fn list_groups(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()])
.with_group("list_groups_test"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: "partitions1".to_owned(),
offset: Some(0),
})
.await;

let actual_results = admin.list_groups().await;
if !actual_results.contains(&"list_groups_test".to_owned()) {
panic!("Expected to find list_groups_test in {actual_results:?} but was misisng")
}
}

async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics_and_wait(&[
Expand All @@ -1458,6 +1480,32 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_partitions3(connection_builder, "partitions3_rf3", 1, 500).await;
}

pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnectionBuilder) {
// rdkafka-rs doesnt support these methods
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
list_groups(connection_builder).await;
}
}

pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
standard_test_suite_base(connection_builder).await;
tests_requiring_all_shotover_nodes(connection_builder).await;
}

pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
standard_test_suite_base(connection_builder).await;
cluster_test_suite_base(connection_builder).await;
tests_requiring_all_shotover_nodes(connection_builder).await;
}

pub async fn cluster_test_suite_with_lost_shotover_node(
connection_builder: &KafkaConnectionBuilder,
) {
standard_test_suite_base(connection_builder).await;
cluster_test_suite_base(connection_builder).await;
}

pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) {
let admin = connection.connect_admin().await;
admin
Expand Down
54 changes: 51 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest,
ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest,
ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
Expand All @@ -48,7 +48,7 @@ use scram_over_mtls::{
use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter,
ListOffsetsRequestSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
Expand Down Expand Up @@ -986,6 +986,11 @@ impl KafkaSinkCluster {
..
})) => self.route_to_controller(request),

Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListGroups(_),
..
})) => self.split_and_route_request::<ListGroupsSplitAndRouter>(request)?,

// route to random broker
Some(Frame::Kafka(KafkaFrame::Request {
body:
Expand Down Expand Up @@ -1400,6 +1405,26 @@ impl KafkaSinkCluster {
result
}

fn split_request_by_routing_to_all_brokers(&mut self) -> HashMap<BrokerId, ()> {
let mut result: HashMap<BrokerId, ()> = Default::default();

for broker in self.nodes.iter().filter(|node| {
node.is_up()
&& node
.rack
.as_ref()
.map(|rack| rack == &self.rack)
// If the cluster is not using racks, include all brokers in the list.
// This ensure we get full coverage of the cluster.
// The client driver can filter out the resulting duplicates.
.unwrap_or(true)
}) {
result.insert(broker.broker_id, ());
}

result
}

/// This method removes all groups from the OffsetFetch request and returns them split up by their destination.
/// If any groups are unroutable they will have their BrokerId set to -1
fn split_offset_fetch_request_by_destination(
Expand Down Expand Up @@ -1973,6 +1998,10 @@ impl KafkaSinkCluster {
body: ResponseBody::OffsetFetch(base),
..
})) => Self::combine_offset_fetch(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListGroups(base),
..
})) => Self::combine_list_groups(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(base),
version,
Expand Down Expand Up @@ -2193,6 +2222,25 @@ impl KafkaSinkCluster {
Ok(())
}

fn combine_list_groups(
base_list_groups: &mut ListGroupsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListGroups(next_list_groups),
..
})) = next.frame()
{
base_list_groups
.groups
.extend(std::mem::take(&mut next_list_groups.groups));
}
}

Ok(())
}

fn combine_add_partitions_to_txn(
base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
drain: impl Iterator<Item = Message>,
Expand Down
32 changes: 30 additions & 2 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use kafka_protocol::messages::{
add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListOffsetsRequest,
OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListGroupsRequest,
ListOffsetsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -166,6 +166,34 @@ impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter {
}
}

pub struct ListGroupsSplitAndRouter;

impl RequestSplitAndRouter for ListGroupsSplitAndRouter {
type Request = ListGroupsRequest;
type SubRequests = ();

fn split_by_destination(
transform: &mut KafkaSinkCluster,
_request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_request_by_routing_to_all_brokers()
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListGroups(request),
..
})) => Some(request),
_ => None,
}
}

fn reassemble(_request: &mut Self::Request, _item: Self::SubRequests) {
// No need to reassemble, each ListGroups is an exact clone of the original
}
}

pub struct OffsetFetchSplitAndRouter;

impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
Expand Down
Loading
Loading