Skip to content

Commit

Permalink
KafkaSinkCluster: route DescribeLogDirs request (#1823)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Nov 20, 2024
1 parent 3f76b54 commit 229c9ce
Show file tree
Hide file tree
Showing 7 changed files with 342 additions and 51 deletions.
6 changes: 6 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,12 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

#[allow(irrefutable_let_patterns)]
if let KafkaDriver::Java = driver {
// describeLogDirs is only on java driver
test_cases::describe_log_dirs(&connection_builder).await;
}

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
Expand Down
111 changes: 106 additions & 5 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ConsumerGroupDescription, ExpectedResponse, IsolationLevel, KafkaAdmin,
KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer, ListOffsetsResultInfo,
NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata, OffsetSpec, Record,
RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
TransactionDescription,
ConsumerGroupDescription, DescribeReplicaLogDirInfo, ExpectedResponse, IsolationLevel,
KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver, KafkaProducer,
ListOffsetsResultInfo, NewPartition, NewPartitionReassignment, NewTopic, OffsetAndMetadata,
OffsetSpec, Record, RecordsToDelete, ResourcePatternType, ResourceSpecifier, ResourceType,
TopicPartition, TopicPartitionReplica, TransactionDescription,
},
docker_compose::DockerCompose,
};
Expand Down Expand Up @@ -1801,6 +1801,107 @@ async fn create_and_list_partition_reassignments(connection_builder: &KafkaConne
);
}

// Due to `AdminClient.describeLogDirs` querying specified brokers directly, this test is specialized to a 2 shotover node, 6 kafka node cluster.
// So we call it directly from such a test, instead of including it in the standard test suite.
pub async fn describe_log_dirs(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;

// Create a topic that is replicated to every node in the cluster
admin
.create_topics_and_wait(&[
NewTopic {
name: "describe_logs_test",
num_partitions: 1,
replication_factor: 6,
},
NewTopic {
name: "describe_logs_test2",
num_partitions: 1,
replication_factor: 6,
},
])
.await;
let producer = connection_builder.connect_producer("all", 100).await;
producer
.assert_produce(
Record {
payload: "initial",
topic_name: "describe_logs_test",
key: None,
},
Some(0),
)
.await;

// describe the topic and assert contains path
let result = admin
.describe_replica_log_dirs(&[
TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 0,
},
TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 1,
},
TopicPartitionReplica {
topic_name: "describe_logs_test2".to_owned(),
partition: 0,
broker_id: 0,
},
])
.await;

/// Assert that the path in the DescribeLogsDir response matches the custom format used by shotover.
/// This format looks like: actual-kafka-broker-id3:/original/kafka/path/here
fn assert_valid_path(info: &DescribeReplicaLogDirInfo) {
let id = info
.path
.as_ref()
.unwrap()
.strip_prefix("actual-kafka-broker-id")
.unwrap()
.strip_suffix(":/bitnami/kafka/data")
.unwrap();
let id: i32 = id.parse().unwrap();
assert!(
id < 6,
"There are only 6 brokers so the broker id must be between 0-5 inclusive but was instead {id}"
);
}

assert_eq!(result.len(), 3);
assert_valid_path(
result
.get(&TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 0,
})
.unwrap(),
);
assert_valid_path(
result
.get(&TopicPartitionReplica {
topic_name: "describe_logs_test".to_owned(),
partition: 0,
broker_id: 1,
})
.unwrap(),
);
assert_valid_path(
result
.get(&TopicPartitionReplica {
topic_name: "describe_logs_test2".to_owned(),
partition: 0,
broker_id: 0,
})
.unwrap(),
);
}

async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
Expand Down
Loading

0 comments on commit 229c9ce

Please sign in to comment.