Skip to content

Commit

Permalink
Implement routing for ListOffsets
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 9, 2024
1 parent 9801ed4 commit be9dfa3
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 32 deletions.
69 changes: 67 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer,
NewPartition, NewTopic, Record, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicPartition,
ListOffsetsResultInfo, NewPartition, NewTopic, OffsetSpec, Record, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -859,6 +859,71 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
}])
.await;
produce_consume_partitions1(connection_builder, "partitions1").await;

let results = admin
.list_offsets(HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
OffsetSpec::Latest,
),
]))
.await;

let expected = HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 11 },
),
]);
assert_eq!(results, expected);
}

produce_consume_acks0(connection_builder).await;
Expand Down
Loading

0 comments on commit be9dfa3

Please sign in to comment.