From 315052176978bed6312d89c58d6587a81c0d6681 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 14 Oct 2024 09:17:10 +1100 Subject: [PATCH] retain O(1) map lookup --- Cargo.lock | 80 +++++++++---------- shotover/Cargo.toml | 3 +- .../src/transforms/kafka/sink_cluster/mod.rs | 20 +++-- .../transforms/kafka/sink_cluster/split.rs | 38 ++++----- 4 files changed, 66 insertions(+), 75 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1d4772f7d..7e374ed60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,9 +163,9 @@ checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457" [[package]] name = "async-compression" -version = "0.4.13" +version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e614738943d3f68c628ae3dbce7c3daffb196665f82f8c8ea6b65de73c79429" +checksum = "e26a9844c659a2a293d239c7910b752f8487fe122c6c8bd1659bf85a6507c302" dependencies = [ "flate2", "futures-core", @@ -295,9 +295,9 @@ dependencies = [ [[package]] name = "aws-sdk-kms" -version = "1.46.0" +version = "1.47.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33590e8d45206fdc4273ded8a1f292bcceaadd513037aa790fc67b237bc30ee" +checksum = "564a597a3c71a957d60a2e4c62c93d78ee5a0d636531e15b760acad983a5c18e" dependencies = [ "aws-credential-types", "aws-runtime", @@ -317,9 +317,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.45.0" +version = "1.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e33ae899566f3d395cbf42858e433930682cc9c1889fa89318896082fef45efb" +checksum = "0dc2faec3205d496c7e57eff685dd944203df7ce16a4116d0281c44021788a7b" dependencies = [ "aws-credential-types", "aws-runtime", @@ -339,9 +339,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.46.0" +version = "1.47.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f39c09e199ebd96b9f860b0fce4b6625f211e064ad7c8693b72ecf7ef03881e0" +checksum = "c93c241f52bc5e0476e259c953234dab7e2a35ee207ee202e86c0095ec4951dc" dependencies = [ "aws-credential-types", "aws-runtime", @@ -361,9 +361,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.45.0" +version = "1.46.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d95f93a98130389eb6233b9d615249e543f6c24a68ca1f109af9ca5164a8765" +checksum = "b259429be94a3459fa1b00c5684faee118d74f9577cc50aebadc36e507c63b5f" dependencies = [ "aws-credential-types", "aws-runtime", @@ -755,9 +755,9 @@ checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" [[package]] name = "bytemuck" -version = "1.18.0" +version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94bbb0ad554ad961ddc5da507a12a29b14e4ae5bda06b19f575a3e6079d2e2ae" +checksum = "8334215b81e418a0a7bdb8ef0849474f40bb10c8b71f1c4ed315cff49f32494d" dependencies = [ "bytemuck_derive", ] @@ -928,9 +928,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.1.28" +version = "1.1.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e80e3b6a3ab07840e1cae9b0666a63970dc28e8ed5ffbcdacbfc760c281bfc1" +checksum = "b16803a61b81d9eabb7eae2588776c4c1e584b738ede45fdbb4c972cec1e9945" dependencies = [ "jobserver", "libc", @@ -1616,15 +1616,6 @@ dependencies = [ "cipher", ] -[[package]] -name = "des" -version = "0.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e" -dependencies = [ - "cipher", -] - [[package]] name = "diff" version = "0.1.13" @@ -2343,7 +2334,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -2640,17 +2631,18 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.71" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cb94a0ffd3f3ee755c20f7d8752f45cac88605a4dcf808abcff72873296ec7b" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" dependencies = [ "wasm-bindgen", ] [[package]] name = "kafka-protocol" -version = "0.12.0" -source = "git+https://github.com/rukai/kafka-protocol-rs?branch=replace_indexmap_with_vec#6f924cd027b4ff1b7cff8905ff8908d8effb119d" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1edaf2fc3ecebe689bbc4fd97a6921cacd4cd09df8ebeda348a8e23c9fd48d4" dependencies = [ "anyhow", "bytes", @@ -4134,9 +4126,9 @@ dependencies = [ [[package]] name = "rustversion" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" +checksum = "0e819f2bc632f285be6d7cd36e25940d45b2391dd6d9b939e79de557f7014248" [[package]] name = "rustyline" @@ -5531,9 +5523,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef073ced962d62984fb38a36e5fdc1a2b23c9e0e1fa0689bb97afa4202ef6887" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" dependencies = [ "cfg-if", "once_cell", @@ -5542,9 +5534,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4bfab14ef75323f4eb75fa52ee0a3fb59611977fd3240da19b2cf36ff85030e" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" dependencies = [ "bumpalo", "log", @@ -5557,9 +5549,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.44" +version = "0.4.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65471f79c1022ffa5291d33520cbbb53b7687b01c2f8e83b57d102eed7ed479d" +checksum = "cc7ec4f8827a71586374db3e87abdb5a2bb3a15afed140221307c3ec06b1f63b" dependencies = [ "cfg-if", "js-sys", @@ -5569,9 +5561,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7bec9830f60924d9ceb3ef99d55c155be8afa76954edffbb5936ff4509474e7" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -5579,9 +5571,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c74f6e152a76a2ad448e223b0fc0b6b5747649c3d769cc6bf45737bf97d0ed6" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" dependencies = [ "proc-macro2", "quote", @@ -5592,15 +5584,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.94" +version = "0.2.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a42f6c679374623f295a8623adfe63d9284091245c3504bde47c17a3ce2777d9" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" [[package]] name = "web-sys" -version = "0.3.71" +version = "0.3.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "44188d185b5bdcae1052d08bcbcf9091a5524038d4572cc4f4f2bb9d5554ddd9" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 648b82251..b75195abd 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -115,8 +115,7 @@ aws-config = { version = "1.0.0", optional = true } aws-sdk-kms = { version = "1.1.0", optional = true } chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true } generic-array = { version = "0.14", features = ["serde"], optional = true } -#kafka-protocol = { version = "0.12.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] } -kafka-protocol = { version = "0.12.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"], git = "https://github.com/rukai/kafka-protocol-rs", branch = "replace_indexmap_with_vec" } +kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] } rustls = { version = "0.23.0", default-features = false, features = ["tls12"] } tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } rustls-pemfile = "2.0.0" diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index ce1a4e28f..3ecfb8b9e 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -20,7 +20,9 @@ use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; use kafka_protocol::messages::metadata_response::MetadataResponseBroker; use kafka_protocol::messages::produce_request::TopicProduceData; -use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch; +use kafka_protocol::messages::produce_response::{ + LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch, TopicProduceResponse, +}; use kafka_protocol::messages::{ AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, @@ -1881,6 +1883,11 @@ impl KafkaSinkCluster { base_produce: &mut ProduceResponse, drain: impl Iterator, ) -> Result<()> { + let mut base_responses: HashMap = + std::mem::take(&mut base_produce.responses) + .into_iter() + .map(|response| (response.name.clone(), response)) + .collect(); for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Produce(next_produce), @@ -1888,12 +1895,7 @@ impl KafkaSinkCluster { })) = next.frame() { for next_response in std::mem::take(&mut next_produce.responses) { - // TODO: evaluate if this linear lookup is ok. - if let Some(base_response) = base_produce - .responses - .iter_mut() - .find(|x| x.name == next_response.name) - { + if let Some(base_response) = base_responses.get_mut(&next_response.name) { for next_partition in &next_response.partition_responses { for base_partition in &base_response.partition_responses { if next_partition.index == base_partition.index { @@ -1906,7 +1908,7 @@ impl KafkaSinkCluster { .partition_responses .extend(next_response.partition_responses) } else { - base_produce.responses.push(next_response); + base_responses.insert(next_response.name.clone(), next_response); } } } else { @@ -1916,6 +1918,8 @@ impl KafkaSinkCluster { } } + base_produce.responses.extend(base_responses.into_values()); + Ok(()) } diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index eb6aa2e8c..9762a64f8 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -6,38 +6,34 @@ use crate::{ }, message::Message, }; -use kafka_protocol::{ - indexmap::IndexMap, - messages::{ - add_partitions_to_txn_request::AddPartitionsToTxnTransaction, - list_offsets_request::ListOffsetsTopic, produce_request::TopicProduceData, - AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, ProduceRequest, TopicName, - TransactionalId, - }, +use kafka_protocol::messages::{ + add_partitions_to_txn_request::AddPartitionsToTxnTransaction, + list_offsets_request::ListOffsetsTopic, produce_request::TopicProduceData, + AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, ProduceRequest, TopicName, }; use std::collections::HashMap; pub trait RequestSplitAndRouter { - type SubRequest; + type SubRequests; type Request; fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request>; fn split_by_destination( transform: &mut KafkaSinkCluster, request: &mut Self::Request, - ) -> HashMap; - fn reassemble(request: &mut Self::Request, item: Self::SubRequest); + ) -> HashMap; + fn reassemble(request: &mut Self::Request, item: Self::SubRequests); } pub struct ProduceRequestSplitAndRouter; impl RequestSplitAndRouter for ProduceRequestSplitAndRouter { type Request = ProduceRequest; - type SubRequest = IndexMap; + type SubRequests = HashMap; fn split_by_destination( transform: &mut KafkaSinkCluster, request: &mut Self::Request, - ) -> HashMap { + ) -> HashMap { transform.split_produce_request_by_destination(request) } @@ -51,8 +47,8 @@ impl RequestSplitAndRouter for ProduceRequestSplitAndRouter { } } - fn reassemble(request: &mut Self::Request, item: Self::SubRequest) { - request.topic_data = item; + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.topic_data = item.into_values().collect(); } } @@ -60,12 +56,12 @@ pub struct AddPartitionsToTxnRequestSplitAndRouter; impl RequestSplitAndRouter for AddPartitionsToTxnRequestSplitAndRouter { type Request = AddPartitionsToTxnRequest; - type SubRequest = IndexMap; + type SubRequests = Vec; fn split_by_destination( transform: &mut KafkaSinkCluster, request: &mut Self::Request, - ) -> HashMap { + ) -> HashMap { transform.split_add_partition_to_txn_request_by_destination(request) } @@ -79,7 +75,7 @@ impl RequestSplitAndRouter for AddPartitionsToTxnRequestSplitAndRouter { } } - fn reassemble(request: &mut Self::Request, item: Self::SubRequest) { + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { request.transactions = item; } } @@ -88,12 +84,12 @@ pub struct ListOffsetsRequestSplitAndRouter; impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter { type Request = ListOffsetsRequest; - type SubRequest = Vec; + type SubRequests = Vec; fn split_by_destination( transform: &mut KafkaSinkCluster, request: &mut Self::Request, - ) -> HashMap { + ) -> HashMap { transform.split_list_offsets_request_by_destination(request) } @@ -107,7 +103,7 @@ impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter { } } - fn reassemble(request: &mut Self::Request, item: Self::SubRequest) { + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { request.topics = item; } }