Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ApiVersions rewrite unit test #1832

Merged
merged 1 commit into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 77 additions & 70 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ aws-config = { version = "1.0.0", optional = true }
aws-sdk-kms = { version = "1.1.0", optional = true }
chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true }
generic-array = { version = "0.14", features = ["serde"], optional = true }
kafka-protocol = { version = "0.13.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] }
kafka-protocol = { version = "0.14.0", optional = true, default-features = false, features = ["messages_enums", "broker", "client"] }
rustls = { version = "0.23.18", default-features = false, features = ["tls12"] }
tokio-rustls = { version = "0.26", default-features = false, features = ["ring"] }
rustls-pemfile = "2.0.0"
Expand Down
6 changes: 3 additions & 3 deletions shotover/src/codec/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ impl Decoder for KafkaDecoder {
} else if self.expect_raw_sasl.is_some() {
Meta {
request_header: RequestHeader {
api_key: ApiKey::SaslAuthenticateKey,
api_key: ApiKey::SaslAuthenticate,
version: 0,
},
// This code path is only used for requests, so message_id can be None.
Expand Down Expand Up @@ -210,7 +210,7 @@ impl Decoder for KafkaDecoder {
}
} else {
// set expect_raw_sasl for requests
if meta.request_header.api_key == ApiKey::SaslHandshakeKey
if meta.request_header.api_key == ApiKey::SaslHandshake
&& meta.request_header.version == 0
{
// Only parse the full frame once we manually check its a v0 sasl handshake
Expand Down Expand Up @@ -326,7 +326,7 @@ impl Encoder<Messages> for KafkaEncoder {
if let Some(tx) = self.request_header_tx.as_ref() {
let header = if message_contains_raw_sasl {
RequestHeader {
api_key: ApiKey::SaslAuthenticateKey,
api_key: ApiKey::SaslAuthenticate,
version: 0,
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/frame/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl KafkaFrame {
}),
None => Ok(KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::SaslAuthenticateKey as i16),
.with_request_api_key(ApiKey::SaslAuthenticate as i16),
body: RequestBody::SaslAuthenticate(
SaslAuthenticateRequest::default().with_auth_bytes(bytes),
),
Expand Down
154 changes: 111 additions & 43 deletions shotover/src/transforms/kafka/sink_cluster/api_versions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,123 @@ use kafka_protocol::{messages::ApiKey, protocol::VersionRange};
// + Make sure any new fields do not break any of the requirements listed above
pub(crate) fn versions_supported_by_key(api_key: i16) -> Option<VersionRange> {
match ApiKey::try_from(api_key) {
Ok(ApiKey::ProduceKey) => Some(VersionRange { min: 0, max: 11 }),
Ok(ApiKey::FetchKey) => Some(VersionRange { min: 0, max: 16 }),
Ok(ApiKey::ListOffsetsKey) => Some(VersionRange { min: 0, max: 8 }),
Ok(ApiKey::MetadataKey) => Some(VersionRange { min: 0, max: 12 }),
Ok(ApiKey::OffsetCommitKey) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::OffsetFetchKey) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::FindCoordinatorKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::JoinGroupKey) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::HeartbeatKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::LeaveGroupKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::SyncGroupKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::DescribeGroupsKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::ListGroupsKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::SaslHandshakeKey) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::ApiVersionsKey) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::CreateTopicsKey) => Some(VersionRange { min: 0, max: 7 }),
Ok(ApiKey::DeleteTopicsKey) => Some(VersionRange { min: 0, max: 6 }),
Ok(ApiKey::DeleteRecordsKey) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::InitProducerIdKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::OffsetForLeaderEpochKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::AddPartitionsToTxnKey) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::AddOffsetsToTxnKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::EndTxnKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::TxnOffsetCommitKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::CreateAclsKey) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DescribeConfigsKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::AlterConfigsKey) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::DescribeLogDirsKey) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::SaslAuthenticateKey) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::CreatePartitionsKey) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DeleteGroupsKey) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::ElectLeadersKey) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::AlterPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ListPartitionReassignmentsKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::OffsetDeleteKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::AlterPartitionKey) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DescribeClusterKey) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::DescribeProducersKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::DescribeTransactionsKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ListTransactionsKey) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::ConsumerGroupHeartbeatKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ConsumerGroupDescribeKey) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::Produce) => Some(VersionRange { min: 0, max: 11 }),
Ok(ApiKey::Fetch) => Some(VersionRange { min: 0, max: 16 }),
Ok(ApiKey::ListOffsets) => Some(VersionRange { min: 0, max: 8 }),
Ok(ApiKey::Metadata) => Some(VersionRange { min: 0, max: 12 }),
Ok(ApiKey::OffsetCommit) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::OffsetFetch) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::FindCoordinator) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::JoinGroup) => Some(VersionRange { min: 0, max: 9 }),
Ok(ApiKey::Heartbeat) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::LeaveGroup) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::SyncGroup) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::DescribeGroups) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::ListGroups) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::SaslHandshake) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::ApiVersions) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::CreateTopics) => Some(VersionRange { min: 0, max: 7 }),
Ok(ApiKey::DeleteTopics) => Some(VersionRange { min: 0, max: 6 }),
Ok(ApiKey::DeleteRecords) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::InitProducerId) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::OffsetForLeaderEpoch) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::AddPartitionsToTxn) => Some(VersionRange { min: 0, max: 5 }),
Ok(ApiKey::AddOffsetsToTxn) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::EndTxn) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::TxnOffsetCommit) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::CreateAcls) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DescribeConfigs) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::AlterConfigs) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::DescribeLogDirs) => Some(VersionRange { min: 0, max: 4 }),
Ok(ApiKey::SaslAuthenticate) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::CreatePartitions) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DeleteGroups) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::ElectLeaders) => Some(VersionRange { min: 0, max: 2 }),
Ok(ApiKey::AlterPartitionReassignments) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ListPartitionReassignments) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::OffsetDelete) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::AlterPartition) => Some(VersionRange { min: 0, max: 3 }),
Ok(ApiKey::DescribeCluster) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::DescribeProducers) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::DescribeTransactions) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ListTransactions) => Some(VersionRange { min: 0, max: 1 }),
Ok(ApiKey::ConsumerGroupHeartbeat) => Some(VersionRange { min: 0, max: 0 }),
Ok(ApiKey::ConsumerGroupDescribe) => Some(VersionRange { min: 0, max: 0 }),
// This message type has very little documentation available and kafka responds to it with an error code 35 UNSUPPORTED_VERSION
// So its not clear at all how to implement this and its not even possible to test it.
// Instead lets just ask the client to not send it at all.
// We can consider supporting it when kafka itself starts to support it but we will need to be very
// careful to correctly implement the pagination/cursor logic.
Ok(ApiKey::DescribeTopicPartitionsKey) => None,
Ok(ApiKey::DescribeTopicPartitions) => None,
Ok(_) => None,
Err(_) => None,
}
}

