Skip to content

Commit

Permalink
kafka: upgrade Produce version to v7 (#1782)
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP authored Mar 22, 2024
1 parent abaef4c commit 50ae501
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 5 deletions.
4 changes: 2 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ import qualified Kafka.Protocol.Service as K
#cv_handler ApiVersions, 0, 3
#cv_handler ListOffsets, 0, 2
#cv_handler Metadata, 0, 5
#cv_handler Produce, 0, 5
#cv_handler Produce, 0, 7
#cv_handler InitProducerId, 0, 0
#cv_handler Fetch, 0, 6
#cv_handler DescribeConfigs, 0, 0
Expand Down Expand Up @@ -99,7 +99,7 @@ handlers sc =
, #mk_handler ListOffsets, 0, 2
, #mk_handler Metadata, 0, 5
-- Write
, #mk_handler Produce, 0, 5
, #mk_handler Produce, 0, 7
, #mk_handler InitProducerId, 0, 0
-- Read
, #mk_handler Fetch, 0, 6
Expand Down
53 changes: 51 additions & 2 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,14 @@ type PartitionProduceDataV5 = PartitionProduceDataV0

type TopicProduceDataV5 = TopicProduceDataV0

type PartitionProduceDataV6 = PartitionProduceDataV0

type TopicProduceDataV6 = TopicProduceDataV0

type PartitionProduceDataV7 = PartitionProduceDataV0

type TopicProduceDataV7 = TopicProduceDataV0

data PartitionProduceResponseV0 = PartitionProduceResponseV0
{ index :: {-# UNPACK #-} !Int32
-- ^ The partition index.
Expand Down Expand Up @@ -978,6 +986,14 @@ data TopicProduceResponseV5 = TopicProduceResponseV5
} deriving (Show, Eq, Generic)
instance Serializable TopicProduceResponseV5

type PartitionProduceResponseV6 = PartitionProduceResponseV5

type TopicProduceResponseV6 = TopicProduceResponseV5

type PartitionProduceResponseV7 = PartitionProduceResponseV5

type TopicProduceResponseV7 = TopicProduceResponseV5

data SyncGroupRequestAssignmentV0 = SyncGroupRequestAssignmentV0
{ memberId :: !Text
-- ^ The ID of the member to assign.
Expand Down Expand Up @@ -1823,6 +1839,10 @@ type ProduceRequestV4 = ProduceRequestV3

type ProduceRequestV5 = ProduceRequestV3

type ProduceRequestV6 = ProduceRequestV3

type ProduceRequestV7 = ProduceRequestV3

newtype ProduceResponseV0 = ProduceResponseV0
{ responses :: (KaArray TopicProduceResponseV0)
} deriving (Show, Eq, Generic)
Expand Down Expand Up @@ -1859,6 +1879,10 @@ data ProduceResponseV5 = ProduceResponseV5
} deriving (Show, Eq, Generic)
instance Serializable ProduceResponseV5

type ProduceResponseV6 = ProduceResponseV5

type ProduceResponseV7 = ProduceResponseV5

newtype SaslAuthenticateRequestV0 = SaslAuthenticateRequestV0
{ authBytes :: ByteString
} deriving (Show, Eq, Generic)
Expand Down Expand Up @@ -2500,16 +2524,39 @@ data HStreamKafkaV6
instance Service HStreamKafkaV6 where
type ServiceName HStreamKafkaV6 = "HStreamKafkaV6"
type ServiceMethods HStreamKafkaV6 =
'[ "fetch"
'[ "produce"
, "fetch"
]

instance HasMethodImpl HStreamKafkaV6 "produce" where
type MethodName HStreamKafkaV6 "produce" = "produce"
type MethodKey HStreamKafkaV6 "produce" = 0
type MethodVersion HStreamKafkaV6 "produce" = 6
type MethodInput HStreamKafkaV6 "produce" = ProduceRequestV6
type MethodOutput HStreamKafkaV6 "produce" = ProduceResponseV6

instance HasMethodImpl HStreamKafkaV6 "fetch" where
type MethodName HStreamKafkaV6 "fetch" = "fetch"
type MethodKey HStreamKafkaV6 "fetch" = 1
type MethodVersion HStreamKafkaV6 "fetch" = 6
type MethodInput HStreamKafkaV6 "fetch" = FetchRequestV6
type MethodOutput HStreamKafkaV6 "fetch" = FetchResponseV6

data HStreamKafkaV7

instance Service HStreamKafkaV7 where
type ServiceName HStreamKafkaV7 = "HStreamKafkaV7"
type ServiceMethods HStreamKafkaV7 =
'[ "produce"
]

instance HasMethodImpl HStreamKafkaV7 "produce" where
type MethodName HStreamKafkaV7 "produce" = "produce"
type MethodKey HStreamKafkaV7 "produce" = 0
type MethodVersion HStreamKafkaV7 "produce" = 7
type MethodInput HStreamKafkaV7 "produce" = ProduceRequestV7
type MethodOutput HStreamKafkaV7 "produce" = ProduceResponseV7

-------------------------------------------------------------------------------

