Skip to content

Commit

Permalink
kafka: upgrade Fetch to version 6 (#1731)
Browse files Browse the repository at this point in the history
Note that we do not support the following features currently:

- lastStableOffset  (unimplemented)
- abortedTransactions  (unimplemented)
- logStartOffset  (for performance reasons)
  • Loading branch information
4eUeP authored Jan 5, 2024
1 parent ed15f15 commit 76fa030
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 16 deletions.
4 changes: 2 additions & 2 deletions hstream-kafka/HStream/Kafka/Server/Handler.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ import qualified Kafka.Protocol.Service as K
#cv_handler Metadata, 0, 5
#cv_handler Produce, 0, 5
#cv_handler InitProducerId, 0, 0
#cv_handler Fetch, 0, 4
#cv_handler Fetch, 0, 6
#cv_handler DescribeConfigs, 0, 0

#cv_handler CreateTopics, 0, 2
Expand Down Expand Up @@ -93,7 +93,7 @@ handlers sc =
, #mk_handler Produce, 0, 5
, #mk_handler InitProducerId, 0, 0
-- Read
, #mk_handler Fetch, 0, 4
, #mk_handler Fetch, 0, 6

, #mk_handler FindCoordinator, 0, 1

Expand Down
44 changes: 32 additions & 12 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,17 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
mgv <- HT.lookup readRecords partition.logid
case mgv of
Nothing ->
pure $ K.PartitionData request.partition K.NONE
hioffset
(Just "")
(-1){- TODO: lastStableOffset -}
(K.NonNullKaArray V.empty){- TODO: abortedTransactions -}
pure $ K.PartitionData
{ partitionIndex = request.partition
, errorCode = K.NONE
, highWatermark = hioffset
, recordBytes = (Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement
-- logStartOffset now
, logStartOffset = (-1)
}
Just gv -> do
v <- GV.unsafeFreeze gv
bs <- encodePartition mutMaxBytes mutIsFirstPartition request v
Expand All @@ -169,9 +175,17 @@ handleFetch ServerContext{..} _ r = K.catchFetchResponseEx $ do
let totalRecords = V.sum $ V.map (\K.RecordFormat{..} -> batchLength) v
M.addCounter counter (fromIntegral totalRecords)
-- PartitionData
pure $ K.PartitionData request.partition K.NONE hioffset (Just bs)
(-1){- TODO: lastStableOffset -}
(K.NonNullKaArray V.empty){- TODO: abortedTransactions -}
pure $ K.PartitionData
{ partitionIndex = request.partition
, errorCode = K.NONE
, highWatermark = hioffset
, recordBytes = (Just bs)
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement
-- logStartOffset now
, logStartOffset = (-1)
}
pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas)
pure $ K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -}

Expand Down Expand Up @@ -346,10 +360,16 @@ encodePartition mutMaxBytes mutIsFirstPartition p v = do
pure (fstRecordBytes, vs)

