diff --git a/hstream-kafka/HStream/Kafka/Network.hs b/hstream-kafka/HStream/Kafka/Network.hs index 7f6098322..106407b1b 100644 --- a/hstream-kafka/HStream/Kafka/Network.hs +++ b/hstream-kafka/HStream/Kafka/Network.hs @@ -225,6 +225,7 @@ runCppServer opts sc_ mkAuthedHandlers = newConnectionContext conn_ctx_ptr = do -- Cpp per-connection context conn <- peek conn_ctx_ptr + Log.debug1 $ "New client in: " <> Log.buildString' conn.peerHost -- Haskell per-connection context sc <- initConnectionContext sc_ newStablePtr (sc, conn) -- Freed by C++ code diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 1e8d64683..d2538be4e 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -55,9 +55,9 @@ data LsnData -- -- NOTE: tailLsn is LSN_INVALID if the partition is empty | ContReading (Vector K.RecordFormat) Int64 - -- ^ Continue reading, do not need to start reading + -- ^ (remRecords, highwaterOffset) -- - -- (remRecords, highwaterOffset) + -- Continue reading, do not need to start reading | ErrPartitionData K.PartitionData -- ^ Error partition response deriving (Show) @@ -162,7 +162,17 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do Right hioffset -> do mgv <- HT.lookup readRecords partition.logid case mgv of - Nothing -> + Nothing -> do + -- Cache the context. + -- + -- It's safe to set the remRecords to empty, because "mgv" is + -- Nothing, which means no remaining records in the table. + K.setFetchLogCtx + fetchCtx + partition.logid + K.FetchLogContext{ nextOffset = request.fetchOffset + , remRecords = V.empty + } pure $ K.PartitionData { partitionIndex = request.partition , errorCode = K.NONE @@ -180,6 +190,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do -- TODO PERF else (remv <>) <$> GV.unsafeFreeze gv (bs, m_offset, tokenIdx) <- encodePartition mutMaxBytes mutIsFirstPartition request v + -- Cache the context K.setFetchLogCtx fetchCtx partition.logid @@ -256,12 +267,14 @@ preProcessRequest ServerContext{..} reqCtx r = do m_logCtx <- K.getFetchLogCtx fetchCtx logid case m_logCtx of Nothing -> do -- Cache miss + Log.debug1 $ "ContFetch: cache miss" writeFastMutInt mutContFetch 0 getPartitionLsn scLDClient scOffsetManager logid p.partition p.fetchOffset Just logCtx -> - if (logCtx.nextOffset /= p.fetchOffset) + if (logCtx.nextOffset /= p.fetchOffset) -- Cache hit but not match then do + Log.debug1 $ "ContFetch: cache hit but not match" writeFastMutInt mutContFetch 0 getPartitionLsn scLDClient scOffsetManager logid p.partition p.fetchOffset @@ -269,14 +282,18 @@ preProcessRequest ServerContext{..} reqCtx r = do m <- K.getLatestOffsetWithLsn scOffsetManager logid case m of Just (latestOffset, _tailLsn) -> do + Log.debug1 $ "ContFetch: Continue reading" let highwaterOffset = latestOffset + 1 pure $ ContReading logCtx.remRecords highwaterOffset Nothing -> do - Log.debug $ "Continue reading, but logid " + Log.debug1 $ "ContFetch: Continue reading, but logid " <> Log.build logid <> " is empty" - pure $ ErrPartitionData $ - errorPartitionResponse p.partition - K.OFFSET_OUT_OF_RANGE + -- We can quick return here, because the partition is empty + if p.fetchOffset == 0 + then pure $ ErrPartitionData $ + partitionResponse0 p.partition K.NONE 0 + else pure $ ErrPartitionData $ + errorPartitionResponse p.partition K.OFFSET_OUT_OF_RANGE pure $ Partition logid elsn p pure (t.topic, ps) contFetch <- readFastMutInt mutContFetch @@ -549,6 +566,19 @@ errorPartitionResponse partitionIndex ec = K.PartitionData } {-# INLINE errorPartitionResponse #-} +partitionResponse0 :: Int32 -> K.ErrorCode -> Int64 -> K.PartitionData +partitionResponse0 partitionIndex ec hw = K.PartitionData + { partitionIndex = partitionIndex + , errorCode = ec + , highWatermark = hw + , recordBytes = (Just "") + , lastStableOffset = (-1) -- TODO + , abortedTransactions = K.NonNullKaArray V.empty -- TODO + -- TODO: for performance reason, we don't implement logStartOffset now + , logStartOffset = (-1) + } +{-# INLINE partitionResponse0 #-} + ------------------------------------------------------------------------------- -- NOTE: condition is True -> continue; False -> break