diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 88674a60a..6f6f00236 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -13,6 +13,7 @@ use async_trait::async_trait; use connections::{Connections, Destination}; use dashmap::DashMap; use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState}; +use kafka_protocol::indexmap::IndexMap; use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction; use kafka_protocol::messages::fetch_request::FetchTopic; use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch; @@ -20,7 +21,9 @@ use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; use kafka_protocol::messages::metadata_response::MetadataResponseBroker; use kafka_protocol::messages::produce_request::TopicProduceData; -use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch; +use kafka_protocol::messages::produce_response::{ + LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch, TopicProduceResponse, +}; use kafka_protocol::messages::{ AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, @@ -2036,6 +2039,11 @@ impl KafkaSinkCluster { base_produce: &mut ProduceResponse, drain: impl Iterator, ) -> Result<()> { + let mut base_responses: IndexMap = + std::mem::take(&mut base_produce.responses) + .into_iter() + .map(|response| (response.name.clone(), response)) + .collect(); for mut next in drain { if let Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Produce(next_produce), @@ -2043,12 +2051,7 @@ impl KafkaSinkCluster { })) = next.frame() { for next_response in std::mem::take(&mut next_produce.responses) { - // TODO: evaluate if this linear lookup is ok. - if let Some(base_response) = base_produce - .responses - .iter_mut() - .find(|x| x.name == next_response.name) - { + if let Some(base_response) = base_responses.get_mut(&next_response.name) { for next_partition in &next_response.partition_responses { for base_partition in &base_response.partition_responses { if next_partition.index == base_partition.index { @@ -2061,7 +2064,7 @@ impl KafkaSinkCluster { .partition_responses .extend(next_response.partition_responses) } else { - base_produce.responses.push(next_response); + base_responses.insert(next_response.name.clone(), next_response); } } } else { @@ -2071,6 +2074,8 @@ impl KafkaSinkCluster { } } + base_produce.responses.extend(base_responses.into_values()); + Ok(()) }