Skip to content

Commit

Permalink
kafka sasl messages frame
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Feb 22, 2024
1 parent 86a2cfd commit 7f39ea1
Showing 1 changed file with 29 additions and 1 deletion.
30 changes: 29 additions & 1 deletion shotover/src/frame/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use kafka_protocol::messages::{
FindCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest,
JoinGroupResponse, LeaderAndIsrRequest, ListOffsetsRequest, ListOffsetsResponse,
MetadataRequest, MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest,
ProduceResponse, RequestHeader, ResponseHeader, SyncGroupRequest, SyncGroupResponse,
ProduceResponse, RequestHeader, ResponseHeader, SaslAuthenticateRequest,
SaslAuthenticateResponse, SaslHandshakeRequest, SaslHandshakeResponse, SyncGroupRequest,
SyncGroupResponse,
};
use kafka_protocol::protocol::{Decodable, Encodable, HeaderVersion, StrBytes};
use std::fmt::{Display, Formatter, Result as FmtResult};
Expand Down Expand Up @@ -86,6 +88,8 @@ pub enum RequestBody {
DeleteTopics(DeleteTopicsRequest),
DeleteGroups(DeleteGroupsRequest),
DescribeConfigs(DescribeConfigsRequest),
SaslAuthenticateRequest(SaslAuthenticateRequest),
SaslHandshakeRequest(SaslHandshakeRequest),
Unknown { api_key: ApiKey, message: Bytes },
}

Expand All @@ -101,6 +105,8 @@ pub enum ResponseBody {
Metadata(MetadataResponse),
DescribeCluster(DescribeClusterResponse),
Heartbeat(HeartbeatResponse),
SaslAuthenticateResponse(SaslAuthenticateResponse),
SaslHandshakeResponse(SaslHandshakeResponse),
Unknown { api_key: ApiKey, message: Bytes },
}

Expand All @@ -117,6 +123,12 @@ impl ResponseBody {
ResponseBody::Metadata(_) => MetadataResponse::header_version(version),
ResponseBody::DescribeCluster(_) => DescribeClusterResponse::header_version(version),
ResponseBody::Heartbeat(_) => HeartbeatResponse::header_version(version),
ResponseBody::SaslAuthenticateResponse(_) => {
SaslAuthenticateResponse::header_version(version)
}
ResponseBody::SaslHandshakeResponse(_) => {
SaslHandshakeResponse::header_version(version)
}
ResponseBody::Unknown { api_key, .. } => api_key.response_header_version(version),
}
}
Expand Down Expand Up @@ -166,6 +178,12 @@ impl KafkaFrame {
ApiKey::DescribeConfigsKey => {
RequestBody::DescribeConfigs(decode(&mut bytes, version)?)
}
ApiKey::SaslAuthenticateKey => {
RequestBody::SaslAuthenticateRequest(decode(&mut bytes, version)?)
}
ApiKey::SaslHandshakeKey => {
RequestBody::SaslHandshakeRequest(decode(&mut bytes, version)?)
}
api_key => RequestBody::Unknown {
api_key,
message: bytes,
Expand Down Expand Up @@ -200,6 +218,12 @@ impl KafkaFrame {
ResponseBody::DescribeCluster(decode(&mut bytes, version)?)
}
ApiKey::HeartbeatKey => ResponseBody::Heartbeat(decode(&mut bytes, version)?),
ApiKey::SaslAuthenticateKey => {
ResponseBody::SaslAuthenticateResponse(decode(&mut bytes, version)?)
}
ApiKey::SaslHandshakeKey => {
ResponseBody::SaslHandshakeResponse(decode(&mut bytes, version)?)
}
api_key => ResponseBody::Unknown {
api_key,
message: bytes,
Expand Down Expand Up @@ -243,6 +267,8 @@ impl KafkaFrame {
RequestBody::DeleteTopics(x) => encode(x, bytes, version)?,
RequestBody::DeleteGroups(x) => encode(x, bytes, version)?,
RequestBody::DescribeConfigs(x) => encode(x, bytes, version)?,
RequestBody::SaslAuthenticateRequest(x) => encode(x, bytes, version)?,
RequestBody::SaslHandshakeRequest(x) => encode(x, bytes, version)?,
RequestBody::Unknown { message, .. } => bytes.extend_from_slice(&message),
}
}
Expand All @@ -263,6 +289,8 @@ impl KafkaFrame {
ResponseBody::Metadata(x) => encode(x, bytes, version)?,
ResponseBody::DescribeCluster(x) => encode(x, bytes, version)?,
ResponseBody::Heartbeat(x) => encode(x, bytes, version)?,
ResponseBody::SaslAuthenticateResponse(x) => encode(x, bytes, version)?,
ResponseBody::SaslHandshakeResponse(x) => encode(x, bytes, version)?,
ResponseBody::Unknown { message, .. } => bytes.extend_from_slice(&message),
}
}
Expand Down

0 comments on commit 7f39ea1

Please sign in to comment.