Skip to content

Commit

Permalink
KafkaSinkCluster: Implement routing for AddOffsetsToTxn
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 15, 2024
1 parent de0fca3 commit b6da02f
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 36 deletions.
162 changes: 144 additions & 18 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer,
ListOffsetsResultInfo, NewPartition, NewTopic, OffsetSpec, Record, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
ExpectedResponse, IsolationLevel, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver,
KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata,
OffsetSpec, Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -75,12 +75,22 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
replication_factor: 1,
},
NewTopic {
name: "transaction_topic1",
name: "transaction_topic1_in",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "transaction_topic2",
name: "transaction_topic1_out",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "transaction_topic2_in",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "transaction_topic2_out",
num_partitions: 3,
replication_factor: 1,
},
Expand Down Expand Up @@ -745,14 +755,14 @@ async fn produce_consume_partitions3(
}
}

async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilder) {
async fn produce_consume_transactions_without_commits(connection_builder: &KafkaConnectionBuilder) {
let producer = connection_builder.connect_producer("1", 0).await;
for i in 0..5 {
producer
.assert_produce(
Record {
payload: &format!("Message1_{i}"),
topic_name: "transaction_topic1",
topic_name: "transaction_topic1_in",
key: Some("Key".into()),
},
Some(i * 2),
Expand All @@ -762,7 +772,7 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde
.assert_produce(
Record {
payload: &format!("Message2_{i}"),
topic_name: "transaction_topic1",
topic_name: "transaction_topic1_in",
key: Some("Key".into()),
},
Some(i * 2 + 1),
Expand All @@ -775,13 +785,14 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde
.await;
let mut consumer_topic1 = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transaction_topic1".to_owned())
ConsumerConfig::consume_from_topic("transaction_topic1_in".to_owned())
.with_group("some_group1"),
)
.await;
let mut consumer_topic2 = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transaction_topic2".to_owned())
ConsumerConfig::consume_from_topic("transaction_topic1_out".to_owned())
.with_isolation_level(IsolationLevel::ReadCommitted)
.with_group("some_group2"),
)
.await;
Expand All @@ -791,15 +802,15 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde
.assert_consume(ExpectedResponse {
message: format!("Message1_{i}"),
key: Some("Key".to_owned()),
topic_name: "transaction_topic1".to_owned(),
topic_name: "transaction_topic1_in".to_owned(),
offset: Some(i * 2),
})
.await;
consumer_topic1
.assert_consume(ExpectedResponse {
message: format!("Message2_{i}"),
key: Some("Key".into()),
topic_name: "transaction_topic1".to_owned(),
topic_name: "transaction_topic1_in".to_owned(),
offset: Some(i * 2 + 1),
})
.await;
Expand All @@ -809,7 +820,7 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde
.assert_produce(
Record {
payload: &format!("Message1_{i}"),
topic_name: "transaction_topic2",
topic_name: "transaction_topic1_out",
key: Some("Key".into()),
},
// Not sure where the extra offset per loop comes from
Expand All @@ -821,28 +832,142 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde
.assert_produce(
Record {
payload: &format!("Message2_{i}"),
topic_name: "transaction_topic2",
topic_name: "transaction_topic1_out",
key: Some("Key".into()),
},
Some(i * 3 + 1),
)
.await;

transaction_producer.send_offsets_to_transaction(&consumer_topic1);
transaction_producer.send_offsets_to_transaction(&consumer_topic1, Default::default());
transaction_producer.commit_transaction();

// TODO: how are we reading uncommitted records?
consumer_topic2
.assert_consume_in_any_order(vec![
ExpectedResponse {
message: format!("Message1_{i}"),
key: Some("Key".to_owned()),
topic_name: "transaction_topic1_out".to_owned(),
offset: Some(i * 3),
},
ExpectedResponse {
message: format!("Message2_{i}"),
key: Some("Key".to_owned()),
topic_name: "transaction_topic1_out".to_owned(),
offset: Some(i * 3 + 1),
},
])
.await;
}
}

async fn produce_consume_transactions_with_commits(connection_builder: &KafkaConnectionBuilder) {
let producer = connection_builder.connect_producer("1", 0).await;
for i in 0..5 {
producer
.assert_produce(
Record {
payload: &format!("Message1_{i}"),
topic_name: "transaction_topic2_in",
key: Some("Key".into()),
},
Some(i * 2),
)
.await;
producer
.assert_produce(
Record {
payload: &format!("Message2_{i}"),
topic_name: "transaction_topic2_in",
key: Some("Key".into()),
},
Some(i * 2 + 1),
)
.await;
}

let transaction_producer = connection_builder
.connect_producer_with_transactions("some_transaction_id".to_owned())
.await;
let mut consumer_topic1 = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transaction_topic2_in".to_owned())
.with_group("some_group1"),
)
.await;
let mut consumer_topic2 = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topic("transaction_topic2_out".to_owned())
.with_group("some_group2"),
)
.await;

for i in 0..5 {
consumer_topic1
.assert_consume(ExpectedResponse {
message: format!("Message1_{i}"),
key: Some("Key".to_owned()),
topic_name: "transaction_topic2_in".to_owned(),
offset: Some(i * 2),
})
.await;
consumer_topic1
.assert_consume(ExpectedResponse {
message: format!("Message2_{i}"),
key: Some("Key".into()),
topic_name: "transaction_topic2_in".to_owned(),
offset: Some(i * 2 + 1),
})
.await;
transaction_producer.begin_transaction();

transaction_producer
.assert_produce(
Record {
payload: &format!("Message1_{i}"),
topic_name: "transaction_topic2_out",
key: Some("Key".into()),
},
// Not sure where the extra offset per loop comes from
// Possibly the transaction commit counts as a record
Some(i * 3),
)
.await;
transaction_producer
.assert_produce(
Record {
payload: &format!("Message2_{i}"),
topic_name: "transaction_topic2_out",
key: Some("Key".into()),
},
Some(i * 3 + 1),
)
.await;

let offsets = HashMap::from([(
// TODO: get partition + offset from produce calls so we can put the correct values here
TopicPartition {
topic_name: "transaction_topic2_out".to_owned(),
partition: 0,
},
OffsetAndMetadata { offset: 1 },
)]);
transaction_producer.send_offsets_to_transaction(&consumer_topic1, offsets);
transaction_producer.commit_transaction();

consumer_topic2
.assert_consume_in_any_order(vec![
ExpectedResponse {
message: format!("Message1_{i}"),
key: Some("Key".to_owned()),
topic_name: "transaction_topic2".to_owned(),
topic_name: "transaction_topic2_out".to_owned(),
offset: Some(i * 3),
},
ExpectedResponse {
message: format!("Message2_{i}"),
key: Some("Key".to_owned()),
topic_name: "transaction_topic2".to_owned(),
topic_name: "transaction_topic2_out".to_owned(),
offset: Some(i * 3 + 1),
},
])
Expand Down Expand Up @@ -959,7 +1084,8 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
// set the bytes limit to 1MB so that we will not reach it and will hit the 100ms timeout every time.
produce_consume_partitions3(connection_builder, "partitions3_case4", 1_000_000, 100).await;

produce_consume_transactions(connection_builder).await;
produce_consume_transactions_with_commits(connection_builder).await;
produce_consume_transactions_without_commits(connection_builder).await;

// Only run this test case on the java driver,
// since even without going through shotover the cpp driver fails this test.
Expand Down
54 changes: 47 additions & 7 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ use kafka_protocol::messages::metadata_response::MetadataResponseBroker;
use kafka_protocol::messages::produce_request::TopicProduceData;
use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch;
use kafka_protocol::messages::{
AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, EndTxnRequest,
FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId,
HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest,
ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, ProduceRequest,
ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId,
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest,
FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest,
LeaveGroupRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse,
ProduceRequest, ProduceResponse, RequestHeader, SaslAuthenticateRequest,
SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId,
TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand Down Expand Up @@ -678,7 +679,8 @@ impl KafkaSinkCluster {
RequestBody::Heartbeat(HeartbeatRequest { group_id, .. })
| RequestBody::SyncGroup(SyncGroupRequest { group_id, .. })
| RequestBody::JoinGroup(JoinGroupRequest { group_id, .. })
| RequestBody::LeaveGroup(LeaveGroupRequest { group_id, .. }),
| RequestBody::LeaveGroup(LeaveGroupRequest { group_id, .. })
| RequestBody::TxnOffsetCommit(TxnOffsetCommitRequest { group_id, .. }),
..
})) => {
self.store_group(&mut groups, group_id.clone());
Expand All @@ -691,6 +693,9 @@ impl KafkaSinkCluster {
})
| RequestBody::EndTxn(EndTxnRequest {
transactional_id, ..
})
| RequestBody::AddOffsetsToTxn(AddOffsetsToTxnRequest {
transactional_id, ..
}),
..
})) => {
Expand Down Expand Up @@ -893,6 +898,14 @@ impl KafkaSinkCluster {
let group_id = groups.groups_names.first().unwrap().clone();
self.route_to_group_coordinator(message, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::TxnOffsetCommit(txn_offset_commit),
..
})) => {
let group_id = txn_offset_commit.group_id.clone();
// Despite being a transaction request this request is routed by group_id
self.route_to_group_coordinator(message, group_id);
}

