Skip to content

Commit

Permalink
KafkaSinkCluster - routing error handling
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jun 14, 2024
1 parent 42ed20d commit a0b5e51
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 1 deletion.
13 changes: 13 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,19 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_partitions1(connection_builder, "partitions1").await;
produce_consume_partitions1(connection_builder, "unknown_topic").await;
produce_consume_partitions3(connection_builder).await;

// delete and recreate topic to force shotover to adjust its existing routing metadata
let admin = connection_builder.connect_admin().await;
admin.delete_topics(&["partitions1"]).await;
admin
.create_topics(&[NewTopic {
name: "partitions1",
num_partitions: 1,
replication_factor: 1,
}])
.await;
produce_consume_partitions1(connection_builder, "partitions1").await;

produce_consume_acks0(connection_builder).await;
connection_builder.admin_cleanup().await;
}
Expand Down
101 changes: 100 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,11 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
first_contact_node: None,
control_connection: None,
pending_requests: Default::default(),
// TODO: this approach with `find_coordinator_requests` and `routed_to_coordinator_for_group`
// is prone to memory leaks and logic errors.
// We should replace these fields with extra state within `PendingRequestTy::Received/Sent`.
find_coordinator_requests: Default::default(),
routed_to_coordinator_for_group: Default::default(),
temp_responses_buffer: Default::default(),
sasl_mechanism: None,
authorize_scram_over_mtls: self.authorize_scram_over_mtls.as_ref().map(|x| x.build()),
Expand Down Expand Up @@ -245,6 +249,10 @@ impl AtomicBrokerId {
.store(value.0.into(), std::sync::atomic::Ordering::Relaxed)
}

fn clear(&self) {
self.0.store(i64::MAX, std::sync::atomic::Ordering::Relaxed)
}

/// Returns `None` when set has never been called.
/// Otherwise returns `Some` containing the latest set value.
fn get(&self) -> Option<BrokerId> {
Expand Down Expand Up @@ -274,6 +282,7 @@ pub struct KafkaSinkCluster {
/// Ordering must be maintained to ensure responses match up with their request.
pending_requests: VecDeque<PendingRequest>,
find_coordinator_requests: MessageIdMap<FindCoordinator>,
routed_to_coordinator_for_group: MessageIdMap<GroupId>,
/// A temporary buffer used when receiving responses, only held onto in order to avoid reallocating.
temp_responses_buffer: Vec<Message>,
sasl_mechanism: Option<String>,
Expand Down Expand Up @@ -661,6 +670,7 @@ impl KafkaSinkCluster {
body: RequestBody::DeleteGroups(groups),
..
})) => {
// TODO: we need to split this up into multiple requests so it can be correctly routed to all possible nodes
let group_id = groups.groups_names.first().unwrap().clone();
self.route_to_coordinator(message, group_id);
}
Expand Down Expand Up @@ -1186,7 +1196,6 @@ routing message to a random node so that:
}

async fn process_responses(&mut self, responses: &mut [Message]) -> Result<()> {
// TODO: Handle errors like NOT_COORDINATOR by removing element from self.topics and self.coordinator_broker_id
for response in responses.iter_mut() {
let request_id = response.request_id().unwrap();
match response.frame() {
Expand Down Expand Up @@ -1234,6 +1243,85 @@ routing message to a random node so that:
})) => {
self.process_sasl_authenticate(authenticate).await?;
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Produce(produce),
..
})) => {
for (topic_name, topic) in &produce.responses {
for partition in &topic.partition_responses {
if let Some(ResponseError::NotLeaderOrFollower) =
ResponseError::try_from_code(partition.error_code)
{
self.topic_by_name.remove(topic_name);
break;
}
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Fetch(fetch),
..
})) => {
for response in &fetch.responses {
for partition in &response.partitions {
if let Some(ResponseError::NotLeaderOrFollower) =
ResponseError::try_from_code(partition.error_code)
{
self.topic_by_name.remove(&response.topic);
self.topic_by_id.remove(&response.topic_id);
break;
}
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Heartbeat(heartbeat),
..
})) => self.handle_coordinator_routing_error(request_id, heartbeat.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::SyncGroup(sync_group),
..
})) => self.handle_coordinator_routing_error(request_id, sync_group.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetFetch(offset_fetch),
..
})) => self.handle_coordinator_routing_error(request_id, offset_fetch.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::JoinGroup(join_group),
..
})) => self.handle_coordinator_routing_error(request_id, join_group.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::LeaveGroup(leave_group),
..
})) => self.handle_coordinator_routing_error(request_id, leave_group.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DeleteGroups(delete_groups),
..
})) => {
for (group_id, result) in &delete_groups.results {
if let Some(ResponseError::NotCoordinator) =
ResponseError::try_from_code(result.error_code)
{
// Need to run this to avoid memory leaks, since route_to_coordinator is called for DeleteGroup requests
self.routed_to_coordinator_for_group.remove(&request_id);

// Need to run this to ensure we remove for all groups
self.group_to_coordinator_broker.remove(group_id);
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::CreateTopics(create_topics),
..
})) => {
for topic in create_topics.topics.values() {
if let Some(ResponseError::NotController) =
ResponseError::try_from_code(topic.error_code)
{
self.controller_broker.clear();
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Metadata(metadata),
..
Expand All @@ -1259,6 +1347,15 @@ routing message to a random node so that:
Ok(())
}

/// This method must be called for every response to a request that was routed via `route_to_coordinator`
fn handle_coordinator_routing_error(&mut self, request_id: u128, error_code: i16) {
if let Some(ResponseError::NotCoordinator) = ResponseError::try_from_code(error_code) {
if let Some(group_id) = self.routed_to_coordinator_for_group.remove(&request_id) {
self.group_to_coordinator_broker.remove(&group_id);
}
}
}

async fn process_sasl_authenticate(
&mut self,
authenticate: &mut SaslAuthenticateResponse,
Expand Down Expand Up @@ -1338,6 +1435,8 @@ routing message to a random node so that:

fn route_to_coordinator(&mut self, request: Message, group_id: GroupId) {
let destination = self.group_to_coordinator_broker.get(&group_id);
self.routed_to_coordinator_for_group
.insert(request.id(), group_id.clone());
let destination = match destination {
Some(destination) => *destination,
None => {
Expand Down

0 comments on commit a0b5e51

Please sign in to comment.