Skip to content

Commit

Permalink
KafkaSinkCluster: Fix DeleteRecords routing
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 11, 2024
1 parent 38c0793 commit 433cd1d
Show file tree
Hide file tree
Showing 4 changed files with 216 additions and 18 deletions.
66 changes: 64 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;

admin.delete_groups(&["some_group", "some_group1"]).await;
delete_records(&admin, connection_builder).await;
delete_records_partitions1(&admin, connection_builder).await;
delete_records_partitions3(&admin, connection_builder).await;
}

async fn delete_offsets(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -181,7 +182,10 @@ async fn delete_offsets(connection_builder: &KafkaConnectionBuilder) {
}
}

async fn delete_records(admin: &KafkaAdmin, connection_builder: &KafkaConnectionBuilder) {
async fn delete_records_partitions1(
admin: &KafkaAdmin,
connection_builder: &KafkaConnectionBuilder,
) {
// Only supported by java driver
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
Expand Down Expand Up @@ -225,6 +229,64 @@ async fn delete_records(admin: &KafkaAdmin, connection_builder: &KafkaConnection
}
}

async fn delete_records_partitions3(
admin: &KafkaAdmin,
connection_builder: &KafkaConnectionBuilder,
) {
// Only supported by java driver
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
// assert partition contains a record
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions3_case1".to_owned()])
.with_group("test_delete_records"),
)
.await;

// assert that a record exists, due to cross partition ordering we dont know what the record is, just that it exists.
consumer.consume(Duration::from_secs(30)).await;

// delete all records in the partition
admin
.delete_records(&[
RecordsToDelete {
topic_partition: TopicPartition {
topic_name: "partitions3_case1".to_owned(),
partition: 0,
},
delete_before_offset: -1,
},
RecordsToDelete {
topic_partition: TopicPartition {
topic_name: "partitions3_case1".to_owned(),
partition: 1,
},
delete_before_offset: -1,
},
RecordsToDelete {
topic_partition: TopicPartition {
topic_name: "partitions3_case1".to_owned(),
partition: 2,
},
delete_before_offset: -1,
},
])
.await;

// assert partition no longer contains a record
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions3_case1".to_owned()])
.with_group("test_delete_records2"),
)
.await;
consumer
.assert_no_consume_within_timeout(Duration::from_secs(2))
.await;
}
}

/// Attempt to make the driver batch produce requests for different topics into the same request
/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests.
pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnectionBuilder) {
Expand Down
129 changes: 118 additions & 11 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use connections::{Connections, Destination};
use dashmap::DashMap;
use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState};
use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction;
use kafka_protocol::messages::delete_records_request::DeleteRecordsTopic;
use kafka_protocol::messages::delete_records_response::DeleteRecordsTopicResult;
use kafka_protocol::messages::fetch_request::FetchTopic;
use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch;
use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
Expand All @@ -27,14 +29,14 @@ use kafka_protocol::messages::produce_response::{
};
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest,
FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest,
LeaveGroupRequest, ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse,
ListTransactionsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest,
ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -49,8 +51,9 @@ use scram_over_mtls::{
use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter,
ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
DeleteRecordsRequestSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter,
ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
Expand Down Expand Up @@ -885,6 +888,12 @@ impl KafkaSinkCluster {
})) => self.split_and_route_request::<OffsetForLeaderEpochRequestSplitAndRouter>(
request,
)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteRecords(_),
..
})) => {
self.split_and_route_request::<DeleteRecordsRequestSplitAndRouter>(request)?
}

// route to group coordinator
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -1055,7 +1064,6 @@ The connection to the client has been closed."
| RequestBody::AlterConfigs(_)
| RequestBody::CreatePartitions(_)
| RequestBody::DeleteTopics(_)
| RequestBody::DeleteRecords(_)
| RequestBody::CreateAcls(_)
| RequestBody::ApiVersions(_),
..
Expand Down Expand Up @@ -1587,6 +1595,58 @@ The connection to the client has been closed."
result
}

