Skip to content

Commit

Permalink
Merge branch 'refs/heads/main' into 1649-leak-query-counter-name
Browse files Browse the repository at this point in the history
  • Loading branch information
justinweng-instaclustr committed Jun 17, 2024
2 parents 9033d57 + b540be8 commit 32fb2ba
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 2 deletions.
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[toolchain]
channel = "1.78"
channel = "1.79"
components = [ "rustfmt", "clippy" ]
targets = [ "aarch64-unknown-linux-gnu" ]
18 changes: 18 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,24 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_partitions1(connection_builder, "partitions1").await;
produce_consume_partitions1(connection_builder, "unknown_topic").await;
produce_consume_partitions3(connection_builder).await;

// Only run this test case on the java driver,
// since even without going through shotover the cpp driver fails this test.
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
// delete and recreate topic to force shotover to adjust its existing routing metadata
let admin = connection_builder.connect_admin().await;
admin.delete_topics(&["partitions1"]).await;
admin
.create_topics(&[NewTopic {
name: "partitions1",
num_partitions: 1,
replication_factor: 1,
}])
.await;
produce_consume_partitions1(connection_builder, "partitions1").await;
}

produce_consume_acks0(connection_builder).await;
connection_builder.admin_cleanup().await;
}
Expand Down
149 changes: 148 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,11 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
first_contact_node: None,
control_connection: None,
pending_requests: Default::default(),
// TODO: this approach with `find_coordinator_requests` and `routed_to_coordinator_for_group`
// is prone to memory leaks and logic errors.
// We should replace these fields with extra state within `PendingRequestTy::Received/Sent`.
find_coordinator_requests: Default::default(),
routed_to_coordinator_for_group: Default::default(),
temp_responses_buffer: Default::default(),
sasl_mechanism: None,
authorize_scram_over_mtls: self.authorize_scram_over_mtls.as_ref().map(|x| x.build()),
Expand Down Expand Up @@ -245,6 +249,10 @@ impl AtomicBrokerId {
.store(value.0.into(), std::sync::atomic::Ordering::Relaxed)
}

fn clear(&self) {
self.0.store(i64::MAX, std::sync::atomic::Ordering::Relaxed)
}

/// Returns `None` when set has never been called.
/// Otherwise returns `Some` containing the latest set value.
fn get(&self) -> Option<BrokerId> {
Expand Down Expand Up @@ -274,6 +282,7 @@ pub struct KafkaSinkCluster {
/// Ordering must be maintained to ensure responses match up with their request.
pending_requests: VecDeque<PendingRequest>,
find_coordinator_requests: MessageIdMap<FindCoordinator>,
routed_to_coordinator_for_group: MessageIdMap<GroupId>,
/// A temporary buffer used when receiving responses, only held onto in order to avoid reallocating.
temp_responses_buffer: Vec<Message>,
sasl_mechanism: Option<String>,
Expand Down Expand Up @@ -661,6 +670,7 @@ impl KafkaSinkCluster {
body: RequestBody::DeleteGroups(groups),
..
})) => {
// TODO: we need to split this up into multiple requests so it can be correctly routed to all possible nodes
let group_id = groups.groups_names.first().unwrap().clone();
self.route_to_coordinator(message, group_id);
}
Expand Down Expand Up @@ -1186,7 +1196,6 @@ routing message to a random node so that:
}

