Skip to content

Commit

Permalink
list_consumer_group_offsets
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 8, 2024
1 parent a75cf7f commit f41685d
Show file tree
Hide file tree
Showing 5 changed files with 177 additions and 11 deletions.
51 changes: 48 additions & 3 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,54 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;

admin
.delete_groups(&["some_group", "some_group1", "consumer_group_with_offsets"])
.await;
admin.delete_groups(&["some_group", "some_group1"]).await;
delete_records(&admin, connection_builder).await;
}

async fn delete_offsets(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;

// Only supported by java driver
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
// assert offset exists
let result = admin
.list_consumer_group_offsets("consumer_group_with_offsets".to_owned())
.await;
let expected_result: HashMap<_, HashMap<TopicPartition, OffsetAndMetadata>> =
HashMap::from([(
"consumer_group_with_offsets".to_owned(),
HashMap::from([(
TopicPartition {
topic_name: "partitions1_with_offset".to_owned(),
partition: 0,
},
OffsetAndMetadata { offset: 2 },
)]),
)]);
assert_eq!(result, expected_result);

// delete offset
admin
.delete_consumer_group_offsets(
"consumer_group_with_offsets".to_owned(),
&[TopicPartition {
topic_name: "partitions1_with_offset".to_owned(),
partition: 0,
}],
)
.await;

// assert offset is deleted
let result = admin
.list_consumer_group_offsets("consumer_group_with_offsets".to_owned())
.await;
let expected_result: HashMap<_, HashMap<TopicPartition, OffsetAndMetadata>> =
HashMap::from([("consumer_group_with_offsets".to_owned(), HashMap::new())]);
assert_eq!(result, expected_result);
}
}

async fn delete_records(admin: &KafkaAdmin, connection_builder: &KafkaConnectionBuilder) {
// Only supported by java driver
#[allow(irrefutable_let_patterns)]
Expand Down Expand Up @@ -894,6 +936,9 @@ pub async fn produce_consume_commit_offsets_partitions1(
})
.await;
}

// test the admin API's offset list and delete operations
delete_offsets(connection_builder).await;
}

pub async fn produce_consume_partitions3(
Expand Down
1 change: 1 addition & 0 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1049,6 +1049,7 @@ The connection to the client has been closed."
| RequestBody::CreatePartitions(_)
| RequestBody::DeleteTopics(_)
| RequestBody::DeleteRecords(_)
| RequestBody::OffsetDelete(_)
| RequestBody::CreateAcls(_)
| RequestBody::ApiVersions(_),
..
Expand Down
19 changes: 19 additions & 0 deletions test-helpers/src/connection/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,14 @@ impl Jvm {
}
}

/// Construct a java set containing the provided elements
pub(crate) fn new_set(&self, element_type: &str, elements: Vec<Value>) -> Value {
self.construct(
"java.util.HashSet",
vec![self.new_list(element_type, elements)],
)
}

/// Construct a java map containing the provided key value pairs
pub(crate) fn new_map(&self, key_values: Vec<(Value, Value)>) -> Value {
let map = self.construct("java.util.HashMap", vec![]);
Expand Down Expand Up @@ -409,3 +417,14 @@ impl Iterator for JavaIterator {
}
}
}