/// This method removes all topics from the DeleteRecords request and returns them split up by their destination.
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_delete_records_request_by_destination(
&mut self,
body: &mut DeleteRecordsRequest,
) -> HashMap<BrokerId, Vec<DeleteRecordsTopic>> {
let mut result: HashMap<BrokerId, Vec<DeleteRecordsTopic>> = Default::default();

for mut topic in body.topics.drain(..) {
let topic_name = &topic.name;
if let Some(topic_meta) = self.topic_by_name.get(topic_name) {
for partition in std::mem::take(&mut topic.partitions) {
let partition_index = partition.partition_index as usize;
let destination = if let Some(partition) =
topic_meta.partitions.get(partition_index)
{
if partition.leader_id == -1 {
tracing::warn!(
"leader_id is unknown for {topic_name:?} at partition index {partition_index}",
);
}
partition.leader_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
"Routing DeleteRecords request portion of partition {partition_index} in {topic_name:?} to broker {}",
destination.0
);
let dest_topics = result.entry(destination).or_default();
if let Some(dest_topic) = dest_topics.iter_mut().find(|x| x.name == topic.name)
{
dest_topic.partitions.push(partition);
} else {
let mut topic = topic.clone();
topic.partitions.push(partition);
dest_topics.push(topic);
}
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_topics = result.entry(destination).or_default();
dest_topics.push(topic);
}
}

result
}

/// This method removes all transactions from the AddPartitionsToTxn request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_add_partition_to_txn_request_by_destination(
Expand Down Expand Up @@ -2077,6 +2137,10 @@ The connection to the client has been closed."
body: ResponseBody::DeleteGroups(base),
..
})) => Self::combine_delete_groups_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DeleteRecords(base),
..
})) => Self::combine_delete_records(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetFetch(base),
..
Expand Down Expand Up @@ -2271,6 +2335,49 @@ The connection to the client has been closed."
Ok(())
}

fn combine_delete_records(
base_body: &mut DeleteRecordsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
let mut base_topics: HashMap<TopicName, DeleteRecordsTopicResult> =
std::mem::take(&mut base_body.topics)
.into_iter()
.map(|response| (response.name.clone(), response))
.collect();
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DeleteRecords(next_body),
..
})) = next.frame()
{
for next_response in std::mem::take(&mut next_body.topics) {
if let Some(base_response) = base_topics.get_mut(&next_response.name) {
for next_partition in &next_response.partitions {
for base_partition in &base_response.partitions {
if next_partition.partition_index == base_partition.partition_index
{
tracing::warn!("Duplicate partition indexes in combined DeleteRecords response, if this ever occurs we should investigate the repercussions")
}
}
}
// A partition can only be contained in one response so there is no risk of duplicating partitions
base_response.partitions.extend(next_response.partitions)
} else {
base_topics.insert(next_response.name.clone(), next_response);
}
}
} else {
return Err(anyhow!(
"Combining DeleteRecords responses but received another message type"
));
}
}

base_body.topics.extend(base_topics.into_values());

Ok(())
}

fn combine_delete_groups_responses(
base_delete_groups: &mut DeleteGroupsResponse,
drain: impl Iterator<Item = Message>,
Expand Down
37 changes: 33 additions & 4 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@ use crate::{
};
use kafka_protocol::messages::{
add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
delete_records_request::DeleteRecordsTopic, list_offsets_request::ListOffsetsTopic,
offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListGroupsRequest,
ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest,
ProduceRequest, TopicName,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest, GroupId,
ListGroupsRequest, ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest,
OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -139,6 +140,34 @@ impl RequestSplitAndRouter for OffsetForLeaderEpochRequestSplitAndRouter {
}
}

pub struct DeleteRecordsRequestSplitAndRouter;

impl RequestSplitAndRouter for DeleteRecordsRequestSplitAndRouter {
type Request = DeleteRecordsRequest;
type SubRequests = Vec<DeleteRecordsTopic>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_delete_records_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteRecords(request),
..
})) => Some(request),
_ => None,
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.topics = item;
}
}

pub struct DeleteGroupsSplitAndRouter;

impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter {
Expand Down
2 changes: 1 addition & 1 deletion test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl KafkaConsumer {
}
}

async fn consume(&mut self, timeout: Duration) -> ExpectedResponse {
pub async fn consume(&mut self, timeout: Duration) -> ExpectedResponse {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(cpp) => cpp.consume(timeout).await,
Expand Down

0 comments on commit 433cd1d

Please sign in to comment.