Skip to content

Commit

Permalink
KafkaSinkCluster: route DescribeTransactions requests
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 15, 2024
1 parent 819f88c commit 460d927
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 20 deletions.
27 changes: 22 additions & 5 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use test_helpers::{
ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer,
KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment,
NewTopic, OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
ResourceSpecifier, ResourceType, TopicPartition, TransactionDescription,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -1687,15 +1687,32 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder) {
}
}

async fn list_transactions(connection_builder: &KafkaConnectionBuilder) {
async fn list_and_describe_transactions(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
let _transaction_producer = connection_builder
let _transaction_producer1 = connection_builder
.connect_producer_with_transactions("some_transaction_id".to_owned())
.await;
let _transaction_producer2 = connection_builder
.connect_producer_with_transactions("some_transaction_id2".to_owned())
.await;

let actual_results = admin.list_transactions().await;
let expected_results = ["some_transaction_id".to_owned()];
let expected_results = [
"some_transaction_id".to_owned(),
"some_transaction_id2".to_owned(),
];
assert_eq!(actual_results, expected_results);

let result = admin
.describe_transactions(&["some_transaction_id", "some_transaction_id2"])
.await;
assert_eq!(
result,
HashMap::from([
("some_transaction_id".to_owned(), TransactionDescription {}),
("some_transaction_id2".to_owned(), TransactionDescription {}),
])
);
}

async fn create_and_list_partition_reassignments(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -1771,7 +1788,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
list_groups(connection_builder).await;
list_transactions(connection_builder).await;
list_and_describe_transactions(connection_builder).await;
create_and_list_partition_reassignments(connection_builder).await;
}
}
Expand Down
80 changes: 69 additions & 11 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ use kafka_protocol::messages::produce_response::{
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, DescribeProducersRequest, DescribeProducersResponse, EndTxnRequest,
FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId,
HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest,
ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
DeleteRecordsResponse, DescribeProducersRequest, DescribeProducersResponse,
DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -56,9 +57,9 @@ use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
DeleteRecordsRequestSplitAndRouter, DescribeProducersRequestSplitAndRouter,
ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter,
OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter,
ProduceRequestSplitAndRouter, RequestSplitAndRouter,
DescribeTransactionsSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter,
ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hasher;
Expand Down Expand Up @@ -770,6 +771,14 @@ impl KafkaSinkCluster {
})) => {
self.store_transaction(&mut transactions, transactional_id.clone());
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeTransactions(describe_transaction),
..
})) => {
for transactional_id in &describe_transaction.transactional_ids {
self.store_transaction(&mut transactions, transactional_id.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddPartitionsToTxn(add_partitions_to_txn_request),
header,
Expand Down Expand Up @@ -1090,6 +1099,10 @@ The connection to the client has been closed."
body: RequestBody::ListGroups(_),
..
})) => self.split_and_route_request::<ListGroupsSplitAndRouter>(request)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeTransactions(_),
..
})) => self.split_and_route_request::<DescribeTransactionsSplitAndRouter>(request)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListTransactions(_),
..
Expand Down Expand Up @@ -1560,6 +1573,29 @@ The connection to the client has been closed."
result
}

/// This method removes all transactions from the DescribeTransactions request and returns them split up by their destination.
/// If any transactions are unroutable they will have their BrokerId set to -1
fn split_describe_transactions_request_by_destination(
&mut self,
body: &mut DescribeTransactionsRequest,
) -> HashMap<BrokerId, Vec<TransactionalId>> {
let mut result: HashMap<BrokerId, Vec<TransactionalId>> = Default::default();

for transaction in body.transactional_ids.drain(..) {
if let Some(destination) = self.transaction_to_coordinator_broker.get(&transaction) {
let dest_transactions = result.entry(*destination).or_default();
dest_transactions.push(transaction);
} else {
tracing::warn!("no known coordinator for transactions {transaction:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_transactions = result.entry(destination).or_default();
dest_transactions.push(transaction);
}
}

result
}

/// Route broadcasted requests to all brokers split across all shotover nodes.
/// That is, each shotover node in a rack will deterministically be assigned a portion of the rack to route the request to.
/// If a shotover node is the only node in its rack it will route to all kafka brokers in the rack.
Expand Down Expand Up @@ -2243,6 +2279,10 @@ The connection to the client has been closed."
body: ResponseBody::ListGroups(base),
..
})) => Self::combine_list_groups(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeTransactions(base),
..
})) => Self::combine_describe_transactions(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListTransactions(base),
..
Expand Down Expand Up @@ -2572,6 +2612,24 @@ The connection to the client has been closed."
Ok(())
}