pub fn map_iterator(map: Value) -> impl Iterator<Item = (Value, Value)> {
map.cast("java.util.Map")
.call("entrySet", vec![])
.call("iterator", vec![])
.into_iter()
.map(|entry| {
let entry = entry.cast("java.util.Map$Entry");
(entry.call("getKey", vec![]), entry.call("getValue", vec![]))
})
}
86 changes: 78 additions & 8 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{
Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicDescription, TopicPartition, TopicPartitionInfo,
};
use crate::connection::java::{Jvm, Value};
use crate::connection::java::{map_iterator, Jvm, Value};
use anyhow::Result;
use pretty_assertions::assert_eq;
use std::{
Expand Down Expand Up @@ -273,7 +273,7 @@ impl KafkaProducerJava {
.into_iter()
.map(|(tp, offset_and_metadata)| {
(
create_topic_partition(&self.jvm, &tp),
topic_partition_to_java(&self.jvm, &tp),
self.jvm.construct(
"org.apache.kafka.clients.consumer.OffsetAndMetadata",
vec![self.jvm.new_long(offset_and_metadata.offset)],
Expand Down Expand Up @@ -383,7 +383,7 @@ impl KafkaConsumerJava {
.iter()
.map(|(tp, offset)| {
(
create_topic_partition(&self.jvm, tp),
topic_partition_to_java(&self.jvm, tp),
self.jvm.construct(
"org.apache.kafka.clients.consumer.OffsetAndMetadata",
vec![self.jvm.new_long(*offset)],
Expand All @@ -403,7 +403,7 @@ impl KafkaConsumerJava {
let mut offsets = HashMap::new();

for tp in partitions {
let topic_partition = create_topic_partition(&self.jvm, &tp);
let topic_partition = topic_partition_to_java(&self.jvm, &tp);

let timeout = self.jvm.call_static(
"java.time.Duration",
Expand Down Expand Up @@ -527,7 +527,7 @@ impl KafkaAdminJava {
.iter()
.map(|x| {
(
create_topic_partition(&self.jvm, &x.topic_partition),
topic_partition_to_java(&self.jvm, &x.topic_partition),
self.jvm.call_static(
"org.apache.kafka.clients.admin.RecordsToDelete",
"beforeOffset",
Expand Down Expand Up @@ -653,7 +653,7 @@ impl KafkaAdminJava {
.iter()
.map(|(topic_partition, offset_spec)| {
(
create_topic_partition(&self.jvm, topic_partition),
topic_partition_to_java(&self.jvm, topic_partition),
match offset_spec {
OffsetSpec::Earliest => {
self.jvm.call_static(offset_spec_class, "earliest", vec![])
Expand All @@ -678,7 +678,7 @@ impl KafkaAdminJava {
let result = java_results
.call(
"get",
vec![create_topic_partition(&self.jvm, &topic_partition)],
vec![topic_partition_to_java(&self.jvm, &topic_partition)],
)
.cast("org.apache.kafka.clients.admin.ListOffsetsResult$ListOffsetsResultInfo");
let offset: i32 = result.call("offset", vec![]).into_rust();
Expand Down Expand Up @@ -725,6 +725,60 @@ impl KafkaAdminJava {
results
}

pub async fn list_consumer_group_offsets(
&self,
group_id: String,
) -> HashMap<String, HashMap<TopicPartition, OffsetAndMetadata>> {
let group_id = self.jvm.new_string(&group_id);

let java_results = self
.admin
.call("listConsumerGroupOffsets", vec![group_id])
.call_async("all", vec![])
.await;

// iterate through the hashmap containing hashmap, turning all the values into rust values
map_iterator(java_results)
.map(|(group_id, value)| {
(
group_id.into_rust(),
map_iterator(value)
.map(|(topic_partition, offset_and_metadata)| {
(
topic_partition_to_rust(topic_partition),
offset_and_metadata_to_rust(offset_and_metadata),
)
})
.collect(),
)
})
.collect()
}

pub async fn delete_consumer_group_offsets(
&self,
group_id: String,
topic_partitions: &[TopicPartition],
) {
let group_id = self.jvm.new_string(&group_id);

let topic_partitions_java = self.jvm.new_set(
"org.apache.kafka.common.TopicPartition",
topic_partitions
.iter()
.map(|topic_partition| topic_partition_to_java(&self.jvm, topic_partition))
.collect(),
);

self.admin
.call(
"deleteConsumerGroupOffsets",
vec![group_id, topic_partitions_java],
)
.call_async("all", vec![])
.await;
}

pub async fn create_acls(&self, acls: Vec<Acl>) {
let resource_type = self
.jvm
Expand Down Expand Up @@ -807,9 +861,25 @@ impl KafkaAdminJava {
}
}

fn create_topic_partition(jvm: &Jvm, tp: &TopicPartition) -> Value {
fn topic_partition_to_java(jvm: &Jvm, tp: &TopicPartition) -> Value {
jvm.construct(
"org.apache.kafka.common.TopicPartition",
vec![jvm.new_string(&tp.topic_name), jvm.new_int(tp.partition)],
)
}

fn topic_partition_to_rust(tp: Value) -> TopicPartition {
let tp = tp.cast("org.apache.kafka.common.TopicPartition");
TopicPartition {
topic_name: tp.call("topic", vec![]).into_rust(),
partition: tp.call("partition", vec![]).into_rust(),
}
}

fn offset_and_metadata_to_rust(offset_and_metadata: Value) -> OffsetAndMetadata {
let offset_and_metadata =
offset_and_metadata.cast("org.apache.kafka.clients.consumer.OffsetAndMetadata");
OffsetAndMetadata {
offset: offset_and_metadata.call("offset", vec![]).into_rust(),
}
}
31 changes: 31 additions & 0 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,36 @@ impl KafkaAdmin {
}
}

pub async fn list_consumer_group_offsets(
&self,
group_id: String,
) -> HashMap<String, HashMap<TopicPartition, OffsetAndMetadata>> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => {
panic!("rdkafka-rs driver does not support list_consumer_group_offsets")
}
Self::Java(java) => java.list_consumer_group_offsets(group_id).await,
}
}

pub async fn delete_consumer_group_offsets(
&self,
group_id: String,
topic_partitions: &[TopicPartition],
) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Self::Cpp(_) => {
panic!("rdkafka-rs driver does not support delete_consumer_group_offsets")
}
Self::Java(java) => {
java.delete_consumer_group_offsets(group_id, topic_partitions)
.await
}
}
}

pub async fn create_partitions(&self, partitions: &[NewPartition<'_>]) {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
Expand Down Expand Up @@ -662,6 +692,7 @@ impl IsolationLevel {
}
}

#[derive(PartialEq, Debug)]
pub struct OffsetAndMetadata {
pub offset: i64,
}
Expand Down

0 comments on commit f41685d

Please sign in to comment.