Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaSinkCluster: route DescribeTransactions requests #1818

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use test_helpers::{
ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer,
KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment,
NewTopic, OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
ResourceSpecifier, ResourceType, TopicPartition, TransactionDescription,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -1687,15 +1687,29 @@ async fn list_groups(connection_builder: &KafkaConnectionBuilder) {
}
}

async fn list_transactions(connection_builder: &KafkaConnectionBuilder) {
async fn list_and_describe_transactions(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
let _transaction_producer = connection_builder
.connect_producer_with_transactions("some_transaction_id".to_owned())
let _transaction_producer1 = connection_builder
.connect_producer_with_transactions("some_transaction_id1".to_owned())
.await;
let _transaction_producer2 = connection_builder
.connect_producer_with_transactions("some_transaction_id2".to_owned())
.await;

let actual_results = admin.list_transactions().await;
let expected_results = ["some_transaction_id".to_owned()];
assert_eq!(actual_results, expected_results);
assert!(actual_results.contains(&"some_transaction_id1".to_owned()));
assert!(actual_results.contains(&"some_transaction_id2".to_owned()));

let result = admin
.describe_transactions(&["some_transaction_id1", "some_transaction_id2"])
.await;
assert_eq!(
result,
HashMap::from([
("some_transaction_id1".to_owned(), TransactionDescription {}),
("some_transaction_id2".to_owned(), TransactionDescription {}),
])
);
}

async fn create_and_list_partition_reassignments(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -1771,7 +1785,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
list_groups(connection_builder).await;
list_transactions(connection_builder).await;
list_and_describe_transactions(connection_builder).await;
create_and_list_partition_reassignments(connection_builder).await;
}
}
Expand Down
80 changes: 69 additions & 11 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ use kafka_protocol::messages::produce_response::{
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, DescribeProducersRequest, DescribeProducersResponse, EndTxnRequest,
FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId,
HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest,
ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
DeleteRecordsResponse, DescribeProducersRequest, DescribeProducersResponse,
DescribeTransactionsRequest, DescribeTransactionsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -56,9 +57,9 @@ use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
DeleteRecordsRequestSplitAndRouter, DescribeProducersRequestSplitAndRouter,
ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter,
OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter,
ProduceRequestSplitAndRouter, RequestSplitAndRouter,
DescribeTransactionsSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter,
ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hasher;
Expand Down Expand Up @@ -770,6 +771,14 @@ impl KafkaSinkCluster {
})) => {
self.store_transaction(&mut transactions, transactional_id.clone());
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeTransactions(describe_transaction),
..
})) => {
for transactional_id in &describe_transaction.transactional_ids {
self.store_transaction(&mut transactions, transactional_id.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddPartitionsToTxn(add_partitions_to_txn_request),
header,
Expand Down Expand Up @@ -1090,6 +1099,10 @@ The connection to the client has been closed."
body: RequestBody::ListGroups(_),
..
})) => self.split_and_route_request::<ListGroupsSplitAndRouter>(request)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeTransactions(_),
..
})) => self.split_and_route_request::<DescribeTransactionsSplitAndRouter>(request)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListTransactions(_),
..
Expand Down Expand Up @@ -1560,6 +1573,29 @@ The connection to the client has been closed."
result
}

/// This method removes all transactions from the DescribeTransactions request and returns them split up by their destination.
/// If any transactions are unroutable they will have their BrokerId set to -1
fn split_describe_transactions_request_by_destination(
&mut self,
body: &mut DescribeTransactionsRequest,
) -> HashMap<BrokerId, Vec<TransactionalId>> {
let mut result: HashMap<BrokerId, Vec<TransactionalId>> = Default::default();

for transaction in body.transactional_ids.drain(..) {
if let Some(destination) = self.transaction_to_coordinator_broker.get(&transaction) {
let dest_transactions = result.entry(*destination).or_default();
dest_transactions.push(transaction);
} else {
tracing::warn!("no known coordinator for transactions {transaction:?}, routing request to a random broker so that a NOT_COORDINATOR or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_transactions = result.entry(destination).or_default();
dest_transactions.push(transaction);
}
}

result
}

/// Route broadcasted requests to all brokers split across all shotover nodes.
/// That is, each shotover node in a rack will deterministically be assigned a portion of the rack to route the request to.
/// If a shotover node is the only node in its rack it will route to all kafka brokers in the rack.
Expand Down Expand Up @@ -2243,6 +2279,10 @@ The connection to the client has been closed."
body: ResponseBody::ListGroups(base),
..
})) => Self::combine_list_groups(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeTransactions(base),
..
})) => Self::combine_describe_transactions(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListTransactions(base),
..
Expand Down Expand Up @@ -2572,6 +2612,24 @@ The connection to the client has been closed."
Ok(())
}

