From 1aa6a8546181edd4346b49d7da2d3f78bb97643d Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Tue, 15 Oct 2024 16:18:02 +1100 Subject: [PATCH] KafkaSinkCluster: Implement routing for AddOffsetsToTxn --- .../tests/kafka_int_tests/test_cases.rs | 191 ++++++++++++++++-- .../src/transforms/kafka/sink_cluster/mod.rs | 54 ++++- test-helpers/src/connection/kafka/cpp.rs | 41 +++- test-helpers/src/connection/kafka/java.rs | 73 +++++-- test-helpers/src/connection/kafka/mod.rs | 79 ++++++-- 5 files changed, 378 insertions(+), 60 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 3b20e57d2..324c03c69 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -3,9 +3,9 @@ use std::{collections::HashMap, time::Duration}; use test_helpers::{ connection::kafka::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig, - ExpectedResponse, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, - ListOffsetsResultInfo, NewPartition, NewTopic, OffsetSpec, Record, ResourcePatternType, - ResourceSpecifier, ResourceType, TopicPartition, + ExpectedResponse, IsolationLevel, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, + KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata, + OffsetSpec, Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition, }, docker_compose::DockerCompose, }; @@ -75,12 +75,22 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { replication_factor: 1, }, NewTopic { - name: "transaction_topic1", + name: "transaction_topic1_in", num_partitions: 3, replication_factor: 1, }, NewTopic { - name: "transaction_topic2", + name: "transaction_topic1_out", + num_partitions: 3, + replication_factor: 1, + }, + NewTopic { + name: "transaction_topic2_in", + num_partitions: 3, + replication_factor: 1, + }, + NewTopic { + name: "transaction_topic2_out", num_partitions: 3, replication_factor: 1, }, @@ -745,14 +755,14 @@ async fn produce_consume_partitions3( } } -async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilder) { +async fn produce_consume_transactions_with_abort(connection_builder: &KafkaConnectionBuilder) { let producer = connection_builder.connect_producer("1", 0).await; for i in 0..5 { producer .assert_produce( Record { payload: &format!("Message1_{i}"), - topic_name: "transaction_topic1", + topic_name: "transaction_topic1_in", key: Some("Key".into()), }, Some(i * 2), @@ -762,7 +772,7 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .assert_produce( Record { payload: &format!("Message2_{i}"), - topic_name: "transaction_topic1", + topic_name: "transaction_topic1_in", key: Some("Key".into()), }, Some(i * 2 + 1), @@ -775,13 +785,14 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .await; let mut consumer_topic1 = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("transaction_topic1".to_owned()) + ConsumerConfig::consume_from_topic("transaction_topic1_in".to_owned()) .with_group("some_group1"), ) .await; let mut consumer_topic2 = connection_builder .connect_consumer( - ConsumerConfig::consume_from_topic("transaction_topic2".to_owned()) + ConsumerConfig::consume_from_topic("transaction_topic1_out".to_owned()) + .with_isolation_level(IsolationLevel::ReadCommitted) .with_group("some_group2"), ) .await; @@ -791,7 +802,110 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .assert_consume(ExpectedResponse { message: format!("Message1_{i}"), key: Some("Key".to_owned()), - topic_name: "transaction_topic1".to_owned(), + topic_name: "transaction_topic1_in".to_owned(), + offset: Some(i * 2), + }) + .await; + consumer_topic1 + .assert_consume(ExpectedResponse { + message: format!("Message2_{i}"), + key: Some("Key".into()), + topic_name: "transaction_topic1_in".to_owned(), + offset: Some(i * 2 + 1), + }) + .await; + transaction_producer.begin_transaction(); + + transaction_producer + .assert_produce( + Record { + payload: &format!("Message1_{i}"), + topic_name: "transaction_topic1_out", + key: Some("Key".into()), + }, + // Not sure where the extra offset per loop comes from + // Possibly the transaction commit counts as a record + Some(i * 3), + ) + .await; + transaction_producer + .assert_produce( + Record { + payload: &format!("Message2_{i}"), + topic_name: "transaction_topic1_out", + key: Some("Key".into()), + }, + Some(i * 3 + 1), + ) + .await; + + // send empty request, has no effect just want to make sure shotover handles this. + transaction_producer.send_offsets_to_transaction(&consumer_topic1, Default::default()); + + transaction_producer.abort_transaction(); + } + + consumer_topic2 + .assert_no_consume_within_timeout(Duration::from_secs(2)) + .await; +} + +async fn produce_consume_transactions_with_commit(connection_builder: &KafkaConnectionBuilder) { + let producer = connection_builder.connect_producer("1", 0).await; + for i in 0..5 { + producer + .assert_produce( + Record { + payload: &format!("Message1_{i}"), + topic_name: "transaction_topic2_in", + key: Some("Key".into()), + }, + Some(i * 2), + ) + .await; + producer + .assert_produce( + Record { + payload: &format!("Message2_{i}"), + topic_name: "transaction_topic2_in", + key: Some("Key".into()), + }, + Some(i * 2 + 1), + ) + .await; + } + + let transaction_producer = connection_builder + .connect_producer_with_transactions("some_transaction_id".to_owned()) + .await; + let mut consumer_topic1 = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topic("transaction_topic2_in".to_owned()) + .with_group("some_group1"), + ) + .await; + let mut consumer_topic2_committed = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topic("transaction_topic2_out".to_owned()) + .with_isolation_level(IsolationLevel::ReadCommitted) + .with_group("some_group2"), + ) + .await; + + let mut consumer_topic2_uncommitted = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topic("transaction_topic2_out".to_owned()) + .with_isolation_level(IsolationLevel::ReadUncommitted) + .with_group("some_group3"), + ) + .await; + + for i in 0..3 { + consumer_topic1 + .assert_consume(ExpectedResponse { + message: format!("Message1_{i}"), + key: Some("Key".to_owned()), + topic_name: "transaction_topic2_in".to_owned(), offset: Some(i * 2), }) .await; @@ -799,7 +913,7 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .assert_consume(ExpectedResponse { message: format!("Message2_{i}"), key: Some("Key".into()), - topic_name: "transaction_topic1".to_owned(), + topic_name: "transaction_topic2_in".to_owned(), offset: Some(i * 2 + 1), }) .await; @@ -809,7 +923,7 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .assert_produce( Record { payload: &format!("Message1_{i}"), - topic_name: "transaction_topic2", + topic_name: "transaction_topic2_out", key: Some("Key".into()), }, // Not sure where the extra offset per loop comes from @@ -821,32 +935,70 @@ async fn produce_consume_transactions(connection_builder: &KafkaConnectionBuilde .assert_produce( Record { payload: &format!("Message2_{i}"), - topic_name: "transaction_topic2", + topic_name: "transaction_topic2_out", key: Some("Key".into()), }, Some(i * 3 + 1), ) .await; - transaction_producer.send_offsets_to_transaction(&consumer_topic1); + let offsets = HashMap::from([( + // TODO: get partition + offset from produce calls so we can put the correct values here + TopicPartition { + topic_name: "transaction_topic2_out".to_owned(), + partition: 0, + }, + OffsetAndMetadata { offset: 1 }, + )]); + transaction_producer.send_offsets_to_transaction(&consumer_topic1, offsets); + + // before we commit, records are only received on uncommitted consumer + consumer_topic2_committed + .assert_no_consume_within_timeout(Duration::from_secs(2)) + .await; + consumer_topic2_uncommitted + .assert_consume_in_any_order(vec![ + ExpectedResponse { + message: format!("Message1_{i}"), + key: Some("Key".to_owned()), + topic_name: "transaction_topic2_out".to_owned(), + offset: Some(i * 3), + }, + ExpectedResponse { + message: format!("Message2_{i}"), + key: Some("Key".to_owned()), + topic_name: "transaction_topic2_out".to_owned(), + offset: Some(i * 3 + 1), + }, + ]) + .await; transaction_producer.commit_transaction(); - consumer_topic2 + // after we commit, receive committed records + consumer_topic2_committed .assert_consume_in_any_order(vec![ ExpectedResponse { message: format!("Message1_{i}"), key: Some("Key".to_owned()), - topic_name: "transaction_topic2".to_owned(), + topic_name: "transaction_topic2_out".to_owned(), offset: Some(i * 3), }, ExpectedResponse { message: format!("Message2_{i}"), key: Some("Key".to_owned()), - topic_name: "transaction_topic2".to_owned(), + topic_name: "transaction_topic2_out".to_owned(), offset: Some(i * 3 + 1), }, ]) .await; + + // send_offsets_to_transaction should result in commits to the consumer offset of the input topic + // let mut consumer_topic1 = connection_builder + // .connect_consumer( + // ConsumerConfig::consume_from_topic("transaction_topic2_in".to_owned()) + // .with_group("some_group1"), + // ) + // .await; } } @@ -959,7 +1111,8 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) { // set the bytes limit to 1MB so that we will not reach it and will hit the 100ms timeout every time. produce_consume_partitions3(connection_builder, "partitions3_case4", 1_000_000, 100).await; - produce_consume_transactions(connection_builder).await; + produce_consume_transactions_with_abort(connection_builder).await; + produce_consume_transactions_with_commit(connection_builder).await; // Only run this test case on the java driver, // since even without going through shotover the cpp driver fails this test. diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index afe7c5314..2f8202f24 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -23,12 +23,13 @@ use kafka_protocol::messages::metadata_response::MetadataResponseBroker; use kafka_protocol::messages::produce_request::TopicProduceData; use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch; use kafka_protocol::messages::{ - AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, EndTxnRequest, - FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, - HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, - ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, ProduceRequest, - ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, - SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, + AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, + BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, + FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, + LeaveGroupRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, + ProduceRequest, ProduceResponse, RequestHeader, SaslAuthenticateRequest, + SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, + TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -678,7 +679,8 @@ impl KafkaSinkCluster { RequestBody::Heartbeat(HeartbeatRequest { group_id, .. }) | RequestBody::SyncGroup(SyncGroupRequest { group_id, .. }) | RequestBody::JoinGroup(JoinGroupRequest { group_id, .. }) - | RequestBody::LeaveGroup(LeaveGroupRequest { group_id, .. }), + | RequestBody::LeaveGroup(LeaveGroupRequest { group_id, .. }) + | RequestBody::TxnOffsetCommit(TxnOffsetCommitRequest { group_id, .. }), .. })) => { self.store_group(&mut groups, group_id.clone()); @@ -691,6 +693,9 @@ impl KafkaSinkCluster { }) | RequestBody::EndTxn(EndTxnRequest { transactional_id, .. + }) + | RequestBody::AddOffsetsToTxn(AddOffsetsToTxnRequest { + transactional_id, .. }), .. })) => { @@ -893,6 +898,14 @@ impl KafkaSinkCluster { let group_id = groups.groups_names.first().unwrap().clone(); self.route_to_group_coordinator(message, group_id); } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::TxnOffsetCommit(txn_offset_commit), + .. + })) => { + let group_id = txn_offset_commit.group_id.clone(); + // Despite being a transaction request this request is routed by group_id + self.route_to_group_coordinator(message, group_id); + } // route to transaction coordinator Some(Frame::Kafka(KafkaFrame::Request { @@ -916,6 +929,13 @@ impl KafkaSinkCluster { body: RequestBody::AddPartitionsToTxn(_), .. })) => self.route_add_partitions_to_txn(message)?, + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::AddOffsetsToTxn(add_offsets_to_txn), + .. + })) => { + let transaction_id = add_offsets_to_txn.transactional_id.clone(); + self.route_to_transaction_coordinator(message, transaction_id); + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::FindCoordinator(_), @@ -2114,6 +2134,26 @@ impl KafkaSinkCluster { })) => { self.handle_transaction_coordinator_routing_error(&request_ty, end_txn.error_code) } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::AddOffsetsToTxn(add_offsets_to_txn), + .. + })) => self.handle_transaction_coordinator_routing_error( + &request_ty, + add_offsets_to_txn.error_code, + ), + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::TxnOffsetCommit(txn_offset_commit), + .. + })) => { + for topic in &txn_offset_commit.topics { + for partition in &topic.partitions { + self.handle_group_coordinator_routing_error( + &request_ty, + partition.error_code, + ); + } + } + } Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::InitProducerId(init_producer_id), .. diff --git a/test-helpers/src/connection/kafka/cpp.rs b/test-helpers/src/connection/kafka/cpp.rs index 5a4dc3182..8a27d396b 100644 --- a/test-helpers/src/connection/kafka/cpp.rs +++ b/test-helpers/src/connection/kafka/cpp.rs @@ -2,7 +2,9 @@ use std::collections::{HashMap, HashSet}; // Allow direct usage of the APIs when the feature is enabled pub use rdkafka; -use super::{ConsumerConfig, ExpectedResponse, NewPartition, Record, TopicPartition}; +use super::{ + ConsumerConfig, ExpectedResponse, NewPartition, OffsetAndMetadata, Record, TopicPartition, +}; use anyhow::Result; use pretty_assertions::assert_eq; use rdkafka::admin::AdminClient; @@ -87,6 +89,7 @@ impl KafkaConnectionBuilderCpp { .set("session.timeout.ms", "6000") .set("auto.offset.reset", "earliest") .set("enable.auto.commit", "false") + .set("isolation.level", config.isolation_level.as_str()) // this has a different name to the java driver 😭 .set("fetch.wait.max.ms", config.fetch_max_wait_ms.to_string()) .set("fetch.min.bytes", config.fetch_min_bytes.to_string()) @@ -163,8 +166,27 @@ impl KafkaProducerCpp { self.producer.begin_transaction().unwrap(); } - pub fn send_offsets_to_transaction(&self, consumer: &KafkaConsumerCpp) { - let topic_partitions = TopicPartitionList::new(); + pub fn abort_transaction(&self) { + self.producer + .abort_transaction(Duration::from_secs(1)) + .unwrap(); + } + + pub fn send_offsets_to_transaction( + &self, + consumer: &KafkaConsumerCpp, + offsets: HashMap, + ) { + let mut topic_partitions = TopicPartitionList::new(); + for (tp, offset_and_metadata) in offsets { + topic_partitions + .add_partition_offset( + &tp.topic_name, + tp.partition, + rdkafka::Offset::Offset(offset_and_metadata.offset), + ) + .unwrap(); + } let consumer_group = consumer.consumer.group_metadata().unwrap(); self.producer .send_offsets_to_transaction(&topic_partitions, &consumer_group, Duration::from_secs(1)) @@ -183,8 +205,8 @@ pub struct KafkaConsumerCpp { } impl KafkaConsumerCpp { - pub async fn consume(&self) -> ExpectedResponse { - let message = tokio::time::timeout(Duration::from_secs(30), self.consumer.recv()) + pub async fn consume(&self, timeout: Duration) -> ExpectedResponse { + let message = tokio::time::timeout(timeout, self.consumer.recv()) .await .expect("Timeout while receiving from consumer") .unwrap(); @@ -198,6 +220,15 @@ impl KafkaConsumerCpp { } } + pub async fn assert_no_consume_within_timeout(&self, timeout: Duration) { + match tokio::time::timeout(timeout, self.consumer.recv()).await { + Ok(records) => panic!("Expected no records to be consumed but consumed {records:?}"), + Err(_) => { + // no values were consumed, continue on. + } + } + } + /// The offset to be committed should be lastProcessedMessageOffset + 1. pub fn commit(&self, offsets: &HashMap) { let offsets_map: HashMap<(String, i32), rdkafka::Offset> = offsets diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 1a64423fa..4002928ba 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,12 +1,15 @@ use super::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse, - ListOffsetsResultInfo, NewPartition, NewTopic, OffsetSpec, Record, ResourcePatternType, - ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, + ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata, OffsetSpec, Record, + ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, }; use crate::connection::java::{Jvm, Value}; use anyhow::Result; use pretty_assertions::assert_eq; -use std::collections::{HashMap, HashSet, VecDeque}; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + time::Duration, +}; fn properties(jvm: &Jvm, props: &HashMap) -> Value { let properties = jvm.construct("java.util.Properties", vec![]); @@ -146,6 +149,10 @@ impl KafkaConnectionBuilderJava { config.insert("session.timeout.ms".to_owned(), "6000".to_owned()); config.insert("auto.offset.reset".to_owned(), "earliest".to_owned()); config.insert("enable.auto.commit".to_owned(), "false".to_owned()); + config.insert( + "isolation.level".to_owned(), + consumer_config.isolation_level.as_str().to_owned(), + ); config.insert( "fetch.max.wait.ms".to_owned(), consumer_config.fetch_max_wait_ms.to_string(), @@ -238,8 +245,29 @@ impl KafkaProducerJava { self.producer.call("commitTransaction", vec![]); } - pub fn send_offsets_to_transaction(&self, consumer: &KafkaConsumerJava) { - let offsets = self.jvm.new_map(vec![]); + pub fn abort_transaction(&self) { + self.producer.call("abortTransaction", vec![]); + } + + pub fn send_offsets_to_transaction( + &self, + consumer: &KafkaConsumerJava, + offsets: HashMap, + ) { + let offsets = self.jvm.new_map( + offsets + .into_iter() + .map(|(tp, offset_and_metadata)| { + ( + create_topic_partition(&self.jvm, &tp), + self.jvm.construct( + "org.apache.kafka.clients.consumer.OffsetAndMetadata", + vec![self.jvm.new_long(offset_and_metadata.offset)], + ), + ) + }) + .collect(), + ); let consumer_group_id = consumer .consumer @@ -258,32 +286,49 @@ pub struct KafkaConsumerJava { } impl KafkaConsumerJava { - pub async fn consume(&mut self) -> ExpectedResponse { + pub async fn assert_no_consume_within_timeout(&mut self, timeout: Duration) { + if !self.waiting_records.is_empty() { + panic!( + "There are pending records from the previous consume {:?}", + self.waiting_records + ); + } + + self.fetch_from_broker(timeout).await; + + if !self.waiting_records.is_empty() { + panic!( + "Records were received from this consume {:?}", + self.waiting_records + ); + } + } + pub async fn consume(&mut self, timeout: Duration) -> ExpectedResponse { // This method asserts that we have consumed a single record from the broker. // Internally we may have actually received multiple records from the broker. // But that is hidden from the test by storing any extra messages for use in the next call to `consume` if self.waiting_records.is_empty() { - self.fetch_from_broker(); + self.fetch_from_broker(timeout).await; } self.pop_one_record() } - fn fetch_from_broker(&mut self) { - let timeout_seconds = 30; + pub async fn fetch_from_broker(&mut self, timeout: Duration) { let instant = std::time::Instant::now(); let mut finished = false; while !finished { - let timeout = self.jvm.call_static( + let java_timeout = self.jvm.call_static( "java.time.Duration", "ofSeconds", - vec![self.jvm.new_long(timeout_seconds as i64)], + vec![self.jvm.new_long(timeout.as_secs() as i64)], ); // the poll method documentation claims that it will await the timeout if there are no records available. // but it will actually return immediately if it receives a transaction control record, - // so we need to add to wrap it in our own timeout logic. - let result = tokio::task::block_in_place(|| self.consumer.call("poll", vec![timeout])); + // so we need to wrap it in our own timeout logic. + let result = + tokio::task::block_in_place(|| self.consumer.call("poll", vec![java_timeout])); for record in result.call("iterator", vec![]) { self.waiting_records @@ -292,7 +337,7 @@ impl KafkaConsumerJava { finished = true; } - if instant.elapsed() > std::time::Duration::from_secs(timeout_seconds) { + if instant.elapsed() > timeout { // finished because timeout elapsed finished = true; } diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 99d3e91ca..5dbc538f1 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -1,5 +1,8 @@ use pretty_assertions::assert_eq; -use std::collections::{HashMap, HashSet}; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; #[cfg(feature = "kafka-cpp-driver-tests")] pub mod cpp; @@ -160,17 +163,31 @@ impl KafkaProducer { } } - pub fn send_offsets_to_transaction(&self, consumer: &KafkaConsumer) { + pub fn abort_transaction(&self) { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(cpp) => cpp.abort_transaction(), + Self::Java(java) => java.abort_transaction(), + } + } + + pub fn send_offsets_to_transaction( + &self, + consumer: &KafkaConsumer, + offsets: HashMap, + ) { match self { #[cfg(feature = "kafka-cpp-driver-tests")] Self::Cpp(cpp) => match consumer { - KafkaConsumer::Cpp(consumer) => cpp.send_offsets_to_transaction(consumer), + KafkaConsumer::Cpp(consumer) => cpp.send_offsets_to_transaction(consumer, offsets), KafkaConsumer::Java(_) => { panic!("Cannot use transactions across java and cpp driver") } }, Self::Java(java) => match consumer { - KafkaConsumer::Java(consumer) => java.send_offsets_to_transaction(consumer), + KafkaConsumer::Java(consumer) => { + java.send_offsets_to_transaction(consumer, offsets) + } #[cfg(feature = "kafka-cpp-driver-tests")] KafkaConsumer::Cpp(_) => { panic!("Cannot use transactions across java and cpp driver") @@ -194,11 +211,7 @@ pub enum KafkaConsumer { impl KafkaConsumer { pub async fn assert_consume(&mut self, expected_response: ExpectedResponse) { - let response = match self { - #[cfg(feature = "kafka-cpp-driver-tests")] - Self::Cpp(cpp) => cpp.consume().await, - Self::Java(java) => java.consume().await, - }; + let response = self.consume(Duration::from_secs(30)).await; let topic = &expected_response.topic_name; assert_eq!( @@ -222,14 +235,26 @@ impl KafkaConsumer { } } + async fn consume(&mut self, timeout: Duration) -> ExpectedResponse { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(cpp) => cpp.consume(timeout).await, + Self::Java(java) => java.consume(timeout).await, + } + } + + pub async fn assert_no_consume_within_timeout(&mut self, timeout: Duration) { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(cpp) => cpp.assert_no_consume_within_timeout(timeout).await, + Self::Java(java) => java.assert_no_consume_within_timeout(timeout).await, + } + } + pub async fn assert_consume_in_any_order(&mut self, expected_responses: Vec) { let mut responses = vec![]; while responses.len() < expected_responses.len() { - match self { - #[cfg(feature = "kafka-cpp-driver-tests")] - Self::Cpp(cpp) => responses.push(cpp.consume().await), - Self::Java(java) => responses.push(java.consume().await), - } + responses.push(self.consume(Duration::from_secs(30)).await); } let full_responses = responses.clone(); let full_expected_responses = expected_responses.clone(); @@ -471,12 +496,12 @@ pub enum OffsetSpec { Latest, } -#[derive(Default)] pub struct ConsumerConfig { topic_name: String, group: String, fetch_min_bytes: i32, fetch_max_wait_ms: i32, + isolation_level: IsolationLevel, } impl ConsumerConfig { @@ -486,6 +511,7 @@ impl ConsumerConfig { group: "default_group".to_owned(), fetch_min_bytes: 1, fetch_max_wait_ms: 500, + isolation_level: IsolationLevel::ReadUncommitted, } } @@ -503,4 +529,27 @@ impl ConsumerConfig { self.fetch_max_wait_ms = fetch_max_wait_ms; self } + + pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self { + self.isolation_level = isolation_level; + self + } +} + +pub enum IsolationLevel { + ReadCommitted, + ReadUncommitted, +} + +impl IsolationLevel { + fn as_str(&self) -> &'static str { + match self { + IsolationLevel::ReadCommitted => "read_committed", + IsolationLevel::ReadUncommitted => "read_uncommitted", + } + } +} + +pub struct OffsetAndMetadata { + pub offset: i64, }