Skip to content

Commit

Permalink
Kafka produce: handle DecodeError
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Apr 8, 2024
1 parent 495e9f8 commit 20759ae
Showing 1 changed file with 47 additions and 36 deletions.
83 changes: 47 additions & 36 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ module HStream.Kafka.Server.Handler.Produce
) where

import qualified Control.Concurrent.Async as Async
import Control.Exception
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
Expand Down Expand Up @@ -85,33 +86,6 @@ handleProduce ServerContext{..} _reqCtx req = do
, logStartOffset = -1
}
True -> do
-- FIXME: Block is too deep. Extract to a standalone function.
-- Wirte appends
(S.AppendCompletion{..}, offset) <-
appendRecords True scLDClient scOffsetManager
(topic.name, partition.index) logid recordBytes

Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN
<> ", start offset: " <> Log.build offset

-- TODO: performance improvements
--
-- For each append request after version 5, we need to read the oldest
-- offset of the log. This will cause critical performance problems.
--
--logStartOffset <-
-- if reqCtx.apiVersion >= 5
-- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid
-- case m_logStartOffset of
-- Just logStartOffset -> pure logStartOffset
-- Nothing -> do
-- Log.fatal $ "Cannot get log start offset for logid "
-- <> Log.build logid
-- pure (-1)
-- else pure (-1)
let logStartOffset = (-1)

-- TODO: PartitionProduceResponse.logAppendTimeMs
--
-- The timestamp returned by broker after appending the messages. If
Expand All @@ -120,13 +94,31 @@ handleProduce ServerContext{..} _reqCtx req = do
-- local time when the messages are appended.
--
-- Currently, only support LogAppendTime
pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = K.NONE
, baseOffset = offset
, logAppendTimeMs = appendCompTimestamp
, logStartOffset = logStartOffset
}
catches (do
(S.AppendCompletion{..}, offset) <-
appendRecords True scLDClient scOffsetManager
(topic.name, partition.index) logid recordBytes
Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN
<> ", start offset: " <> Log.build offset
pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = K.NONE
, baseOffset = offset
, logAppendTimeMs = appendCompTimestamp
, logStartOffset = (-1) -- TODO: getLogStartOffset
})
[ Handler (\(K.DecodeError (ec, msg))-> do
Log.debug1 $ "Append DecodeError " <> Log.buildString' ec
<> ", " <> Log.buildString' msg
pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = ec
, baseOffset = (-1)
, logAppendTimeMs = (-1)
, logStartOffset = (-1)
})
]

pure $ K.TopicProduceResponse topic.name (K.KaArray $ Just partitionResponses)

Expand Down Expand Up @@ -158,7 +150,8 @@ appendRecords
-> ByteString
-> IO (S.AppendCompletion, Int64)
appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = do
(batchLength, offsetOffsets) <- K.decodeBatchRecordsForProduce shouldValidateCrc bs
batch <- K.decodeRecordBatch shouldValidateCrc bs
let batchLength = batch.recordsCount
when (batchLength < 1) $ error "Invalid batch length"

-- Offset wroten into storage is the max key in the batch, but return the min
Expand All @@ -185,7 +178,7 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
appendKey = U.intToCBytesWithPadding o
appendAttrs = Just [(S.KeyTypeFindKey, appendKey)]

K.unsafeAlterBatchRecordsBsForProduce (+ startOffset) offsetOffsets bs
K.unsafeUpdateRecordBatchBaseOffset bs (+ startOffset)

-- FIXME unlikely overflow: convert batchLength from Int to Int32
let storedRecord = K.runPut $ K.RecordFormat 0{- version -}
Expand All @@ -204,3 +197,21 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d
M.withLabel M.topicTotalAppendMessages partLabel $ \counter ->
void $ M.addCounter counter (fromIntegral batchLength)
pure (r, startOffset)

-- TODO: performance improvements
--
-- For each append request after version 5, we need to read the oldest
-- offset of the log. This will cause critical performance problems.
--
--logStartOffset <-
-- if reqCtx.apiVersion >= 5
-- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid
-- case m_logStartOffset of
-- Just logStartOffset -> pure logStartOffset
-- Nothing -> do
-- Log.fatal $ "Cannot get log start offset for logid "
-- <> Log.build logid
-- pure (-1)
-- else pure (-1)
getLogStartOffset :: IO Int64
getLogStartOffset = pure (-1)

0 comments on commit 20759ae

Please sign in to comment.