From fdc5afacd0e307a451181db61b1da02af8f1e95a Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 11 Nov 2024 12:48:47 +1100 Subject: [PATCH 1/3] KafkaSinkCluster: route OffsetDelete (#1802) --- .../tests/kafka_int_tests/test_cases.rs | 51 ++++++++++- .../src/transforms/kafka/sink_cluster/mod.rs | 7 ++ test-helpers/src/connection/java.rs | 19 ++++ test-helpers/src/connection/kafka/java.rs | 86 +++++++++++++++++-- test-helpers/src/connection/kafka/mod.rs | 31 +++++++ 5 files changed, 183 insertions(+), 11 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index c325234f2..7a6b0576f 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -133,12 +133,54 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) { async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; - admin - .delete_groups(&["some_group", "some_group1", "consumer_group_with_offsets"]) - .await; + admin.delete_groups(&["some_group", "some_group1"]).await; delete_records(&admin, connection_builder).await; } +async fn delete_offsets(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; + + // Only supported by java driver + #[allow(irrefutable_let_patterns)] + if let KafkaConnectionBuilder::Java(_) = connection_builder { + // assert offset exists + let result = admin + .list_consumer_group_offsets("consumer_group_with_offsets".to_owned()) + .await; + let expected_result: HashMap<_, HashMap> = + HashMap::from([( + "consumer_group_with_offsets".to_owned(), + HashMap::from([( + TopicPartition { + topic_name: "partitions1_with_offset".to_owned(), + partition: 0, + }, + OffsetAndMetadata { offset: 2 }, + )]), + )]); + assert_eq!(result, expected_result); + + // delete offset + admin + .delete_consumer_group_offsets( + "consumer_group_with_offsets".to_owned(), + &[TopicPartition { + topic_name: "partitions1_with_offset".to_owned(), + partition: 0, + }], + ) + .await; + + // assert offset is deleted + let result = admin + .list_consumer_group_offsets("consumer_group_with_offsets".to_owned()) + .await; + let expected_result: HashMap<_, HashMap> = + HashMap::from([("consumer_group_with_offsets".to_owned(), HashMap::new())]); + assert_eq!(result, expected_result); + } +} + async fn delete_records(admin: &KafkaAdmin, connection_builder: &KafkaConnectionBuilder) { // Only supported by java driver #[allow(irrefutable_let_patterns)] @@ -894,6 +936,9 @@ pub async fn produce_consume_commit_offsets_partitions1( }) .await; } + + // test the admin API's offset list and delete operations + delete_offsets(connection_builder).await; } pub async fn produce_consume_partitions3( diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 241c69749..21d729b83 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -939,6 +939,13 @@ impl KafkaSinkCluster { })) => { self.split_and_route_request::(request)?; } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::OffsetDelete(offset_delete), + .. + })) => { + let group_id = offset_delete.group_id.clone(); + self.route_to_group_coordinator(request, group_id); + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::TxnOffsetCommit(txn_offset_commit), .. diff --git a/test-helpers/src/connection/java.rs b/test-helpers/src/connection/java.rs index 1c8756236..25ac455c9 100644 --- a/test-helpers/src/connection/java.rs +++ b/test-helpers/src/connection/java.rs @@ -245,6 +245,14 @@ impl Jvm { } } + /// Construct a java set containing the provided elements + pub(crate) fn new_set(&self, element_type: &str, elements: Vec) -> Value { + self.construct( + "java.util.HashSet", + vec![self.new_list(element_type, elements)], + ) + } + /// Construct a java map containing the provided key value pairs pub(crate) fn new_map(&self, key_values: Vec<(Value, Value)>) -> Value { let map = self.construct("java.util.HashMap", vec![]); @@ -409,3 +417,14 @@ impl Iterator for JavaIterator { } } } + +pub fn map_iterator(map: Value) -> impl Iterator { + map.cast("java.util.Map") + .call("entrySet", vec![]) + .call("iterator", vec![]) + .into_iter() + .map(|entry| { + let entry = entry.cast("java.util.Map$Entry"); + (entry.call("getKey", vec![]), entry.call("getValue", vec![])) + }) +} diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index 876254d66..f2f09628c 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -4,7 +4,7 @@ use super::{ Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, TopicPartitionInfo, }; -use crate::connection::java::{Jvm, Value}; +use crate::connection::java::{map_iterator, Jvm, Value}; use anyhow::Result; use pretty_assertions::assert_eq; use std::{ @@ -273,7 +273,7 @@ impl KafkaProducerJava { .into_iter() .map(|(tp, offset_and_metadata)| { ( - create_topic_partition(&self.jvm, &tp), + topic_partition_to_java(&self.jvm, &tp), self.jvm.construct( "org.apache.kafka.clients.consumer.OffsetAndMetadata", vec![self.jvm.new_long(offset_and_metadata.offset)], @@ -383,7 +383,7 @@ impl KafkaConsumerJava { .iter() .map(|(tp, offset)| { ( - create_topic_partition(&self.jvm, tp), + topic_partition_to_java(&self.jvm, tp), self.jvm.construct( "org.apache.kafka.clients.consumer.OffsetAndMetadata", vec![self.jvm.new_long(*offset)], @@ -403,7 +403,7 @@ impl KafkaConsumerJava { let mut offsets = HashMap::new(); for tp in partitions { - let topic_partition = create_topic_partition(&self.jvm, &tp); + let topic_partition = topic_partition_to_java(&self.jvm, &tp); let timeout = self.jvm.call_static( "java.time.Duration", @@ -527,7 +527,7 @@ impl KafkaAdminJava { .iter() .map(|x| { ( - create_topic_partition(&self.jvm, &x.topic_partition), + topic_partition_to_java(&self.jvm, &x.topic_partition), self.jvm.call_static( "org.apache.kafka.clients.admin.RecordsToDelete", "beforeOffset", @@ -653,7 +653,7 @@ impl KafkaAdminJava { .iter() .map(|(topic_partition, offset_spec)| { ( - create_topic_partition(&self.jvm, topic_partition), + topic_partition_to_java(&self.jvm, topic_partition), match offset_spec { OffsetSpec::Earliest => { self.jvm.call_static(offset_spec_class, "earliest", vec![]) @@ -678,7 +678,7 @@ impl KafkaAdminJava { let result = java_results .call( "get", - vec![create_topic_partition(&self.jvm, &topic_partition)], + vec![topic_partition_to_java(&self.jvm, &topic_partition)], ) .cast("org.apache.kafka.clients.admin.ListOffsetsResult$ListOffsetsResultInfo"); let offset: i32 = result.call("offset", vec![]).into_rust(); @@ -725,6 +725,60 @@ impl KafkaAdminJava { results } + pub async fn list_consumer_group_offsets( + &self, + group_id: String, + ) -> HashMap> { + let group_id = self.jvm.new_string(&group_id); + + let java_results = self + .admin + .call("listConsumerGroupOffsets", vec![group_id]) + .call_async("all", vec![]) + .await; + + // iterate through the hashmap containing hashmap, turning all the values into rust values + map_iterator(java_results) + .map(|(group_id, value)| { + ( + group_id.into_rust(), + map_iterator(value) + .map(|(topic_partition, offset_and_metadata)| { + ( + topic_partition_to_rust(topic_partition), + offset_and_metadata_to_rust(offset_and_metadata), + ) + }) + .collect(), + ) + }) + .collect() + } + + pub async fn delete_consumer_group_offsets( + &self, + group_id: String, + topic_partitions: &[TopicPartition], + ) { + let group_id = self.jvm.new_string(&group_id); + + let topic_partitions_java = self.jvm.new_set( + "org.apache.kafka.common.TopicPartition", + topic_partitions + .iter() + .map(|topic_partition| topic_partition_to_java(&self.jvm, topic_partition)) + .collect(), + ); + + self.admin + .call( + "deleteConsumerGroupOffsets", + vec![group_id, topic_partitions_java], + ) + .call_async("all", vec![]) + .await; + } + pub async fn create_acls(&self, acls: Vec) { let resource_type = self .jvm @@ -807,9 +861,25 @@ impl KafkaAdminJava { } } -fn create_topic_partition(jvm: &Jvm, tp: &TopicPartition) -> Value { +fn topic_partition_to_java(jvm: &Jvm, tp: &TopicPartition) -> Value { jvm.construct( "org.apache.kafka.common.TopicPartition", vec![jvm.new_string(&tp.topic_name), jvm.new_int(tp.partition)], ) } + +fn topic_partition_to_rust(tp: Value) -> TopicPartition { + let tp = tp.cast("org.apache.kafka.common.TopicPartition"); + TopicPartition { + topic_name: tp.call("topic", vec![]).into_rust(), + partition: tp.call("partition", vec![]).into_rust(), + } +} + +fn offset_and_metadata_to_rust(offset_and_metadata: Value) -> OffsetAndMetadata { + let offset_and_metadata = + offset_and_metadata.cast("org.apache.kafka.clients.consumer.OffsetAndMetadata"); + OffsetAndMetadata { + offset: offset_and_metadata.call("offset", vec![]).into_rust(), + } +} diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index d4420d518..6d899abd1 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -477,6 +477,36 @@ impl KafkaAdmin { } } + pub async fn list_consumer_group_offsets( + &self, + group_id: String, + ) -> HashMap> { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => { + panic!("rdkafka-rs driver does not support list_consumer_group_offsets") + } + Self::Java(java) => java.list_consumer_group_offsets(group_id).await, + } + } + + pub async fn delete_consumer_group_offsets( + &self, + group_id: String, + topic_partitions: &[TopicPartition], + ) { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => { + panic!("rdkafka-rs driver does not support delete_consumer_group_offsets") + } + Self::Java(java) => { + java.delete_consumer_group_offsets(group_id, topic_partitions) + .await + } + } + } + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { match self { #[cfg(feature = "kafka-cpp-driver-tests")] @@ -662,6 +692,7 @@ impl IsolationLevel { } } +#[derive(PartialEq, Debug)] pub struct OffsetAndMetadata { pub offset: i64, } From 38c079331c0d8e058be074bec52308d89e7b2073 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 11 Nov 2024 14:28:50 +1100 Subject: [PATCH 2/3] Update deps (#1801) --- Cargo.lock | 672 ++++++++++++++++++++++++++++------------ deny.toml | 1 + ec2-cargo/Cargo.toml | 2 +- shotover/Cargo.toml | 8 +- test-helpers/Cargo.toml | 4 +- 5 files changed, 474 insertions(+), 213 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 172927ba6..559506c3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -75,9 +75,9 @@ dependencies = [ [[package]] name = "allocator-api2" -version = "0.2.18" +version = "0.2.19" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c6cb57a04249c6480766f7f7cef5467412af1490f8d1e243141daddada3264f" +checksum = "611cc2ae7d2e242c457e4be7f97036b8ad9ca152b499f53faf99b1ed8fc2553f" [[package]] name = "android-tzdata" @@ -102,9 +102,9 @@ checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" [[package]] name = "anstream" -version = "0.6.15" +version = "0.6.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64e15c1ab1f89faffbf04a634d5e1962e9074f2741eef6d97f3c4e322426d526" +checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b" dependencies = [ "anstyle", "anstyle-parse", @@ -117,43 +117,43 @@ dependencies = [ [[package]] name = "anstyle" -version = "1.0.8" +version = "1.0.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" +checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9" [[package]] name = "anstyle-parse" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb47de1e80c2b463c735db5b217a0ddc39d612e7ac9e2e96a5aed1f57616c1cb" +checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9" dependencies = [ "utf8parse", ] [[package]] name = "anstyle-query" -version = "1.1.1" +version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d36fc52c7f6c869915e99412912f22093507da8d9e942ceaf66fe4b7c14422a" +checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] name = "anstyle-wincon" -version = "3.0.4" +version = "3.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5bf74e1b6e971609db8ca7a9ce79fd5768ab6ae46441c572e46cf596f59e57f8" +checksum = "2109dbce0e72be3ec00bed26e6a7479ca384ad226efdd66db8fa2e3a38c83125" dependencies = [ "anstyle", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] name = "anyhow" -version = "1.0.89" +version = "1.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +checksum = "4c95c10ba0b00a02636238b814946408b1322d5ac4760326e6fb8ec956d85775" [[package]] name = "arc-swap" @@ -163,9 +163,9 @@ checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" [[package]] name = "async-compression" -version = "0.4.16" +version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "103db485efc3e41214fe4fda9f3dbeae2eb9082f48fd236e6095627a9422066e" +checksum = "0cb8f1d480b0ea3783ab015936d2a55c87e219676f0c0b7dec61494043f21857" dependencies = [ "flate2", "futures-core", @@ -182,7 +182,7 @@ checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -217,7 +217,7 @@ checksum = "99e1aca718ea7b89985790c94aad72d77533063fe00bc497bb79a7c2dae6a661" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -228,9 +228,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" [[package]] name = "aws-config" -version = "1.5.8" +version = "1.5.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7198e6f03240fdceba36656d8be440297b6b82270325908c7381f37d826a74f6" +checksum = "9b49afaa341e8dd8577e1a2200468f98956d6eda50bcf4a53246cc00174ba924" dependencies = [ "aws-credential-types", "aws-runtime", @@ -295,9 +295,9 @@ dependencies = [ [[package]] name = "aws-sdk-kms" -version = "1.47.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "564a597a3c71a957d60a2e4c62c93d78ee5a0d636531e15b760acad983a5c18e" +checksum = "bfd059dacda4dfd5b57f2bd453fc6555f9acb496cb77508d517da24cf5d73167" dependencies = [ "aws-credential-types", "aws-runtime", @@ -317,9 +317,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.46.0" +version = "1.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0dc2faec3205d496c7e57eff685dd944203df7ce16a4116d0281c44021788a7b" +checksum = "09677244a9da92172c8dc60109b4a9658597d4d298b188dd0018b6a66b410ca4" dependencies = [ "aws-credential-types", "aws-runtime", @@ -339,9 +339,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.47.0" +version = "1.50.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c93c241f52bc5e0476e259c953234dab7e2a35ee207ee202e86c0095ec4951dc" +checksum = "81fea2f3a8bb3bd10932ae7ad59cc59f65f270fc9183a7e91f501dc5efbef7ee" dependencies = [ "aws-credential-types", "aws-runtime", @@ -361,9 +361,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.46.0" +version = "1.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b259429be94a3459fa1b00c5684faee118d74f9577cc50aebadc36e507c63b5f" +checksum = "53dcf5e7d9bd1517b8b998e170e650047cea8a2b85fe1835abe3210713e541b7" dependencies = [ "aws-credential-types", "aws-runtime", @@ -384,9 +384,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.2.4" +version = "1.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc8db6904450bafe7473c6ca9123f88cc11089e41a025408f992db4e22d3be68" +checksum = "5619742a0d8f253be760bfbb8e8e8368c69e3587e4637af5754e488a611499b1" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -457,9 +457,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.7.2" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a065c0fe6fdbdf9f11817eb68582b2ab4aff9e9c39e986ae48f7ec576c6322db" +checksum = "be28bd063fa91fd871d131fc8b68d7cd4c5fa0869bea68daca50dcb1cbd76be2" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -484,9 +484,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.7.2" +version = "1.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e086682a53d3aa241192aa110fa8dfce98f2f5ac2ead0de84d41582c7e8fdb96" +checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -501,9 +501,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.2.7" +version = "1.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "147100a7bea70fa20ef224a6bad700358305f5dc0f84649c53769761395b355b" +checksum = "4fbd94a32b3a7d55d3806fe27d98d3ad393050439dd05eb53ece36ec5e3d3510" dependencies = [ "base64-simd", "bytes", @@ -692,9 +692,9 @@ dependencies = [ [[package]] name = "bigdecimal" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d712318a27c7150326677b321a5fa91b55f6d9034ffd67f20319e147d40cee" +checksum = "8f850665a0385e070b64c38d2354e6c104c8479c59868d1e48a0c13ee2c7a1c1" dependencies = [ "autocfg", "libm", @@ -770,7 +770,7 @@ checksum = "bcfcc3cd946cb52f0bbfdbbcfa2f4e24f75ebb6c0e1002f7c25904fada18b9ec" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -781,9 +781,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.2" +version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" +checksum = "9ac0150caa2ae65ca5bd83f25c7de183dea78d4d366469f148435e2acfbad0da" dependencies = [ "serde", ] @@ -825,7 +825,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -928,9 +928,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.30" +version = "1.1.37" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" +checksum = "40545c26d092346d8a8dab71ee48e7685a7a9cba76e634790c215b41a4a7b4cf" dependencies = [ "jobserver", "libc", @@ -974,6 +974,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "cfg_aliases" version = "0.2.1" @@ -1088,7 +1094,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1141,9 +1147,9 @@ dependencies = [ [[package]] name = "colorchoice" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3fd119d74b830634cea2a0f58bbd0d54540518a14397557951e79340abc28c0" +checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" [[package]] name = "colored" @@ -1402,9 +1408,9 @@ dependencies = [ [[package]] name = "csv" -version = "1.3.0" +version = "1.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +checksum = "acdc4883a9c96732e4733212c01447ebd805833b7275a73ca3ee080fd77afdaf" dependencies = [ "csv-core", "itoa", @@ -1454,7 +1460,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1494,7 +1500,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1505,7 +1511,7 @@ checksum = "d336a2a514f6ccccaa3e09b02d41d35330c07ddf03a62165fcec10bb561c7806" dependencies = [ "darling_core", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1583,7 +1589,7 @@ dependencies = [ "proc-macro2", "quote", "rustc_version", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1603,7 +1609,7 @@ checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", "unicode-xid", ] @@ -1634,6 +1640,17 @@ dependencies = [ "subtle", ] +[[package]] +name = "displaydoc" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "docker-compose-runner" version = "0.3.1" @@ -1747,9 +1764,9 @@ checksum = "a357d28ed41a50f9c765dbfe56cbc04a64e53e5fc58ba79fbc34c10ef3df831f" [[package]] name = "encoding_rs" -version = "0.8.34" +version = "0.8.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b45de904aa0b010bce2ab45264d0631681847fa7b6f2eaa7dab7619943bc4f59" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" dependencies = [ "cfg-if", ] @@ -1804,9 +1821,9 @@ checksum = "a5d9305ccc6942a704f4335694ecd3de2ea531b114ac2d51f5f843750787a92f" [[package]] name = "fastrand" -version = "2.1.1" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8c02a5121d4ea3eb16a80748c74f5549a5665e4c21333c6098f283870fbdea6" +checksum = "486f806e73c5707928240ddc295403b1b93c96a02038563881c4a2fd84b81ac4" [[package]] name = "fd-lock" @@ -1866,6 +1883,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" + [[package]] name = "foreign-types" version = "0.3.2" @@ -1892,9 +1915,9 @@ dependencies = [ [[package]] name = "fred" -version = "9.2.1" +version = "9.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1caa4f052f1ee1b987c2089e0524aeb9a0e33cd7be0a184667aca2e4cbf17579" +checksum = "3cdd5378252ea124b712e0ac55147d26ae3af575883b34b8423091a4c719606b" dependencies = [ "arc-swap", "async-trait", @@ -1908,7 +1931,7 @@ dependencies = [ "parking_lot", "rand", "redis-protocol", - "rustls 0.23.15", + "rustls 0.23.16", "rustls-native-certs 0.7.3", "semver", "socket2 0.5.7", @@ -1928,7 +1951,7 @@ checksum = "1458c6e22d36d61507034d5afecc64f105c1d39712b7ac6ec3b352c423f715cc" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -1993,7 +2016,7 @@ checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -2088,13 +2111,14 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "governor" -version = "0.6.3" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" +checksum = "0746aa765db78b521451ef74221663b57ba595bf83f75d0ce23cc09447c8139f" dependencies = [ "cfg-if", - "futures", + "futures-sink", "futures-timer", + "futures-util", "no-std-compat", "nonzero_ext", "parking_lot", @@ -2182,9 +2206,12 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.0" +version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +checksum = "3a9bfc1af68b1726ea47d3d5109de126281def866b33970e10fbab11b5dafab3" +dependencies = [ + "foldhash", +] [[package]] name = "heck" @@ -2388,7 +2415,7 @@ dependencies = [ "http 1.1.0", "hyper 1.5.0", "hyper-util", - "rustls 0.23.15", + "rustls 0.23.16", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -2414,9 +2441,9 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.9" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +checksum = "df2dcfbe0677734ab2f3ffa7fa7bfd4706bfdc1ef393f2ee30184aed67e631b4" dependencies = [ "bytes", "futures-channel", @@ -2454,6 +2481,124 @@ dependencies = [ "cc", ] +[[package]] +name = "icu_collections" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db2fa452206ebee18c4b5c2274dbf1de17008e874b4dc4f0aea9d01ca79e4526" +dependencies = [ + "displaydoc", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_locid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13acbb8371917fc971be86fc8057c41a64b521c184808a698c02acc242dbf637" +dependencies = [ + "displaydoc", + "litemap", + "tinystr", + "writeable", + "zerovec", +] + +[[package]] +name = "icu_locid_transform" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01d11ac35de8e40fdeda00d9e1e9d92525f3f9d887cdd7aa81d727596788b54e" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_locid_transform_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_locid_transform_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdc8ff3388f852bede6b579ad4e978ab004f139284d7b28715f773507b946f6e" + +[[package]] +name = "icu_normalizer" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19ce3e0da2ec68599d193c93d088142efd7f9c5d6fc9b803774855747dc6a84f" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_normalizer_data", + "icu_properties", + "icu_provider", + "smallvec", + "utf16_iter", + "utf8_iter", + "write16", + "zerovec", +] + +[[package]] +name = "icu_normalizer_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8cafbf7aa791e9b22bec55a167906f9e1215fd475cd22adfcf660e03e989516" + +[[package]] +name = "icu_properties" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93d6020766cfc6302c15dbbc9c8778c37e62c14427cb7f6e601d849e092aeef5" +dependencies = [ + "displaydoc", + "icu_collections", + "icu_locid_transform", + "icu_properties_data", + "icu_provider", + "tinystr", + "zerovec", +] + +[[package]] +name = "icu_properties_data" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "67a8effbc3dd3e4ba1afa8ad918d5684b8868b3b26500753effea8d2eed19569" + +[[package]] +name = "icu_provider" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ed421c8a8ef78d3e2dbc98a973be2f3770cb42b606e3ab18d6237c4dfde68d9" +dependencies = [ + "displaydoc", + "icu_locid", + "icu_provider_macros", + "stable_deref_trait", + "tinystr", + "writeable", + "yoke", + "zerofrom", + "zerovec", +] + +[[package]] +name = "icu_provider_macros" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ec89e9337638ecdc08744df490b221a7399bf8d164eb52a665454e60e075ad6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "ident_case" version = "1.0.1" @@ -2462,12 +2607,23 @@ checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" [[package]] name = "idna" -version = "0.5.0" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "634d9b1461af396cad843f47fdba5597a4f9e6ddd4bfb6ff5d85028c25cb12f6" +checksum = "686f825264d630750a544639377bae737628043f20d38bbc029e8f29ea968a7e" dependencies = [ - "unicode-bidi", - "unicode-normalization", + "idna_adapter", + "smallvec", + "utf8_iter", +] + +[[package]] +name = "idna_adapter" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daca1df1c957320b2cf139ac61e7bd64fed304c5040df000a745aa1de3b4ef71" +dependencies = [ + "icu_normalizer", + "icu_properties", ] [[package]] @@ -2488,7 +2644,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", - "hashbrown 0.15.0", + "hashbrown 0.15.1", "serde", ] @@ -2572,15 +2728,14 @@ checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" [[package]] name = "j4rs" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "689ae4f2bd4eba82601592f3d22b7e7147b1df52d3b525223f5218990501b4eb" +checksum = "0690025c2b04415d9657cbe1e8588d27ce213a30f6411a722a6c25d0a1b89238" dependencies = [ "cesu8", "dunce", "fs_extra", "futures", - "glob", "java-locator", "jni-sys", "lazy_static", @@ -2593,12 +2748,11 @@ dependencies = [ [[package]] name = "java-locator" -version = "0.1.7" +version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2abecabd9961c5e01405a6426687fcf1bd94a269927137e4c3cc1a7419b93fd" +checksum = "6f25f28894af6a5dd349ed5ec46e178654e75f62edb6717ac74007102a57deb5" dependencies = [ "glob", - "lazy_static", ] [[package]] @@ -2664,9 +2818,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.160" +version = "0.2.162" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0b21006cd1874ae9e650973c565615676dc4a274c965bb0a73796dac838ce4f" +checksum = "18d287de67fe55fd7e1581fe933d965a5a9477b38e949cfa9f8574ef01506398" [[package]] name = "libloading" @@ -2680,9 +2834,9 @@ dependencies = [ [[package]] name = "libm" -version = "0.2.8" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" +checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" [[package]] name = "libz-sys" @@ -2702,6 +2856,12 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "litemap" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "643cb0b8d4fcc284004d5fd0d67ccf61dfffadb7f75e1e71bc420f4688a3a704" + [[package]] name = "lock_api" version = "0.4.12" @@ -2756,9 +2916,9 @@ checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" [[package]] name = "metrics" -version = "0.23.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "884adb57038347dfbaf2d5065887b6cf4312330dc8e94bc30a1a839bd79d3261" +checksum = "8ae428771d17306715c5091d446327d1cfdedc82185c65ba8423ab404e45bf10" dependencies = [ "ahash", "portable-atomic", @@ -2766,9 +2926,9 @@ dependencies = [ [[package]] name = "metrics-exporter-prometheus" -version = "0.15.3" +version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4f0c8427b39666bf970460908b213ec09b3b350f20c0c2eabcbba51704a08e6" +checksum = "85b6f8152da6d7892ff1b7a1c0fa3f435e92b5918ad67035c3bb432111d9a29b" dependencies = [ "base64 0.22.1", "indexmap 2.6.0", @@ -2780,15 +2940,14 @@ dependencies = [ [[package]] name = "metrics-util" -version = "0.17.0" +version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4259040465c955f9f2f1a4a8a16dc46726169bca0f88e8fb2dbeced487c3e828" +checksum = "15b482df36c13dd1869d73d14d28cd4855fbd6cfc32294bee109908a9f4a4ed7" dependencies = [ "crossbeam-epoch", "crossbeam-utils", - "hashbrown 0.14.5", + "hashbrown 0.15.1", "metrics", - "num_cpus", "quanta", "sketches-ddsketch", ] @@ -2854,12 +3013,13 @@ dependencies = [ [[package]] name = "nix" -version = "0.27.1" +version = "0.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2eb04e9c688eff1c89d72b407f168cf79bb9e867a9d3323ed6c01519eb9cc053" +checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" dependencies = [ "bitflags", "cfg-if", + "cfg_aliases 0.1.1", "libc", ] @@ -2871,7 +3031,7 @@ checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" dependencies = [ "bitflags", "cfg-if", - "cfg_aliases", + "cfg_aliases 0.2.1", "libc", ] @@ -2981,16 +3141,6 @@ dependencies = [ "libm", ] -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi 0.3.9", - "libc", -] - [[package]] name = "num_enum" version = "0.5.11" @@ -3082,7 +3232,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -3093,9 +3243,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "openssl-src" -version = "300.3.2+3.3.2" +version = "300.4.0+3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a211a18d945ef7e648cc6e0058f4c548ee46aab922ea203e0d30e966ea23647b" +checksum = "a709e02f2b4aca747929cca5ed248880847c650233cf8b8cdc48f40aaf4898a6" dependencies = [ "cc", ] @@ -3115,9 +3265,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "4.4.0" +version = "4.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83e7ccb95e240b7c9506a3d544f10d935e142cc90b0a1d56954fb44d89ad6b97" +checksum = "c65ee1f9701bf938026630b455d5315f490640234259037edb259798b3bcf85e" dependencies = [ "num-traits", "rand", @@ -3263,9 +3413,9 @@ checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] name = "pin-project-lite" -version = "0.2.14" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" +checksum = "915a1e146535de9163f3987b8944ed8cf49a18bb0056bcebcdcece385cece4ff" [[package]] name = "pin-utils" @@ -3402,7 +3552,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3ae130e2f271fbc2ac3a40fb1d07180839cdbbe443c7a27e1e3c13c5cac0116d" dependencies = [ "diff", - "yansi 1.0.1", + "yansi", ] [[package]] @@ -3435,9 +3585,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.88" +version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c3a7fc5db1e57d5a779a352c8cdb57b29aa4c40cc69c3a68a7fedc815fbf2f9" +checksum = "f139b0662de085916d1fb67d2b4169d1addddda1919e696f3252b740b629986e" dependencies = [ "unicode-ident", ] @@ -3480,7 +3630,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.15", + "rustls 0.23.16", "socket2 0.5.7", "thiserror", "tokio", @@ -3497,7 +3647,7 @@ dependencies = [ "rand", "ring", "rustc-hash", - "rustls 0.23.15", + "rustls 0.23.16", "slab", "thiserror", "tinyvec", @@ -3506,10 +3656,11 @@ dependencies = [ [[package]] name = "quinn-udp" -version = "0.5.5" +version = "0.5.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fe68c2e9e1a1234e218683dbdf9f9dfcb094113c5ac2b938dfcb9bab4c4140b" +checksum = "7d5a626c6807713b15cac82a6acaccd6043c9a5408c24baae07611fec3f243da" dependencies = [ + "cfg_aliases 0.2.1", "libc", "once_cell", "socket2 0.5.7", @@ -3698,9 +3849,9 @@ dependencies = [ [[package]] name = "regex" -version = "1.11.0" +version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" +checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", @@ -3754,9 +3905,9 @@ checksum = "ba39f3699c378cd8970968dcbff9c43159ea4cfbd88d43c00b22f2ef10a435d2" [[package]] name = "reqwest" -version = "0.12.8" +version = "0.12.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f713147fbe92361e52392c73b8c9e48c04c6625bce969ef54dc901e58e042a7b" +checksum = "a77c62af46e79de0a562e1a9849205ffcb7fc1238876e9bd743357570e04046f" dependencies = [ "async-compression", "base64 0.22.1", @@ -3781,7 +3932,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.15", + "rustls 0.23.16", "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", @@ -3874,7 +4025,7 @@ dependencies = [ "regex", "relative-path", "rustc_version", - "syn 2.0.79", + "syn 2.0.87", "unicode-ident", ] @@ -3886,7 +4037,7 @@ checksum = "b3a8fb4672e840a587a66fc577a5491375df51ddb88f2a2c2a792598c326fe14" dependencies = [ "quote", "rand", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4016,9 +4167,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.37" +version = "0.38.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8acb788b847c24f28525660c4d7758620a7210875711f79e7f663cc152726811" +checksum = "99e4ea3e1cdc4b559b8e5650f9c8e5998e3e5c1343b4eaf034565f32318d63c0" dependencies = [ "bitflags", "errno", @@ -4041,9 +4192,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.15" +version = "0.23.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fbb44d7acc4e873d613422379f69f237a1b141928c02f6bc6ccfddddc2d7993" +checksum = "eee87ff5d9b36712a58574e12e9f0ea80f915a5b0ac518d322b24a465617925e" dependencies = [ "log", "once_cell", @@ -4079,6 +4230,19 @@ dependencies = [ "security-framework", ] +[[package]] +name = "rustls-native-certs" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" +dependencies = [ + "openssl-probe", + "rustls-pemfile 2.2.0", + "rustls-pki-types", + "schannel", + "security-framework", +] + [[package]] name = "rustls-pemfile" version = "1.0.4" @@ -4132,9 +4296,9 @@ checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" [[package]] name = "rustyline" -version = "13.0.0" +version = "14.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02a2d683a4ac90aeef5b1013933f6d977bd37d51ff3f4dad829d4931a7e6be86" +checksum = "7803e8936da37efd9b6d4478277f4b2b9bb5cdb37a113e8d63222e58da647e63" dependencies = [ "bitflags", "cfg-if", @@ -4144,12 +4308,12 @@ dependencies = [ "libc", "log", "memchr", - "nix 0.27.1", + "nix 0.28.0", "radix_trie", "unicode-segmentation", "unicode-width", "utf8parse", - "winapi", + "windows-sys 0.52.0", ] [[package]] @@ -4285,7 +4449,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4317,9 +4481,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.12.0" +version = "2.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea4a292869320c0272d7bc55a5a6aafaff59b4f63404a003887b679a2e05b4b6" +checksum = "fa39c7303dc58b5543c94d22c1766b0d31f2ee58306363ea622b10bbc075eaa2" dependencies = [ "core-foundation-sys", "libc", @@ -4336,29 +4500,29 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.210" +version = "1.0.214" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" +checksum = "f55c3193aca71c12ad7890f1785d2b73e1b9f63a0bbc353c08ef26fe03fc56b5" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.210" +version = "1.0.214" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" +checksum = "de523f781f095e28fa605cdce0f8307e451cc0fd14e2eb4cd2e98a355b147766" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] name = "serde_json" -version = "1.0.128" +version = "1.0.132" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" +checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" dependencies = [ "itoa", "memchr", @@ -4405,7 +4569,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4466,16 +4630,15 @@ checksum = "ae4c63bdcc11eea49b562941b914d5ac30d42cad982e3f6e846a513ee6a3ce7e" [[package]] name = "shellfish" -version = "0.9.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fef42fa3356b28a29aee098d1656681670da7eb15be0e89bba6f05aea540df3d" +checksum = "82cf74efa005270b11ef497491d8b1c38c748eebe62fb2ad76819d71f1cd9c4e" dependencies = [ "async-trait", "cfg-if", "rustyline", "thiserror", - "version_check", - "yansi 0.5.1", + "yansi", ] [[package]] @@ -4532,7 +4695,7 @@ dependencies = [ "pretty_assertions", "rand", "redis-protocol", - "rustls 0.23.15", + "rustls 0.23.16", "rustls-pemfile 2.2.0", "rustls-pki-types", "sasl", @@ -4620,9 +4783,9 @@ dependencies = [ [[package]] name = "sketches-ddsketch" -version = "0.2.2" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" +checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" [[package]] name = "slab" @@ -4741,6 +4904,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "stable_deref_trait" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" + [[package]] name = "static_assertions" version = "1.1.0" @@ -4772,7 +4941,7 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4804,9 +4973,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.79" +version = "2.0.87" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "25aa4ce346d03a6dcd68dd8b4010bcb74e54e62c90c573f394c46eae99aba32d" dependencies = [ "proc-macro2", "quote", @@ -4828,6 +4997,17 @@ dependencies = [ "futures-core", ] +[[package]] +name = "synstructure" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8af7666ab7b6390ab78131fb5b0fce11d6b7a6951602017c35fa82800708971" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "system-configuration" version = "0.6.1" @@ -4851,9 +5031,9 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f0f2c9fc62d0beef6951ccffd757e241266a2c833136efbe35af6cd2567dca5b" +checksum = "28cce251fcbc87fac86a866eeb0d6c2d536fc16d06f184bb61aeae11aa4cee0c" dependencies = [ "cfg-if", "fastrand", @@ -4883,7 +5063,7 @@ dependencies = [ "rdkafka", "redis", "reqwest", - "rustls 0.23.15", + "rustls 0.23.16", "rustls-pemfile 2.2.0", "rustls-pki-types", "scylla", @@ -4901,22 +5081,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.64" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.64" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -4960,6 +5140,16 @@ dependencies = [ "time-core", ] +[[package]] +name = "tinystr" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9117f5d4db391c1cf6927e7bea3db74b9a1c1add8f7eda9ffd5364f40f57b82f" +dependencies = [ + "displaydoc", + "zerovec", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -4987,9 +5177,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.40.0" +version = "1.41.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +checksum = "22cfb5bee7a6a52939ca9224d6ac897bb669134078daa8735560897f69de4d33" dependencies = [ "backtrace", "bytes", @@ -5041,7 +5231,7 @@ checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -5081,7 +5271,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.15", + "rustls 0.23.16", "rustls-pki-types", "tokio", ] @@ -5099,14 +5289,14 @@ dependencies = [ [[package]] name = "tokio-tungstenite" -version = "0.23.1" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" dependencies = [ "futures-util", "log", - "rustls 0.23.15", - "rustls-native-certs 0.7.3", + "rustls 0.23.16", + "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -5212,7 +5402,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] [[package]] @@ -5296,9 +5486,9 @@ checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" [[package]] name = "tungstenite" -version = "0.23.0" +version = "0.24.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e2e2ce1e47ed2994fd43b04c8f618008d4cabdd5ee34027cf14f9d918edd9c8" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" dependencies = [ "byteorder", "bytes", @@ -5307,7 +5497,7 @@ dependencies = [ "httparse", "log", "rand", - "rustls 0.23.15", + "rustls 0.23.16", "rustls-pki-types", "sha1", "thiserror", @@ -5357,30 +5547,15 @@ checksum = "70b20a22c42c8f1cd23ce5e34f165d4d37038f5b663ad20fb6adbdf029172483" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", ] -[[package]] -name = "unicode-bidi" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ab17db44d7388991a428b2ee655ce0c212e862eff1768a455c58f9aad6e7893" - [[package]] name = "unicode-ident" version = "1.0.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" -[[package]] -name = "unicode-normalization" -version = "0.1.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5033c97c4262335cded6d6fc3e5c18ab755e1a3dc96376350f3d8e9f009ad956" -dependencies = [ - "tinyvec", -] - [[package]] name = "unicode-segmentation" version = "1.12.0" @@ -5423,9 +5598,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.2" +version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22784dbdf76fdde8af1aeda5622b546b422b6fc585325248a2bf9f5e41e94d6c" +checksum = "8d157f1b96d14500ffdc1f10ba712e780825526c03d9a49b4d0324b0d9113ada" dependencies = [ "form_urlencoded", "idna", @@ -5444,6 +5619,18 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "utf16_iter" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" + +[[package]] +name = "utf8_iter" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" + [[package]] name = "utf8parse" version = "0.2.2" @@ -5543,7 +5730,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", "wasm-bindgen-shared", ] @@ -5577,7 +5764,7 @@ checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -5879,6 +6066,18 @@ dependencies = [ "memchr", ] +[[package]] +name = "write16" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1890f4022759daae28ed4fe62859b1236caebfc61ede2f63ed4e695f3f6d936" + +[[package]] +name = "writeable" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9df38ee2d2c3c5948ea468a8406ff0db0b29ae1ffde1bcf20ef305bcc95c51" + [[package]] name = "xmlparser" version = "0.13.6" @@ -5891,12 +6090,6 @@ version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" -[[package]] -name = "yansi" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09041cd90cf85f7f8b2df60c646f853b7f535ce68f85244eb6731cf89fa498ec" - [[package]] name = "yansi" version = "1.0.1" @@ -5912,6 +6105,30 @@ dependencies = [ "time", ] +[[package]] +name = "yoke" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c5b1314b079b0930c31e3af543d8ee1757b1951ae1e1565ec704403a7240ca5" +dependencies = [ + "serde", + "stable_deref_trait", + "yoke-derive", + "zerofrom", +] + +[[package]] +name = "yoke-derive" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28cc31741b18cb6f1d5ff12f5b7523e3d6eb0852bbbad19d73905511d9849b95" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", + "synstructure", +] + [[package]] name = "zerocopy" version = "0.7.35" @@ -5930,7 +6147,28 @@ checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" dependencies = [ "proc-macro2", "quote", - "syn 2.0.79", + "syn 2.0.87", +] + +[[package]] +name = "zerofrom" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91ec111ce797d0e0784a1116d0ddcdbea84322cd79e5d5ad173daeba4f93ab55" +dependencies = [ + "zerofrom-derive", +] + +[[package]] +name = "zerofrom-derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea7b4a3637ea8669cedf0f1fd5c286a17f3de97b8dd5a70a6c167a1730e63a5" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", + "synstructure", ] [[package]] @@ -5938,3 +6176,25 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" + +[[package]] +name = "zerovec" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa2b893d79df23bfb12d5461018d408ea19dfafe76c2c7ef6d4eba614f8ff079" +dependencies = [ + "yoke", + "zerofrom", + "zerovec-derive", +] + +[[package]] +name = "zerovec-derive" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6eafa6dfb17584ea3e2bd6e76e0cc15ad7af12b09abdd1ca55961bed9b1063c6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] diff --git a/deny.toml b/deny.toml index 51a62c881..767daf822 100644 --- a/deny.toml +++ b/deny.toml @@ -72,6 +72,7 @@ unlicensed = "deny" allow = [ "MIT", "ISC", + "Unicode-3.0", "Apache-2.0", "BSD-3-Clause", "BSD-2-Clause", diff --git a/ec2-cargo/Cargo.toml b/ec2-cargo/Cargo.toml index 6a8af53e3..a4bfa895b 100644 --- a/ec2-cargo/Cargo.toml +++ b/ec2-cargo/Cargo.toml @@ -13,6 +13,6 @@ clap.workspace = true tracing-subscriber.workspace = true aws-throwaway.workspace = true tracing-appender.workspace = true -shellfish = { version = "0.9.0", features = ["async"] } +shellfish = { version = "0.10.0", features = ["async"] } cargo_metadata = "0.18.0" shell-quote.workspace = true diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index d7d0170c1..d13e7a7b3 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -53,7 +53,7 @@ pretty-hex = "0.4.0" tokio-stream = "0.1.2" derivative = "2.1.1" cached = { version = "0.53", features = ["async"], optional = true } -governor = { version = "0.6", default-features = false, features = ["std", "jitter", "quanta"] } +governor = { version = "0.7", default-features = false, features = ["std", "jitter", "quanta"] } nonzero_ext = "0.3.0" version-compare = { version = "0.2", optional = true } rand = { features = ["small_rng"], workspace = true } @@ -76,7 +76,7 @@ csv = { workspace = true, optional = true } hex = { workspace = true, optional = true } async-trait.workspace = true typetag.workspace = true -tokio-tungstenite = "0.23.0" +tokio-tungstenite = "0.24.0" # Error handling thiserror = "1.0" @@ -98,8 +98,8 @@ httparse = { version = "1.8.0", optional = true } http = { version = "1.0.0", optional = true } #Observability -metrics = "0.23.0" -metrics-exporter-prometheus = { version = "0.15.0", default-features = false } +metrics = "0.24.0" +metrics-exporter-prometheus = { version = "0.16.0", default-features = false } tracing.workspace = true tracing-subscriber.workspace = true tracing-appender.workspace = true diff --git a/test-helpers/Cargo.toml b/test-helpers/Cargo.toml index 706c5c06f..c0cb615a5 100644 --- a/test-helpers/Cargo.toml +++ b/test-helpers/Cargo.toml @@ -33,12 +33,12 @@ anyhow.workspace = true rcgen.workspace = true rdkafka = { version = "0.36", features = ["cmake-build"], optional = true } docker-compose-runner = "0.3.0" -j4rs = "0.20.0" +j4rs = "0.21.0" futures-util = "0.3.28" http = "1.1.0" rustls = { version = "0.23.2", default-features = false } rustls-pki-types = "1.0.1" rustls-pemfile = "2.0.0" -tokio-tungstenite = { version = "0.23", features = ["rustls-tls-native-roots"] } +tokio-tungstenite = { version = "0.24", features = ["rustls-tls-native-roots"] } pretty_assertions.workspace = true serde.workspace = true From 157f199466fbc6f778714bf4568f9939b8dc9afb Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 11 Nov 2024 17:42:15 +1100 Subject: [PATCH 3/3] KafkaSinkCluster: Fix DeleteRecords routing (#1803) --- .../tests/kafka_int_tests/test_cases.rs | 66 ++++++++- .../src/transforms/kafka/sink_cluster/mod.rs | 129 ++++++++++++++++-- .../transforms/kafka/sink_cluster/split.rs | 37 ++++- test-helpers/src/connection/kafka/mod.rs | 2 +- 4 files changed, 216 insertions(+), 18 deletions(-) diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index 7a6b0576f..c8c32a6bf 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -134,7 +134,8 @@ async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; admin.delete_groups(&["some_group", "some_group1"]).await; - delete_records(&admin, connection_builder).await; + delete_records_partitions1(&admin, connection_builder).await; + delete_records_partitions3(&admin, connection_builder).await; } async fn delete_offsets(connection_builder: &KafkaConnectionBuilder) { @@ -181,7 +182,10 @@ async fn delete_offsets(connection_builder: &KafkaConnectionBuilder) { } } -async fn delete_records(admin: &KafkaAdmin, connection_builder: &KafkaConnectionBuilder) { +async fn delete_records_partitions1( + admin: &KafkaAdmin, + connection_builder: &KafkaConnectionBuilder, +) { // Only supported by java driver #[allow(irrefutable_let_patterns)] if let KafkaConnectionBuilder::Java(_) = connection_builder { @@ -225,6 +229,64 @@ async fn delete_records(admin: &KafkaAdmin, connection_builder: &KafkaConnection } } +async fn delete_records_partitions3( + admin: &KafkaAdmin, + connection_builder: &KafkaConnectionBuilder, +) { + // Only supported by java driver + #[allow(irrefutable_let_patterns)] + if let KafkaConnectionBuilder::Java(_) = connection_builder { + // assert partition contains a record + let mut consumer = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topics(vec!["partitions3_case1".to_owned()]) + .with_group("test_delete_records"), + ) + .await; + + // assert that a record exists, due to cross partition ordering we dont know what the record is, just that it exists. + consumer.consume(Duration::from_secs(30)).await; + + // delete all records in the partition + admin + .delete_records(&[ + RecordsToDelete { + topic_partition: TopicPartition { + topic_name: "partitions3_case1".to_owned(), + partition: 0, + }, + delete_before_offset: -1, + }, + RecordsToDelete { + topic_partition: TopicPartition { + topic_name: "partitions3_case1".to_owned(), + partition: 1, + }, + delete_before_offset: -1, + }, + RecordsToDelete { + topic_partition: TopicPartition { + topic_name: "partitions3_case1".to_owned(), + partition: 2, + }, + delete_before_offset: -1, + }, + ]) + .await; + + // assert partition no longer contains a record + let mut consumer = connection_builder + .connect_consumer( + ConsumerConfig::consume_from_topics(vec!["partitions3_case1".to_owned()]) + .with_group("test_delete_records2"), + ) + .await; + consumer + .assert_no_consume_within_timeout(Duration::from_secs(2)) + .await; + } +} + /// Attempt to make the driver batch produce requests for different topics into the same request /// This is important to test since shotover has complex logic for splitting these batch requests into individual requests. pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnectionBuilder) { diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 21d729b83..fe9c440e5 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -14,6 +14,8 @@ use connections::{Connections, Destination}; use dashmap::DashMap; use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState}; use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction; +use kafka_protocol::messages::delete_records_request::DeleteRecordsTopic; +use kafka_protocol::messages::delete_records_response::DeleteRecordsTopicResult; use kafka_protocol::messages::fetch_request::FetchTopic; use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch; use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic; @@ -27,14 +29,14 @@ use kafka_protocol::messages::produce_response::{ }; use kafka_protocol::messages::{ AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, - BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest, - FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, - InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse, - ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest, - MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest, - OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, - SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, - TopicName, TransactionalId, TxnOffsetCommitRequest, + BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, + DeleteRecordsResponse, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, + FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, + LeaveGroupRequest, ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, + ListTransactionsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, + OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, + ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, + SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -49,8 +51,9 @@ use scram_over_mtls::{ use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ - AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter, - ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter, + AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, + DeleteRecordsRequestSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter, + ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; @@ -885,6 +888,12 @@ impl KafkaSinkCluster { })) => self.split_and_route_request::( request, )?, + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DeleteRecords(_), + .. + })) => { + self.split_and_route_request::(request)? + } // route to group coordinator Some(Frame::Kafka(KafkaFrame::Request { @@ -1055,7 +1064,6 @@ The connection to the client has been closed." | RequestBody::AlterConfigs(_) | RequestBody::CreatePartitions(_) | RequestBody::DeleteTopics(_) - | RequestBody::DeleteRecords(_) | RequestBody::CreateAcls(_) | RequestBody::ApiVersions(_), .. @@ -1587,6 +1595,58 @@ The connection to the client has been closed." result } + /// This method removes all topics from the DeleteRecords request and returns them split up by their destination. + /// If any topics are unroutable they will have their BrokerId set to -1 + fn split_delete_records_request_by_destination( + &mut self, + body: &mut DeleteRecordsRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for mut topic in body.topics.drain(..) { + let topic_name = &topic.name; + if let Some(topic_meta) = self.topic_by_name.get(topic_name) { + for partition in std::mem::take(&mut topic.partitions) { + let partition_index = partition.partition_index as usize; + let destination = if let Some(partition) = + topic_meta.partitions.get(partition_index) + { + if partition.leader_id == -1 { + tracing::warn!( + "leader_id is unknown for {topic_name:?} at partition index {partition_index}", + ); + } + partition.leader_id + } else { + let partition_len = topic_meta.partitions.len(); + tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + BrokerId(-1) + }; + tracing::debug!( + "Routing DeleteRecords request portion of partition {partition_index} in {topic_name:?} to broker {}", + destination.0 + ); + let dest_topics = result.entry(destination).or_default(); + if let Some(dest_topic) = dest_topics.iter_mut().find(|x| x.name == topic.name) + { + dest_topic.partitions.push(partition); + } else { + let mut topic = topic.clone(); + topic.partitions.push(partition); + dest_topics.push(topic); + } + } + } else { + tracing::warn!("no known partition replica for {topic_name:?}, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + let destination = BrokerId(-1); + let dest_topics = result.entry(destination).or_default(); + dest_topics.push(topic); + } + } + + result + } + /// This method removes all transactions from the AddPartitionsToTxn request and returns them split up by their destination /// If any topics are unroutable they will have their BrokerId set to -1 fn split_add_partition_to_txn_request_by_destination( @@ -2077,6 +2137,10 @@ The connection to the client has been closed." body: ResponseBody::DeleteGroups(base), .. })) => Self::combine_delete_groups_responses(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DeleteRecords(base), + .. + })) => Self::combine_delete_records(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::OffsetFetch(base), .. @@ -2271,6 +2335,49 @@ The connection to the client has been closed." Ok(()) } + fn combine_delete_records( + base_body: &mut DeleteRecordsResponse, + drain: impl Iterator, + ) -> Result<()> { + let mut base_topics: HashMap = + std::mem::take(&mut base_body.topics) + .into_iter() + .map(|response| (response.name.clone(), response)) + .collect(); + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::DeleteRecords(next_body), + .. + })) = next.frame() + { + for next_response in std::mem::take(&mut next_body.topics) { + if let Some(base_response) = base_topics.get_mut(&next_response.name) { + for next_partition in &next_response.partitions { + for base_partition in &base_response.partitions { + if next_partition.partition_index == base_partition.partition_index + { + tracing::warn!("Duplicate partition indexes in combined DeleteRecords response, if this ever occurs we should investigate the repercussions") + } + } + } + // A partition can only be contained in one response so there is no risk of duplicating partitions + base_response.partitions.extend(next_response.partitions) + } else { + base_topics.insert(next_response.name.clone(), next_response); + } + } + } else { + return Err(anyhow!( + "Combining DeleteRecords responses but received another message type" + )); + } + } + + base_body.topics.extend(base_topics.into_values()); + + Ok(()) + } + fn combine_delete_groups_responses( base_delete_groups: &mut DeleteGroupsResponse, drain: impl Iterator, diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index 7ada3283b..7b34bc0c3 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -8,11 +8,12 @@ use crate::{ }; use kafka_protocol::messages::{ add_partitions_to_txn_request::AddPartitionsToTxnTransaction, - list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup, + delete_records_request::DeleteRecordsTopic, list_offsets_request::ListOffsetsTopic, + offset_fetch_request::OffsetFetchRequestGroup, offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData, - AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListGroupsRequest, - ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, - ProduceRequest, TopicName, + AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, DeleteRecordsRequest, GroupId, + ListGroupsRequest, ListOffsetsRequest, ListTransactionsRequest, OffsetFetchRequest, + OffsetForLeaderEpochRequest, ProduceRequest, TopicName, }; use std::collections::HashMap; @@ -139,6 +140,34 @@ impl RequestSplitAndRouter for OffsetForLeaderEpochRequestSplitAndRouter { } } +pub struct DeleteRecordsRequestSplitAndRouter; + +impl RequestSplitAndRouter for DeleteRecordsRequestSplitAndRouter { + type Request = DeleteRecordsRequest; + type SubRequests = Vec; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_delete_records_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::DeleteRecords(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.topics = item; + } +} + pub struct DeleteGroupsSplitAndRouter; impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter { diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 6d899abd1..7fe493b75 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -268,7 +268,7 @@ impl KafkaConsumer { } } - async fn consume(&mut self, timeout: Duration) -> ExpectedResponse { + pub async fn consume(&mut self, timeout: Duration) -> ExpectedResponse { match self { #[cfg(feature = "kafka-cpp-driver-tests")] Self::Cpp(cpp) => cpp.consume(timeout).await,