Skip to content

Commit

Permalink
Merge branch 'main' into release_0.5.3
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 21, 2024
2 parents afacd6f + 3fd55b8 commit c323712
Show file tree
Hide file tree
Showing 9 changed files with 549 additions and 59 deletions.
16 changes: 16 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,12 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

#[allow(irrefutable_let_patterns)]
if let KafkaDriver::Java = driver {
// describeLogDirs is only on java driver
test_cases::describe_log_dirs(&connection_builder).await;
}

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
Expand Down Expand Up @@ -623,6 +629,16 @@ async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) {
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

#[allow(irrefutable_let_patterns)]
if let KafkaDriver::Java = driver {
// new consumer group protocol is only on java driver
test_cases::produce_consume_partitions_new_consumer_group_protocol(
&connection_builder,
"partitions3_new_consumer_group_protocol",
)
.await;
}

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
Expand Down
173 changes: 169 additions & 4 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ConsumerGroupDescription, ExpectedResponse, IsolationLevel, KafkaAdmin,
KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, ListOffsetsResultInfo,
NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, Record,
RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
ConsumerGroupDescription, ConsumerProtocol, DescribeReplicaLogDirInfo, ExpectedResponse,
IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver,
KafkaProducer, ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic,
OffsetAndMetadata, OffsetSpec, Record, RecordsToDelete, ResourcePatternType,
ResourceSpecifier, ResourceType, TopicPartition, TopicPartitionReplica,
TransactionDescription,
},
docker_compose::DockerCompose,
Expand Down Expand Up @@ -47,6 +48,11 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_new_consumer_group_protocol",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "acks0",
num_partitions: 1,
Expand Down Expand Up @@ -1091,6 +1097,64 @@ pub async fn produce_consume_partitions3(
}
}

/// The new consumer protocol must be specifically enabled in the broker config.
/// We only do this for the kafka 3.9 docker-compose.yaml so this test case is
/// manually called for that test and not included in the standard test suite.
pub async fn produce_consume_partitions_new_consumer_group_protocol(
connection_builder: &KafkaConnectionBuilder,
topic_name: &str,
) {
let producer = connection_builder.connect_producer("1", 0).await;
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec![topic_name.to_owned()])
.with_group("some_group")
.with_protocol(ConsumerProtocol::Consumer),
)
.await;

for _ in 0..5 {
producer
.assert_produce(
Record {
payload: "Message1",
topic_name,
key: Some("Key".into()),
},
// We cant predict the offsets since that will depend on which partition the keyless record ends up in
None,
)
.await;
producer
.assert_produce(
Record {
payload: "Message2",
topic_name,
key: None,
},
None,
)
.await;

consumer
.assert_consume_in_any_order(vec![
ExpectedResponse {
message: "Message1".to_owned(),
key: Some("Key".to_owned()),
topic_name: topic_name.to_owned(),
offset: None,
},
ExpectedResponse {
message: "Message2".to_owned(),
key: None,
topic_name: topic_name.to_owned(),
offset: None,
},
])
.await;
}
}

pub async fn produce_consume_multi_topic_consumer(connection_builder: &KafkaConnectionBuilder) {
let producer = connection_builder.connect_producer("1", 0).await;
let mut consumer = connection_builder
Expand Down Expand Up @@ -1801,6 +1865,107 @@ async fn create_and_list_partition_reassignments(connection_builder: &KafkaConne
);
}

// Due to `AdminClient.describeLogDirs` querying specified brokers directly, this test is specialized to a 2 shotover node, 6 kafka node cluster.
// So we call it directly from such a test, instead of including it in the standard test suite.
pub async fn describe_log_dirs(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;

// Create a topic that is replicated to every node in the cluster
admin
.create_topics_and_wait(&[
NewTopic {
name: "describe_logs_test",
num_partitions: 1,
replication_factor: 6,
},
NewTopic {
name: "describe_logs_test2",
num_partitions: 1,
replication_factor: 6,
},
])
.await;
let producer = connection_builder.connect_producer("all", 100).await;
producer
.assert_produce(
Record {
payload: "initial",
topic_name: "describe_logs_test",
key: None,
},
Some(0),
)
.await;

// describe the topic and assert contains path
let result = admin
.describe_replica_log_dirs(&[
TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 0,
},
TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 1,
},
TopicPartitionReplica {
topic_name: "describe_logs_test2".to_owned(),
partition: 0,
broker_id: 0,
},
])
.await;

/// Assert that the path in the DescribeLogsDir response matches the custom format used by shotover.
/// This format looks like: actual-kafka-broker-id3:/original/kafka/path/here
fn assert_valid_path(info: &DescribeReplicaLogDirInfo) {
let id = info
.path
.as_ref()
.unwrap()
.strip_prefix("actual-kafka-broker-id")
.unwrap()
.strip_suffix(":/bitnami/kafka/data")
.unwrap();
let id: i32 = id.parse().unwrap();
assert!(
id < 6,
"There are only 6 brokers so the broker id must be between 0-5 inclusive but was instead {id}"
);
}

assert_eq!(result.len(), 3);
assert_valid_path(
result
.get(&TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 0,
})
.unwrap(),
);
assert_valid_path(
result
.get(&TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 1,
})
.unwrap(),
);
assert_valid_path(
result
.get(&TopicPartitionReplica {
topic_name: "describe_logs_test2".to_owned(),
partition: 0,
broker_id: 0,
})
.unwrap(),
);
}

async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ services:
#
# However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run.
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"

KAFKA_CFG_GROUP_COORDINATOR_REBALANCE_PROTOCOLS: "consumer, classic"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
Expand Down
Loading

0 comments on commit c323712

Please sign in to comment.