Skip to content

Commit

Permalink
Merge branch 'main' into kafka-cluster-sasl
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Mar 8, 2024
2 parents d8df5a6 + 78af104 commit f8ea37c
Show file tree
Hide file tree
Showing 8 changed files with 513 additions and 206 deletions.
227 changes: 117 additions & 110 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 20 additions & 2 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use test_helpers::connection::kafka::{
ExpectedResponse, KafkaConnectionBuilder, NewPartition, NewTopic, Record,
AlterConfig, ConfigEntry, ExpectedResponse, KafkaConnectionBuilder, NewPartition, NewTopic,
Record, ResourceSpecifier,
};

async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -36,6 +37,24 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
new_partition_count: 2,
}])
.await;

admin
// TODO: test ResourceSpecifier::Broker and ResourceSpecifier::Group as well.
// Will need to find a way to get a valid broker id and to create a group.
.describe_configs(&[ResourceSpecifier::Topic("to_delete")])
.await;

admin
.alter_configs(&[AlterConfig {
specifier: ResourceSpecifier::Topic("to_delete"),
entries: &[ConfigEntry {
key: "delete.retention.ms".to_owned(),
value: "86400001".to_owned(),
}],
}])
.await;

admin.delete_topics(&["to_delete"]).await
}

async fn produce_consume(connection_builder: &KafkaConnectionBuilder, topic_name: &str, i: i64) {
Expand Down Expand Up @@ -115,7 +134,6 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {

pub async fn basic(connection_builder: KafkaConnectionBuilder) {
admin_setup(&connection_builder).await;
connection_builder.admin_setup().await;
for i in 0..2 {
produce_consume(&connection_builder, "partitions1", i).await;
produce_consume(&connection_builder, "partitions3", i).await;
Expand Down
4 changes: 2 additions & 2 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ bincode = { workspace = true, optional = true }
num = { version = "0.4.0", features = ["serde"] }
uuid = { workspace = true }
bigdecimal = { version = "0.4.0", features = ["serde"] }
base64 = { version = "0.21.0", optional = true }
base64 = { version = "0.22.0", optional = true }
httparse = { version = "1.8.0", optional = true }
http = { version = "1.0.0", optional = true }

Expand All @@ -111,7 +111,7 @@ aws-config = { version = "1.0.0", optional = true }
aws-sdk-kms = { version = "1.1.0", optional = true }
chacha20poly1305 = { version = "0.10.0", features = ["std"], optional = true }
generic-array = { version = "0.14", features = ["serde"], optional = true }
kafka-protocol = { version = "0.9.0", optional = true }
kafka-protocol = { version = "0.10.0", optional = true }
rustls = { version = "0.22.0" }
tokio-rustls = "0.25"
rustls-pemfile = "2.0.0"
Expand Down
124 changes: 100 additions & 24 deletions shotover/src/frame/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,44 @@ use kafka_protocol::messages::{
AlterPartitionReassignmentsRequest, AlterPartitionReassignmentsResponse, AlterPartitionRequest,
AlterPartitionResponse, AlterReplicaLogDirsRequest, AlterReplicaLogDirsResponse,
AlterUserScramCredentialsRequest, AlterUserScramCredentialsResponse, ApiKey,
ApiVersionsRequest, ApiVersionsResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse,
ApiVersionsRequest, ApiVersionsResponse, AssignReplicasToDirsRequest,
AssignReplicasToDirsResponse, BeginQuorumEpochRequest, BeginQuorumEpochResponse,
BrokerHeartbeatRequest, BrokerHeartbeatResponse, BrokerRegistrationRequest,
BrokerRegistrationResponse, ControlledShutdownRequest, ControlledShutdownResponse,
CreateAclsRequest, CreateAclsResponse, CreateDelegationTokenRequest,
CreateDelegationTokenResponse, CreatePartitionsRequest, CreatePartitionsResponse,
CreateTopicsRequest, CreateTopicsResponse, DeleteAclsRequest, DeleteAclsResponse,
DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest, DeleteRecordsResponse,
DeleteTopicsRequest, DeleteTopicsResponse, DescribeAclsRequest, DescribeAclsResponse,
DescribeClientQuotasRequest, DescribeClientQuotasResponse, DescribeClusterRequest,
DescribeClusterResponse, DescribeConfigsRequest, DescribeConfigsResponse,
DescribeDelegationTokenRequest, DescribeDelegationTokenResponse, DescribeGroupsRequest,
DescribeGroupsResponse, DescribeLogDirsRequest, DescribeLogDirsResponse,
BrokerRegistrationResponse, ConsumerGroupHeartbeatRequest, ConsumerGroupHeartbeatResponse,
ControlledShutdownRequest, ControlledShutdownResponse, ControllerRegistrationRequest,
ControllerRegistrationResponse, CreateAclsRequest, CreateAclsResponse,
CreateDelegationTokenRequest, CreateDelegationTokenResponse, CreatePartitionsRequest,
CreatePartitionsResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteAclsRequest,
DeleteAclsResponse, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, DeleteTopicsRequest, DeleteTopicsResponse, DescribeAclsRequest,
DescribeAclsResponse, DescribeClientQuotasRequest, DescribeClientQuotasResponse,
DescribeClusterRequest, DescribeClusterResponse, DescribeConfigsRequest,
DescribeConfigsResponse, DescribeDelegationTokenRequest, DescribeDelegationTokenResponse,
DescribeGroupsRequest, DescribeGroupsResponse, DescribeLogDirsRequest, DescribeLogDirsResponse,
DescribeProducersRequest, DescribeProducersResponse, DescribeQuorumRequest,
DescribeQuorumResponse, DescribeTransactionsRequest, DescribeTransactionsResponse,
DescribeUserScramCredentialsRequest, DescribeUserScramCredentialsResponse, ElectLeadersRequest,
ElectLeadersResponse, EndQuorumEpochRequest, EndQuorumEpochResponse, EndTxnRequest,
EndTxnResponse, EnvelopeRequest, EnvelopeResponse, ExpireDelegationTokenRequest,
ExpireDelegationTokenResponse, FetchRequest, FetchResponse, FetchSnapshotRequest,
FetchSnapshotResponse, FindCoordinatorRequest, FindCoordinatorResponse, HeartbeatRequest,
FetchSnapshotResponse, FindCoordinatorRequest, FindCoordinatorResponse,
GetTelemetrySubscriptionsRequest, GetTelemetrySubscriptionsResponse, HeartbeatRequest,
HeartbeatResponse, IncrementalAlterConfigsRequest, IncrementalAlterConfigsResponse,
InitProducerIdRequest, InitProducerIdResponse, JoinGroupRequest, JoinGroupResponse,
LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse,
ListGroupsRequest, ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse,
ListPartitionReassignmentsRequest, ListPartitionReassignmentsResponse, ListTransactionsRequest,
ListTransactionsResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest,
OffsetCommitResponse, OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest,
OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest,
ProduceResponse, RenewDelegationTokenRequest, RenewDelegationTokenResponse, RequestHeader,
ResponseHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SaslHandshakeResponse, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest,
SyncGroupResponse, TxnOffsetCommitRequest, TxnOffsetCommitResponse, UnregisterBrokerRequest,
UnregisterBrokerResponse, UpdateFeaturesRequest, UpdateFeaturesResponse, UpdateMetadataRequest,
UpdateMetadataResponse, VoteRequest, VoteResponse, WriteTxnMarkersRequest,
WriteTxnMarkersResponse,
ListClientMetricsResourcesRequest, ListClientMetricsResourcesResponse, ListGroupsRequest,
ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse, ListPartitionReassignmentsRequest,
ListPartitionReassignmentsResponse, ListTransactionsRequest, ListTransactionsResponse,
MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse,
OffsetDeleteRequest, OffsetDeleteResponse, OffsetFetchRequest, OffsetFetchResponse,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
PushTelemetryRequest, PushTelemetryResponse, RenewDelegationTokenRequest,
RenewDelegationTokenResponse, RequestHeader, ResponseHeader, SaslAuthenticateRequest,
SaslAuthenticateResponse, SaslHandshakeRequest, SaslHandshakeResponse, StopReplicaRequest,
StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, TxnOffsetCommitRequest,
TxnOffsetCommitResponse, UnregisterBrokerRequest, UnregisterBrokerResponse,
UpdateFeaturesRequest, UpdateFeaturesResponse, UpdateMetadataRequest, UpdateMetadataResponse,
VoteRequest, VoteResponse, WriteTxnMarkersRequest, WriteTxnMarkersResponse,
};
use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion};
use std::fmt::{Display, Formatter, Result as FmtResult};
Expand Down Expand Up @@ -177,6 +181,12 @@ pub enum RequestBody {
AlterClientQuotas(AlterClientQuotasRequest),
DescribeQuorum(DescribeQuorumRequest),
AlterUserScramCredentials(AlterUserScramCredentialsRequest),
ConsumerGroupHeartbeat(ConsumerGroupHeartbeatRequest),
ControllerRegistration(ControllerRegistrationRequest),
GetTelemetrySubscriptions(GetTelemetrySubscriptionsRequest),
PushTelemetry(PushTelemetryRequest),
AssignReplicasToDirs(AssignReplicasToDirsRequest),
ListClientMetricsResources(ListClientMetricsResourcesRequest),
}

#[derive(Debug, PartialEq, Clone)]
Expand Down Expand Up @@ -249,6 +259,12 @@ pub enum ResponseBody {
DescribeConfigs(DescribeConfigsResponse),
OffsetDelete(OffsetDeleteResponse),
DeleteGroups(DeleteGroupsResponse),
ConsumerGroupHeartbeat(ConsumerGroupHeartbeatResponse),
ControllerRegistration(ControllerRegistrationResponse),
GetTelemetrySubscriptions(GetTelemetrySubscriptionsResponse),
PushTelemetry(PushTelemetryResponse),
AssignReplicasToDirs(AssignReplicasToDirsResponse),
ListClientMetricsResources(ListClientMetricsResourcesResponse),
}

impl ResponseBody {
Expand Down Expand Up @@ -360,6 +376,22 @@ impl ResponseBody {
ResponseBody::DescribeConfigs(_) => DescribeConfigsResponse::header_version(version),
ResponseBody::OffsetDelete(_) => OffsetDeleteResponse::header_version(version),
ResponseBody::DeleteGroups(_) => DeleteGroupsResponse::header_version(version),
ResponseBody::ConsumerGroupHeartbeat(_) => {
ConsumerGroupHeartbeatResponse::header_version(version)
}
ResponseBody::ControllerRegistration(_) => {
ControllerRegistrationResponse::header_version(version)
}
ResponseBody::GetTelemetrySubscriptions(_) => {
GetTelemetrySubscriptionsResponse::header_version(version)
}
ResponseBody::PushTelemetry(_) => PushTelemetryResponse::header_version(version),
ResponseBody::AssignReplicasToDirs(_) => {
AssignReplicasToDirsResponse::header_version(version)
}
ResponseBody::ListClientMetricsResources(_) => {
ListClientMetricsResourcesResponse::header_version(version)
}
}
}
}
Expand Down Expand Up @@ -523,6 +555,22 @@ impl KafkaFrame {
ApiKey::DescribeClusterKey => {
RequestBody::DescribeCluster(decode(&mut bytes, version)?)
}
ApiKey::ConsumerGroupHeartbeatKey => {
RequestBody::ConsumerGroupHeartbeat(decode(&mut bytes, version)?)
}
ApiKey::ControllerRegistrationKey => {
RequestBody::ControllerRegistration(decode(&mut bytes, version)?)
}
ApiKey::GetTelemetrySubscriptionsKey => {
RequestBody::GetTelemetrySubscriptions(decode(&mut bytes, version)?)
}
ApiKey::PushTelemetryKey => RequestBody::PushTelemetry(decode(&mut bytes, version)?),
ApiKey::AssignReplicasToDirsKey => {
RequestBody::AssignReplicasToDirs(decode(&mut bytes, version)?)
}
ApiKey::ListClientMetricsResourcesKey => {
RequestBody::ListClientMetricsResources(decode(&mut bytes, version)?)
}
};

Ok(KafkaFrame::Request { header, body })
Expand Down Expand Up @@ -672,6 +720,22 @@ impl KafkaFrame {
ApiKey::AllocateProducerIdsKey => {
ResponseBody::AllocateProducerIds(decode(&mut bytes, version)?)
}
ApiKey::ConsumerGroupHeartbeatKey => {
ResponseBody::ConsumerGroupHeartbeat(decode(&mut bytes, version)?)
}
ApiKey::ControllerRegistrationKey => {
ResponseBody::ControllerRegistration(decode(&mut bytes, version)?)
}
ApiKey::GetTelemetrySubscriptionsKey => {
ResponseBody::GetTelemetrySubscriptions(decode(&mut bytes, version)?)
}
ApiKey::PushTelemetryKey => ResponseBody::PushTelemetry(decode(&mut bytes, version)?),
ApiKey::AssignReplicasToDirsKey => {
ResponseBody::AssignReplicasToDirs(decode(&mut bytes, version)?)
}
ApiKey::ListClientMetricsResourcesKey => {
ResponseBody::ListClientMetricsResources(decode(&mut bytes, version)?)
}
};

Ok(KafkaFrame::Response {
Expand Down Expand Up @@ -766,6 +830,12 @@ impl KafkaFrame {
RequestBody::AllocateProducerIds(x) => encode(x, bytes, version)?,
RequestBody::DescribeCluster(x) => encode(x, bytes, version)?,
RequestBody::AlterScramCredentials(x) => encode(x, bytes, version)?,
RequestBody::ConsumerGroupHeartbeat(x) => encode(x, bytes, version)?,
RequestBody::ControllerRegistration(x) => encode(x, bytes, version)?,
RequestBody::GetTelemetrySubscriptions(x) => encode(x, bytes, version)?,
RequestBody::PushTelemetry(x) => encode(x, bytes, version)?,
RequestBody::AssignReplicasToDirs(x) => encode(x, bytes, version)?,
RequestBody::ListClientMetricsResources(x) => encode(x, bytes, version)?,
}
}
KafkaFrame::Response {
Expand Down Expand Up @@ -843,6 +913,12 @@ impl KafkaFrame {
ResponseBody::AllocateProducerIds(x) => encode(x, bytes, version)?,
ResponseBody::DescribeConfigs(x) => encode(x, bytes, version)?,
ResponseBody::DeleteGroups(x) => encode(x, bytes, version)?,
ResponseBody::ConsumerGroupHeartbeat(x) => encode(x, bytes, version)?,
ResponseBody::ControllerRegistration(x) => encode(x, bytes, version)?,
ResponseBody::GetTelemetrySubscriptions(x) => encode(x, bytes, version)?,
ResponseBody::PushTelemetry(x) => encode(x, bytes, version)?,
ResponseBody::AssignReplicasToDirs(x) => encode(x, bytes, version)?,
ResponseBody::ListClientMetricsResources(x) => encode(x, bytes, version)?,
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion shotover/src/transforms/protect/key_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ mod key_manager_tests {
let result = futures::executor::block_on(config.build()).unwrap_err();
assert_eq!(
result.to_string(),
"Invalid byte 61, offset 43.".to_string()
"Invalid symbol 61, offset 43.".to_string()
);
}
}
Loading

0 comments on commit f8ea37c

Please sign in to comment.