// This test gives visibility into the api versions that shotover doesnt support yet.
// If this test is failing after a `cargo update`, you can just alter EXPECTED_ERROR_MESSAGE to include the new versions.
// The actual upgrade can be done later.
#[test]
fn check_api_version_backlog() {
use std::fmt::Write;
const EXPECTED_ERROR_MESSAGE: &str = r#"
LeaderAndIsr kafka-protocol=0..7 shotover=NotSupported
StopReplica kafka-protocol=0..4 shotover=NotSupported
UpdateMetadata kafka-protocol=0..8 shotover=NotSupported
ControlledShutdown kafka-protocol=0..3 shotover=NotSupported
WriteTxnMarkers kafka-protocol=0..1 shotover=NotSupported
DescribeAcls kafka-protocol=0..3 shotover=NotSupported
DeleteAcls kafka-protocol=0..3 shotover=NotSupported
AlterReplicaLogDirs kafka-protocol=0..2 shotover=NotSupported
CreateDelegationToken kafka-protocol=0..3 shotover=NotSupported
RenewDelegationToken kafka-protocol=0..2 shotover=NotSupported
ExpireDelegationToken kafka-protocol=0..2 shotover=NotSupported
DescribeDelegationToken kafka-protocol=0..3 shotover=NotSupported
IncrementalAlterConfigs kafka-protocol=0..1 shotover=NotSupported
DescribeClientQuotas kafka-protocol=0..1 shotover=NotSupported
AlterClientQuotas kafka-protocol=0..1 shotover=NotSupported
DescribeUserScramCredentials kafka-protocol=0..0 shotover=NotSupported
AlterUserScramCredentials kafka-protocol=0..0 shotover=NotSupported
Vote kafka-protocol=0..0 shotover=NotSupported
BeginQuorumEpoch kafka-protocol=0..0 shotover=NotSupported
EndQuorumEpoch kafka-protocol=0..0 shotover=NotSupported
DescribeQuorum kafka-protocol=0..1 shotover=NotSupported
UpdateFeatures kafka-protocol=0..1 shotover=NotSupported
Envelope kafka-protocol=0..0 shotover=NotSupported
FetchSnapshot kafka-protocol=0..0 shotover=NotSupported
BrokerRegistration kafka-protocol=0..3 shotover=NotSupported
BrokerHeartbeat kafka-protocol=0..1 shotover=NotSupported
UnregisterBroker kafka-protocol=0..0 shotover=NotSupported
AllocateProducerIds kafka-protocol=0..0 shotover=NotSupported
ControllerRegistration kafka-protocol=0..0 shotover=NotSupported
GetTelemetrySubscriptions kafka-protocol=0..0 shotover=NotSupported
PushTelemetry kafka-protocol=0..0 shotover=NotSupported
AssignReplicasToDirs kafka-protocol=0..0 shotover=NotSupported
ListClientMetricsResources kafka-protocol=0..0 shotover=NotSupported
DescribeTopicPartitions kafka-protocol=0..0 shotover=NotSupported
"#;

let mut error_message = String::new();
for api_key in ApiKey::iter() {
let shotover_version = versions_supported_by_key(api_key as i16);

let kafka_protocol_version = api_key.valid_versions();
if shotover_version != Some(kafka_protocol_version) {
let shotover_version = match shotover_version {
Some(version) => format!("{version}"),
None => "NotSupported".to_owned(),
};
writeln!(
error_message,
"{api_key:?} kafka-protocol={kafka_protocol_version} shotover={shotover_version}"
)
.unwrap();
}
}

pretty_assertions::assert_eq!(
EXPECTED_ERROR_MESSAGE.trim(),
error_message.trim(),
"The list of message types not supported by shotover differs from the expected list defined in EXPECTED_ERROR_MESSAGE",
);
}
2 changes: 1 addition & 1 deletion shotover/src/transforms/kafka/sink_cluster/kafka_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ impl ConnectionFactory {
fn create_auth_request(bytes: Vec<u8>) -> Message {
Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::SaslAuthenticateKey as i16)
.with_request_api_key(ApiKey::SaslAuthenticate as i16)
.with_request_api_version(2),
body: RequestBody::SaslAuthenticate(
SaslAuthenticateRequest::default().with_auth_bytes(bytes.into()),
Expand Down
4 changes: 2 additions & 2 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1951,7 +1951,7 @@ The connection to the client has been closed."
) -> Result<KafkaNode, FindCoordinatorError> {
let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::FindCoordinatorKey as i16)
.with_request_api_key(ApiKey::FindCoordinator as i16)
.with_request_api_version(2)
.with_correlation_id(0),
body: RequestBody::FindCoordinator(
Expand Down Expand Up @@ -2035,7 +2035,7 @@ The connection to the client has been closed."
let api_version = if topic_ids.is_empty() { 4 } else { 12 };
let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::MetadataKey as i16)
.with_request_api_key(ApiKey::Metadata as i16)
.with_request_api_version(api_version)
.with_correlation_id(0),
body: RequestBody::Metadata(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn find_new_brokers(nodes: &mut Vec<Node>, rng: &mut SmallRng) -> Result<(

let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::MetadataKey as i16)
.with_request_api_key(ApiKey::Metadata as i16)
.with_request_api_version(4)
.with_correlation_id(0),
body: RequestBody::Metadata(MetadataRequest::default()),
Expand Down Expand Up @@ -127,7 +127,7 @@ async fn create_delegation_token_for_user(
connection.send(vec![Message::from_frame(Frame::Kafka(
KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::CreateDelegationTokenKey as i16)
.with_request_api_key(ApiKey::CreateDelegationToken as i16)
.with_request_api_version(3),
body: RequestBody::CreateDelegationToken(
CreateDelegationTokenRequest::default()
Expand Down Expand Up @@ -241,7 +241,7 @@ async fn is_delegation_token_ready(
connection.send(vec![Message::from_frame(Frame::Kafka(
KafkaFrame::Request {
header: RequestHeader::default()
.with_request_api_key(ApiKey::DescribeDelegationTokenKey as i16)
.with_request_api_key(ApiKey::DescribeDelegationToken as i16)
.with_request_api_version(3),
body: RequestBody::DescribeDelegationToken(
DescribeDelegationTokenRequest::default().with_owners(Some(vec![
Expand Down
Loading