fn combine_describe_transactions(
base: &mut DescribeTransactionsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeTransactions(next),
..
})) = next.frame()
{
base.transaction_states
.extend(std::mem::take(&mut next.transaction_states));
}
}

Ok(())
}

fn combine_list_transactions(
base_list_transactions: &mut ListTransactionsResponse,
drain: impl Iterator<Item = Message>,
Expand Down
34 changes: 31 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use kafka_protocol::messages::{
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest,
DescribeProducersRequest, GroupId, ListGroupsRequest, ListOffsetsRequest,
ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest,
TopicName,
DescribeProducersRequest, DescribeTransactionsRequest, GroupId, ListGroupsRequest,
ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest,
ProduceRequest, TopicName, TransactionalId,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -281,6 +281,34 @@ impl RequestSplitAndRouter for ListTransactionsSplitAndRouter {
}
}

pub struct DescribeTransactionsSplitAndRouter;

impl RequestSplitAndRouter for DescribeTransactionsSplitAndRouter {
type Request = DescribeTransactionsRequest;
type SubRequests = Vec<TransactionalId>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_describe_transactions_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> &mut Self::Request {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DescribeTransactions(request),
..
})) => request,
_ => unreachable!(),
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.transactional_ids = item;
}
}

pub struct OffsetFetchSplitAndRouter;

impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
Expand Down
24 changes: 23 additions & 1 deletion test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{
ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata,
OffsetSpec, PartitionReassignment, ProduceResult, ProducerState, Record, RecordsToDelete,
ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition,
TopicPartitionInfo,
TopicPartitionInfo, TransactionDescription,
};
use crate::connection::java::{map_iterator, Jvm, Value};
use anyhow::Result;
Expand Down Expand Up @@ -768,6 +768,28 @@ impl KafkaAdminJava {
results
}

pub async fn desscribe_transactions(
&self,
transaction_ids: &[&str],
) -> HashMap<String, TransactionDescription> {
let transaction_ids = transaction_ids
.iter()
.map(|x| self.jvm.new_string(x))
.collect();
let transaction_ids = self.jvm.new_list("java.lang.String", transaction_ids);
let java_results = self
.admin
.call("describeTransactions", vec![transaction_ids])
.call_async("all", vec![])
.await;

map_iterator(java_results)
.map(|(transaction_id, _transaction_description)| {
(transaction_id.into_rust(), TransactionDescription {})
})
.collect()
}

pub async fn list_consumer_group_offsets(
&self,
group_id: String,
Expand Down
14 changes: 14 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,17 @@ impl KafkaAdmin {
}
}

pub async fn describe_transactions(
&self,
transaction_ids: &[&str],
) -> HashMap<String, TransactionDescription> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => panic!("rdkafka-rs driver does not support desscribe_transactions"),
Self::Java(java) => java.desscribe_transactions(transaction_ids).await,
}
}

pub async fn list_consumer_group_offsets(
&self,
group_id: String,
Expand Down Expand Up @@ -750,6 +761,9 @@ pub struct RecordsToDelete {
pub delete_before_offset: i64,
}

#[derive(PartialEq, Debug)]
pub struct TransactionDescription {}

#[derive(PartialEq, Debug)]
pub struct PartitionReassignment {
pub adding_replica_broker_ids: Vec<i32>,
Expand Down
Loading