Skip to content

Commit

Permalink
kafka fetch: fix the bug of re-startReading while the reader response…
Browse files Browse the repository at this point in the history
… is empty (#1794)
  • Loading branch information
4eUeP authored Apr 19, 2024
1 parent 093b7f3 commit 405cc6a
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 8 deletions.
1 change: 1 addition & 0 deletions hstream-kafka/HStream/Kafka/Network.hs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ runCppServer opts sc_ mkAuthedHandlers =
newConnectionContext conn_ctx_ptr = do
-- Cpp per-connection context
conn <- peek conn_ctx_ptr
Log.debug1 $ "New client in: " <> Log.buildString' conn.peerHost
-- Haskell per-connection context
sc <- initConnectionContext sc_
newStablePtr (sc, conn) -- Freed by C++ code
Expand Down
46 changes: 38 additions & 8 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ data LsnData
--
-- NOTE: tailLsn is LSN_INVALID if the partition is empty
| ContReading (Vector K.RecordFormat) Int64
-- ^ Continue reading, do not need to start reading
-- ^ (remRecords, highwaterOffset)
--
-- (remRecords, highwaterOffset)
-- Continue reading, do not need to start reading
| ErrPartitionData K.PartitionData
-- ^ Error partition response
deriving (Show)
Expand Down Expand Up @@ -162,7 +162,17 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
Right hioffset -> do
mgv <- HT.lookup readRecords partition.logid
case mgv of
Nothing ->
Nothing -> do
-- Cache the context.
--
-- It's safe to set the remRecords to empty, because "mgv" is
-- Nothing, which means no remaining records in the table.
K.setFetchLogCtx
fetchCtx
partition.logid
K.FetchLogContext{ nextOffset = request.fetchOffset
, remRecords = V.empty
}
pure $ K.PartitionData
{ partitionIndex = request.partition
, errorCode = K.NONE
Expand All @@ -180,6 +190,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
-- TODO PERF
else (remv <>) <$> GV.unsafeFreeze gv
(bs, m_offset, tokenIdx) <- encodePartition mutMaxBytes mutIsFirstPartition request v
-- Cache the context
K.setFetchLogCtx
fetchCtx
partition.logid
Expand Down Expand Up @@ -256,27 +267,33 @@ preProcessRequest ServerContext{..} reqCtx r = do
m_logCtx <- K.getFetchLogCtx fetchCtx logid
case m_logCtx of
Nothing -> do -- Cache miss
Log.debug1 $ "ContFetch: cache miss"
writeFastMutInt mutContFetch 0
getPartitionLsn scLDClient scOffsetManager
logid p.partition p.fetchOffset
Just logCtx ->
if (logCtx.nextOffset /= p.fetchOffset)
if (logCtx.nextOffset /= p.fetchOffset) -- Cache hit but not match
then do
Log.debug1 $ "ContFetch: cache hit but not match"
writeFastMutInt mutContFetch 0
getPartitionLsn scLDClient scOffsetManager logid p.partition
p.fetchOffset
else do
m <- K.getLatestOffsetWithLsn scOffsetManager logid
case m of
Just (latestOffset, _tailLsn) -> do
Log.debug1 $ "ContFetch: Continue reading"
let highwaterOffset = latestOffset + 1
pure $ ContReading logCtx.remRecords highwaterOffset
Nothing -> do
Log.debug $ "Continue reading, but logid "
Log.debug1 $ "ContFetch: Continue reading, but logid "
<> Log.build logid <> " is empty"
pure $ ErrPartitionData $
errorPartitionResponse p.partition
K.OFFSET_OUT_OF_RANGE
-- We can quick return here, because the partition is empty
if p.fetchOffset == 0
then pure $ ErrPartitionData $
partitionResponse0 p.partition K.NONE 0
else pure $ ErrPartitionData $
errorPartitionResponse p.partition K.OFFSET_OUT_OF_RANGE
pure $ Partition logid elsn p
pure (t.topic, ps)
contFetch <- readFastMutInt mutContFetch
Expand Down Expand Up @@ -549,6 +566,19 @@ errorPartitionResponse partitionIndex ec = K.PartitionData
}
{-# INLINE errorPartitionResponse #-}

partitionResponse0 :: Int32 -> K.ErrorCode -> Int64 -> K.PartitionData
partitionResponse0 partitionIndex ec hw = K.PartitionData
{ partitionIndex = partitionIndex
, errorCode = ec
, highWatermark = hw
, recordBytes = (Just "")
, lastStableOffset = (-1) -- TODO
, abortedTransactions = K.NonNullKaArray V.empty -- TODO
-- TODO: for performance reason, we don't implement logStartOffset now
, logStartOffset = (-1)
}
{-# INLINE partitionResponse0 #-}

-------------------------------------------------------------------------------

-- NOTE: condition is True -> continue; False -> break
Expand Down

0 comments on commit 405cc6a

Please sign in to comment.