Skip to content

Commit

Permalink
KafkaSinkCluster: route ListPartitionReassignments
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 13, 2024
1 parent b057491 commit cef19f2
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 7 deletions.
34 changes: 31 additions & 3 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer,
KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic,
OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment,
NewTopic, OffsetAndMetadata, OffsetSpec, PartitionReassignment, Record, RecordsToDelete,
ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -1646,6 +1646,33 @@ async fn list_transactions(connection_builder: &KafkaConnectionBuilder) {
assert_eq!(actual_results, expected_results);
}

async fn create_and_list_partition_reassignments(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
.alter_partition_reassignments(HashMap::from([(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
NewPartitionReassignment {
replica_broker_ids: vec![0],
},
)]))
.await;

let actual_results = admin.list_partition_reassignments().await;
assert_eq!(
actual_results,
HashMap::from([(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
PartitionReassignment {}
)])
)
}

async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
Expand All @@ -1672,6 +1699,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec
if let KafkaConnectionBuilder::Java(_) = connection_builder {
list_groups(connection_builder).await;
list_transactions(connection_builder).await;
create_and_list_partition_reassignments(connection_builder).await;
}
}

Expand Down
4 changes: 3 additions & 1 deletion shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,9 @@ The connection to the client has been closed."
| RequestBody::CreatePartitions(_)
| RequestBody::DeleteTopics(_)
| RequestBody::CreateAcls(_)
| RequestBody::ApiVersions(_),
| RequestBody::ApiVersions(_)
| RequestBody::AlterPartitionReassignments(_)
| RequestBody::ListPartitionReassignments(_),
..
})) => self.route_to_random_broker(request),

Expand Down
59 changes: 56 additions & 3 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use super::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse,
ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata, OffsetSpec, ProduceResult,
Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicDescription, TopicPartition, TopicPartitionInfo,
ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata,
OffsetSpec, PartitionReassignment, ProduceResult, Record, RecordsToDelete, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, TopicPartitionInfo,
};
use crate::connection::java::{map_iterator, Jvm, Value};
use anyhow::Result;
Expand Down Expand Up @@ -799,6 +799,59 @@ impl KafkaAdminJava {
.map(|_| ())
}

pub async fn alter_partition_reassignments(
&self,
reassignments: HashMap<TopicPartition, NewPartitionReassignment>,
) {
let reassignments_java: Vec<_> = reassignments
.into_iter()
.map(|(topic_partition, reassignment)| {
(
topic_partition_to_java(&self.jvm, &topic_partition),
self.jvm.call_static(
"java.util.Optional",
"of",
vec![self.jvm.construct(
"org.apache.kafka.clients.admin.NewPartitionReassignment",
vec![self.jvm.new_list(
"java.lang.Integer",
reassignment
.replica_broker_ids
.into_iter()
.map(|x| self.jvm.new_int_object(x))
.collect(),
)],
)],
),
)
})
.collect();
let reassignments_java = self.jvm.new_map(reassignments_java);
self.admin
.call("alterPartitionReassignments", vec![reassignments_java])
.call_async("all", vec![])
.await;
}

pub async fn list_partition_reassignments(
&self,
) -> HashMap<TopicPartition, PartitionReassignment> {
let java_results = self
.admin
.call("listPartitionReassignments", vec![])
.call_async("reassignments", vec![])
.await;

map_iterator(java_results)
.map(|(topic_partition, _partition_reassignment)| {
(
topic_partition_to_rust(topic_partition),
PartitionReassignment {},
)
})
.collect()
}

pub async fn create_acls(&self, acls: Vec<Acl>) {
let resource_type = self
.jvm
Expand Down
33 changes: 33 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,31 @@ impl KafkaAdmin {
}
}

pub async fn alter_partition_reassignments(
&self,
reassignments: HashMap<TopicPartition, NewPartitionReassignment>,
) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => {
panic!("rdkafka-rs driver does not support alter_partition_reassignments")
}
Self::Java(java) => java.alter_partition_reassignments(reassignments).await,
}
}

pub async fn list_partition_reassignments(
&self,
) -> HashMap<TopicPartition, PartitionReassignment> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => {
panic!("rdkafka-rs driver does not support list_partition_reassignments")
}
Self::Java(java) => java.list_partition_reassignments().await,
}
}

pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Expand Down Expand Up @@ -712,3 +737,11 @@ pub struct RecordsToDelete {
/// If -1 is given delete all records regardless of offset.
pub delete_before_offset: i64,
}

#[derive(PartialEq, Debug)]
pub struct PartitionReassignment {}

#[derive(PartialEq, Debug)]
pub struct NewPartitionReassignment {
pub replica_broker_ids: Vec<i32>,
}

0 comments on commit cef19f2

Please sign in to comment.