From b6da02fa5cdc152be4d78acda4317f4f55b506d7 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 15 Oct 2024 16:18:02 +1100 Subject: [PATCH] KafkaSinkCluster: Implement routing for AddOffsetsToTxn --- .../tests/kafka_int_tests/test_cases.rs | 162 ++++++++++++++++-- .../src/transforms/kafka/sink_cluster/mod.rs | 54 +++++- test-helpers/src/connection/kafka/cpp.rs | 22 ++- test-helpers/src/connection/kafka/java.rs | 29 +++- test-helpers/src/connection/kafka/mod.rs | 38 +++- 5 files changed, 269 insertions(+), 36 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 3b20e57d2..20101fcd3 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -3,9 +3,9 @@ use std::{collections::HashMap, time::Duration}; use test_helpers::{ connection::kafka::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig, - ExpectedResponse, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, - ListOffsetsResultInfo, NewPartition, NewTopic, OffsetSpec, Record, ResourcePatternType, - ResourceSpecifier, ResourceType, TopicPartition, + ExpectedResponse, IsolationLevel, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, + KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata, + OffsetSpec, Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition, }, docker_compose::DockerCompose, }; @@ -75,12 +75,22 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { replication_factor: 1, }, NewTopic { - name: "transaction_topic1", + name: "transaction_topic1_in", num_partitions: 3, replication_factor: 1, }, NewTopic { - name: "transaction_topic2", + name: "transaction_topic1_out", + num_partitions: 3, + replication_factor: 1, + }, + NewTopic { + name: "transaction_topic2_in", + num_partitions: 3, + replication_factor: 1, + }, + NewTopic { + name: "transaction_topic2_out", num_partitions: 3, replication_factor: 1, }, @@ -745,14 +755,14 @@ async fn produce_consume_partitions3( } } -async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilder) { +async fn produce_consume_transactions_without_commits(connection_builder: &KafkaConnectionBuilder) { let producer = connection_builder.connect_producer("1", 0).await; for i in 0..5 { producer .assert_produce( Record { payload: &format!("Message1_{i}"), - topic_name: "transaction_topic1", + topic_name: "transaction_topic1_in", key: Some("Key".into()), }, Some(i * 2), @@ -762,7 +772,7 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .assert_produce( Record { payload: &format!("Message2_{i}"), - topic_name: "transaction_topic1", + topic_name: "transaction_topic1_in", key: Some("Key".into()), }, Some(i * 2 + 1), @@ -775,13 +785,14 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .await; let mut consumer_topic1 = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("transaction_topic1".to_owned()) + ConsumerConfig::consume_from_topic("transaction_topic1_in".to_owned()) .with_group("some_group1"), ) .await; let mut consumer_topic2 = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("transaction_topic2".to_owned()) + ConsumerConfig::consume_from_topic("transaction_topic1_out".to_owned()) + .with_isolation_level(IsolationLevel::ReadCommitted) .with_group("some_group2"), ) .await; @@ -791,7 +802,7 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .assert_consume(ExpectedResponse { message: format!("Message1_{i}"), key: Some("Key".to_owned()), - topic_name: "transaction_topic1".to_owned(), + topic_name: "transaction_topic1_in".to_owned(), offset: Some(i * 2), }) .await; @@ -799,7 +810,7 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .assert_consume(ExpectedResponse { message: format!("Message2_{i}"), key: Some("Key".into()), - topic_name: "transaction_topic1".to_owned(), + topic_name: "transaction_topic1_in".to_owned(), offset: Some(i * 2 + 1), }) .await; @@ -809,7 +820,7 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .assert_produce( Record { payload: &format!("Message1_{i}"), - topic_name: "transaction_topic2", + topic_name: "transaction_topic1_out", key: Some("Key".into()), }, // Not sure where the extra offset per loop comes from @@ -821,14 +832,128 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .assert_produce( Record { payload: &format!("Message2_{i}"), - topic_name: "transaction_topic2", + topic_name: "transaction_topic1_out", key: Some("Key".into()), }, Some(i * 3 + 1), ) .await; - transaction_producer.send_offsets_to_transaction(&consumer_topic1); + transaction_producer.send_offsets_to_transaction(&consumer_topic1, Default::default()); + transaction_producer.commit_transaction(); + + // TODO: how are we reading uncommitted records? + consumer_topic2 + .assert_consume_in_any_order(vec![ + ExpectedResponse { + message: format!("Message1_{i}"), + key: Some("Key".to_owned()), + topic_name: "transaction_topic1_out".to_owned(), + offset: Some(i * 3), + }, + ExpectedResponse { + message: format!("Message2_{i}"), + key: Some("Key".to_owned()), + topic_name: "transaction_topic1_out".to_owned(), + offset: Some(i * 3 + 1), + }, + ]) + .await; + } +} + +async fn produce_consume_transactions_with_commits(connection_builder: &KafkaConnectionBuilder) { + let producer = connection_builder.connect_producer("1", 0).await; + for i in 0..5 { + producer + .assert_produce( + Record { + payload: &format!("Message1_{i}"), + topic_name: "transaction_topic2_in", + key: Some("Key".into()), + }, + Some(i * 2), + ) + .await; + producer + .assert_produce( + Record { + payload: &format!("Message2_{i}"), + topic_name: "transaction_topic2_in", + key: Some("Key".into()), + }, + Some(i * 2 + 1), + ) + .await; + } + + let transaction_producer = connection_builder + .connect_producer_with_transactions("some_transaction_id".to_owned()) + .await; + let mut consumer_topic1 = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topic("transaction_topic2_in".to_owned()) + .with_group("some_group1"), + ) + .await; + let mut consumer_topic2 = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topic("transaction_topic2_out".to_owned()) + .with_group("some_group2"), + ) + .await; + + for i in 0..5 { + consumer_topic1 + .assert_consume(ExpectedResponse { + message: format!("Message1_{i}"), + key: Some("Key".to_owned()), + topic_name: "transaction_topic2_in".to_owned(), + offset: Some(i * 2), + }) + .await; + consumer_topic1 + .assert_consume(ExpectedResponse { + message: format!("Message2_{i}"), + key: Some("Key".into()), + topic_name: "transaction_topic2_in".to_owned(), + offset: Some(i * 2 + 1), + }) + .await; + transaction_producer.begin_transaction(); + + transaction_producer + .assert_produce( + Record { + payload: &format!("Message1_{i}"), + topic_name: "transaction_topic2_out", + key: Some("Key".into()), + }, + // Not sure where the extra offset per loop comes from + // Possibly the transaction commit counts as a record + Some(i * 3), + ) + .await; + transaction_producer + .assert_produce( + Record { + payload: &format!("Message2_{i}"), + topic_name: "transaction_topic2_out", + key: Some("Key".into()), + }, + Some(i * 3 + 1), + ) + .await; + + let offsets = HashMap::from([( + // TODO: get partition + offset from produce calls so we can put the correct values here + TopicPartition { + topic_name: "transaction_topic2_out".to_owned(), + partition: 0, + }, + OffsetAndMetadata { offset: 1 }, + )]); + transaction_producer.send_offsets_to_transaction(&consumer_topic1, offsets); transaction_producer.commit_transaction(); consumer_topic2 @@ -836,13 +961,13 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde ExpectedResponse { message: format!("Message1_{i}"), key: Some("Key".to_owned()), - topic_name: "transaction_topic2".to_owned(), + topic_name: "transaction_topic2_out".to_owned(), offset: Some(i * 3), }, ExpectedResponse { message: format!("Message2_{i}"), key: Some("Key".to_owned()), - topic_name: "transaction_topic2".to_owned(), + topic_name: "transaction_topic2_out".to_owned(), offset: Some(i * 3 + 1), }, ]) @@ -959,7 +1084,8 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { // set the bytes limit to 1MB so that we will not reach it and will hit the 100ms timeout every time. produce_consume_partitions3(connection_builder, "partitions3_case4", 1_000_000, 100).await; - produce_consume_transactions(connection_builder).await; + produce_consume_transactions_with_commits(connection_builder).await; + produce_consume_transactions_without_commits(connection_builder).await; // Only run this test case on the java driver, // since even without going through shotover the cpp driver fails this test. diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index afe7c5314..2f8202f24 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -23,12 +23,13 @@ use kafka_protocol::messages::metadata_response::MetadataResponseBroker; use kafka_protocol::messages::produce_request::TopicProduceData; use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch; use kafka_protocol::messages::{ - AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, EndTxnRequest, - FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, - HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, - ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, ProduceRequest, - ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, - SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, + AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, + BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, + FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, + LeaveGroupRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, + ProduceRequest, ProduceResponse, RequestHeader, SaslAuthenticateRequest, + SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, + TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -678,7 +679,8 @@ impl KafkaSinkCluster { RequestBody::Heartbeat(HeartbeatRequest { group_id, .. }) | RequestBody::SyncGroup(SyncGroupRequest { group_id, .. }) | RequestBody::JoinGroup(JoinGroupRequest { group_id, .. }) - | RequestBody::LeaveGroup(LeaveGroupRequest { group_id, .. }), + | RequestBody::LeaveGroup(LeaveGroupRequest { group_id, .. }) + | RequestBody::TxnOffsetCommit(TxnOffsetCommitRequest { group_id, .. }), .. })) => { self.store_group(&mut groups, group_id.clone()); @@ -691,6 +693,9 @@ impl KafkaSinkCluster { }) | RequestBody::EndTxn(EndTxnRequest { transactional_id, .. + }) + | RequestBody::AddOffsetsToTxn(AddOffsetsToTxnRequest { + transactional_id, .. }), .. })) => { @@ -893,6 +898,14 @@ impl KafkaSinkCluster { let group_id = groups.groups_names.first().unwrap().clone(); self.route_to_group_coordinator(message, group_id); } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::TxnOffsetCommit(txn_offset_commit), + .. + })) => { + let group_id = txn_offset_commit.group_id.clone(); + // Despite being a transaction request this request is routed by group_id + self.route_to_group_coordinator(message, group_id); + } // route to transaction coordinator Some(Frame::Kafka(KafkaFrame::Request { @@ -916,6 +929,13 @@ impl KafkaSinkCluster { body: RequestBody::AddPartitionsToTxn(_), .. })) => self.route_add_partitions_to_txn(message)?, + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::AddOffsetsToTxn(add_offsets_to_txn), + .. + })) => { + let transaction_id = add_offsets_to_txn.transactional_id.clone(); + self.route_to_transaction_coordinator(message, transaction_id); + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::FindCoordinator(_), @@ -2114,6 +2134,26 @@ impl KafkaSinkCluster { })) => { self.handle_transaction_coordinator_routing_error(&request_ty, end_txn.error_code) } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::AddOffsetsToTxn(add_offsets_to_txn), + .. + })) => self.handle_transaction_coordinator_routing_error( + &request_ty, + add_offsets_to_txn.error_code, + ), + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::TxnOffsetCommit(txn_offset_commit), + .. + })) => { + for topic in &txn_offset_commit.topics { + for partition in &topic.partitions { + self.handle_group_coordinator_routing_error( + &request_ty, + partition.error_code, + ); + } + } + } Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::InitProducerId(init_producer_id), .. diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index 5a4dc3182..b99da617e 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -2,7 +2,9 @@ use std::collections::{HashMap, HashSet}; // Allow direct usage of the APIs when the feature is enabled pub use rdkafka; -use super::{ConsumerConfig, ExpectedResponse, NewPartition, Record, TopicPartition}; +use super::{ + ConsumerConfig, ExpectedResponse, NewPartition, OffsetAndMetadata, Record, TopicPartition, +}; use anyhow::Result; use pretty_assertions::assert_eq; use rdkafka::admin::AdminClient; @@ -87,6 +89,7 @@ impl KafkaConnectionBuilderCpp { .set("session.timeout.ms", "6000") .set("auto.offset.reset", "earliest") .set("enable.auto.commit", "false") + .set("isolation.level", config.isolation_level.as_str()) // this has a different name to the java driver 😭 .set("fetch.wait.max.ms", config.fetch_max_wait_ms.to_string()) .set("fetch.min.bytes", config.fetch_min_bytes.to_string()) @@ -163,8 +166,21 @@ impl KafkaProducerCpp { self.producer.begin_transaction().unwrap(); } - pub fn send_offsets_to_transaction(&self, consumer: &KafkaConsumerCpp) { - let topic_partitions = TopicPartitionList::new(); + pub fn send_offsets_to_transaction( + &self, + consumer: &KafkaConsumerCpp, + offsets: HashMap, + ) { + let mut topic_partitions = TopicPartitionList::new(); + for (tp, offset_and_metadata) in offsets { + topic_partitions + .add_partition_offset( + &tp.topic_name, + tp.partition, + rdkafka::Offset::Offset(offset_and_metadata.offset), + ) + .unwrap(); + } let consumer_group = consumer.consumer.group_metadata().unwrap(); self.producer .send_offsets_to_transaction(&topic_partitions, &consumer_group, Duration::from_secs(1)) diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 1a64423fa..caef5f475 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,7 +1,7 @@ use super::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse, - ListOffsetsResultInfo, NewPartition, NewTopic, OffsetSpec, Record, ResourcePatternType, - ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, + ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata, OffsetSpec, Record, + ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, }; use crate::connection::java::{Jvm, Value}; use anyhow::Result; @@ -146,6 +146,10 @@ impl KafkaConnectionBuilderJava { config.insert("session.timeout.ms".to_owned(), "6000".to_owned()); config.insert("auto.offset.reset".to_owned(), "earliest".to_owned()); config.insert("enable.auto.commit".to_owned(), "false".to_owned()); + config.insert( + "isolation.level".to_owned(), + consumer_config.isolation_level.as_str().to_owned(), + ); config.insert( "fetch.max.wait.ms".to_owned(), consumer_config.fetch_max_wait_ms.to_string(), @@ -238,8 +242,25 @@ impl KafkaProducerJava { self.producer.call("commitTransaction", vec![]); } - pub fn send_offsets_to_transaction(&self, consumer: &KafkaConsumerJava) { - let offsets = self.jvm.new_map(vec![]); + pub fn send_offsets_to_transaction( + &self, + consumer: &KafkaConsumerJava, + offsets: HashMap, + ) { + let offsets = self.jvm.new_map( + offsets + .into_iter() + .map(|(tp, offset_and_metadata)| { + ( + create_topic_partition(&self.jvm, &tp), + self.jvm.construct( + "org.apache.kafka.clients.consumer.OffsetAndMetadata", + vec![self.jvm.new_long(offset_and_metadata.offset)], + ), + ) + }) + .collect(), + ); let consumer_group_id = consumer .consumer diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 99d3e91ca..c3ea3d3f4 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -160,17 +160,23 @@ impl KafkaProducer { } } - pub fn send_offsets_to_transaction(&self, consumer: &KafkaConsumer) { + pub fn send_offsets_to_transaction( + &self, + consumer: &KafkaConsumer, + offsets: HashMap, + ) { match self { #[cfg(feature = "kafka-cpp-driver-tests")] Self::Cpp(cpp) => match consumer { - KafkaConsumer::Cpp(consumer) => cpp.send_offsets_to_transaction(consumer), + KafkaConsumer::Cpp(consumer) => cpp.send_offsets_to_transaction(consumer, offsets), KafkaConsumer::Java(_) => { panic!("Cannot use transactions across java and cpp driver") } }, Self::Java(java) => match consumer { - KafkaConsumer::Java(consumer) => java.send_offsets_to_transaction(consumer), + KafkaConsumer::Java(consumer) => { + java.send_offsets_to_transaction(consumer, offsets) + } #[cfg(feature = "kafka-cpp-driver-tests")] KafkaConsumer::Cpp(_) => { panic!("Cannot use transactions across java and cpp driver") @@ -471,12 +477,12 @@ pub enum OffsetSpec { Latest, } -#[derive(Default)] pub struct ConsumerConfig { topic_name: String, group: String, fetch_min_bytes: i32, fetch_max_wait_ms: i32, + isolation_level: IsolationLevel, } impl ConsumerConfig { @@ -486,6 +492,7 @@ impl ConsumerConfig { group: "default_group".to_owned(), fetch_min_bytes: 1, fetch_max_wait_ms: 500, + isolation_level: IsolationLevel::ReadUncommitted, } } @@ -503,4 +510,27 @@ impl ConsumerConfig { self.fetch_max_wait_ms = fetch_max_wait_ms; self } + + pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self { + self.isolation_level = isolation_level; + self + } +} + +pub enum IsolationLevel { + ReadCommitted, + ReadUncommitted, +} + +impl IsolationLevel { + fn as_str(&self) -> &'static str { + match self { + IsolationLevel::ReadCommitted => "read_committed", + IsolationLevel::ReadUncommitted => "read_uncommitted", + } + } +} + +pub struct OffsetAndMetadata { + pub offset: i64, }