newtype ApiKey = ApiKey Int16
Expand Down Expand Up @@ -2545,7 +2592,7 @@ instance Show ApiKey where

supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 0 5
[ ApiVersionV0 (ApiKey 0) 0 7
, ApiVersionV0 (ApiKey 1) 0 6
, ApiVersionV0 (ApiKey 2) 0 2
, ApiVersionV0 (ApiKey 3) 0 5
Expand Down Expand Up @@ -2579,6 +2626,8 @@ getHeaderVersion (ApiKey (0)) 2 = (1, 0)
getHeaderVersion (ApiKey (0)) 3 = (1, 0)
getHeaderVersion (ApiKey (0)) 4 = (1, 0)
getHeaderVersion (ApiKey (0)) 5 = (1, 0)
getHeaderVersion (ApiKey (0)) 6 = (1, 0)
getHeaderVersion (ApiKey (0)) 7 = (1, 0)
getHeaderVersion (ApiKey (1)) 0 = (1, 0)
getHeaderVersion (ApiKey (1)) 1 = (1, 0)
getHeaderVersion (ApiKey (1)) 2 = (1, 0)
Expand Down
48 changes: 48 additions & 0 deletions hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs
Original file line number Diff line number Diff line change
Expand Up @@ -1751,6 +1751,10 @@ partitionProduceDataToV4 :: PartitionProduceData -> PartitionProduceDataV4
partitionProduceDataToV4 = partitionProduceDataToV0
partitionProduceDataToV5 :: PartitionProduceData -> PartitionProduceDataV5
partitionProduceDataToV5 = partitionProduceDataToV0
partitionProduceDataToV6 :: PartitionProduceData -> PartitionProduceDataV6
partitionProduceDataToV6 = partitionProduceDataToV0
partitionProduceDataToV7 :: PartitionProduceData -> PartitionProduceDataV7
partitionProduceDataToV7 = partitionProduceDataToV0

partitionProduceDataFromV0 :: PartitionProduceDataV0 -> PartitionProduceData
partitionProduceDataFromV0 x = PartitionProduceData
Expand All @@ -1767,6 +1771,10 @@ partitionProduceDataFromV4 :: PartitionProduceDataV4 -> PartitionProduceData
partitionProduceDataFromV4 = partitionProduceDataFromV0
partitionProduceDataFromV5 :: PartitionProduceDataV5 -> PartitionProduceData
partitionProduceDataFromV5 = partitionProduceDataFromV0
partitionProduceDataFromV6 :: PartitionProduceDataV6 -> PartitionProduceData
partitionProduceDataFromV6 = partitionProduceDataFromV0
partitionProduceDataFromV7 :: PartitionProduceDataV7 -> PartitionProduceData
partitionProduceDataFromV7 = partitionProduceDataFromV0

data PartitionProduceResponse = PartitionProduceResponse
{ index :: {-# UNPACK #-} !Int32
Expand Down Expand Up @@ -1812,6 +1820,10 @@ partitionProduceResponseToV5 x = PartitionProduceResponseV5
, logAppendTimeMs = x.logAppendTimeMs
, logStartOffset = x.logStartOffset
}
partitionProduceResponseToV6 :: PartitionProduceResponse -> PartitionProduceResponseV6
partitionProduceResponseToV6 = partitionProduceResponseToV5
partitionProduceResponseToV7 :: PartitionProduceResponse -> PartitionProduceResponseV7
partitionProduceResponseToV7 = partitionProduceResponseToV5

partitionProduceResponseFromV0 :: PartitionProduceResponseV0 -> PartitionProduceResponse
partitionProduceResponseFromV0 x = PartitionProduceResponse
Expand Down Expand Up @@ -1843,6 +1855,10 @@ partitionProduceResponseFromV5 x = PartitionProduceResponse
, logAppendTimeMs = x.logAppendTimeMs
, logStartOffset = x.logStartOffset
}
partitionProduceResponseFromV6 :: PartitionProduceResponseV6 -> PartitionProduceResponse
partitionProduceResponseFromV6 = partitionProduceResponseFromV5
partitionProduceResponseFromV7 :: PartitionProduceResponseV7 -> PartitionProduceResponse
partitionProduceResponseFromV7 = partitionProduceResponseFromV5

data SupportedFeatureKey = SupportedFeatureKey
{ name :: !CompactString
Expand Down Expand Up @@ -1918,6 +1934,10 @@ topicProduceDataToV4 :: TopicProduceData -> TopicProduceDataV4
topicProduceDataToV4 = topicProduceDataToV0
topicProduceDataToV5 :: TopicProduceData -> TopicProduceDataV5
topicProduceDataToV5 = topicProduceDataToV0
topicProduceDataToV6 :: TopicProduceData -> TopicProduceDataV6
topicProduceDataToV6 = topicProduceDataToV0
topicProduceDataToV7 :: TopicProduceData -> TopicProduceDataV7
topicProduceDataToV7 = topicProduceDataToV0

topicProduceDataFromV0 :: TopicProduceDataV0 -> TopicProduceData
topicProduceDataFromV0 x = TopicProduceData
Expand All @@ -1934,6 +1954,10 @@ topicProduceDataFromV4 :: TopicProduceDataV4 -> TopicProduceData
topicProduceDataFromV4 = topicProduceDataFromV0
topicProduceDataFromV5 :: TopicProduceDataV5 -> TopicProduceData
topicProduceDataFromV5 = topicProduceDataFromV0
topicProduceDataFromV6 :: TopicProduceDataV6 -> TopicProduceData
topicProduceDataFromV6 = topicProduceDataFromV0
topicProduceDataFromV7 :: TopicProduceDataV7 -> TopicProduceData
topicProduceDataFromV7 = topicProduceDataFromV0

data TopicProduceResponse = TopicProduceResponse
{ name :: !Text
Expand Down Expand Up @@ -1964,6 +1988,10 @@ topicProduceResponseToV5 x = TopicProduceResponseV5
{ name = x.name
, partitionResponses = fmap partitionProduceResponseToV5 x.partitionResponses
}
topicProduceResponseToV6 :: TopicProduceResponse -> TopicProduceResponseV6
topicProduceResponseToV6 = topicProduceResponseToV5
topicProduceResponseToV7 :: TopicProduceResponse -> TopicProduceResponseV7
topicProduceResponseToV7 = topicProduceResponseToV5

topicProduceResponseFromV0 :: TopicProduceResponseV0 -> TopicProduceResponse
topicProduceResponseFromV0 x = TopicProduceResponse
Expand All @@ -1986,6 +2014,10 @@ topicProduceResponseFromV5 x = TopicProduceResponse
{ name = x.name
, partitionResponses = fmap partitionProduceResponseFromV5 x.partitionResponses
}
topicProduceResponseFromV6 :: TopicProduceResponseV6 -> TopicProduceResponse
topicProduceResponseFromV6 = topicProduceResponseFromV5
topicProduceResponseFromV7 :: TopicProduceResponseV7 -> TopicProduceResponse
topicProduceResponseFromV7 = topicProduceResponseFromV5

data ApiVersionsRequest = ApiVersionsRequest
{ clientSoftwareName :: !CompactString
Expand Down Expand Up @@ -3574,6 +3606,10 @@ produceRequestToV4 :: ProduceRequest -> ProduceRequestV4
produceRequestToV4 = produceRequestToV3
produceRequestToV5 :: ProduceRequest -> ProduceRequestV5
produceRequestToV5 = produceRequestToV3
produceRequestToV6 :: ProduceRequest -> ProduceRequestV6
produceRequestToV6 = produceRequestToV3
produceRequestToV7 :: ProduceRequest -> ProduceRequestV7
produceRequestToV7 = produceRequestToV3

produceRequestFromV0 :: ProduceRequestV0 -> ProduceRequest
produceRequestFromV0 x = ProduceRequest
Expand All @@ -3597,6 +3633,10 @@ produceRequestFromV4 :: ProduceRequestV4 -> ProduceRequest
produceRequestFromV4 = produceRequestFromV3
produceRequestFromV5 :: ProduceRequestV5 -> ProduceRequest
produceRequestFromV5 = produceRequestFromV3
produceRequestFromV6 :: ProduceRequestV6 -> ProduceRequest
produceRequestFromV6 = produceRequestFromV3
produceRequestFromV7 :: ProduceRequestV7 -> ProduceRequest
produceRequestFromV7 = produceRequestFromV3

data ProduceResponse = ProduceResponse
{ responses :: !(KaArray TopicProduceResponse)
Expand Down Expand Up @@ -3630,6 +3670,10 @@ produceResponseToV5 x = ProduceResponseV5
{ responses = fmap topicProduceResponseToV5 x.responses
, throttleTimeMs = x.throttleTimeMs
}
produceResponseToV6 :: ProduceResponse -> ProduceResponseV6
produceResponseToV6 = produceResponseToV5
produceResponseToV7 :: ProduceResponse -> ProduceResponseV7
produceResponseToV7 = produceResponseToV5

produceResponseFromV0 :: ProduceResponseV0 -> ProduceResponse
produceResponseFromV0 x = ProduceResponse
Expand All @@ -3655,6 +3699,10 @@ produceResponseFromV5 x = ProduceResponse
{ responses = fmap topicProduceResponseFromV5 x.responses
, throttleTimeMs = x.throttleTimeMs
}
produceResponseFromV6 :: ProduceResponseV6 -> ProduceResponse
produceResponseFromV6 = produceResponseFromV5
produceResponseFromV7 :: ProduceResponseV7 -> ProduceResponse
produceResponseFromV7 = produceResponseFromV5

newtype SaslAuthenticateRequest = SaslAuthenticateRequest
{ authBytes :: ByteString
Expand Down
2 changes: 1 addition & 1 deletion script/kafka_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def get_field_default(field_type, default=None):
API_VERSION_PATCHES = {
"ApiVersions": (0, 3),
"Metadata": (0, 5),
"Produce": (0, 5),
"Produce": (0, 7),
"Fetch": (0, 6),
"OffsetFetch": (0, 3),
"OffsetCommit": (0, 3),
Expand Down

0 comments on commit 50ae501

Please sign in to comment.