diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 1e8d64683..3543f0f76 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -116,6 +116,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do x -> error $ "LogicError: this should not be " <> show x pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas) let resp = K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -} + Log.warning $ "Fetch request return error: " <> Log.build (show resp) -- Exit early throwIO $ K.FetchResponseEx resp @@ -332,6 +333,7 @@ getPartitionLsn -> Int64 -- ^ kafka start offset -> IO LsnData getPartitionLsn ldclient om logid partition offset = do + Log.debug $ "Request partition LSN for logid: " <> Log.build logid <> ", partition: " <> Log.build partition <> ", offset: " <> Log.build offset m <- K.getLatestOffsetWithLsn om logid case m of Just (latestOffset, tailLsn) -> do @@ -341,14 +343,16 @@ getPartitionLsn ldclient om logid partition offset = do Log.debug1 $ "Try findKey " <> Log.buildString' key <> " in logid " <> Log.build logid (_, startLsn) <- S.findKey ldclient logid key S.FindKeyStrict - Log.debug1 $ "FindKey result " <> Log.build logid <> ": " - <> Log.build startLsn + Log.debug1 $ "FindKey for logid " <> Log.build logid <> " get result " <> Log.build startLsn pure $ LsnData startLsn tailLsn highwaterOffset - | offset == latestOffset -> + | offset == latestOffset -> do + Log.debug1 $ "offset == latestOffset" pure $ LsnData tailLsn tailLsn highwaterOffset - | offset == highwaterOffset -> + | offset == highwaterOffset -> do + Log.debug1 $ "offset == highwaterOffset" pure $ LsnData (tailLsn + 1) tailLsn highwaterOffset - | offset > highwaterOffset -> + | offset > highwaterOffset -> do + Log.debug1 $ "offset > highwaterOffset" pure $ ErrPartitionData $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE -- ghc is not smart enough to detact my partten matching is complete | otherwise -> error "This should not be reached (getPartitionLsn)" @@ -389,8 +393,9 @@ readMode1 r storageOpts reader = do if r.minBytes <= 0 || r.maxWaitMs <= 0 -- respond immediately then do - Log.debug1 $ "Set reader to nonblocking" - S.readerSetTimeout reader 0 -- nonblocking + Log.debug1 $ "Set reader wait only when no data and timeout to 10ms" + S.readerSetWaitOnlyWhenNoData reader + S.readerSetTimeout reader 10 insertRecords recordTable -- For non-empty results =<< S.readerReadSome reader storageOpts.fetchMaxLen 10{-retries-}