Skip to content

Commit

Permalink
retain O(1) map lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 13, 2024
1 parent 988c001 commit aa848d8
Showing 1 changed file with 13 additions and 8 deletions.
21 changes: 13 additions & 8 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@ use async_trait::async_trait;
use connections::{Connections, Destination};
use dashmap::DashMap;
use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState};
use kafka_protocol::indexmap::IndexMap;
use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction;
use kafka_protocol::messages::fetch_request::FetchTopic;
use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch;
use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
use kafka_protocol::messages::metadata_response::MetadataResponseBroker;
use kafka_protocol::messages::produce_request::TopicProduceData;
use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch;
use kafka_protocol::messages::produce_response::{
LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch, TopicProduceResponse,
};
use kafka_protocol::messages::{
AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, EndTxnRequest,
FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId,
Expand Down Expand Up @@ -2036,19 +2039,19 @@ impl KafkaSinkCluster {
base_produce: &mut ProduceResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
let mut base_responses: IndexMap<TopicName, TopicProduceResponse> =
std::mem::take(&mut base_produce.responses)
.into_iter()
.map(|response| (response.name.clone(), response))
.collect();
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Produce(next_produce),
..
})) = next.frame()
{
for next_response in std::mem::take(&mut next_produce.responses) {
// TODO: evaluate if this linear lookup is ok.
if let Some(base_response) = base_produce
.responses
.iter_mut()
.find(|x| x.name == next_response.name)
{
if let Some(base_response) = base_responses.get_mut(&next_response.name) {
for next_partition in &next_response.partition_responses {
for base_partition in &base_response.partition_responses {
if next_partition.index == base_partition.index {
Expand All @@ -2061,7 +2064,7 @@ impl KafkaSinkCluster {
.partition_responses
.extend(next_response.partition_responses)
} else {
base_produce.responses.push(next_response);
base_responses.insert(next_response.name.clone(), next_response);
}
}
} else {
Expand All @@ -2071,6 +2074,8 @@ impl KafkaSinkCluster {
}
}

base_produce.responses.extend(base_responses.into_values());

Ok(())
}

Expand Down

0 comments on commit aa848d8

Please sign in to comment.