// route to transaction coordinator
Some(Frame::Kafka(KafkaFrame::Request {
Expand All @@ -916,6 +929,13 @@ impl KafkaSinkCluster {
body: RequestBody::AddPartitionsToTxn(_),
..
})) => self.route_add_partitions_to_txn(message)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::AddOffsetsToTxn(add_offsets_to_txn),
..
})) => {
let transaction_id = add_offsets_to_txn.transactional_id.clone();
self.route_to_transaction_coordinator(message, transaction_id);
}

Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::FindCoordinator(_),
Expand Down Expand Up @@ -2114,6 +2134,26 @@ impl KafkaSinkCluster {
})) => {
self.handle_transaction_coordinator_routing_error(&request_ty, end_txn.error_code)
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddOffsetsToTxn(add_offsets_to_txn),
..
})) => self.handle_transaction_coordinator_routing_error(
&request_ty,
add_offsets_to_txn.error_code,
),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::TxnOffsetCommit(txn_offset_commit),
..
})) => {
for topic in &txn_offset_commit.topics {
for partition in &topic.partitions {
self.handle_group_coordinator_routing_error(
&request_ty,
partition.error_code,
);
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::InitProducerId(init_producer_id),
..
Expand Down
Loading

0 comments on commit b6da02f

Please sign in to comment.