From 1b71e42b334c56d0182e3e2b4d96c693862ab461 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 20 Nov 2024 16:07:57 +1100 Subject: [PATCH] ApiVersions rewrite --- Cargo.lock | 3 +- shotover/Cargo.toml | 2 +- .../kafka/sink_cluster/api_versions.rs | 68 +++++++++++++++++++ 3 files changed, 70 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 038791f70..671a889ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2795,8 +2795,7 @@ dependencies = [ [[package]] name = "kafka-protocol" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1edaf2fc3ecebe689bbc4fd97a6921cacd4cd09df8ebeda348a8e23c9fd48d4" +source = "git+https://github.com/rukai/kafka-protocol-rs?branch=shotover_fork#6990104ee29b915df702700fd85ed498d4312bb8" dependencies = [ "anyhow", "bytes", diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 3f6df96f9..ded009253 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -115,7 +115,7 @@ aws-config = { version = "1.0.0", optional = true } aws-sdk-kms = { version = "1.1.0", optional = true } chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true } generic-array = { version = "0.14", features = ["serde"], optional = true } -kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] } +kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"], git = "https://github.com/rukai/kafka-protocol-rs", branch = "shotover_fork" } rustls = { version = "0.23.18", default-features = false, features = ["tls12"] } tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } rustls-pemfile = "2.0.0" diff --git a/shotover/src/transforms/kafka/sink_cluster/api_versions.rs b/shotover/src/transforms/kafka/sink_cluster/api_versions.rs index 979cf4dea..5bb28fe70 100644 --- a/shotover/src/transforms/kafka/sink_cluster/api_versions.rs +++ b/shotover/src/transforms/kafka/sink_cluster/api_versions.rs @@ -60,3 +60,71 @@ pub(crate) fn versions_supported_by_key(api_key: i16) -> Option { Err(_) => None, } } + +// This test gives visibility into the api versions that shotover doesnt support yet. +// If this test is failing after a `cargo update`, you can just alter EXPECTED_ERROR_MESSAGE to include the new versions. +// The actual upgrade can be done later. +#[test] +fn check_api_version_backlog() { + use std::fmt::Write; + const EXPECTED_ERROR_MESSAGE: &str = r#" +LeaderAndIsrKey kafka-protocol=0..7 shotover=NotSupported +StopReplicaKey kafka-protocol=0..4 shotover=NotSupported +UpdateMetadataKey kafka-protocol=0..8 shotover=NotSupported +ControlledShutdownKey kafka-protocol=0..3 shotover=NotSupported +WriteTxnMarkersKey kafka-protocol=0..1 shotover=NotSupported +DescribeAclsKey kafka-protocol=0..3 shotover=NotSupported +DeleteAclsKey kafka-protocol=0..3 shotover=NotSupported +AlterReplicaLogDirsKey kafka-protocol=0..2 shotover=NotSupported +CreateDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported +RenewDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported +ExpireDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported +DescribeDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported +IncrementalAlterConfigsKey kafka-protocol=0..1 shotover=NotSupported +DescribeClientQuotasKey kafka-protocol=0..1 shotover=NotSupported +AlterClientQuotasKey kafka-protocol=0..1 shotover=NotSupported +DescribeUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported +AlterUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported +VoteKey kafka-protocol=0..0 shotover=NotSupported +BeginQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported +EndQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported +DescribeQuorumKey kafka-protocol=0..1 shotover=NotSupported +UpdateFeaturesKey kafka-protocol=0..1 shotover=NotSupported +EnvelopeKey kafka-protocol=0..0 shotover=NotSupported +FetchSnapshotKey kafka-protocol=0..0 shotover=NotSupported +BrokerRegistrationKey kafka-protocol=0..3 shotover=NotSupported +BrokerHeartbeatKey kafka-protocol=0..1 shotover=NotSupported +UnregisterBrokerKey kafka-protocol=0..0 shotover=NotSupported +AllocateProducerIdsKey kafka-protocol=0..0 shotover=NotSupported +ControllerRegistrationKey kafka-protocol=0..0 shotover=NotSupported +GetTelemetrySubscriptionsKey kafka-protocol=0..0 shotover=NotSupported +PushTelemetryKey kafka-protocol=0..0 shotover=NotSupported +AssignReplicasToDirsKey kafka-protocol=0..0 shotover=NotSupported +ListClientMetricsResourcesKey kafka-protocol=0..0 shotover=NotSupported +DescribeTopicPartitionsKey kafka-protocol=0..0 shotover=NotSupported +"#; + + let mut error_message = String::new(); + for api_key in ApiKey::iterate_all() { + let shotover_version = versions_supported_by_key(api_key as i16); + + let kafka_protocol_version = api_key.valid_versions(); + if shotover_version != Some(kafka_protocol_version) { + let shotover_version = match shotover_version { + Some(version) => format!("{version}"), + None => "NotSupported".to_owned(), + }; + writeln!( + error_message, + "{api_key:?} kafka-protocol={kafka_protocol_version} shotover={shotover_version}" + ) + .unwrap(); + } + } + + pretty_assertions::assert_eq!( + error_message.trim(), + EXPECTED_ERROR_MESSAGE.trim(), + "The list of message types not supported by shotover differs from the expected list defined in EXPECTED_ERROR_MESSAGE", + ); +}