From 50ae5016331e6e9dc2be79debae353d3817d2f49 Mon Sep 17 00:00:00 2001 From: mu <59917266+4eUeP@users.noreply.github.com> Date: Fri, 22 Mar 2024 11:34:17 +0800 Subject: [PATCH] kafka: upgrade Produce version to v7 (#1782) --- .../HStream/Kafka/Server/Handler.hsc | 4 +- .../protocol/Kafka/Protocol/Message/Struct.hs | 53 ++++++++++++++++++- .../protocol/Kafka/Protocol/Message/Total.hs | 48 +++++++++++++++++ script/kafka_gen.py | 2 +- 4 files changed, 102 insertions(+), 5 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hsc b/hstream-kafka/HStream/Kafka/Server/Handler.hsc index dc6355aa3..d9bca1f1a 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hsc +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -59,7 +59,7 @@ import qualified Kafka.Protocol.Service as K #cv_handler ApiVersions, 0, 3 #cv_handler ListOffsets, 0, 2 #cv_handler Metadata, 0, 5 -#cv_handler Produce, 0, 5 +#cv_handler Produce, 0, 7 #cv_handler InitProducerId, 0, 0 #cv_handler Fetch, 0, 6 #cv_handler DescribeConfigs, 0, 0 @@ -99,7 +99,7 @@ handlers sc = , #mk_handler ListOffsets, 0, 2 , #mk_handler Metadata, 0, 5 -- Write - , #mk_handler Produce, 0, 5 + , #mk_handler Produce, 0, 7 , #mk_handler InitProducerId, 0, 0 -- Read , #mk_handler Fetch, 0, 6 diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index 2d62a6802..26c28128d 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -900,6 +900,14 @@ type PartitionProduceDataV5 = PartitionProduceDataV0 type TopicProduceDataV5 = TopicProduceDataV0 +type PartitionProduceDataV6 = PartitionProduceDataV0 + +type TopicProduceDataV6 = TopicProduceDataV0 + +type PartitionProduceDataV7 = PartitionProduceDataV0 + +type TopicProduceDataV7 = TopicProduceDataV0 + data PartitionProduceResponseV0 = PartitionProduceResponseV0 { index :: {-# UNPACK #-} !Int32 -- ^ The partition index. @@ -978,6 +986,14 @@ data TopicProduceResponseV5 = TopicProduceResponseV5 } deriving (Show, Eq, Generic) instance Serializable TopicProduceResponseV5 +type PartitionProduceResponseV6 = PartitionProduceResponseV5 + +type TopicProduceResponseV6 = TopicProduceResponseV5 + +type PartitionProduceResponseV7 = PartitionProduceResponseV5 + +type TopicProduceResponseV7 = TopicProduceResponseV5 + data SyncGroupRequestAssignmentV0 = SyncGroupRequestAssignmentV0 { memberId :: !Text -- ^ The ID of the member to assign. @@ -1823,6 +1839,10 @@ type ProduceRequestV4 = ProduceRequestV3 type ProduceRequestV5 = ProduceRequestV3 +type ProduceRequestV6 = ProduceRequestV3 + +type ProduceRequestV7 = ProduceRequestV3 + newtype ProduceResponseV0 = ProduceResponseV0 { responses :: (KaArray TopicProduceResponseV0) } deriving (Show, Eq, Generic) @@ -1859,6 +1879,10 @@ data ProduceResponseV5 = ProduceResponseV5 } deriving (Show, Eq, Generic) instance Serializable ProduceResponseV5 +type ProduceResponseV6 = ProduceResponseV5 + +type ProduceResponseV7 = ProduceResponseV5 + newtype SaslAuthenticateRequestV0 = SaslAuthenticateRequestV0 { authBytes :: ByteString } deriving (Show, Eq, Generic) @@ -2500,9 +2524,17 @@ data HStreamKafkaV6 instance Service HStreamKafkaV6 where type ServiceName HStreamKafkaV6 = "HStreamKafkaV6" type ServiceMethods HStreamKafkaV6 = - '[ "fetch" + '[ "produce" + , "fetch" ] +instance HasMethodImpl HStreamKafkaV6 "produce" where + type MethodName HStreamKafkaV6 "produce" = "produce" + type MethodKey HStreamKafkaV6 "produce" = 0 + type MethodVersion HStreamKafkaV6 "produce" = 6 + type MethodInput HStreamKafkaV6 "produce" = ProduceRequestV6 + type MethodOutput HStreamKafkaV6 "produce" = ProduceResponseV6 + instance HasMethodImpl HStreamKafkaV6 "fetch" where type MethodName HStreamKafkaV6 "fetch" = "fetch" type MethodKey HStreamKafkaV6 "fetch" = 1 @@ -2510,6 +2542,21 @@ instance HasMethodImpl HStreamKafkaV6 "fetch" where type MethodInput HStreamKafkaV6 "fetch" = FetchRequestV6 type MethodOutput HStreamKafkaV6 "fetch" = FetchResponseV6 +data HStreamKafkaV7 + +instance Service HStreamKafkaV7 where + type ServiceName HStreamKafkaV7 = "HStreamKafkaV7" + type ServiceMethods HStreamKafkaV7 = + '[ "produce" + ] + +instance HasMethodImpl HStreamKafkaV7 "produce" where + type MethodName HStreamKafkaV7 "produce" = "produce" + type MethodKey HStreamKafkaV7 "produce" = 0 + type MethodVersion HStreamKafkaV7 "produce" = 7 + type MethodInput HStreamKafkaV7 "produce" = ProduceRequestV7 + type MethodOutput HStreamKafkaV7 "produce" = ProduceResponseV7 + ------------------------------------------------------------------------------- newtype ApiKey = ApiKey Int16 @@ -2545,7 +2592,7 @@ instance Show ApiKey where supportedApiVersions :: [ApiVersionV0] supportedApiVersions = - [ ApiVersionV0 (ApiKey 0) 0 5 + [ ApiVersionV0 (ApiKey 0) 0 7 , ApiVersionV0 (ApiKey 1) 0 6 , ApiVersionV0 (ApiKey 2) 0 2 , ApiVersionV0 (ApiKey 3) 0 5 @@ -2579,6 +2626,8 @@ getHeaderVersion (ApiKey (0)) 2 = (1, 0) getHeaderVersion (ApiKey (0)) 3 = (1, 0) getHeaderVersion (ApiKey (0)) 4 = (1, 0) getHeaderVersion (ApiKey (0)) 5 = (1, 0) +getHeaderVersion (ApiKey (0)) 6 = (1, 0) +getHeaderVersion (ApiKey (0)) 7 = (1, 0) getHeaderVersion (ApiKey (1)) 0 = (1, 0) getHeaderVersion (ApiKey (1)) 1 = (1, 0) getHeaderVersion (ApiKey (1)) 2 = (1, 0) diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index 57cb91419..4a22b1f5f 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -1751,6 +1751,10 @@ partitionProduceDataToV4 :: PartitionProduceData -> PartitionProduceDataV4 partitionProduceDataToV4 = partitionProduceDataToV0 partitionProduceDataToV5 :: PartitionProduceData -> PartitionProduceDataV5 partitionProduceDataToV5 = partitionProduceDataToV0 +partitionProduceDataToV6 :: PartitionProduceData -> PartitionProduceDataV6 +partitionProduceDataToV6 = partitionProduceDataToV0 +partitionProduceDataToV7 :: PartitionProduceData -> PartitionProduceDataV7 +partitionProduceDataToV7 = partitionProduceDataToV0 partitionProduceDataFromV0 :: PartitionProduceDataV0 -> PartitionProduceData partitionProduceDataFromV0 x = PartitionProduceData @@ -1767,6 +1771,10 @@ partitionProduceDataFromV4 :: PartitionProduceDataV4 -> PartitionProduceData partitionProduceDataFromV4 = partitionProduceDataFromV0 partitionProduceDataFromV5 :: PartitionProduceDataV5 -> PartitionProduceData partitionProduceDataFromV5 = partitionProduceDataFromV0 +partitionProduceDataFromV6 :: PartitionProduceDataV6 -> PartitionProduceData +partitionProduceDataFromV6 = partitionProduceDataFromV0 +partitionProduceDataFromV7 :: PartitionProduceDataV7 -> PartitionProduceData +partitionProduceDataFromV7 = partitionProduceDataFromV0 data PartitionProduceResponse = PartitionProduceResponse { index :: {-# UNPACK #-} !Int32 @@ -1812,6 +1820,10 @@ partitionProduceResponseToV5 x = PartitionProduceResponseV5 , logAppendTimeMs = x.logAppendTimeMs , logStartOffset = x.logStartOffset } +partitionProduceResponseToV6 :: PartitionProduceResponse -> PartitionProduceResponseV6 +partitionProduceResponseToV6 = partitionProduceResponseToV5 +partitionProduceResponseToV7 :: PartitionProduceResponse -> PartitionProduceResponseV7 +partitionProduceResponseToV7 = partitionProduceResponseToV5 partitionProduceResponseFromV0 :: PartitionProduceResponseV0 -> PartitionProduceResponse partitionProduceResponseFromV0 x = PartitionProduceResponse @@ -1843,6 +1855,10 @@ partitionProduceResponseFromV5 x = PartitionProduceResponse , logAppendTimeMs = x.logAppendTimeMs , logStartOffset = x.logStartOffset } +partitionProduceResponseFromV6 :: PartitionProduceResponseV6 -> PartitionProduceResponse +partitionProduceResponseFromV6 = partitionProduceResponseFromV5 +partitionProduceResponseFromV7 :: PartitionProduceResponseV7 -> PartitionProduceResponse +partitionProduceResponseFromV7 = partitionProduceResponseFromV5 data SupportedFeatureKey = SupportedFeatureKey { name :: !CompactString @@ -1918,6 +1934,10 @@ topicProduceDataToV4 :: TopicProduceData -> TopicProduceDataV4 topicProduceDataToV4 = topicProduceDataToV0 topicProduceDataToV5 :: TopicProduceData -> TopicProduceDataV5 topicProduceDataToV5 = topicProduceDataToV0 +topicProduceDataToV6 :: TopicProduceData -> TopicProduceDataV6 +topicProduceDataToV6 = topicProduceDataToV0 +topicProduceDataToV7 :: TopicProduceData -> TopicProduceDataV7 +topicProduceDataToV7 = topicProduceDataToV0 topicProduceDataFromV0 :: TopicProduceDataV0 -> TopicProduceData topicProduceDataFromV0 x = TopicProduceData @@ -1934,6 +1954,10 @@ topicProduceDataFromV4 :: TopicProduceDataV4 -> TopicProduceData topicProduceDataFromV4 = topicProduceDataFromV0 topicProduceDataFromV5 :: TopicProduceDataV5 -> TopicProduceData topicProduceDataFromV5 = topicProduceDataFromV0 +topicProduceDataFromV6 :: TopicProduceDataV6 -> TopicProduceData +topicProduceDataFromV6 = topicProduceDataFromV0 +topicProduceDataFromV7 :: TopicProduceDataV7 -> TopicProduceData +topicProduceDataFromV7 = topicProduceDataFromV0 data TopicProduceResponse = TopicProduceResponse { name :: !Text @@ -1964,6 +1988,10 @@ topicProduceResponseToV5 x = TopicProduceResponseV5 { name = x.name , partitionResponses = fmap partitionProduceResponseToV5 x.partitionResponses } +topicProduceResponseToV6 :: TopicProduceResponse -> TopicProduceResponseV6 +topicProduceResponseToV6 = topicProduceResponseToV5 +topicProduceResponseToV7 :: TopicProduceResponse -> TopicProduceResponseV7 +topicProduceResponseToV7 = topicProduceResponseToV5 topicProduceResponseFromV0 :: TopicProduceResponseV0 -> TopicProduceResponse topicProduceResponseFromV0 x = TopicProduceResponse @@ -1986,6 +2014,10 @@ topicProduceResponseFromV5 x = TopicProduceResponse { name = x.name , partitionResponses = fmap partitionProduceResponseFromV5 x.partitionResponses } +topicProduceResponseFromV6 :: TopicProduceResponseV6 -> TopicProduceResponse +topicProduceResponseFromV6 = topicProduceResponseFromV5 +topicProduceResponseFromV7 :: TopicProduceResponseV7 -> TopicProduceResponse +topicProduceResponseFromV7 = topicProduceResponseFromV5 data ApiVersionsRequest = ApiVersionsRequest { clientSoftwareName :: !CompactString @@ -3574,6 +3606,10 @@ produceRequestToV4 :: ProduceRequest -> ProduceRequestV4 produceRequestToV4 = produceRequestToV3 produceRequestToV5 :: ProduceRequest -> ProduceRequestV5 produceRequestToV5 = produceRequestToV3 +produceRequestToV6 :: ProduceRequest -> ProduceRequestV6 +produceRequestToV6 = produceRequestToV3 +produceRequestToV7 :: ProduceRequest -> ProduceRequestV7 +produceRequestToV7 = produceRequestToV3 produceRequestFromV0 :: ProduceRequestV0 -> ProduceRequest produceRequestFromV0 x = ProduceRequest @@ -3597,6 +3633,10 @@ produceRequestFromV4 :: ProduceRequestV4 -> ProduceRequest produceRequestFromV4 = produceRequestFromV3 produceRequestFromV5 :: ProduceRequestV5 -> ProduceRequest produceRequestFromV5 = produceRequestFromV3 +produceRequestFromV6 :: ProduceRequestV6 -> ProduceRequest +produceRequestFromV6 = produceRequestFromV3 +produceRequestFromV7 :: ProduceRequestV7 -> ProduceRequest +produceRequestFromV7 = produceRequestFromV3 data ProduceResponse = ProduceResponse { responses :: !(KaArray TopicProduceResponse) @@ -3630,6 +3670,10 @@ produceResponseToV5 x = ProduceResponseV5 { responses = fmap topicProduceResponseToV5 x.responses , throttleTimeMs = x.throttleTimeMs } +produceResponseToV6 :: ProduceResponse -> ProduceResponseV6 +produceResponseToV6 = produceResponseToV5 +produceResponseToV7 :: ProduceResponse -> ProduceResponseV7 +produceResponseToV7 = produceResponseToV5 produceResponseFromV0 :: ProduceResponseV0 -> ProduceResponse produceResponseFromV0 x = ProduceResponse @@ -3655,6 +3699,10 @@ produceResponseFromV5 x = ProduceResponse { responses = fmap topicProduceResponseFromV5 x.responses , throttleTimeMs = x.throttleTimeMs } +produceResponseFromV6 :: ProduceResponseV6 -> ProduceResponse +produceResponseFromV6 = produceResponseFromV5 +produceResponseFromV7 :: ProduceResponseV7 -> ProduceResponse +produceResponseFromV7 = produceResponseFromV5 newtype SaslAuthenticateRequest = SaslAuthenticateRequest { authBytes :: ByteString diff --git a/script/kafka_gen.py b/script/kafka_gen.py index 46e1037f7..7924f8145 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -110,7 +110,7 @@ def get_field_default(field_type, default=None): API_VERSION_PATCHES = { "ApiVersions": (0, 3), "Metadata": (0, 5), - "Produce": (0, 5), + "Produce": (0, 7), "Fetch": (0, 6), "OffsetFetch": (0, 3), "OffsetCommit": (0, 3),