Skip to content

Commit

Permalink
route by topic id
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 15, 2024
1 parent 9097bdf commit 7a1e439
Showing 1 changed file with 42 additions and 15 deletions.
57 changes: 42 additions & 15 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ pub struct KafkaSinkClusterBuilder {
read_timeout: Option<Duration>,
controller_broker: Arc<AtomicBrokerId>,
group_to_coordinator_broker: Arc<DashMap<GroupId, BrokerId>>,
topics: Arc<DashMap<TopicName, Topic>>,
topic_by_name: Arc<DashMap<TopicName, Topic>>,
topic_by_id: Arc<DashMap<Uuid, Topic>>,
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
tls: Option<TlsConnector>,
sasl_enabled: bool,
Expand Down Expand Up @@ -143,7 +144,8 @@ impl KafkaSinkClusterBuilder {
read_timeout: receive_timeout,
controller_broker: Arc::new(AtomicBrokerId::new()),
group_to_coordinator_broker: Arc::new(DashMap::new()),
topics: Arc::new(DashMap::new()),
topic_by_name: Arc::new(DashMap::new()),
topic_by_id: Arc::new(DashMap::new()),
nodes_shared: Arc::new(RwLock::new(vec![])),
tls,
sasl_enabled,
Expand All @@ -162,7 +164,8 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
nodes_shared: self.nodes_shared.clone(),
controller_broker: self.controller_broker.clone(),
group_to_coordinator_broker: self.group_to_coordinator_broker.clone(),
topics: self.topics.clone(),
topic_by_name: self.topic_by_name.clone(),
topic_by_id: self.topic_by_id.clone(),
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
sasl_status: SaslStatus::new(self.sasl_enabled),
connection_factory: ConnectionFactory::new(self.tls.clone(), self.connect_timeout),
Expand Down Expand Up @@ -238,7 +241,8 @@ pub struct KafkaSinkCluster {
nodes_shared: Arc<RwLock<Vec<KafkaNode>>>,
controller_broker: Arc<AtomicBrokerId>,
group_to_coordinator_broker: Arc<DashMap<GroupId, BrokerId>>,
topics: Arc<DashMap<TopicName, Topic>>,
topic_by_name: Arc<DashMap<TopicName, Topic>>,
topic_by_id: Arc<DashMap<Uuid, Topic>>,
rng: SmallRng,
sasl_status: SaslStatus,
connection_factory: ConnectionFactory,
Expand Down Expand Up @@ -304,7 +308,7 @@ impl Transform for KafkaSinkCluster {

impl KafkaSinkCluster {
fn store_topic(&self, topics: &mut Vec<TopicName>, topic: TopicName) {
if self.topics.get(&topic).is_none() && !topics.contains(&topic) {
if self.topic_by_name.get(&topic).is_none() && !topics.contains(&topic) {
topics.push(topic);
}
}
Expand Down Expand Up @@ -352,6 +356,7 @@ impl KafkaSinkCluster {
body: RequestBody::Fetch(fetch),
..
})) => {
// TODO: Handle topics that only have an ID
for topic in &fetch.topics {
self.store_topic(&mut topics, topic.topic.clone());
}
Expand Down Expand Up @@ -420,7 +425,7 @@ impl KafkaSinkCluster {
.iter()
.next()
.ok_or_else(|| anyhow!("No topics in produce message"))?;
if let Some(topic) = self.topics.get(&topic_name.0) {
if let Some(topic) = self.topic_by_name.get(&topic_name.0) {
// assume that all partitions in this topic have the same routing requirements
let partition = &topic.partitions[topic_data
.partition_data
Expand Down Expand Up @@ -471,8 +476,18 @@ impl KafkaSinkCluster {
.topics
.first()
.ok_or_else(|| anyhow!("No topics in fetch message"))?;

// This way of constructing topic_meta is kind of crazy, but it works around borrow checker limitations
let topic_name = &topic.topic;
let node = if let Some(topic_meta) = self.topics.get(topic_name) {
let topic_by_id = self.topic_by_id.get(&topic.topic_id);
let topic_by_name;
let mut topic_meta = topic_by_id.as_deref();
if topic_meta.is_none() {
topic_by_name = self.topic_by_name.get(&topic.topic);
topic_meta = topic_by_name.as_deref();
}

let node = if let Some(topic_meta) = topic_meta {
let partition_index = topic
.partitions
.first()
Expand Down Expand Up @@ -796,12 +811,12 @@ impl KafkaSinkCluster {
body: ResponseBody::Fetch(fetch),
..
})) => {
let destination = self
.fetch_request_destinations
.remove(&request_id.unwrap())
.unwrap();
self.fetch_session_id_to_broker
.insert(fetch.session_id, destination);
if let Some(destination) =
self.fetch_request_destinations.remove(&request_id.unwrap())
{
self.fetch_session_id_to_broker
.insert(fetch.session_id, destination);
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeCluster(_),
Expand Down Expand Up @@ -910,7 +925,18 @@ impl KafkaSinkCluster {
})
.collect();
partitions.sort_by_key(|x| x.index);
self.topics.insert(topic.0.clone(), Topic { partitions });
if !topic.0.is_empty() {
self.topic_by_name.insert(
topic.0.clone(),
Topic {
partitions: partitions.clone(),
},
);
}
if !topic.1.topic_id.is_nil() {
self.topic_by_id
.insert(topic.1.topic_id, Topic { partitions });
}
}
}

Expand Down Expand Up @@ -1148,7 +1174,8 @@ fn hash_partition(topic_id: Uuid, partition_index: i32) -> usize {
struct Topic {
partitions: Vec<Partition>,
}
#[derive(Debug)]

#[derive(Debug, Clone)]
struct Partition {
index: i32,
leader_id: i32,
Expand Down

0 comments on commit 7a1e439

Please sign in to comment.