Skip to content

Commit

Permalink
kafka improvements: do not re-encode batches while producing
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Mar 25, 2024
1 parent 50ae501 commit 28f0e4b
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 7 deletions.
16 changes: 9 additions & 7 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ handleInitProducerId _ _ _ = do
}

-------------------------------------------------------------------------------

appendRecords
:: Bool
-> S.LDClient
Expand All @@ -157,7 +158,7 @@ appendRecords
-> ByteString
-> IO (S.AppendCompletion, Int64)
appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = do
(records, batchLength) <- K.decodeBatchRecords' shouldValidateCrc bs
(batchLength, offsetOffsets) <- K.decodeBatchRecordsForProduce shouldValidateCrc bs
when (batchLength < 1) $ error "Invalid batch length"

-- Offset wroten into storage is the max key in the batch, but return the min
Expand All @@ -181,15 +182,16 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
-- we can always get the correct result with the hi.
K.withOffsetN om logid (fromIntegral batchLength) $ \o -> do
let startOffset = o + 1 - fromIntegral batchLength
records' = K.modifyBatchRecordsOffset (+ startOffset) records
let appendKey = U.intToCBytesWithPadding o
appendKey = U.intToCBytesWithPadding o
appendAttrs = Just [(S.KeyTypeFindKey, appendKey)]
storedBs = K.encodeBatchRecords records'
-- FIXME unlikely overflow: convert batchLength from Int to Int32
storedRecord = K.runPut $ K.RecordFormat 0{- version -}

K.unsafeAlterBatchRecordsBsForProduce (+ startOffset) offsetOffsets bs

