Skip to content

Commit

Permalink
kafka: fix handling of multi-partition topics
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Jan 21, 2024
1 parent 4da469a commit 1a9d94b
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ async fn admin(config: ClientConfig) {
.create_topics(
&[
NewTopic {
name: "foo",
name: "partitions1",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "paritions3",
num_partitions: 3,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "acks0",
num_partitions: 1,
Expand Down Expand Up @@ -126,8 +132,7 @@ async fn admin_cleanup(config: ClientConfig) {
}
}

async fn produce_consume(client: ClientConfig) {
let topic_name = "foo";
async fn produce_consume(client: ClientConfig, topic_name: &str) {
let producer: FutureProducer = client
.clone()
.set("message.timeout.ms", "5000")
Expand All @@ -140,7 +145,7 @@ async fn produce_consume(client: ClientConfig) {
.await
.unwrap()
.unwrap();
assert_eq!(delivery_status, (0, 0));
assert_eq!(delivery_status.1, 0);

let consumer: StreamConsumer = client
.clone()
Expand All @@ -159,9 +164,8 @@ async fn produce_consume(client: ClientConfig) {
let contents = message.payload_view::<str>().unwrap().unwrap();
assert_eq!("Message", contents);
assert_eq!(b"Key", message.key().unwrap());
assert_eq!("foo", message.topic());
assert_eq!(topic_name, message.topic());
assert_eq!(0, message.offset());
assert_eq!(0, message.partition());
}

async fn produce_consume_acks0(client: ClientConfig) {
Expand Down Expand Up @@ -221,7 +225,8 @@ pub async fn basic(address: &str) {
// internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber
.set("debug", "all");
admin(client.clone()).await;
produce_consume(client.clone()).await;
produce_consume(client.clone(), "partitions1").await;
//produce_consume(client.clone(), "partitions3").await;
produce_consume_acks0(client.clone()).await;
admin_cleanup(client.clone()).await;
}

0 comments on commit 1a9d94b

Please sign in to comment.