diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index ad19d2b57..23ef1045f 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -4,6 +4,7 @@ module HStream.Kafka.Server.Handler.Produce ) where import qualified Control.Concurrent.Async as Async +import Control.Exception import Control.Monad import Data.ByteString (ByteString) import qualified Data.ByteString as BS @@ -85,33 +86,6 @@ handleProduce ServerContext{..} _reqCtx req = do , logStartOffset = -1 } True -> do - -- FIXME: Block is too deep. Extract to a standalone function. - -- Wirte appends - (S.AppendCompletion{..}, offset) <- - appendRecords True scLDClient scOffsetManager - (topic.name, partition.index) logid recordBytes - - Log.debug1 $ "Append done " <> Log.build appendCompLogID - <> ", lsn: " <> Log.build appendCompLSN - <> ", start offset: " <> Log.build offset - - -- TODO: performance improvements - -- - -- For each append request after version 5, we need to read the oldest - -- offset of the log. This will cause critical performance problems. - -- - --logStartOffset <- - -- if reqCtx.apiVersion >= 5 - -- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid - -- case m_logStartOffset of - -- Just logStartOffset -> pure logStartOffset - -- Nothing -> do - -- Log.fatal $ "Cannot get log start offset for logid " - -- <> Log.build logid - -- pure (-1) - -- else pure (-1) - let logStartOffset = (-1) - -- TODO: PartitionProduceResponse.logAppendTimeMs -- -- The timestamp returned by broker after appending the messages. If @@ -120,13 +94,31 @@ handleProduce ServerContext{..} _reqCtx req = do -- local time when the messages are appended. -- -- Currently, only support LogAppendTime - pure $ K.PartitionProduceResponse - { index = partition.index - , errorCode = K.NONE - , baseOffset = offset - , logAppendTimeMs = appendCompTimestamp - , logStartOffset = logStartOffset - } + catches (do + (S.AppendCompletion{..}, offset) <- + appendRecords True scLDClient scOffsetManager + (topic.name, partition.index) logid recordBytes + Log.debug1 $ "Append done " <> Log.build appendCompLogID + <> ", lsn: " <> Log.build appendCompLSN + <> ", start offset: " <> Log.build offset + pure $ K.PartitionProduceResponse + { index = partition.index + , errorCode = K.NONE + , baseOffset = offset + , logAppendTimeMs = appendCompTimestamp + , logStartOffset = (-1) -- TODO: getLogStartOffset + }) + [ Handler (\(K.DecodeError (ec, msg))-> do + Log.debug1 $ "Append DecodeError " <> Log.buildString' ec + <> ", " <> Log.buildString' msg + pure $ K.PartitionProduceResponse + { index = partition.index + , errorCode = ec + , baseOffset = (-1) + , logAppendTimeMs = (-1) + , logStartOffset = (-1) + }) + ] pure $ K.TopicProduceResponse topic.name (K.KaArray $ Just partitionResponses) @@ -158,7 +150,8 @@ appendRecords -> ByteString -> IO (S.AppendCompletion, Int64) appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = do - (batchLength, offsetOffsets) <- K.decodeBatchRecordsForProduce shouldValidateCrc bs + batch <- K.decodeRecordBatch shouldValidateCrc bs + let batchLength = batch.recordsCount when (batchLength < 1) $ error "Invalid batch length" -- Offset wroten into storage is the max key in the batch, but return the min @@ -185,7 +178,7 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d appendKey = U.intToCBytesWithPadding o appendAttrs = Just [(S.KeyTypeFindKey, appendKey)] - K.unsafeAlterBatchRecordsBsForProduce (+ startOffset) offsetOffsets bs + K.unsafeUpdateRecordBatchBaseOffset bs (+ startOffset) -- FIXME unlikely overflow: convert batchLength from Int to Int32 let storedRecord = K.runPut $ K.RecordFormat 0{- version -} @@ -204,3 +197,21 @@ appendRecords shouldValidateCrc ldclient om (streamName, partition) logid bs = d M.withLabel M.topicTotalAppendMessages partLabel $ \counter -> void $ M.addCounter counter (fromIntegral batchLength) pure (r, startOffset) + +-- TODO: performance improvements +-- +-- For each append request after version 5, we need to read the oldest +-- offset of the log. This will cause critical performance problems. +-- +--logStartOffset <- +-- if reqCtx.apiVersion >= 5 +-- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid +-- case m_logStartOffset of +-- Just logStartOffset -> pure logStartOffset +-- Nothing -> do +-- Log.fatal $ "Cannot get log start offset for logid " +-- <> Log.build logid +-- pure (-1) +-- else pure (-1) +getLogStartOffset :: IO Int64 +getLogStartOffset = pure (-1)