-- FIXME unlikely overflow: convert batchLength from Int to Int32
let storedRecord = K.runPut $ K.RecordFormat 0{- version -}
o
(fromIntegral batchLength)
(K.CompactBytes storedBs)
(K.CompactBytes bs)
Log.debug1 $ "Append key " <> Log.buildString' appendKey
<> ", write offset " <> Log.build o
<> ", batch length " <> Log.build batchLength
Expand Down
83 changes: 83 additions & 0 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding.hs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ module Kafka.Protocol.Encoding
, unNonNullKaArray
, kaArrayToCompact
, kaArrayFromCompact
, decodeBatchRecordsForProduce
, unsafeAlterBatchRecordsBsForProduce
-- * Internals
-- ** Parser
, Parser
Expand Down Expand Up @@ -382,6 +384,8 @@ decodeBatchRecords shouldValidateCrc batchBs =
-- convenience.
--
-- Extra information: the real total number of records.
--
-- TODO: support compression
decodeBatchRecords' :: Bool -> ByteString -> IO (Vector BatchRecord, Int)
decodeBatchRecords' shouldValidateCrc batchBs = Growing.new >>= decode 0 batchBs
where
Expand Down Expand Up @@ -767,6 +771,85 @@ kaArrayFromCompact :: CompactKaArray a -> KaArray a
kaArrayFromCompact = KaArray . unCompactKaArray
{-# INLINE kaArrayFromCompact #-}

-- Currently this is used for Produce handler.
decodeBatchRecordsForProduce :: Bool -> ByteString -> IO (Int, [Int])
decodeBatchRecordsForProduce shouldValidateCrc = decode 0 0 []
where
decode len _consumed offsetOffsets "" = pure (len, offsetOffsets)
decode !len !consumed !offsetOffsets !bs = do
(RecordBase{..}, bs') <- runGet' @RecordBase bs
case magic of
0 -> do let crc = partitionLeaderEpochOrCrc
messageSize = batchLength
when (messageSize < fromIntegral minRecordSizeV0) $
throwIO $ DecodeError $ "Invalid messageSize"
when shouldValidateCrc $ do
-- The crc field contains the CRC32 (and not CRC-32C) of the
-- subsequent message bytes (i.e. from magic byte to the value).
--
-- NOTE: pass the origin inputs to validLegacyCrc, not the bs'
validLegacyCrc (fromIntegral batchLength) crc bs
let totalSize = fromIntegral $ messageSize + 12
remainder <- snd <$> runParser' (dropBytes totalSize) bs
decode (len + 1) (consumed + totalSize)
(consumed:offsetOffsets) remainder
1 -> do let crc = partitionLeaderEpochOrCrc
messageSize = batchLength
when (messageSize < fromIntegral minRecordSizeV1) $
throwIO $ DecodeError $ "Invalid messageSize"
when shouldValidateCrc $ do
-- The crc field contains the CRC32 (and not CRC-32C) of the
-- subsequent message bytes (i.e. from magic byte to the value).
--
-- NOTE: pass the origin inputs to validLegacyCrc, not the bs'
validLegacyCrc (fromIntegral batchLength) crc bs
let totalSize = fromIntegral $ messageSize + 12
remainder <- snd <$> runParser' (dropBytes totalSize) bs
decode (len + 1) (consumed + totalSize)
(consumed:offsetOffsets) remainder
2 -> do let partitionLeaderEpoch = partitionLeaderEpochOrCrc
-- The CRC covers the data from the attributes to the end of
-- the batch (i.e. all the bytes that follow the CRC).
--
-- The CRC-32C (Castagnoli) polynomial is used for the
-- computation.
(crc, bs'') <- runGet' @Int32 bs'
when shouldValidateCrc $ do
let crcPayload = BS.take (fromIntegral batchLength - 9) bs''
when (fromIntegral (crc32c crcPayload) /= crc) $
throwIO $ DecodeError "Invalid CRC32"
(batchRecordsLen, remainder) <- runParser'
(do batchRecordsLen <- unsafePeekInt32At 36
-- 36: attributes(2) + lastOffsetDelta(4)
-- + baseTimestamp(8) + maxTimestamp(8)
-- + producerId(8) + producerEpoch(2)
-- + baseSequence(4)
dropBytes (fromIntegral batchLength - 9)
pure batchRecordsLen
) bs''
let batchRecordsLen' = if batchRecordsLen >= 0
then fromIntegral batchRecordsLen
else 0
-- Actually, there should be only one batch record here, but
-- we don't require it.
decode (len + batchRecordsLen')
(consumed + fromIntegral batchLength + 12)
(consumed:offsetOffsets)
remainder
_ -> throwIO $ DecodeError $ "Invalid magic " <> show magic
{-# INLINABLE decodeBatchRecordsForProduce #-}

unsafeAlterBatchRecordsBsForProduce
:: (Int64 -> Int64) -- Update baseOffsets
-> [Int] -- All bytes offsets of baseOffset
-> ByteString
-> IO ()
unsafeAlterBatchRecordsBsForProduce boof boos bs@(BS fp len) = do
unsafeWithForeignPtr fp $ \p -> do
forM_ boos $ \boo -> do
origin <- fromIntegral <$> peek64BE (p `plusPtr` boo)
poke64BE (p `plusPtr` boo) (fromIntegral $ boof origin)

-------------------------------------------------------------------------------
-- Internals

Expand Down
12 changes: 12 additions & 0 deletions hstream-kafka/protocol/Kafka/Protocol/Encoding/Parser.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ module Kafka.Protocol.Encoding.Parser
, takeBytes
, dropBytes
, directDropBytes
, unsafePeekInt32At
, fromIO
) where

Expand All @@ -46,6 +47,7 @@ import Data.Int
import Data.Text (Text)
import qualified Data.Text.Encoding as Text
import Data.Word
import Foreign.Ptr (plusPtr)
import GHC.Float (castWord64ToDouble)

import Kafka.Protocol.Encoding.Internal
Expand Down Expand Up @@ -292,6 +294,16 @@ getRecordString = do
then decodeUtf8 $! takeBytes n
else fail $! "Length of RecordString must be -1 " <> show n

-- | Peek a 32-bit integer at the specified offset without consuming the input.
--
-- Warning: this does not check the length of input.
unsafePeekInt32At :: Int -> Parser Int32
unsafePeekInt32At offset = Parser $ \bs next -> do
let BSI.BS fp _ = bs
r <- fromIntegral <$!> BSI.unsafeWithForeignPtr fp (peek32BE . (`plusPtr` offset))
next bs r
{-# INLINE unsafePeekInt32At #-}

-------------------------------------------------------------------------------

-- Fork from: https://github.com/Yuras/scanner
Expand Down

0 comments on commit 28f0e4b

Please sign in to comment.