Skip to content

Commit

Permalink
kafka api: a fake InitProducerId handler
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Dec 28, 2023
1 parent 53f406c commit 27153b4
Show file tree
Hide file tree
Showing 6 changed files with 187 additions and 0 deletions.
2 changes: 2 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import qualified Kafka.Protocol.Service as K
#cv_handler ApiVersions, 0, 3
#cv_handler Metadata, 0, 5
#cv_handler Produce, 0, 3
#cv_handler InitProducerId, 0, 0
#cv_handler Fetch, 0, 4
#cv_handler DescribeConfigs, 0, 0

Expand All @@ -85,6 +86,7 @@ handlers sc =
, #mk_handler Metadata, 0, 5
-- Write
, #mk_handler Produce, 0, 3
, #mk_handler InitProducerId, 0, 0
-- Read
, #mk_handler Fetch, 0, 4

Expand Down
16 changes: 16 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module HStream.Kafka.Server.Handler.Produce
( handleProduce
, handleInitProducerId
) where

import qualified Control.Concurrent.Async as Async
Expand Down Expand Up @@ -83,6 +84,21 @@ handleProduce ServerContext{..} _ req = do

pure $ K.ProduceResponse (K.KaArray $ Just responses) 0{- TODO: throttleTimeMs -}

-- TODO
handleInitProducerId
:: ServerContext
-> K.RequestContext
-> K.InitProducerIdRequest
-> IO K.InitProducerIdResponse
handleInitProducerId ServerContext{..} _ req = do
Log.warning "InitProducerId is not implemented"
pure $ K.InitProducerIdResponse
{ throttleTimeMs = 0
, errorCode = K.NONE
, producerId = 0
, producerEpoch = 0
}

-------------------------------------------------------------------------------

appendRecords
Expand Down
40 changes: 40 additions & 0 deletions hstream-kafka/message/InitProducerIdRequest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 22,
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "InitProducerIdRequest",
// Version 1 is the same as version 0.
//
// Version 2 is the first flexible version.
//
// Version 3 adds ProducerId and ProducerEpoch, allowing producers to try to resume after an INVALID_PRODUCER_EPOCH error
//
// Version 4 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-4",
"flexibleVersions": "2+",
"fields": [
{ "name": "TransactionalId", "type": "string", "versions": "0+", "nullableVersions": "0+", "entityType": "transactionalId",
"about": "The transactional id, or null if the producer is not transactional." },
{ "name": "TransactionTimeoutMs", "type": "int32", "versions": "0+",
"about": "The time in ms to wait before aborting idle transactions sent by this producer. This is only relevant if a TransactionalId has been defined." },
{ "name": "ProducerId", "type": "int64", "versions": "3+", "default": "-1", "entityType": "producerId",
"about": "The producer id. This is used to disambiguate requests if a transactional id is reused following its expiration." },
{ "name": "ProducerEpoch", "type": "int16", "versions": "3+", "default": "-1",
"about": "The producer's current epoch. This will be checked against the producer epoch on the broker, and the request will return an error if they do not match." }
]
}
39 changes: 39 additions & 0 deletions hstream-kafka/message/InitProducerIdResponse.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

{
"apiKey": 22,
"type": "response",
"name": "InitProducerIdResponse",
// Starting in version 1, on quota violation, brokers send out responses before throttling.
//
// Version 2 is the first flexible version.
//
// Version 3 is the same as version 2.
//
// Version 4 adds the support for new error code PRODUCER_FENCED.
"validVersions": "0-4",
"flexibleVersions": "2+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
"about": "The error code, or 0 if there was no error." },
{ "name": "ProducerId", "type": "int64", "versions": "0+", "entityType": "producerId",
"default": -1, "about": "The current producer id." },
{ "name": "ProducerEpoch", "type": "int16", "versions": "0+",
"about": "The current epoch associated with the producer id." }
]
}
33 changes: 33 additions & 0 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,28 @@ data HeartbeatResponseV1 = HeartbeatResponseV1
} deriving (Show, Eq, Generic)
instance Serializable HeartbeatResponseV1

