Skip to content

Commit

Permalink
retain O(1) map lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 15, 2024
1 parent ce65587 commit 3150521
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 75 deletions.
80 changes: 36 additions & 44 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ aws-config = { version = "1.0.0", optional = true }
aws-sdk-kms = { version = "1.1.0", optional = true }
chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true }
generic-array = { version = "0.14", features = ["serde"], optional = true }
#kafka-protocol = { version = "0.12.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] }
kafka-protocol = { version = "0.12.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"], git = "https://github.com/rukai/kafka-protocol-rs", branch = "replace_indexmap_with_vec" }
kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] }
rustls = { version = "0.23.0", default-features = false, features = ["tls12"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
rustls-pemfile = "2.0.0"
Expand Down
20 changes: 12 additions & 8 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
use kafka_protocol::messages::metadata_response::MetadataResponseBroker;
use kafka_protocol::messages::produce_request::TopicProduceData;
use kafka_protocol::messages::produce_response::LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch;
use kafka_protocol::messages::produce_response::{
LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch, TopicProduceResponse,
};
use kafka_protocol::messages::{
AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey, BrokerId, EndTxnRequest,
FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId,
Expand Down Expand Up @@ -1881,19 +1883,19 @@ impl KafkaSinkCluster {
base_produce: &mut ProduceResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
let mut base_responses: HashMap<TopicName, TopicProduceResponse> =
std::mem::take(&mut base_produce.responses)
.into_iter()
.map(|response| (response.name.clone(), response))
.collect();
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Produce(next_produce),
..
})) = next.frame()
{
for next_response in std::mem::take(&mut next_produce.responses) {
// TODO: evaluate if this linear lookup is ok.
if let Some(base_response) = base_produce
.responses
.iter_mut()
.find(|x| x.name == next_response.name)
{
if let Some(base_response) = base_responses.get_mut(&next_response.name) {
for next_partition in &next_response.partition_responses {
for base_partition in &base_response.partition_responses {
if next_partition.index == base_partition.index {
Expand All @@ -1906,7 +1908,7 @@ impl KafkaSinkCluster {
.partition_responses
.extend(next_response.partition_responses)
} else {
base_produce.responses.push(next_response);
base_responses.insert(next_response.name.clone(), next_response);
}
}
} else {
Expand All @@ -1916,6 +1918,8 @@ impl KafkaSinkCluster {
}
}

base_produce.responses.extend(base_responses.into_values());

Ok(())
}

Expand Down
38 changes: 17 additions & 21 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,34 @@ use crate::{
},
message::Message,
};
use kafka_protocol::{
indexmap::IndexMap,
messages::{
add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
list_offsets_request::ListOffsetsTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, ProduceRequest, TopicName,
TransactionalId,
},
use kafka_protocol::messages::{
add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
list_offsets_request::ListOffsetsTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, ProduceRequest, TopicName,
};
use std::collections::HashMap;

pub trait RequestSplitAndRouter {
type SubRequest;
type SubRequests;
type Request;
fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request>;
fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequest>;
fn reassemble(request: &mut Self::Request, item: Self::SubRequest);
) -> HashMap<BrokerId, Self::SubRequests>;
fn reassemble(request: &mut Self::Request, item: Self::SubRequests);
}

pub struct ProduceRequestSplitAndRouter;

impl RequestSplitAndRouter for ProduceRequestSplitAndRouter {
type Request = ProduceRequest;
type SubRequest = IndexMap<TopicName, TopicProduceData>;
type SubRequests = HashMap<TopicName, TopicProduceData>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequest> {
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_produce_request_by_destination(request)
}

Expand All @@ -51,21 +47,21 @@ impl RequestSplitAndRouter for ProduceRequestSplitAndRouter {
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequest) {
request.topic_data = item;
fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.topic_data = item.into_values().collect();
}
}

pub struct AddPartitionsToTxnRequestSplitAndRouter;

impl RequestSplitAndRouter for AddPartitionsToTxnRequestSplitAndRouter {
type Request = AddPartitionsToTxnRequest;
type SubRequest = IndexMap<TransactionalId, AddPartitionsToTxnTransaction>;
type SubRequests = Vec<AddPartitionsToTxnTransaction>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequest> {
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_add_partition_to_txn_request_by_destination(request)
}

Expand All @@ -79,7 +75,7 @@ impl RequestSplitAndRouter for AddPartitionsToTxnRequestSplitAndRouter {
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequest) {
fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.transactions = item;
}
}
Expand All @@ -88,12 +84,12 @@ pub struct ListOffsetsRequestSplitAndRouter;

impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter {
type Request = ListOffsetsRequest;
type SubRequest = Vec<ListOffsetsTopic>;
type SubRequests = Vec<ListOffsetsTopic>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequest> {
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_list_offsets_request_by_destination(request)
}

Expand All @@ -107,7 +103,7 @@ impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter {
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequest) {
fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.topics = item;
}
}

0 comments on commit 3150521

Please sign in to comment.