fn combine_describe_transactions(
base: &mut DescribeTransactionsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeTransactions(next),
..
})) = next.frame()
{
base.transaction_states
.extend(std::mem::take(&mut next.transaction_states));
}
}

Ok(())
}

fn combine_list_transactions(
base_list_transactions: &mut ListTransactionsResponse,
drain: impl Iterator<Item = Message>,
Expand Down
34 changes: 31 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use kafka_protocol::messages::{
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest,
DescribeProducersRequest, GroupId, ListGroupsRequest, ListOffsetsRequest,
ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest,
TopicName,
DescribeProducersRequest, DescribeTransactionsRequest, GroupId, ListGroupsRequest,
ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest,
ProduceRequest, TopicName, TransactionalId,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -281,6 +281,34 @@ impl RequestSplitAndRouter for ListTransactionsSplitAndRouter {
}
}

pub struct DescribeTransactionsSplitAndRouter;

impl RequestSplitAndRouter for DescribeTransactionsSplitAndRouter {
type Request = DescribeTransactionsRequest;
type SubRequests = Vec<TransactionalId>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_describe_transactions_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeTransactions(request),
..
})) => request,
_ => unreachable!(),
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.transactional_ids = item;
}
}

pub struct OffsetFetchSplitAndRouter;

impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
Expand Down
24 changes: 23 additions & 1 deletion test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata,
OffsetSpec, PartitionReassignment, ProduceResult, ProducerState, Record, RecordsToDelete,
ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition,
TopicPartitionInfo,
TopicPartitionInfo, TransactionDescription,
};
use crate::connection::java::{map_iterator, Jvm, Value};
use anyhow::Result;
Expand Down Expand Up @@ -768,6 +768,28 @@ impl KafkaAdminJava {
results
}

pub async fn desscribe_transactions(
&self,
transaction_ids: &[&str],
) -> HashMap<String, TransactionDescription> {
let transaction_ids = transaction_ids
.iter()
.map(|x| self.jvm.new_string(x))
.collect();
let transaction_ids = self.jvm.new_list("java.lang.String", transaction_ids);
let java_results = self
.admin
.call("describeTransactions", vec![transaction_ids])
.call_async("all", vec![])
.await;

map_iterator(java_results)
.map(|(transaction_id, _transaction_description)| {
(transaction_id.into_rust(), TransactionDescription {})
})
.collect()
}

pub async fn list_consumer_group_offsets(
&self,
group_id: String,
Expand Down
14 changes: 14 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,17 @@ impl KafkaAdmin {
}
}

pub async fn describe_transactions(
&self,
transaction_ids: &[&str],
) -> HashMap<String, TransactionDescription> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => panic!("rdkafka-rs driver does not support desscribe_transactions"),
Self::Java(java) => java.desscribe_transactions(transaction_ids).await,
}
}

pub async fn list_consumer_group_offsets(
&self,
group_id: String,
Expand Down Expand Up @@ -750,6 +761,9 @@ pub struct RecordsToDelete {
pub delete_before_offset: i64,
}

#[derive(PartialEq, Debug)]
pub struct TransactionDescription {}

#[derive(PartialEq, Debug)]
pub struct PartitionReassignment {
pub adding_replica_broker_ids: Vec<i32>,
Expand Down

0 comments on commit 460d927

Please sign in to comment.