diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 41379ed7b..61dae4129 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -112,7 +112,8 @@ pub struct KafkaSinkClusterBuilder { read_timeout: Option, controller_broker: Arc, group_to_coordinator_broker: Arc>, - topics: Arc>, + topic_by_name: Arc>, + topic_by_id: Arc>, nodes_shared: Arc>>, tls: Option, sasl_enabled: bool, @@ -143,7 +144,8 @@ impl KafkaSinkClusterBuilder { read_timeout: receive_timeout, controller_broker: Arc::new(AtomicBrokerId::new()), group_to_coordinator_broker: Arc::new(DashMap::new()), - topics: Arc::new(DashMap::new()), + topic_by_name: Arc::new(DashMap::new()), + topic_by_id: Arc::new(DashMap::new()), nodes_shared: Arc::new(RwLock::new(vec![])), tls, sasl_enabled, @@ -162,7 +164,8 @@ impl TransformBuilder for KafkaSinkClusterBuilder { nodes_shared: self.nodes_shared.clone(), controller_broker: self.controller_broker.clone(), group_to_coordinator_broker: self.group_to_coordinator_broker.clone(), - topics: self.topics.clone(), + topic_by_name: self.topic_by_name.clone(), + topic_by_id: self.topic_by_id.clone(), rng: SmallRng::from_rng(rand::thread_rng()).unwrap(), sasl_status: SaslStatus::new(self.sasl_enabled), connection_factory: ConnectionFactory::new(self.tls.clone(), self.connect_timeout), @@ -238,7 +241,8 @@ pub struct KafkaSinkCluster { nodes_shared: Arc>>, controller_broker: Arc, group_to_coordinator_broker: Arc>, - topics: Arc>, + topic_by_name: Arc>, + topic_by_id: Arc>, rng: SmallRng, sasl_status: SaslStatus, connection_factory: ConnectionFactory, @@ -304,7 +308,7 @@ impl Transform for KafkaSinkCluster { impl KafkaSinkCluster { fn store_topic(&self, topics: &mut Vec, topic: TopicName) { - if self.topics.get(&topic).is_none() && !topics.contains(&topic) { + if self.topic_by_name.get(&topic).is_none() && !topics.contains(&topic) { topics.push(topic); } } @@ -352,6 +356,7 @@ impl KafkaSinkCluster { body: RequestBody::Fetch(fetch), .. })) => { + // TODO: Handle topics that only have an ID for topic in &fetch.topics { self.store_topic(&mut topics, topic.topic.clone()); } @@ -420,7 +425,7 @@ impl KafkaSinkCluster { .iter() .next() .ok_or_else(|| anyhow!("No topics in produce message"))?; - if let Some(topic) = self.topics.get(&topic_name.0) { + if let Some(topic) = self.topic_by_name.get(&topic_name.0) { // assume that all partitions in this topic have the same routing requirements let partition = &topic.partitions[topic_data .partition_data @@ -471,8 +476,18 @@ impl KafkaSinkCluster { .topics .first() .ok_or_else(|| anyhow!("No topics in fetch message"))?; + + // This way of constructing topic_meta is kind of crazy, but it works around borrow checker limitations let topic_name = &topic.topic; - let node = if let Some(topic_meta) = self.topics.get(topic_name) { + let topic_by_id = self.topic_by_id.get(&topic.topic_id); + let topic_by_name; + let mut topic_meta = topic_by_id.as_deref(); + if topic_meta.is_none() { + topic_by_name = self.topic_by_name.get(&topic.topic); + topic_meta = topic_by_name.as_deref(); + } + + let node = if let Some(topic_meta) = topic_meta { let partition_index = topic .partitions .first() @@ -796,12 +811,12 @@ impl KafkaSinkCluster { body: ResponseBody::Fetch(fetch), .. })) => { - let destination = self - .fetch_request_destinations - .remove(&request_id.unwrap()) - .unwrap(); - self.fetch_session_id_to_broker - .insert(fetch.session_id, destination); + if let Some(destination) = + self.fetch_request_destinations.remove(&request_id.unwrap()) + { + self.fetch_session_id_to_broker + .insert(fetch.session_id, destination); + } } Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::DescribeCluster(_), @@ -910,7 +925,18 @@ impl KafkaSinkCluster { }) .collect(); partitions.sort_by_key(|x| x.index); - self.topics.insert(topic.0.clone(), Topic { partitions }); + if !topic.0.is_empty() { + self.topic_by_name.insert( + topic.0.clone(), + Topic { + partitions: partitions.clone(), + }, + ); + } + if !topic.1.topic_id.is_nil() { + self.topic_by_id + .insert(topic.1.topic_id, Topic { partitions }); + } } } @@ -1148,7 +1174,8 @@ fn hash_partition(topic_id: Uuid, partition_index: i32) -> usize { struct Topic { partitions: Vec, } -#[derive(Debug)] + +#[derive(Debug, Clone)] struct Partition { index: i32, leader_id: i32,