From 3e8a57ea696cecdc721363ddd3dd2e68c8b38fd4 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 14 Oct 2024 09:17:10 +1100 Subject: [PATCH] retain O(1) map lookup --- Cargo.lock | 158 +++++++++--------- shotover/Cargo.toml | 3 +- .../src/transforms/kafka/sink_cluster/mod.rs | 20 ++- .../transforms/kafka/sink_cluster/split.rs | 38 ++--- 4 files changed, 105 insertions(+), 114 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c72da0d0c..0d8aa0e3c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,9 +163,9 @@ checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" [[package]] name = "async-compression" -version = "0.4.13" +version = "0.4.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e614738943d3f68c628ae3dbce7c3daffb196665f82f8c8ea6b65de73c79429" +checksum = "103db485efc3e41214fe4fda9f3dbeae2eb9082f48fd236e6095627a9422066e" dependencies = [ "flate2", "futures-core", @@ -295,9 +295,9 @@ dependencies = [ [[package]] name = "aws-sdk-kms" -version = "1.46.0" +version = "1.47.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33590e8d45206fdc4273ded8a1f292bcceaadd513037aa790fc67b237bc30ee" +checksum = "564a597a3c71a957d60a2e4c62c93d78ee5a0d636531e15b760acad983a5c18e" dependencies = [ "aws-credential-types", "aws-runtime", @@ -317,9 +317,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.45.0" +version = "1.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33ae899566f3d395cbf42858e433930682cc9c1889fa89318896082fef45efb" +checksum = "0dc2faec3205d496c7e57eff685dd944203df7ce16a4116d0281c44021788a7b" dependencies = [ "aws-credential-types", "aws-runtime", @@ -339,9 +339,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.46.0" +version = "1.47.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f39c09e199ebd96b9f860b0fce4b6625f211e064ad7c8693b72ecf7ef03881e0" +checksum = "c93c241f52bc5e0476e259c953234dab7e2a35ee207ee202e86c0095ec4951dc" dependencies = [ "aws-credential-types", "aws-runtime", @@ -361,9 +361,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.45.0" +version = "1.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d95f93a98130389eb6233b9d615249e543f6c24a68ca1f109af9ca5164a8765" +checksum = "b259429be94a3459fa1b00c5684faee118d74f9577cc50aebadc36e507c63b5f" dependencies = [ "aws-credential-types", "aws-runtime", @@ -472,7 +472,7 @@ dependencies = [ "http-body 0.4.6", "http-body 1.0.1", "httparse", - "hyper 0.14.30", + "hyper 0.14.31", "hyper-rustls 0.24.2", "once_cell", "pin-project-lite", @@ -582,7 +582,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "itoa", "matchit", @@ -755,9 +755,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94bbb0ad554ad961ddc5da507a12a29b14e4ae5bda06b19f575a3e6079d2e2ae" +checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" dependencies = [ "bytemuck_derive", ] @@ -928,9 +928,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.28" +version = "1.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e80e3b6a3ab07840e1cae9b0666a63970dc28e8ed5ffbcdacbfc760c281bfc1" +checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" dependencies = [ "jobserver", "libc", @@ -1616,15 +1616,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "des" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" -dependencies = [ - "cipher", -] - [[package]] name = "diff" version = "0.1.13" @@ -1917,7 +1908,7 @@ dependencies = [ "parking_lot", "rand", "redis-protocol", - "rustls 0.23.14", + "rustls 0.23.15", "rustls-native-certs 0.7.3", "semver", "socket2 0.5.7", @@ -2328,9 +2319,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "0.14.30" +version = "0.14.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a152ddd61dfaec7273fe8419ab357f33aee0d914c5f4efbf0d96fa749eea5ec9" +checksum = "8c08302e8fa335b151b788c775ff56e7a03ae64ff85c548ee820fecb70356e85" dependencies = [ "bytes", "futures-channel", @@ -2343,7 +2334,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -2352,9 +2343,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.4.1" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +checksum = "bbbff0a806a4728c99295b254c8838933b5b082d75e3cb70c8dab21fdfbcfa9a" dependencies = [ "bytes", "futures-channel", @@ -2379,7 +2370,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.12", - "hyper 0.14.30", + "hyper 0.14.31", "log", "rustls 0.21.12", "rustls-native-certs 0.6.3", @@ -2395,9 +2386,9 @@ checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", "http 1.1.0", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", - "rustls 0.23.14", + "rustls 0.23.15", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -2413,7 +2404,7 @@ checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" dependencies = [ "bytes", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-util", "native-tls", "tokio", @@ -2432,7 +2423,7 @@ dependencies = [ "futures-util", "http 1.1.0", "http-body 1.0.1", - "hyper 1.4.1", + "hyper 1.5.0", "pin-project-lite", "socket2 0.5.7", "tokio", @@ -2640,17 +2631,18 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.71" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cb94a0ffd3f3ee755c20f7d8752f45cac88605a4dcf808abcff72873296ec7b" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" dependencies = [ "wasm-bindgen", ] [[package]] name = "kafka-protocol" -version = "0.12.0" -source = "git+https://github.com/rukai/kafka-protocol-rs?branch=replace_indexmap_with_vec#6f924cd027b4ff1b7cff8905ff8908d8effb119d" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1edaf2fc3ecebe689bbc4fd97a6921cacd4cd09df8ebeda348a8e23c9fd48d4" dependencies = [ "anyhow", "bytes", @@ -2672,9 +2664,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.159" +version = "0.2.160" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" +checksum = "f0b21006cd1874ae9e650973c565615676dc4a274c965bb0a73796dac838ce4f" [[package]] name = "libloading" @@ -3069,9 +3061,9 @@ dependencies = [ [[package]] name = "openssl" -version = "0.10.66" +version = "0.10.68" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9529f4786b70a3e8c61e11179af17ab6188ad8d0ded78c5529441ed39d4bd9c1" +checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" dependencies = [ "bitflags", "cfg-if", @@ -3110,9 +3102,9 @@ dependencies = [ [[package]] name = "openssl-sys" -version = "0.9.103" +version = "0.9.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f9e8deee91df40a943c71b917e5874b951d32a802526c85721ce3b776c929d6" +checksum = "45abf306cbf99debc8195b66b7346498d7b10c210de50418b5ccd7ceba08c741" dependencies = [ "cc", "libc", @@ -3123,9 +3115,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "4.3.0" +version = "4.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44d501f1a72f71d3c063a6bbc8f7271fa73aa09fe5d6283b6571e2ed176a2537" +checksum = "83e7ccb95e240b7c9506a3d544f10d935e142cc90b0a1d56954fb44d89ad6b97" dependencies = [ "num-traits", "rand", @@ -3443,9 +3435,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.87" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3e4daa0dcf6feba26f985457cdf104d4b4256fc5a09547140f3631bb076b19a" +checksum = "7c3a7fc5db1e57d5a779a352c8cdb57b29aa4c40cc69c3a68a7fedc815fbf2f9" dependencies = [ "unicode-ident", ] @@ -3488,7 +3480,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.23.14", + "rustls 0.23.15", "socket2 0.5.7", "thiserror", "tokio", @@ -3505,7 +3497,7 @@ dependencies = [ "rand", "ring", "rustc-hash", - "rustls 0.23.14", + "rustls 0.23.15", "slab", "thiserror", "tinyvec", @@ -3776,7 +3768,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.1", "http-body-util", - "hyper 1.4.1", + "hyper 1.5.0", "hyper-rustls 0.27.3", "hyper-tls", "hyper-util", @@ -3789,7 +3781,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "quinn", - "rustls 0.23.14", + "rustls 0.23.15", "rustls-pemfile 2.2.0", "rustls-pki-types", "serde", @@ -4049,9 +4041,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.14" +version = "0.23.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" +checksum = "5fbb44d7acc4e873d613422379f69f237a1b141928c02f6bc6ccfddddc2d7993" dependencies = [ "log", "once_cell", @@ -4107,9 +4099,9 @@ dependencies = [ [[package]] name = "rustls-pki-types" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" [[package]] name = "rustls-webpki" @@ -4134,9 +4126,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" [[package]] name = "rustyline" @@ -4540,7 +4532,7 @@ dependencies = [ "pretty_assertions", "rand", "redis-protocol", - "rustls 0.23.14", + "rustls 0.23.15", "rustls-pemfile 2.2.0", "rustls-pki-types", "sasl", @@ -4728,9 +4720,9 @@ dependencies = [ [[package]] name = "ssh-key" -version = "0.6.6" +version = "0.6.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca9b366a80cf18bb6406f4cf4d10aebfb46140a8c0c33f666a144c5c76ecbafc" +checksum = "3b86f5297f0f04d08cabaa0f6bff7cb6aec4d9c3b49d87990d63da9d9156a8c3" dependencies = [ "bcrypt-pbkdf", "ed25519-dalek", @@ -4891,7 +4883,7 @@ dependencies = [ "rdkafka", "redis", "reqwest", - "rustls 0.23.14", + "rustls 0.23.15", "rustls-pemfile 2.2.0", "rustls-pki-types", "scylla", @@ -5089,7 +5081,7 @@ version = "0.26.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" dependencies = [ - "rustls 0.23.14", + "rustls 0.23.15", "rustls-pki-types", "tokio", ] @@ -5113,7 +5105,7 @@ checksum = "c6989540ced10490aaf14e6bad2e3d33728a2813310a0c71d1574304c49631cd" dependencies = [ "futures-util", "log", - "rustls 0.23.14", + "rustls 0.23.15", "rustls-native-certs 0.7.3", "rustls-pki-types", "tokio", @@ -5315,7 +5307,7 @@ dependencies = [ "httparse", "log", "rand", - "rustls 0.23.14", + "rustls 0.23.15", "rustls-pki-types", "sha1", "thiserror", @@ -5460,9 +5452,9 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ "getrandom", "serde", @@ -5531,9 +5523,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef073ced962d62984fb38a36e5fdc1a2b23c9e0e1fa0689bb97afa4202ef6887" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" dependencies = [ "cfg-if", "once_cell", @@ -5542,9 +5534,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4bfab14ef75323f4eb75fa52ee0a3fb59611977fd3240da19b2cf36ff85030e" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" dependencies = [ "bumpalo", "log", @@ -5557,9 +5549,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.44" +version = "0.4.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65471f79c1022ffa5291d33520cbbb53b7687b01c2f8e83b57d102eed7ed479d" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" dependencies = [ "cfg-if", "js-sys", @@ -5569,9 +5561,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7bec9830f60924d9ceb3ef99d55c155be8afa76954edffbb5936ff4509474e7" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -5579,9 +5571,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c74f6e152a76a2ad448e223b0fc0b6b5747649c3d769cc6bf45737bf97d0ed6" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", @@ -5592,15 +5584,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a42f6c679374623f295a8623adfe63d9284091245c3504bde47c17a3ce2777d9" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" [[package]] name = "web-sys" -version = "0.3.71" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44188d185b5bdcae1052d08bcbcf9091a5524038d4572cc4f4f2bb9d5554ddd9" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 648b82251..b75195abd 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -115,8 +115,7 @@ aws-config = { version = "1.0.0", optional = true } aws-sdk-kms = { version = "1.1.0", optional = true } chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true } generic-array = { version = "0.14", features = ["serde"], optional = true } -#kafka-protocol = { version = "0.12.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] } -kafka-protocol = { version = "0.12.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"], git = "https://github.com/rukai/kafka-protocol-rs", branch = "replace_indexmap_with_vec" } +kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] } rustls = { version = "0.23.0", default-features = false, features = ["tls12"] } tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } rustls-pemfile = "2.0.0" diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index bd1610468..01180a348 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -20,7 +20,9 @@ use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; use kafka_protocol::messages::metadata_response::MetadataResponseBroker; use kafka_protocol::messages::produce_request::TopicProduceData; -use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch; +use kafka_protocol::messages::produce_response::{ + LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch, TopicProduceResponse, +}; use kafka_protocol::messages::{ AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, @@ -1893,6 +1895,11 @@ impl KafkaSinkCluster { base_produce: &mut ProduceResponse, drain: impl Iterator, ) -> Result<()> { + let mut base_responses: HashMap = + std::mem::take(&mut base_produce.responses) + .into_iter() + .map(|response| (response.name.clone(), response)) + .collect(); for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Produce(next_produce), @@ -1900,12 +1907,7 @@ impl KafkaSinkCluster { })) = next.frame() { for next_response in std::mem::take(&mut next_produce.responses) { - // TODO: evaluate if this linear lookup is ok. - if let Some(base_response) = base_produce - .responses - .iter_mut() - .find(|x| x.name == next_response.name) - { + if let Some(base_response) = base_responses.get_mut(&next_response.name) { for next_partition in &next_response.partition_responses { for base_partition in &base_response.partition_responses { if next_partition.index == base_partition.index { @@ -1918,7 +1920,7 @@ impl KafkaSinkCluster { .partition_responses .extend(next_response.partition_responses) } else { - base_produce.responses.push(next_response); + base_responses.insert(next_response.name.clone(), next_response); } } } else { @@ -1928,6 +1930,8 @@ impl KafkaSinkCluster { } } + base_produce.responses.extend(base_responses.into_values()); + Ok(()) } diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index eb6aa2e8c..9762a64f8 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -6,38 +6,34 @@ use crate::{ }, message::Message, }; -use kafka_protocol::{ - indexmap::IndexMap, - messages::{ - add_partitions_to_txn_request::AddPartitionsToTxnTransaction, - list_offsets_request::ListOffsetsTopic, produce_request::TopicProduceData, - AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, ProduceRequest, TopicName, - TransactionalId, - }, +use kafka_protocol::messages::{ + add_partitions_to_txn_request::AddPartitionsToTxnTransaction, + list_offsets_request::ListOffsetsTopic, produce_request::TopicProduceData, + AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, ProduceRequest, TopicName, }; use std::collections::HashMap; pub trait RequestSplitAndRouter { - type SubRequest; + type SubRequests; type Request; fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request>; fn split_by_destination( transform: &mut KafkaSinkCluster, request: &mut Self::Request, - ) -> HashMap; - fn reassemble(request: &mut Self::Request, item: Self::SubRequest); + ) -> HashMap; + fn reassemble(request: &mut Self::Request, item: Self::SubRequests); } pub struct ProduceRequestSplitAndRouter; impl RequestSplitAndRouter for ProduceRequestSplitAndRouter { type Request = ProduceRequest; - type SubRequest = IndexMap; + type SubRequests = HashMap; fn split_by_destination( transform: &mut KafkaSinkCluster, request: &mut Self::Request, - ) -> HashMap { + ) -> HashMap { transform.split_produce_request_by_destination(request) } @@ -51,8 +47,8 @@ impl RequestSplitAndRouter for ProduceRequestSplitAndRouter { } } - fn reassemble(request: &mut Self::Request, item: Self::SubRequest) { - request.topic_data = item; + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.topic_data = item.into_values().collect(); } } @@ -60,12 +56,12 @@ pub struct AddPartitionsToTxnRequestSplitAndRouter; impl RequestSplitAndRouter for AddPartitionsToTxnRequestSplitAndRouter { type Request = AddPartitionsToTxnRequest; - type SubRequest = IndexMap; + type SubRequests = Vec; fn split_by_destination( transform: &mut KafkaSinkCluster, request: &mut Self::Request, - ) -> HashMap { + ) -> HashMap { transform.split_add_partition_to_txn_request_by_destination(request) } @@ -79,7 +75,7 @@ impl RequestSplitAndRouter for AddPartitionsToTxnRequestSplitAndRouter { } } - fn reassemble(request: &mut Self::Request, item: Self::SubRequest) { + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { request.transactions = item; } } @@ -88,12 +84,12 @@ pub struct ListOffsetsRequestSplitAndRouter; impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter { type Request = ListOffsetsRequest; - type SubRequest = Vec; + type SubRequests = Vec; fn split_by_destination( transform: &mut KafkaSinkCluster, request: &mut Self::Request, - ) -> HashMap { + ) -> HashMap { transform.split_list_offsets_request_by_destination(request) } @@ -107,7 +103,7 @@ impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter { } } - fn reassemble(request: &mut Self::Request, item: Self::SubRequest) { + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { request.topics = item; } }