diff --git a/shotover-proxy/tests/kafka_int_tests/test_cases.rs b/shotover-proxy/tests/kafka_int_tests/test_cases.rs index b677283d9..7e099aab0 100644 --- a/shotover-proxy/tests/kafka_int_tests/test_cases.rs +++ b/shotover-proxy/tests/kafka_int_tests/test_cases.rs @@ -4,8 +4,8 @@ use test_helpers::{ connection::kafka::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig, ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, - KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic, - OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType, + KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, + NewTopic, OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition, }, docker_compose::DockerCompose, @@ -1653,6 +1653,54 @@ async fn list_transactions(connection_builder: &KafkaConnectionBuilder) { assert_eq!(actual_results, expected_results); } +async fn create_and_list_partition_reassignments(connection_builder: &KafkaConnectionBuilder) { + let admin = connection_builder.connect_admin().await; + admin + .alter_partition_reassignments(HashMap::from([( + TopicPartition { + topic_name: "partitions1".to_owned(), + partition: 0, + }, + NewPartitionReassignment { + replica_broker_ids: vec![0], + }, + )])) + .await; + + let actual_results = admin.list_partition_reassignments().await; + + if actual_results.is_empty() { + // If too much time passes between requesting the reassignment and listing the reassignment it, + // the reassignment might have already completed so there is nothing to list. + // In that case return early to skip the assertions. + // + // The assertions should still run sometimes, so its worth keeping around. + // And at the very least we know that the messages are sent/received succesfully. + return; + } + + assert_eq!(actual_results.len(), 1); + let reassignment = actual_results + .get(&TopicPartition { + topic_name: "partitions1".to_owned(), + partition: 0, + }) + .unwrap(); + let expected_adding_replica_broker_ids: &[i32] = if reassignment.replica_broker_ids == [0] { + // The original broker is randomly assigned. + // If it happens to be broker 0, matching the new broker we requested, + // then adding_replica_broker_ids will be empty + &[] + } else { + // otherwise it contains the new broker we requested + &[0] + }; + assert_eq!( + reassignment.adding_replica_broker_ids, + expected_adding_replica_broker_ids + ); +} + async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) { let admin = connection_builder.connect_admin().await; admin @@ -1679,6 +1727,7 @@ pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnec if let KafkaConnectionBuilder::Java(_) = connection_builder { list_groups(connection_builder).await; list_transactions(connection_builder).await; + create_and_list_partition_reassignments(connection_builder).await; } } diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 13e64203d..3a895e0bb 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -1072,7 +1072,9 @@ The connection to the client has been closed." | RequestBody::CreatePartitions(_) | RequestBody::DeleteTopics(_) | RequestBody::CreateAcls(_) - | RequestBody::ApiVersions(_), + | RequestBody::ApiVersions(_) + | RequestBody::AlterPartitionReassignments(_) + | RequestBody::ListPartitionReassignments(_), .. })) => self.route_to_random_broker(request), diff --git a/test-helpers/src/connection/kafka/java.rs b/test-helpers/src/connection/kafka/java.rs index ac1728068..2791f316b 100644 --- a/test-helpers/src/connection/kafka/java.rs +++ b/test-helpers/src/connection/kafka/java.rs @@ -1,8 +1,8 @@ use super::{ Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse, - ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata, OffsetSpec, ProduceResult, - Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, - TopicDescription, TopicPartition, TopicPartitionInfo, + ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, + OffsetSpec, PartitionReassignment, ProduceResult, Record, RecordsToDelete, ResourcePatternType, + ResourceSpecifier, ResourceType, TopicDescription, TopicPartition, TopicPartitionInfo, }; use crate::connection::java::{map_iterator, Jvm, Value}; use anyhow::Result; @@ -799,6 +799,70 @@ impl KafkaAdminJava { .map(|_| ()) } + pub async fn alter_partition_reassignments( + &self, + reassignments: HashMap, + ) { + let reassignments_java: Vec<_> = reassignments + .into_iter() + .map(|(topic_partition, reassignment)| { + ( + topic_partition_to_java(&self.jvm, &topic_partition), + self.jvm.call_static( + "java.util.Optional", + "of", + vec![self.jvm.construct( + "org.apache.kafka.clients.admin.NewPartitionReassignment", + vec![self.jvm.new_list( + "java.lang.Integer", + reassignment + .replica_broker_ids + .into_iter() + .map(|x| self.jvm.new_int_object(x)) + .collect(), + )], + )], + ), + ) + }) + .collect(); + let reassignments_java = self.jvm.new_map(reassignments_java); + self.admin + .call("alterPartitionReassignments", vec![reassignments_java]) + .call_async("all", vec![]) + .await; + } + + pub async fn list_partition_reassignments( + &self, + ) -> HashMap { + let java_results = self + .admin + .call("listPartitionReassignments", vec![]) + .call_async("reassignments", vec![]) + .await; + + map_iterator(java_results) + .map(|(topic_partition, partition_reassignment)| { + (topic_partition_to_rust(topic_partition), { + let partition_reassignment = partition_reassignment + .cast("org.apache.kafka.clients.admin.PartitionReassignment"); + PartitionReassignment { + adding_replica_broker_ids: partition_reassignment + .call("addingReplicas", vec![]) + .into_rust(), + removing_replica_broker_ids: partition_reassignment + .call("removingReplicas", vec![]) + .into_rust(), + replica_broker_ids: partition_reassignment + .call("replicas", vec![]) + .into_rust(), + } + }) + }) + .collect() + } + pub async fn create_acls(&self, acls: Vec) { let resource_type = self .jvm diff --git a/test-helpers/src/connection/kafka/mod.rs b/test-helpers/src/connection/kafka/mod.rs index 7bd001db3..ac12926c8 100644 --- a/test-helpers/src/connection/kafka/mod.rs +++ b/test-helpers/src/connection/kafka/mod.rs @@ -517,6 +517,31 @@ impl KafkaAdmin { } } + pub async fn alter_partition_reassignments( + &self, + reassignments: HashMap, + ) { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => { + panic!("rdkafka-rs driver does not support alter_partition_reassignments") + } + Self::Java(java) => java.alter_partition_reassignments(reassignments).await, + } + } + + pub async fn list_partition_reassignments( + &self, + ) -> HashMap { + match self { + #[cfg(feature = "kafka-cpp-driver-tests")] + Self::Cpp(_) => { + panic!("rdkafka-rs driver does not support list_partition_reassignments") + } + Self::Java(java) => java.list_partition_reassignments().await, + } + } + pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) { match self { #[cfg(feature = "kafka-cpp-driver-tests")] @@ -712,3 +737,15 @@ pub struct RecordsToDelete { /// If -1 is given delete all records regardless of offset. pub delete_before_offset: i64, } + +#[derive(PartialEq, Debug)] +pub struct PartitionReassignment { + pub adding_replica_broker_ids: Vec, + pub removing_replica_broker_ids: Vec, + pub replica_broker_ids: Vec, +} + +#[derive(PartialEq, Debug)] +pub struct NewPartitionReassignment { + pub replica_broker_ids: Vec, +}