data InitProducerIdRequestV0 = InitProducerIdRequestV0
{ transactionalId :: !NullableString
-- ^ The transactional id, or null if the producer is not transactional.
, transactionTimeoutMs :: {-# UNPACK #-} !Int32
-- ^ The time in ms to wait before aborting idle transactions sent by this
-- producer. This is only relevant if a TransactionalId has been defined.
} deriving (Show, Eq, Generic)
instance Serializable InitProducerIdRequestV0

data InitProducerIdResponseV0 = InitProducerIdResponseV0
{ throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, producerId :: {-# UNPACK #-} !Int64
-- ^ The current producer id.
, producerEpoch :: {-# UNPACK #-} !Int16
-- ^ The current epoch associated with the producer id.
} deriving (Show, Eq, Generic)
instance Serializable InitProducerIdResponseV0

data JoinGroupRequestV0 = JoinGroupRequestV0
{ groupId :: !Text
-- ^ The group identifier.
Expand Down Expand Up @@ -1462,6 +1484,7 @@ instance Service HStreamKafkaV0 where
, "apiVersions"
, "createTopics"
, "deleteTopics"
, "initProducerId"
, "describeConfigs"
, "saslAuthenticate"
]
Expand Down Expand Up @@ -1592,6 +1615,13 @@ instance HasMethodImpl HStreamKafkaV0 "deleteTopics" where
type MethodInput HStreamKafkaV0 "deleteTopics" = DeleteTopicsRequestV0
type MethodOutput HStreamKafkaV0 "deleteTopics" = DeleteTopicsResponseV0

instance HasMethodImpl HStreamKafkaV0 "initProducerId" where
type MethodName HStreamKafkaV0 "initProducerId" = "initProducerId"
type MethodKey HStreamKafkaV0 "initProducerId" = 22
type MethodVersion HStreamKafkaV0 "initProducerId" = 0
type MethodInput HStreamKafkaV0 "initProducerId" = InitProducerIdRequestV0
type MethodOutput HStreamKafkaV0 "initProducerId" = InitProducerIdResponseV0

instance HasMethodImpl HStreamKafkaV0 "describeConfigs" where
type MethodName HStreamKafkaV0 "describeConfigs" = "describeConfigs"
type MethodKey HStreamKafkaV0 "describeConfigs" = 32
Expand Down Expand Up @@ -1905,6 +1935,7 @@ instance Show ApiKey where
show (ApiKey (18)) = "ApiVersions(18)"
show (ApiKey (19)) = "CreateTopics(19)"
show (ApiKey (20)) = "DeleteTopics(20)"
show (ApiKey (22)) = "InitProducerId(22)"
show (ApiKey (32)) = "DescribeConfigs(32)"
show (ApiKey (36)) = "SaslAuthenticate(36)"
show (ApiKey n) = "Unknown " <> show n
Expand All @@ -1928,6 +1959,7 @@ supportedApiVersions =
, ApiVersionV0 (ApiKey 18) 0 3
, ApiVersionV0 (ApiKey 19) 0 0
, ApiVersionV0 (ApiKey 20) 0 0
, ApiVersionV0 (ApiKey 22) 0 0
, ApiVersionV0 (ApiKey 32) 0 0
, ApiVersionV0 (ApiKey 36) 0 0
]
Expand Down Expand Up @@ -1981,6 +2013,7 @@ getHeaderVersion (ApiKey (18)) 2 = (1, 0)
getHeaderVersion (ApiKey (18)) 3 = (2, 0)
getHeaderVersion (ApiKey (19)) 0 = (1, 0)
getHeaderVersion (ApiKey (20)) 0 = (1, 0)
getHeaderVersion (ApiKey (22)) 0 = (1, 0)
getHeaderVersion (ApiKey (32)) 0 = (1, 0)
getHeaderVersion (ApiKey (36)) 0 = (1, 0)
getHeaderVersion k v = error $ "Unknown " <> show k <> " v" <> show v
Expand Down
57 changes: 57 additions & 0 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2022,6 +2022,56 @@ heartbeatResponseFromV1 x = HeartbeatResponse
, throttleTimeMs = x.throttleTimeMs
}

data InitProducerIdRequest = InitProducerIdRequest
{ transactionalId :: !NullableString
-- ^ The transactional id, or null if the producer is not transactional.
, transactionTimeoutMs :: {-# UNPACK #-} !Int32
-- ^ The time in ms to wait before aborting idle transactions sent by this
-- producer. This is only relevant if a TransactionalId has been defined.
} deriving (Show, Eq, Generic)
instance Serializable InitProducerIdRequest

initProducerIdRequestToV0 :: InitProducerIdRequest -> InitProducerIdRequestV0
initProducerIdRequestToV0 x = InitProducerIdRequestV0
{ transactionalId = x.transactionalId
, transactionTimeoutMs = x.transactionTimeoutMs
}

initProducerIdRequestFromV0 :: InitProducerIdRequestV0 -> InitProducerIdRequest
initProducerIdRequestFromV0 x = InitProducerIdRequest
{ transactionalId = x.transactionalId
, transactionTimeoutMs = x.transactionTimeoutMs
}

data InitProducerIdResponse = InitProducerIdResponse
{ throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no error.
, producerId :: {-# UNPACK #-} !Int64
-- ^ The current producer id.
, producerEpoch :: {-# UNPACK #-} !Int16
-- ^ The current epoch associated with the producer id.
} deriving (Show, Eq, Generic)
instance Serializable InitProducerIdResponse

initProducerIdResponseToV0 :: InitProducerIdResponse -> InitProducerIdResponseV0
initProducerIdResponseToV0 x = InitProducerIdResponseV0
{ throttleTimeMs = x.throttleTimeMs
, errorCode = x.errorCode
, producerId = x.producerId
, producerEpoch = x.producerEpoch
}

initProducerIdResponseFromV0 :: InitProducerIdResponseV0 -> InitProducerIdResponse
initProducerIdResponseFromV0 x = InitProducerIdResponse
{ throttleTimeMs = x.throttleTimeMs
, errorCode = x.errorCode
, producerId = x.producerId
, producerEpoch = x.producerEpoch
}

data JoinGroupRequest = JoinGroupRequest
{ groupId :: !Text
-- ^ The group identifier.
Expand Down Expand Up @@ -2935,6 +2985,13 @@ instance Exception HeartbeatResponseEx
catchHeartbeatResponseEx :: IO HeartbeatResponse -> IO HeartbeatResponse
catchHeartbeatResponseEx act = act `catch` \(HeartbeatResponseEx resp) -> pure resp

newtype InitProducerIdResponseEx = InitProducerIdResponseEx InitProducerIdResponse
deriving (Show, Eq)
instance Exception InitProducerIdResponseEx

catchInitProducerIdResponseEx :: IO InitProducerIdResponse -> IO InitProducerIdResponse
catchInitProducerIdResponseEx act = act `catch` \(InitProducerIdResponseEx resp) -> pure resp

newtype JoinGroupResponseEx = JoinGroupResponseEx JoinGroupResponse
deriving (Show, Eq)
instance Exception JoinGroupResponseEx
Expand Down

0 comments on commit 27153b4

Please sign in to comment.