Skip to content

Commit

Permalink
KafkaSinkCluster: Add kafka 3.9 integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Nov 18, 2024
1 parent 8045ecd commit 5ac944d
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 1 deletion.
37 changes: 37 additions & 0 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,43 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
}
}

#[rstest]
#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover_kafka_3_9(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-2-racks/docker-compose-kafka-3.9.yaml");

// One shotover instance per rack
let mut shotovers = vec![];
for i in 1..3 {
shotovers.push(
shotover_process(&format!(
"tests/test-configs/kafka/cluster-2-racks/topology-rack{i}.yaml"
))
.with_config(&format!(
"tests/test-configs/shotover-config/config{i}.yaml"
))
.with_log_name(&format!("shotover{i}"))
.start()
.await,
);
}

let connection_builder = KafkaConnectionBuilder::new(driver, "127.0.0.1:9192");
test_cases::cluster_test_suite(&connection_builder).await;

for shotover in shotovers {
tokio::time::timeout(
Duration::from_secs(10),
shotover.shutdown_and_then_consume_events(&multi_shotover_events()),
)
.await
.expect("Shotover did not shutdown within 10s");
}
}

#[rstest]
//#[cfg_attr(feature = "kafka-cpp-driver-tests", case::cpp(KafkaDriver::Cpp))] // CPP driver does not support scram
#[case::java(KafkaDriver::Java)]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
networks:
cluster_subnet:
name: cluster_subnet
driver: bridge
ipam:
driver: default
config:
- subnet: 172.16.1.0/24
gateway: 172.16.1.1
services:
kafka0:
image: &image 'bitnami/kafka:3.9.0-debian-12-r3'
networks:
cluster_subnet:
ipv4_address: 172.16.1.2
environment: &environment
KAFKA_CFG_LISTENERS: "BROKER://:9092,CONTROLLER://:9093"
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.2:9092"
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,BROKER:PLAINTEXT"
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: "BROKER"
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
KAFKA_KRAFT_CLUSTER_ID: "abcdefghijklmnopqrstuv"
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka0:9093,1@kafka1:9093,2@kafka2:9093,3@kafka3:9093,4@kafka4:9093,5@kafka5:9093"
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_BROKER_RACK: "rack1"
ALLOW_PLAINTEXT_LISTENER: "yes"
# Required for high availability
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 3
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 2

# This cfg is set to 3000 by default, which for a typical workload reduces the overhead of creating a
# new consumer group by avoiding constant rebalances as each initial consumer joins.
# See: https://cwiki.apache.org/confluence/display/KAFKA/KIP-134%3A+Delay+initial+consumer+group+rebalance
#
# However for an integration test workload we are constantly spinning up single consumer groups, so the default value makes the tests take twice as long to run.
KAFKA_CFG_GROUP_INITIAL_REBALANCE_DELAY_MS: "0"
volumes: &volumes
- type: tmpfs
target: /bitnami/kafka
kafka1:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.3
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.3:9092"
KAFKA_CFG_NODE_ID: 1
KAFKA_CFG_BROKER_RACK: "rack1"
volumes: *volumes
kafka2:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.4
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.4:9092"
KAFKA_CFG_NODE_ID: 2
KAFKA_CFG_BROKER_RACK: "rack1"
volumes: *volumes
kafka3:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.5
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.5:9092"
KAFKA_CFG_NODE_ID: 3
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
kafka4:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.6
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.6:9092"
KAFKA_CFG_NODE_ID: 4
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
kafka5:
image: *image
networks:
cluster_subnet:
ipv4_address: 172.16.1.7
environment:
<<: *environment
KAFKA_CFG_ADVERTISED_LISTENERS: "BROKER://172.16.1.7:9092"
KAFKA_CFG_NODE_ID: 5
KAFKA_CFG_BROKER_RACK: "rack2"
volumes: *volumes
37 changes: 37 additions & 0 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3077,6 +3077,43 @@ The connection to the client has been closed."
self.rewrite_describe_cluster_response(describe_cluster)?;
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ApiVersions(api_versions),
..
})) => {
let original_size = api_versions.api_keys.len();

// List of keys that shotover doesnt support and so should be removed from supported keys list
let disable_keys = [
// This message type has very little documentation available but it seems that due to the way the pagination
// functionality is designed, there is no straightforward way to implement this message type in shotover.
// I reccomend we avoid implementing this message type for as long as possible.
//
// The problem is that the cursor spans across topics, but each partition in the request needs to be split up and routed to the owner of that partition.
// So shotover is going to need to split the cursor across different brokers which is ill-defined.
//
// A possible solution could be something like:
// 1. split and route request to brokers by location of partition and remove the cursor from the request.
// 2. Store the result in a cache.
// 3. Apply our own cursor logic upon the data in the cache.
// 4. On follow up requests make use the existing cache to respond with the next section of the paginated data.
//
// But this will be quite difficult to implement and makes pagination incredibly inefficient which is the opposite of the intention of pagination.
// So for now, lets not.
ApiKey::DescribeTopicPartitionsKey as i16,
// This message type is part of the new consumer group API, we should implement support for it in the future.
// I've disabled it for now to keep the scope down for kafka 3.9 support.
ApiKey::ConsumerGroupDescribeKey as i16,
];
api_versions
.api_keys
.retain(|x| !disable_keys.contains(&x.api_key));

if original_size != api_versions.api_keys.len() {
// only invalidate the cache if we actually removed anything
response.invalidate_cache();
}
}
_ => {}
}

Expand Down
7 changes: 6 additions & 1 deletion test-helpers/src/docker_compose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub fn new_moto() -> DockerCompose {
docker_compose("tests/transforms/docker-compose-moto.yaml")
}

pub static IMAGE_WAITERS: [Image; 11] = [
pub static IMAGE_WAITERS: [Image; 12] = [
Image {
name: "motoserver/moto",
log_regex_to_wait_for: r"Press CTRL\+C to quit",
Expand Down Expand Up @@ -75,6 +75,11 @@ pub static IMAGE_WAITERS: [Image; 11] = [
log_regex_to_wait_for: r"Kafka Server started",
timeout: Duration::from_secs(120),
},
Image {
name: "bitnami/kafka:3.9.0-debian-12-r3",
log_regex_to_wait_for: r"Kafka Server started",
timeout: Duration::from_secs(120),
},
Image {
name: "opensearchproject/opensearch:2.9.0",
log_regex_to_wait_for: r"Node started",
Expand Down

0 comments on commit 5ac944d

Please sign in to comment.