Skip to content

Commit

Permalink
ensure Fetch returns some data even when minBytes is set to 0
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed Mar 22, 2024
1 parent 3c025d2 commit 7cb3934
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
throwIO $ K.FetchResponseEx resp

-- Client request to new reading
when (not r.contFetch) $ do
unless r.contFetch $ do
-- Clear the context
K.clearFetchLogCtx fetchCtx
-- Start reading
Expand Down Expand Up @@ -283,7 +283,7 @@ preProcessRequest ServerContext{..} reqCtx r = do
numOfReads <- readFastMutInt mutNumOfReads
-- TODO PERF: We can bybass loop all topics(using a global mutAllError).
-- However, this will make the code more complex.
let doesAllError ts = all (all (isErrPartitionData . (.elsn)) . snd) ts
let doesAllError = all (all (isErrPartitionData . (.elsn)) . snd)
if contFetch == 0
then do
pure $ ReFetchRequest{ topics = topics
Expand Down Expand Up @@ -375,7 +375,7 @@ readMode1 r storageOpts reader = do
V.map (BS.length . K.unCompactBytes . (.recordBytes)) remRecords
-- [TAG_NEV]: Make sure do not insert empty vector to the table,
-- since we will assume the vector is non-empty in `encodePartition`
when (not $ V.null remRecords) $
unless (V.null remRecords) $
insertRemRecords recordTable p.logid remRecords
x -> Log.fatal $
"LogicError: this should not be reached, " <> Log.buildString' x
Expand All @@ -388,12 +388,16 @@ readMode1 r storageOpts reader = do
let defTimeout = fromIntegral storageOpts.fetchReaderTimeout

if r.minBytes <= 0 || r.maxWaitMs <= 0 -- respond immediately
then do S.readerSetTimeout reader 0 -- nonblocking
insertRecords recordTable
=<< S.readerRead reader storageOpts.fetchMaxLen
then do
Log.debug1 $ "Set reader to nonblocking"
S.readerSetTimeout reader 0 -- nonblocking
insertRecords recordTable
-- For non-empty results
=<< S.readerReadSome reader storageOpts.fetchMaxLen 10{-retries-}
else
if r.maxWaitMs > defTimeout
then do
Log.debug1 $ "Set reader timeout to " <> Log.build defTimeout
S.readerSetTimeout reader defTimeout
rs1 <- M.observeDuration M.topicReadStoreLatency $
S.readerRead reader storageOpts.fetchMaxLen
Expand All @@ -407,6 +411,7 @@ readMode1 r storageOpts reader = do
S.readerRead reader storageOpts.fetchMaxLen
insertRecords recordTable rs2
else do
Log.debug1 $ "Set reader timeout to " <> Log.build r.maxWaitMs
S.readerSetTimeout reader r.maxWaitMs
rs <- M.observeDuration M.topicReadStoreLatency $
S.readerRead reader storageOpts.fetchMaxLen
Expand Down

0 comments on commit 7cb3934

Please sign in to comment.