diff --git a/hstream-kafka/HStream/Kafka/Server/Handler.hsc b/hstream-kafka/HStream/Kafka/Server/Handler.hsc index cbab09ddb..314179ecc 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler.hsc +++ b/hstream-kafka/HStream/Kafka/Server/Handler.hsc @@ -61,7 +61,7 @@ import qualified Kafka.Protocol.Service as K #cv_handler Metadata, 0, 5 #cv_handler Produce, 0, 5 #cv_handler InitProducerId, 0, 0 -#cv_handler Fetch, 0, 4 +#cv_handler Fetch, 0, 6 #cv_handler DescribeConfigs, 0, 0 #cv_handler CreateTopics, 0, 2 @@ -93,7 +93,7 @@ handlers sc = , #mk_handler Produce, 0, 5 , #mk_handler InitProducerId, 0, 0 -- Read - , #mk_handler Fetch, 0, 4 + , #mk_handler Fetch, 0, 6 , #mk_handler FindCoordinator, 0, 1 diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 8cd89e854..2d1670b31 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -153,11 +153,17 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do mgv <- HT.lookup readRecords partition.logid case mgv of Nothing -> - pure $ K.PartitionData request.partition K.NONE - hioffset - (Just "") - (-1){- TODO: lastStableOffset -} - (K.NonNullKaArray V.empty){- TODO: abortedTransactions -} + pure $ K.PartitionData + { partitionIndex = request.partition + , errorCode = K.NONE + , highWatermark = hioffset + , recordBytes = (Just "") + , lastStableOffset = (-1) -- TODO + , abortedTransactions = K.NonNullKaArray V.empty -- TODO + -- TODO: for performance reason, we don't implement + -- logStartOffset now + , logStartOffset = (-1) + } Just gv -> do v <- GV.unsafeFreeze gv bs <- encodePartition mutMaxBytes mutIsFirstPartition request v @@ -169,9 +175,17 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do let totalRecords = V.sum $ V.map (\K.RecordFormat{..} -> batchLength) v M.addCounter counter (fromIntegral totalRecords) -- PartitionData - pure $ K.PartitionData request.partition K.NONE hioffset (Just bs) - (-1){- TODO: lastStableOffset -} - (K.NonNullKaArray V.empty){- TODO: abortedTransactions -} + pure $ K.PartitionData + { partitionIndex = request.partition + , errorCode = K.NONE + , highWatermark = hioffset + , recordBytes = (Just bs) + , lastStableOffset = (-1) -- TODO + , abortedTransactions = K.NonNullKaArray V.empty -- TODO + -- TODO: for performance reason, we don't implement + -- logStartOffset now + , logStartOffset = (-1) + } pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas) pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -} @@ -346,10 +360,16 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do pure (fstRecordBytes, vs) errorPartitionResponse :: Int32 -> K.ErrorCode -> K.PartitionData -errorPartitionResponse partitionIndex ec = - K.PartitionData partitionIndex ec (-1) (Just "") - (-1){- TODO: lastStableOffset -} - (K.NonNullKaArray V.empty){- TODO: abortedTransactions -} +errorPartitionResponse partitionIndex ec = K.PartitionData + { partitionIndex = partitionIndex + , errorCode = ec + , highWatermark = (-1) + , recordBytes = (Just "") + , lastStableOffset = (-1) -- TODO + , abortedTransactions = K.NonNullKaArray V.empty -- TODO + -- TODO: for performance reason, we don't implement logStartOffset now + , logStartOffset = (-1) + } {-# INLINE errorPartitionResponse #-} foldWhileM :: Monad m => a -> (a -> m (a, Bool)) -> m a diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index 767054eab..bba6ca8b2 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -256,6 +256,32 @@ type FetchPartitionV4 = FetchPartitionV0 type FetchTopicV4 = FetchTopicV0 +data FetchPartitionV5 = FetchPartitionV5 + { partition :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , fetchOffset :: {-# UNPACK #-} !Int64 + -- ^ The message offset. + , logStartOffset :: {-# UNPACK #-} !Int64 + -- ^ The earliest available offset of the follower replica. The field is + -- only used when the request is sent by the follower. + , partitionMaxBytes :: {-# UNPACK #-} !Int32 + -- ^ The maximum bytes to fetch from this partition. See KIP-74 for cases + -- where this limit may not be honored. + } deriving (Show, Eq, Generic) +instance Serializable FetchPartitionV5 + +data FetchTopicV5 = FetchTopicV5 + { topic :: !Text + -- ^ The name of the topic to fetch. + , partitions :: !(KaArray FetchPartitionV5) + -- ^ The partitions to fetch. + } deriving (Show, Eq, Generic) +instance Serializable FetchTopicV5 + +type FetchPartitionV6 = FetchPartitionV5 + +type FetchTopicV6 = FetchTopicV5 + data PartitionDataV0 = PartitionDataV0 { partitionIndex :: {-# UNPACK #-} !Int32 -- ^ The partition index. @@ -322,6 +348,42 @@ data FetchableTopicResponseV4 = FetchableTopicResponseV4 } deriving (Show, Eq, Generic) instance Serializable FetchableTopicResponseV4 +type AbortedTransactionV5 = AbortedTransactionV4 + +data PartitionDataV5 = PartitionDataV5 + { partitionIndex :: {-# UNPACK #-} !Int32 + -- ^ The partition index. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no fetch error. + , highWatermark :: {-# UNPACK #-} !Int64 + -- ^ The current high water mark. + , lastStableOffset :: {-# UNPACK #-} !Int64 + -- ^ The last stable offset (or LSO) of the partition. This is the last + -- offset such that the state of all transactional records prior to this + -- offset have been decided (ABORTED or COMMITTED) + , logStartOffset :: {-# UNPACK #-} !Int64 + -- ^ The current log start offset. + , abortedTransactions :: !(KaArray AbortedTransactionV4) + -- ^ The aborted transactions. + , recordBytes :: !NullableBytes + -- ^ The record data. + } deriving (Show, Eq, Generic) +instance Serializable PartitionDataV5 + +data FetchableTopicResponseV5 = FetchableTopicResponseV5 + { topic :: !Text + -- ^ The topic name. + , partitions :: !(KaArray PartitionDataV5) + -- ^ The topic partitions. + } deriving (Show, Eq, Generic) +instance Serializable FetchableTopicResponseV5 + +type AbortedTransactionV6 = AbortedTransactionV4 + +type PartitionDataV6 = PartitionDataV5 + +type FetchableTopicResponseV6 = FetchableTopicResponseV5 + data JoinGroupRequestProtocolV0 = JoinGroupRequestProtocolV0 { name :: !Text -- ^ The protocol name. @@ -1013,6 +1075,33 @@ data FetchRequestV4 = FetchRequestV4 } deriving (Show, Eq, Generic) instance Serializable FetchRequestV4 +data FetchRequestV5 = FetchRequestV5 + { replicaId :: {-# UNPACK #-} !Int32 + -- ^ The broker ID of the follower, of -1 if this request is from a + -- consumer. + , maxWaitMs :: {-# UNPACK #-} !Int32 + -- ^ The maximum time in milliseconds to wait for the response. + , minBytes :: {-# UNPACK #-} !Int32 + -- ^ The minimum bytes to accumulate in the response. + , maxBytes :: {-# UNPACK #-} !Int32 + -- ^ The maximum bytes to fetch. See KIP-74 for cases where this limit may + -- not be honored. + , isolationLevel :: {-# UNPACK #-} !Int8 + -- ^ This setting controls the visibility of transactional records. Using + -- READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With + -- READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED + -- transactional records are visible. To be more concrete, READ_COMMITTED + -- returns all data from offsets smaller than the current LSO (last stable + -- offset), and enables the inclusion of the list of aborted transactions + -- in the result, which allows consumers to discard ABORTED transactional + -- records + , topics :: !(KaArray FetchTopicV5) + -- ^ The topics to fetch. + } deriving (Show, Eq, Generic) +instance Serializable FetchRequestV5 + +type FetchRequestV6 = FetchRequestV5 + newtype FetchResponseV0 = FetchResponseV0 { responses :: (KaArray FetchableTopicResponseV0) } deriving (Show, Eq, Generic) @@ -1040,6 +1129,17 @@ data FetchResponseV4 = FetchResponseV4 } deriving (Show, Eq, Generic) instance Serializable FetchResponseV4 +data FetchResponseV5 = FetchResponseV5 + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , responses :: !(KaArray FetchableTopicResponseV5) + -- ^ The response topics. + } deriving (Show, Eq, Generic) +instance Serializable FetchResponseV5 + +type FetchResponseV6 = FetchResponseV5 + newtype FindCoordinatorRequestV0 = FindCoordinatorRequestV0 { key :: Text } deriving (Show, Eq, Generic) @@ -2126,6 +2226,7 @@ instance Service HStreamKafkaV5 where type ServiceName HStreamKafkaV5 = "HStreamKafkaV5" type ServiceMethods HStreamKafkaV5 = '[ "produce" + , "fetch" , "metadata" ] @@ -2136,6 +2237,13 @@ instance HasMethodImpl HStreamKafkaV5 "produce" where type MethodInput HStreamKafkaV5 "produce" = ProduceRequestV5 type MethodOutput HStreamKafkaV5 "produce" = ProduceResponseV5 +instance HasMethodImpl HStreamKafkaV5 "fetch" where + type MethodName HStreamKafkaV5 "fetch" = "fetch" + type MethodKey HStreamKafkaV5 "fetch" = 1 + type MethodVersion HStreamKafkaV5 "fetch" = 5 + type MethodInput HStreamKafkaV5 "fetch" = FetchRequestV5 + type MethodOutput HStreamKafkaV5 "fetch" = FetchResponseV5 + instance HasMethodImpl HStreamKafkaV5 "metadata" where type MethodName HStreamKafkaV5 "metadata" = "metadata" type MethodKey HStreamKafkaV5 "metadata" = 3 @@ -2143,6 +2251,21 @@ instance HasMethodImpl HStreamKafkaV5 "metadata" where type MethodInput HStreamKafkaV5 "metadata" = MetadataRequestV5 type MethodOutput HStreamKafkaV5 "metadata" = MetadataResponseV5 +data HStreamKafkaV6 + +instance Service HStreamKafkaV6 where + type ServiceName HStreamKafkaV6 = "HStreamKafkaV6" + type ServiceMethods HStreamKafkaV6 = + '[ "fetch" + ] + +instance HasMethodImpl HStreamKafkaV6 "fetch" where + type MethodName HStreamKafkaV6 "fetch" = "fetch" + type MethodKey HStreamKafkaV6 "fetch" = 1 + type MethodVersion HStreamKafkaV6 "fetch" = 6 + type MethodInput HStreamKafkaV6 "fetch" = FetchRequestV6 + type MethodOutput HStreamKafkaV6 "fetch" = FetchResponseV6 + ------------------------------------------------------------------------------- newtype ApiKey = ApiKey Int16 @@ -2175,7 +2298,7 @@ instance Show ApiKey where supportedApiVersions :: [ApiVersionV0] supportedApiVersions = [ ApiVersionV0 (ApiKey 0) 0 5 - , ApiVersionV0 (ApiKey 1) 0 4 + , ApiVersionV0 (ApiKey 1) 0 6 , ApiVersionV0 (ApiKey 2) 0 2 , ApiVersionV0 (ApiKey 3) 0 5 , ApiVersionV0 (ApiKey 8) 0 3 @@ -2209,6 +2332,8 @@ getHeaderVersion (ApiKey (1)) 1 = (1, 0) getHeaderVersion (ApiKey (1)) 2 = (1, 0) getHeaderVersion (ApiKey (1)) 3 = (1, 0) getHeaderVersion (ApiKey (1)) 4 = (1, 0) +getHeaderVersion (ApiKey (1)) 5 = (1, 0) +getHeaderVersion (ApiKey (1)) 6 = (1, 0) getHeaderVersion (ApiKey (2)) 0 = (1, 0) getHeaderVersion (ApiKey (2)) 1 = (1, 0) getHeaderVersion (ApiKey (2)) 2 = (1, 0) diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index 6b2672dea..dd73d102d 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -38,12 +38,20 @@ abortedTransactionToV4 x = AbortedTransactionV4 { producerId = x.producerId , firstOffset = x.firstOffset } +abortedTransactionToV5 :: AbortedTransaction -> AbortedTransactionV5 +abortedTransactionToV5 = abortedTransactionToV4 +abortedTransactionToV6 :: AbortedTransaction -> AbortedTransactionV6 +abortedTransactionToV6 = abortedTransactionToV4 abortedTransactionFromV4 :: AbortedTransactionV4 -> AbortedTransaction abortedTransactionFromV4 x = AbortedTransaction { producerId = x.producerId , firstOffset = x.firstOffset } +abortedTransactionFromV5 :: AbortedTransactionV5 -> AbortedTransaction +abortedTransactionFromV5 = abortedTransactionFromV4 +abortedTransactionFromV6 :: AbortedTransactionV6 -> AbortedTransaction +abortedTransactionFromV6 = abortedTransactionFromV4 data ApiVersion = ApiVersion { apiKey :: {-# UNPACK #-} !ApiKey @@ -432,6 +440,9 @@ data FetchPartition = FetchPartition , partitionMaxBytes :: {-# UNPACK #-} !Int32 -- ^ The maximum bytes to fetch from this partition. See KIP-74 for cases -- where this limit may not be honored. + , logStartOffset :: {-# UNPACK #-} !Int64 + -- ^ The earliest available offset of the follower replica. The field is + -- only used when the request is sent by the follower. } deriving (Show, Eq, Generic) instance Serializable FetchPartition @@ -449,12 +460,22 @@ fetchPartitionToV3 :: FetchPartition -> FetchPartitionV3 fetchPartitionToV3 = fetchPartitionToV0 fetchPartitionToV4 :: FetchPartition -> FetchPartitionV4 fetchPartitionToV4 = fetchPartitionToV0 +fetchPartitionToV5 :: FetchPartition -> FetchPartitionV5 +fetchPartitionToV5 x = FetchPartitionV5 + { partition = x.partition + , fetchOffset = x.fetchOffset + , logStartOffset = x.logStartOffset + , partitionMaxBytes = x.partitionMaxBytes + } +fetchPartitionToV6 :: FetchPartition -> FetchPartitionV6 +fetchPartitionToV6 = fetchPartitionToV5 fetchPartitionFromV0 :: FetchPartitionV0 -> FetchPartition fetchPartitionFromV0 x = FetchPartition { partition = x.partition , fetchOffset = x.fetchOffset , partitionMaxBytes = x.partitionMaxBytes + , logStartOffset = (-1) } fetchPartitionFromV1 :: FetchPartitionV1 -> FetchPartition fetchPartitionFromV1 = fetchPartitionFromV0 @@ -464,6 +485,15 @@ fetchPartitionFromV3 :: FetchPartitionV3 -> FetchPartition fetchPartitionFromV3 = fetchPartitionFromV0 fetchPartitionFromV4 :: FetchPartitionV4 -> FetchPartition fetchPartitionFromV4 = fetchPartitionFromV0 +fetchPartitionFromV5 :: FetchPartitionV5 -> FetchPartition +fetchPartitionFromV5 x = FetchPartition + { partition = x.partition + , fetchOffset = x.fetchOffset + , partitionMaxBytes = x.partitionMaxBytes + , logStartOffset = x.logStartOffset + } +fetchPartitionFromV6 :: FetchPartitionV6 -> FetchPartition +fetchPartitionFromV6 = fetchPartitionFromV5 data FetchTopic = FetchTopic { topic :: !Text @@ -486,6 +516,13 @@ fetchTopicToV3 :: FetchTopic -> FetchTopicV3 fetchTopicToV3 = fetchTopicToV0 fetchTopicToV4 :: FetchTopic -> FetchTopicV4 fetchTopicToV4 = fetchTopicToV0 +fetchTopicToV5 :: FetchTopic -> FetchTopicV5 +fetchTopicToV5 x = FetchTopicV5 + { topic = x.topic + , partitions = fmap fetchPartitionToV5 x.partitions + } +fetchTopicToV6 :: FetchTopic -> FetchTopicV6 +fetchTopicToV6 = fetchTopicToV5 fetchTopicFromV0 :: FetchTopicV0 -> FetchTopic fetchTopicFromV0 x = FetchTopic @@ -500,6 +537,13 @@ fetchTopicFromV3 :: FetchTopicV3 -> FetchTopic fetchTopicFromV3 = fetchTopicFromV0 fetchTopicFromV4 :: FetchTopicV4 -> FetchTopic fetchTopicFromV4 = fetchTopicFromV0 +fetchTopicFromV5 :: FetchTopicV5 -> FetchTopic +fetchTopicFromV5 x = FetchTopic + { topic = x.topic + , partitions = fmap fetchPartitionFromV5 x.partitions + } +fetchTopicFromV6 :: FetchTopicV6 -> FetchTopic +fetchTopicFromV6 = fetchTopicFromV5 data FetchableTopicResponse = FetchableTopicResponse { topic :: !Text @@ -525,6 +569,13 @@ fetchableTopicResponseToV4 x = FetchableTopicResponseV4 { topic = x.topic , partitions = fmap partitionDataToV4 x.partitions } +fetchableTopicResponseToV5 :: FetchableTopicResponse -> FetchableTopicResponseV5 +fetchableTopicResponseToV5 x = FetchableTopicResponseV5 + { topic = x.topic + , partitions = fmap partitionDataToV5 x.partitions + } +fetchableTopicResponseToV6 :: FetchableTopicResponse -> FetchableTopicResponseV6 +fetchableTopicResponseToV6 = fetchableTopicResponseToV5 fetchableTopicResponseFromV0 :: FetchableTopicResponseV0 -> FetchableTopicResponse fetchableTopicResponseFromV0 x = FetchableTopicResponse @@ -542,6 +593,13 @@ fetchableTopicResponseFromV4 x = FetchableTopicResponse { topic = x.topic , partitions = fmap partitionDataFromV4 x.partitions } +fetchableTopicResponseFromV5 :: FetchableTopicResponseV5 -> FetchableTopicResponse +fetchableTopicResponseFromV5 x = FetchableTopicResponse + { topic = x.topic + , partitions = fmap partitionDataFromV5 x.partitions + } +fetchableTopicResponseFromV6 :: FetchableTopicResponseV6 -> FetchableTopicResponse +fetchableTopicResponseFromV6 = fetchableTopicResponseFromV5 data FinalizedFeatureKey = FinalizedFeatureKey { name :: !CompactString @@ -1304,6 +1362,8 @@ data PartitionData = PartitionData -- offset have been decided (ABORTED or COMMITTED) , abortedTransactions :: !(KaArray AbortedTransaction) -- ^ The aborted transactions. + , logStartOffset :: {-# UNPACK #-} !Int64 + -- ^ The current log start offset. } deriving (Show, Eq, Generic) instance Serializable PartitionData @@ -1329,6 +1389,18 @@ partitionDataToV4 x = PartitionDataV4 , abortedTransactions = fmap abortedTransactionToV4 x.abortedTransactions , recordBytes = x.recordBytes } +partitionDataToV5 :: PartitionData -> PartitionDataV5 +partitionDataToV5 x = PartitionDataV5 + { partitionIndex = x.partitionIndex + , errorCode = x.errorCode + , highWatermark = x.highWatermark + , lastStableOffset = x.lastStableOffset + , logStartOffset = x.logStartOffset + , abortedTransactions = fmap abortedTransactionToV5 x.abortedTransactions + , recordBytes = x.recordBytes + } +partitionDataToV6 :: PartitionData -> PartitionDataV6 +partitionDataToV6 = partitionDataToV5 partitionDataFromV0 :: PartitionDataV0 -> PartitionData partitionDataFromV0 x = PartitionData @@ -1338,6 +1410,7 @@ partitionDataFromV0 x = PartitionData , recordBytes = x.recordBytes , lastStableOffset = (-1) , abortedTransactions = KaArray (Just V.empty) + , logStartOffset = (-1) } partitionDataFromV1 :: PartitionDataV1 -> PartitionData partitionDataFromV1 = partitionDataFromV0 @@ -1353,7 +1426,20 @@ partitionDataFromV4 x = PartitionData , recordBytes = x.recordBytes , lastStableOffset = x.lastStableOffset , abortedTransactions = fmap abortedTransactionFromV4 x.abortedTransactions + , logStartOffset = (-1) } +partitionDataFromV5 :: PartitionDataV5 -> PartitionData +partitionDataFromV5 x = PartitionData + { partitionIndex = x.partitionIndex + , errorCode = x.errorCode + , highWatermark = x.highWatermark + , recordBytes = x.recordBytes + , lastStableOffset = x.lastStableOffset + , abortedTransactions = fmap abortedTransactionFromV5 x.abortedTransactions + , logStartOffset = x.logStartOffset + } +partitionDataFromV6 :: PartitionDataV6 -> PartitionData +partitionDataFromV6 = partitionDataFromV5 data PartitionProduceData = PartitionProduceData { index :: {-# UNPACK #-} !Int32 @@ -1982,6 +2068,17 @@ fetchRequestToV4 x = FetchRequestV4 , isolationLevel = x.isolationLevel , topics = fmap fetchTopicToV4 x.topics } +fetchRequestToV5 :: FetchRequest -> FetchRequestV5 +fetchRequestToV5 x = FetchRequestV5 + { replicaId = x.replicaId + , maxWaitMs = x.maxWaitMs + , minBytes = x.minBytes + , maxBytes = x.maxBytes + , isolationLevel = x.isolationLevel + , topics = fmap fetchTopicToV5 x.topics + } +fetchRequestToV6 :: FetchRequest -> FetchRequestV6 +fetchRequestToV6 = fetchRequestToV5 fetchRequestFromV0 :: FetchRequestV0 -> FetchRequest fetchRequestFromV0 x = FetchRequest @@ -2014,6 +2111,17 @@ fetchRequestFromV4 x = FetchRequest , maxBytes = x.maxBytes , isolationLevel = x.isolationLevel } +fetchRequestFromV5 :: FetchRequestV5 -> FetchRequest +fetchRequestFromV5 x = FetchRequest + { replicaId = x.replicaId + , maxWaitMs = x.maxWaitMs + , minBytes = x.minBytes + , topics = fmap fetchTopicFromV5 x.topics + , maxBytes = x.maxBytes + , isolationLevel = x.isolationLevel + } +fetchRequestFromV6 :: FetchRequestV6 -> FetchRequest +fetchRequestFromV6 = fetchRequestFromV5 data FetchResponse = FetchResponse { responses :: !(KaArray FetchableTopicResponse) @@ -2042,6 +2150,13 @@ fetchResponseToV4 x = FetchResponseV4 { throttleTimeMs = x.throttleTimeMs , responses = fmap fetchableTopicResponseToV4 x.responses } +fetchResponseToV5 :: FetchResponse -> FetchResponseV5 +fetchResponseToV5 x = FetchResponseV5 + { throttleTimeMs = x.throttleTimeMs + , responses = fmap fetchableTopicResponseToV5 x.responses + } +fetchResponseToV6 :: FetchResponse -> FetchResponseV6 +fetchResponseToV6 = fetchResponseToV5 fetchResponseFromV0 :: FetchResponseV0 -> FetchResponse fetchResponseFromV0 x = FetchResponse @@ -2062,6 +2177,13 @@ fetchResponseFromV4 x = FetchResponse { responses = fmap fetchableTopicResponseFromV4 x.responses , throttleTimeMs = x.throttleTimeMs } +fetchResponseFromV5 :: FetchResponseV5 -> FetchResponse +fetchResponseFromV5 x = FetchResponse + { responses = fmap fetchableTopicResponseFromV5 x.responses + , throttleTimeMs = x.throttleTimeMs + } +fetchResponseFromV6 :: FetchResponseV6 -> FetchResponse +fetchResponseFromV6 = fetchResponseFromV5 data FindCoordinatorRequest = FindCoordinatorRequest { key :: !Text diff --git a/script/kafka_gen.py b/script/kafka_gen.py index bc0e5c38e..dd038b237 100755 --- a/script/kafka_gen.py +++ b/script/kafka_gen.py @@ -111,7 +111,7 @@ def get_field_default(field_type, default=None): "ApiVersions": (0, 3), "Metadata": (0, 5), "Produce": (0, 5), - "Fetch": (0, 4), + "Fetch": (0, 6), "OffsetFetch": (0, 3), "OffsetCommit": (0, 3), "ListOffsets": (0, 2),