Skip to content

Commit

Permalink
kafka: add test for multi-partition topics (#1427)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Feb 21, 2024
1 parent fea7468 commit 49c1c6c
Showing 1 changed file with 129 additions and 46 deletions.
175 changes: 129 additions & 46 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ async fn admin(config: ClientConfig) {
.create_topics(
&[
NewTopic {
name: "foo",
name: "partitions1",
num_partitions: 1,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "paritions3",
num_partitions: 3,
replication: TopicReplication::Fixed(1),
config: vec![],
},
NewTopic {
name: "acks0",
num_partitions: 1,
Expand Down Expand Up @@ -126,21 +132,89 @@ async fn admin_cleanup(config: ClientConfig) {
}
}

async fn produce_consume(client: ClientConfig) {
let topic_name = "foo";
async fn assert_produce(
producer: &FutureProducer,
record: Record<'_>,
expected_offset: Option<i64>,
) {
let send = match record.key {
Some(key) => producer
.send_result(
FutureRecord::to(record.topic_name)
.payload(record.payload)
.key(key),
)
.unwrap(),
None => producer
.send_result(FutureRecord::<(), _>::to(record.topic_name).payload(record.payload))
.unwrap(),
};
let delivery_status = tokio::time::timeout(Duration::from_secs(30), send)
.await
.expect("Timeout while receiving from producer")
.unwrap()
.unwrap();

if let Some(offset) = expected_offset {
assert_eq!(delivery_status.1, offset, "Unexpected offset");
}
}

struct Record<'a> {
payload: &'a str,
topic_name: &'a str,
key: Option<&'a str>,
}

async fn assert_consume(consumer: &StreamConsumer, response: ExpectedResponse<'_>) {
let message = tokio::time::timeout(Duration::from_secs(30), consumer.recv())
.await
.expect("Timeout while receiving from consumer")
.unwrap();
let contents = message.payload_view::<str>().unwrap().unwrap();
assert_eq!(response.message, contents);
assert_eq!(
response.key,
message.key().map(|x| std::str::from_utf8(x).unwrap())
);
assert_eq!(response.topic_name, message.topic());
assert_eq!(response.offset, message.offset());
}

struct ExpectedResponse<'a> {
message: &'a str,
key: Option<&'a str>,
topic_name: &'a str,
offset: i64,
}

async fn produce_consume(client: ClientConfig, topic_name: &str, i: i64) {
let producer: FutureProducer = client
.clone()
.set("message.timeout.ms", "5000")
.create()
.unwrap();

let delivery_status = producer
.send_result(FutureRecord::to(topic_name).payload("Message").key("Key"))
.unwrap()
.await
.unwrap()
.unwrap();
assert_eq!(delivery_status, (0, 0));
assert_produce(
&producer,
Record {
payload: "Message1",
topic_name,
key: Some("Key"),
},
Some(i * 2),
)
.await;
assert_produce(
&producer,
Record {
payload: "Message2",
topic_name,
key: None,
},
Some(i * 2 + 1),
)
.await;

let consumer: StreamConsumer = client
.clone()
Expand All @@ -152,16 +226,26 @@ async fn produce_consume(client: ClientConfig) {
.unwrap();
consumer.subscribe(&[topic_name]).unwrap();

let message = tokio::time::timeout(Duration::from_secs(30), consumer.recv())
.await
.expect("Timeout while receiving from consumer")
.unwrap();
let contents = message.payload_view::<str>().unwrap().unwrap();
assert_eq!("Message", contents);
assert_eq!(b"Key", message.key().unwrap());
assert_eq!("foo", message.topic());
assert_eq!(0, message.offset());
assert_eq!(0, message.partition());
assert_consume(
&consumer,
ExpectedResponse {
message: "Message1",
key: Some("Key"),
topic_name,
offset: 0,
},
)
.await;
assert_consume(
&consumer,
ExpectedResponse {
message: "Message2",
key: None,
topic_name,
offset: 1,
},
)
.await;
}

async fn produce_consume_acks0(client: ClientConfig) {
Expand All @@ -174,20 +258,16 @@ async fn produce_consume_acks0(client: ClientConfig) {
.unwrap();

for _ in 0..10 {
tokio::time::timeout(
Duration::from_secs(30),
producer
.send_result(
FutureRecord::to(topic_name)
.payload("MessageAcks0")
.key("KeyAcks0"),
)
.unwrap(),
assert_produce(
&producer,
Record {
payload: "MessageAcks0",
topic_name,
key: Some("KeyAcks0"),
},
None,
)
.await
.expect("Timeout while receiving from producer")
.unwrap()
.unwrap();
.await;
}

let consumer: StreamConsumer = client
Expand All @@ -200,17 +280,17 @@ async fn produce_consume_acks0(client: ClientConfig) {
.unwrap();
consumer.subscribe(&[topic_name]).unwrap();

for i in 0..10 {
let message = tokio::time::timeout(Duration::from_secs(30), consumer.recv())
.await
.expect("Timeout while receiving from consumer")
.unwrap();
let contents = message.payload_view::<str>().unwrap().unwrap();
assert_eq!("MessageAcks0", contents);
assert_eq!(b"KeyAcks0", message.key().unwrap());
assert_eq!("acks0", message.topic());
assert_eq!(i, message.offset());
assert_eq!(0, message.partition());
for j in 0..10 {
assert_consume(
&consumer,
ExpectedResponse {
message: "MessageAcks0",
key: Some("KeyAcks0"),
topic_name,
offset: j,
},
)
.await;
}
}

Expand All @@ -221,7 +301,10 @@ pub async fn basic(address: &str) {
// internal driver debug logs are emitted to tokio tracing, assuming the appropriate filter is used by the tracing subscriber
.set("debug", "all");
admin(client.clone()).await;
produce_consume(client.clone()).await;
produce_consume_acks0(client.clone()).await;
for i in 0..2 {
produce_consume(client.clone(), "partitions1", i).await;
produce_consume(client.clone(), "partitions3", i).await;
produce_consume_acks0(client.clone()).await;
}
admin_cleanup(client.clone()).await;
}

0 comments on commit 49c1c6c

Please sign in to comment.