diff --git a/Cargo.lock b/Cargo.lock index 3dd821617..3680d1699 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2795,8 +2795,7 @@ dependencies = [ [[package]] name = "kafka-protocol" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1edaf2fc3ecebe689bbc4fd97a6921cacd4cd09df8ebeda348a8e23c9fd48d4" +source = "git+https://github.com/rukai/kafka-protocol-rs?branch=shotover_fork#6990104ee29b915df702700fd85ed498d4312bb8" dependencies = [ "anyhow", "bytes", diff --git a/shotover/Cargo.toml b/shotover/Cargo.toml index 12498d7d9..031278b19 100644 --- a/shotover/Cargo.toml +++ b/shotover/Cargo.toml @@ -115,7 +115,7 @@ aws-config = { version = "1.0.0", optional = true } aws-sdk-kms = { version = "1.1.0", optional = true } chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true } generic-array = { version = "0.14", features = ["serde"], optional = true } -kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] } +kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"], git = "https://github.com/rukai/kafka-protocol-rs", branch = "shotover_fork" } rustls = { version = "0.23.0", default-features = false, features = ["tls12"] } tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] } rustls-pemfile = "2.0.0" diff --git a/shotover/src/transforms/kafka/api_versions.rs b/shotover/src/transforms/kafka/api_versions.rs new file mode 100644 index 000000000..9fd0a18f6 --- /dev/null +++ b/shotover/src/transforms/kafka/api_versions.rs @@ -0,0 +1,125 @@ +use kafka_protocol::{messages::ApiKey, protocol::VersionRange}; + +// This table defines which api versions shotover supports. +// This applies to both KafkaSinkSingle and KafkaSinkCluster +// * When adding a new message type: +// + Make sure you have implemented routing logic for it in KafkaSinkCluster +// + Make sure any fields referring to internal cluster details such as broker ids or addresses are rewritten to refer to shotover nodes +// * When adding a new message type version: +// + Make sure any new fields do not break any of the requirements listed above +pub(crate) fn api_version_support(api_key: i16) -> Option { + match ApiKey::try_from(api_key) { + Ok(ApiKey::ProduceKey) => Some(VersionRange { min: 0, max: 11 }), + Ok(ApiKey::FetchKey) => Some(VersionRange { min: 0, max: 16 }), + Ok(ApiKey::ListOffsetsKey) => Some(VersionRange { min: 0, max: 8 }), + Ok(ApiKey::MetadataKey) => Some(VersionRange { min: 0, max: 12 }), + Ok(ApiKey::OffsetCommitKey) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::OffsetFetchKey) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::FindCoordinatorKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::JoinGroupKey) => Some(VersionRange { min: 0, max: 9 }), + Ok(ApiKey::HeartbeatKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::LeaveGroupKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::SyncGroupKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::DescribeGroupsKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::ListGroupsKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::SaslHandshakeKey) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::ApiVersionsKey) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::CreateTopicsKey) => Some(VersionRange { min: 0, max: 7 }), + Ok(ApiKey::DeleteTopicsKey) => Some(VersionRange { min: 0, max: 6 }), + Ok(ApiKey::DeleteRecordsKey) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::InitProducerIdKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::OffsetForLeaderEpochKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::AddPartitionsToTxnKey) => Some(VersionRange { min: 0, max: 5 }), + Ok(ApiKey::AddOffsetsToTxnKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::EndTxnKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::TxnOffsetCommitKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::CreateAclsKey) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DescribeConfigsKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::AlterConfigsKey) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::DescribeLogDirsKey) => Some(VersionRange { min: 0, max: 4 }), + Ok(ApiKey::SaslAuthenticateKey) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::CreatePartitionsKey) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DeleteGroupsKey) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::ElectLeadersKey) => Some(VersionRange { min: 0, max: 2 }), + Ok(ApiKey::AlterPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ListPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::OffsetDeleteKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::AlterPartitionKey) => Some(VersionRange { min: 0, max: 3 }), + Ok(ApiKey::DescribeClusterKey) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::DescribeProducersKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::DescribeTransactionsKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ListTransactionsKey) => Some(VersionRange { min: 0, max: 1 }), + Ok(ApiKey::ConsumerGroupHeartbeatKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(ApiKey::ConsumerGroupDescribeKey) => Some(VersionRange { min: 0, max: 0 }), + Ok(_) => None, + Err(_) => None, + } +} + +// This test gives visibility into the api versions that shotover doesnt support yet. +// If this test is failing after a `cargo update`, you can just alter EXPECTED_ERROR_MESSAGE to include the new versions. +// The actual upgrade can be done later. +#[test] +fn check_api_version_backlog() { + use std::fmt::Write; + const EXPECTED_ERROR_MESSAGE: &str = r#" +LeaderAndIsrKey kafka-protocol=0..7 shotover=NotSupported +StopReplicaKey kafka-protocol=0..4 shotover=NotSupported +UpdateMetadataKey kafka-protocol=0..8 shotover=NotSupported +ControlledShutdownKey kafka-protocol=0..3 shotover=NotSupported +WriteTxnMarkersKey kafka-protocol=0..1 shotover=NotSupported +DescribeAclsKey kafka-protocol=0..3 shotover=NotSupported +DeleteAclsKey kafka-protocol=0..3 shotover=NotSupported +AlterReplicaLogDirsKey kafka-protocol=0..2 shotover=NotSupported +CreateDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported +RenewDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported +ExpireDelegationTokenKey kafka-protocol=0..2 shotover=NotSupported +DescribeDelegationTokenKey kafka-protocol=0..3 shotover=NotSupported +IncrementalAlterConfigsKey kafka-protocol=0..1 shotover=NotSupported +DescribeClientQuotasKey kafka-protocol=0..1 shotover=NotSupported +AlterClientQuotasKey kafka-protocol=0..1 shotover=NotSupported +DescribeUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported +AlterUserScramCredentialsKey kafka-protocol=0..0 shotover=NotSupported +VoteKey kafka-protocol=0..0 shotover=NotSupported +BeginQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported +EndQuorumEpochKey kafka-protocol=0..0 shotover=NotSupported +DescribeQuorumKey kafka-protocol=0..1 shotover=NotSupported +UpdateFeaturesKey kafka-protocol=0..1 shotover=NotSupported +EnvelopeKey kafka-protocol=0..0 shotover=NotSupported +FetchSnapshotKey kafka-protocol=0..0 shotover=NotSupported +BrokerRegistrationKey kafka-protocol=0..3 shotover=NotSupported +BrokerHeartbeatKey kafka-protocol=0..1 shotover=NotSupported +UnregisterBrokerKey kafka-protocol=0..0 shotover=NotSupported +AllocateProducerIdsKey kafka-protocol=0..0 shotover=NotSupported +ControllerRegistrationKey kafka-protocol=0..0 shotover=NotSupported +GetTelemetrySubscriptionsKey kafka-protocol=0..0 shotover=NotSupported +PushTelemetryKey kafka-protocol=0..0 shotover=NotSupported +AssignReplicasToDirsKey kafka-protocol=0..0 shotover=NotSupported +ListClientMetricsResourcesKey kafka-protocol=0..0 shotover=NotSupported +DescribeTopicPartitionsKey kafka-protocol=0..0 shotover=NotSupported +"#; + + let mut error_message = String::new(); + for api_key in ApiKey::iterate_all() { + let shotover_version = api_version_support(api_key as i16); + + let kafka_protocol_version = api_key.valid_versions(); + if shotover_version != Some(kafka_protocol_version) { + let shotover_version = match shotover_version { + Some(version) => format!("{version}"), + None => "NotSupported".to_owned(), + }; + writeln!( + error_message, + "{api_key:?} kafka-protocol={kafka_protocol_version} shotover={shotover_version}" + ) + .unwrap(); + } + } + + pretty_assertions::assert_eq!( + error_message.trim(), + EXPECTED_ERROR_MESSAGE.trim(), + "The list of message types not supported by shotover differs from the expected list defined in EXPECTED_ERROR_MESSAGE", + ); +} diff --git a/shotover/src/transforms/kafka/mod.rs b/shotover/src/transforms/kafka/mod.rs index 75b708060..0851624fc 100644 --- a/shotover/src/transforms/kafka/mod.rs +++ b/shotover/src/transforms/kafka/mod.rs @@ -1,2 +1,3 @@ +pub(crate) mod api_versions; pub mod sink_cluster; pub mod sink_single; diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 743378deb..e3ef1bfc3 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -3252,8 +3252,29 @@ The connection to the client has been closed." .api_keys .retain(|x| !disable_keys.contains(&x.api_key)); - if original_size != api_versions.api_keys.len() { - // only invalidate the cache if we actually removed anything + let mut modified = false; + let mut to_delete = vec![]; + for (i, api_key) in api_versions.api_keys.iter_mut().enumerate() { + match super::api_versions::api_version_support(api_key.api_key) { + Some(version) => { + if api_key.max_version > version.max { + api_key.max_version = version.max; + modified = true; + } + if api_key.min_version < version.max { + api_key.min_version = version.min; + modified = true; + } + } + None => to_delete.push(i), + } + } + for to_delete in to_delete.into_iter().rev() { + api_versions.api_keys.remove(to_delete); + } + + if original_size != api_versions.api_keys.len() || modified { + // only invalidate the cache if we actually changed anything response.invalidate_cache(); } }