async fn process_responses(&mut self, responses: &mut [Message]) -> Result<()> {
// TODO: Handle errors like NOT_COORDINATOR by removing element from self.topics and self.coordinator_broker_id
for response in responses.iter_mut() {
let request_id = response.request_id().unwrap();
match response.frame() {
Expand Down Expand Up @@ -1234,6 +1243,128 @@ routing message to a random node so that:
})) => {
self.process_sasl_authenticate(authenticate).await?;
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Produce(produce),
..
})) => {
for (topic_name, response_topic) in &produce.responses {
for response_partition in &response_topic.partition_responses {
if let Some(ResponseError::NotLeaderOrFollower) =
ResponseError::try_from_code(response_partition.error_code)
{
if response_partition.current_leader.leader_id != -1 {
// The broker has informed us who the new leader is, we can just directly update the leader
if let Some(mut stored_topic) =
self.topic_by_name.get_mut(topic_name)
{
if let Some(stored_partition) = stored_topic
.partitions
.get_mut(response_partition.index as usize)
{
if response_partition.current_leader.leader_epoch
> stored_partition.leader_epoch
{
stored_partition.leader_id =
response_partition.current_leader.leader_id;
stored_partition.leader_epoch =
response_partition.current_leader.leader_epoch;
}
tracing::info!(
"Produce response included error NOT_LEADER_OR_FOLLOWER and so updated leader in topic {:?} partition {}",
topic_name,
response_partition.index
);
}
}
} else {
// The broker doesnt know who the new leader is, clear the entire topic.
self.topic_by_name.remove(topic_name);
tracing::info!(
"Produce response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?}",
topic_name,
);
break;
}
}
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Fetch(fetch),
..
})) => {
for response in &fetch.responses {
for partition in &response.partitions {
if let Some(ResponseError::NotLeaderOrFollower) =
ResponseError::try_from_code(partition.error_code)
{
// The fetch response includes the leader_id which a client could could use to route a fetch request to,
// but we cant use it to fix our list of replicas, so our only option is to clear the whole thing.
self.topic_by_name.remove(&response.topic);
self.topic_by_id.remove(&response.topic_id);
tracing::info!(
"Fetch response included error NOT_LEADER_OR_FOLLOWER and so cleared topic {:?} {:?}",
response.topic,
response.topic_id
);
break;
}
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Heartbeat(heartbeat),
..
})) => self.handle_coordinator_routing_error(request_id, heartbeat.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::SyncGroup(sync_group),
..
})) => self.handle_coordinator_routing_error(request_id, sync_group.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetFetch(offset_fetch),
..
})) => self.handle_coordinator_routing_error(request_id, offset_fetch.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::JoinGroup(join_group),
..
})) => self.handle_coordinator_routing_error(request_id, join_group.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::LeaveGroup(leave_group),
..
})) => self.handle_coordinator_routing_error(request_id, leave_group.error_code),
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DeleteGroups(delete_groups),
..
})) => {
for (group_id, result) in &delete_groups.results {
if let Some(ResponseError::NotCoordinator) =
ResponseError::try_from_code(result.error_code)
{
// Need to run this to avoid memory leaks, since route_to_coordinator is called for DeleteGroup requests
self.routed_to_coordinator_for_group.remove(&request_id);

// Need to run this to ensure we remove for all groups
self.group_to_coordinator_broker.remove(group_id);
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::CreateTopics(create_topics),
..
})) => {
for topic in create_topics.topics.values() {
if let Some(ResponseError::NotController) =
ResponseError::try_from_code(topic.error_code)
{
tracing::info!(
"Response to CreateTopics included error NOT_CONTROLLER and so reset controller broker, previously was {:?}",
self.controller_broker.get()
);
self.controller_broker.clear();
break;
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Metadata(metadata),
..
Expand All @@ -1259,6 +1390,20 @@ routing message to a random node so that:
Ok(())
}

/// This method must be called for every response to a request that was routed via `route_to_coordinator`
fn handle_coordinator_routing_error(&mut self, request_id: u128, error_code: i16) {
if let Some(ResponseError::NotCoordinator) = ResponseError::try_from_code(error_code) {
if let Some(group_id) = self.routed_to_coordinator_for_group.remove(&request_id) {
let broker_id = self.group_to_coordinator_broker.remove(&group_id);
tracing::info!(
"Response was error NOT_COORDINATOR and so cleared group id {:?} coordinator mapping to broker {:?}",
group_id,
broker_id,
);
}
}
}

async fn process_sasl_authenticate(
&mut self,
authenticate: &mut SaslAuthenticateResponse,
Expand Down Expand Up @@ -1338,6 +1483,8 @@ routing message to a random node so that:

fn route_to_coordinator(&mut self, request: Message, group_id: GroupId) {
let destination = self.group_to_coordinator_broker.get(&group_id);
self.routed_to_coordinator_for_group
.insert(request.id(), group_id.clone());
let destination = match destination {
Some(destination) => *destination,
None => {
Expand Down

0 comments on commit 32fb2ba

Please sign in to comment.