errorPartitionResponse :: Int32 -> K.ErrorCode -> K.PartitionData
errorPartitionResponse partitionIndex ec =
K.PartitionData partitionIndex ec (-1) (Just "")
(-1){- TODO: lastStableOffset -}
(K.NonNullKaArray V.empty){- TODO: abortedTransactions -}
errorPartitionResponse partitionIndex ec = K.PartitionData
{ partitionIndex = partitionIndex
, errorCode = ec
, highWatermark = (-1)
, recordBytes = (Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement logStartOffset now
, logStartOffset = (-1)
}
{-# INLINE errorPartitionResponse #-}

foldWhileM :: Monad m => a -> (a -> m (a, Bool)) -> m a
Expand Down
127 changes: 126 additions & 1 deletion hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,32 @@ type FetchPartitionV4 = FetchPartitionV0

type FetchTopicV4 = FetchTopicV0

data FetchPartitionV5 = FetchPartitionV5
{ partition :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, fetchOffset :: {-# UNPACK #-} !Int64
-- ^ The message offset.
, logStartOffset :: {-# UNPACK #-} !Int64
-- ^ The earliest available offset of the follower replica. The field is
-- only used when the request is sent by the follower.
, partitionMaxBytes :: {-# UNPACK #-} !Int32
-- ^ The maximum bytes to fetch from this partition. See KIP-74 for cases
-- where this limit may not be honored.
} deriving (Show, Eq, Generic)
instance Serializable FetchPartitionV5

data FetchTopicV5 = FetchTopicV5
{ topic :: !Text
-- ^ The name of the topic to fetch.
, partitions :: !(KaArray FetchPartitionV5)
-- ^ The partitions to fetch.
} deriving (Show, Eq, Generic)
instance Serializable FetchTopicV5

type FetchPartitionV6 = FetchPartitionV5

type FetchTopicV6 = FetchTopicV5

data PartitionDataV0 = PartitionDataV0
{ partitionIndex :: {-# UNPACK #-} !Int32
-- ^ The partition index.
Expand Down Expand Up @@ -322,6 +348,42 @@ data FetchableTopicResponseV4 = FetchableTopicResponseV4
} deriving (Show, Eq, Generic)
instance Serializable FetchableTopicResponseV4

type AbortedTransactionV5 = AbortedTransactionV4

data PartitionDataV5 = PartitionDataV5
{ partitionIndex :: {-# UNPACK #-} !Int32
-- ^ The partition index.
, errorCode :: {-# UNPACK #-} !ErrorCode
-- ^ The error code, or 0 if there was no fetch error.
, highWatermark :: {-# UNPACK #-} !Int64
-- ^ The current high water mark.
, lastStableOffset :: {-# UNPACK #-} !Int64
-- ^ The last stable offset (or LSO) of the partition. This is the last
-- offset such that the state of all transactional records prior to this
-- offset have been decided (ABORTED or COMMITTED)
, logStartOffset :: {-# UNPACK #-} !Int64
-- ^ The current log start offset.
, abortedTransactions :: !(KaArray AbortedTransactionV4)
-- ^ The aborted transactions.
, recordBytes :: !NullableBytes
-- ^ The record data.
} deriving (Show, Eq, Generic)
instance Serializable PartitionDataV5

data FetchableTopicResponseV5 = FetchableTopicResponseV5
{ topic :: !Text
-- ^ The topic name.
, partitions :: !(KaArray PartitionDataV5)
-- ^ The topic partitions.
} deriving (Show, Eq, Generic)
instance Serializable FetchableTopicResponseV5

type AbortedTransactionV6 = AbortedTransactionV4

type PartitionDataV6 = PartitionDataV5

type FetchableTopicResponseV6 = FetchableTopicResponseV5

data JoinGroupRequestProtocolV0 = JoinGroupRequestProtocolV0
{ name :: !Text
-- ^ The protocol name.
Expand Down Expand Up @@ -1013,6 +1075,33 @@ data FetchRequestV4 = FetchRequestV4
} deriving (Show, Eq, Generic)
instance Serializable FetchRequestV4

data FetchRequestV5 = FetchRequestV5
{ replicaId :: {-# UNPACK #-} !Int32
-- ^ The broker ID of the follower, of -1 if this request is from a
-- consumer.
, maxWaitMs :: {-# UNPACK #-} !Int32
-- ^ The maximum time in milliseconds to wait for the response.
, minBytes :: {-# UNPACK #-} !Int32
-- ^ The minimum bytes to accumulate in the response.
, maxBytes :: {-# UNPACK #-} !Int32
-- ^ The maximum bytes to fetch. See KIP-74 for cases where this limit may
-- not be honored.
, isolationLevel :: {-# UNPACK #-} !Int8
-- ^ This setting controls the visibility of transactional records. Using
-- READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With
-- READ_COMMITTED (isolation_level = 1), non-transactional and COMMITTED
-- transactional records are visible. To be more concrete, READ_COMMITTED
-- returns all data from offsets smaller than the current LSO (last stable
-- offset), and enables the inclusion of the list of aborted transactions
-- in the result, which allows consumers to discard ABORTED transactional
-- records
, topics :: !(KaArray FetchTopicV5)
-- ^ The topics to fetch.
} deriving (Show, Eq, Generic)
instance Serializable FetchRequestV5

type FetchRequestV6 = FetchRequestV5

newtype FetchResponseV0 = FetchResponseV0
{ responses :: (KaArray FetchableTopicResponseV0)
} deriving (Show, Eq, Generic)
Expand Down Expand Up @@ -1040,6 +1129,17 @@ data FetchResponseV4 = FetchResponseV4
} deriving (Show, Eq, Generic)
instance Serializable FetchResponseV4

data FetchResponseV5 = FetchResponseV5
{ throttleTimeMs :: {-# UNPACK #-} !Int32
-- ^ The duration in milliseconds for which the request was throttled due
-- to a quota violation, or zero if the request did not violate any quota.
, responses :: !(KaArray FetchableTopicResponseV5)
-- ^ The response topics.
} deriving (Show, Eq, Generic)
instance Serializable FetchResponseV5

type FetchResponseV6 = FetchResponseV5

newtype FindCoordinatorRequestV0 = FindCoordinatorRequestV0
{ key :: Text
} deriving (Show, Eq, Generic)
Expand Down Expand Up @@ -2126,6 +2226,7 @@ instance Service HStreamKafkaV5 where
type ServiceName HStreamKafkaV5 = "HStreamKafkaV5"
type ServiceMethods HStreamKafkaV5 =
'[ "produce"
, "fetch"
, "metadata"
]

Expand All @@ -2136,13 +2237,35 @@ instance HasMethodImpl HStreamKafkaV5 "produce" where
type MethodInput HStreamKafkaV5 "produce" = ProduceRequestV5
type MethodOutput HStreamKafkaV5 "produce" = ProduceResponseV5

instance HasMethodImpl HStreamKafkaV5 "fetch" where
type MethodName HStreamKafkaV5 "fetch" = "fetch"
type MethodKey HStreamKafkaV5 "fetch" = 1
type MethodVersion HStreamKafkaV5 "fetch" = 5
type MethodInput HStreamKafkaV5 "fetch" = FetchRequestV5
type MethodOutput HStreamKafkaV5 "fetch" = FetchResponseV5

instance HasMethodImpl HStreamKafkaV5 "metadata" where
type MethodName HStreamKafkaV5 "metadata" = "metadata"
type MethodKey HStreamKafkaV5 "metadata" = 3
type MethodVersion HStreamKafkaV5 "metadata" = 5
type MethodInput HStreamKafkaV5 "metadata" = MetadataRequestV5
type MethodOutput HStreamKafkaV5 "metadata" = MetadataResponseV5

data HStreamKafkaV6

instance Service HStreamKafkaV6 where
type ServiceName HStreamKafkaV6 = "HStreamKafkaV6"
type ServiceMethods HStreamKafkaV6 =
'[ "fetch"
]

instance HasMethodImpl HStreamKafkaV6 "fetch" where
type MethodName HStreamKafkaV6 "fetch" = "fetch"
type MethodKey HStreamKafkaV6 "fetch" = 1
type MethodVersion HStreamKafkaV6 "fetch" = 6
type MethodInput HStreamKafkaV6 "fetch" = FetchRequestV6
type MethodOutput HStreamKafkaV6 "fetch" = FetchResponseV6

-------------------------------------------------------------------------------

newtype ApiKey = ApiKey Int16
Expand Down Expand Up @@ -2175,7 +2298,7 @@ instance Show ApiKey where
supportedApiVersions :: [ApiVersionV0]
supportedApiVersions =
[ ApiVersionV0 (ApiKey 0) 0 5
, ApiVersionV0 (ApiKey 1) 0 4
, ApiVersionV0 (ApiKey 1) 0 6
, ApiVersionV0 (ApiKey 2) 0 2
, ApiVersionV0 (ApiKey 3) 0 5
, ApiVersionV0 (ApiKey 8) 0 3
Expand Down Expand Up @@ -2209,6 +2332,8 @@ getHeaderVersion (ApiKey (1)) 1 = (1, 0)
getHeaderVersion (ApiKey (1)) 2 = (1, 0)
getHeaderVersion (ApiKey (1)) 3 = (1, 0)
getHeaderVersion (ApiKey (1)) 4 = (1, 0)
getHeaderVersion (ApiKey (1)) 5 = (1, 0)
getHeaderVersion (ApiKey (1)) 6 = (1, 0)
getHeaderVersion (ApiKey (2)) 0 = (1, 0)
getHeaderVersion (ApiKey (2)) 1 = (1, 0)
getHeaderVersion (ApiKey (2)) 2 = (1, 0)
Expand Down
Loading

0 comments on commit 76fa030

Please sign in to comment.