Skip to content

Commit

Permalink
fix(kafka): set 'readerSetWaitOnlyWhenNoData' instead of set reader t…
Browse files Browse the repository at this point in the history
…imeout to 0
  • Loading branch information
YangKian committed Apr 9, 2024
1 parent a3ee52e commit e015d28
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
x -> error $ "LogicError: this should not be " <> show x
pure $ K.FetchableTopicResponse topic (K.NonNullKaArray respPartitionDatas)
let resp = K.FetchResponse (K.NonNullKaArray respTopics) 0{- TODO: throttleTimeMs -}
Log.warning $ "Fetch request return error: " <> Log.build (show resp)
-- Exit early
throwIO $ K.FetchResponseEx resp

Expand Down Expand Up @@ -332,6 +333,7 @@ getPartitionLsn
-> Int64 -- ^ kafka start offset
-> IO LsnData
getPartitionLsn ldclient om logid partition offset = do
Log.debug $ "Request partition LSN for logid: " <> Log.build logid <> ", partition: " <> Log.build partition <> ", offset: " <> Log.build offset
m <- K.getLatestOffsetWithLsn om logid
case m of
Just (latestOffset, tailLsn) -> do
Expand All @@ -341,14 +343,16 @@ getPartitionLsn ldclient om logid partition offset = do
Log.debug1 $ "Try findKey " <> Log.buildString' key <> " in logid "
<> Log.build logid
(_, startLsn) <- S.findKey ldclient logid key S.FindKeyStrict
Log.debug1 $ "FindKey result " <> Log.build logid <> ": "
<> Log.build startLsn
Log.debug1 $ "FindKey for logid " <> Log.build logid <> " get result " <> Log.build startLsn
pure $ LsnData startLsn tailLsn highwaterOffset
| offset == latestOffset ->
| offset == latestOffset -> do
Log.debug1 $ "offset == latestOffset"
pure $ LsnData tailLsn tailLsn highwaterOffset
| offset == highwaterOffset ->
| offset == highwaterOffset -> do
Log.debug1 $ "offset == highwaterOffset"
pure $ LsnData (tailLsn + 1) tailLsn highwaterOffset
| offset > highwaterOffset ->
| offset > highwaterOffset -> do
Log.debug1 $ "offset > highwaterOffset"
pure $ ErrPartitionData $ errorPartitionResponse partition K.OFFSET_OUT_OF_RANGE
-- ghc is not smart enough to detact my partten matching is complete
| otherwise -> error "This should not be reached (getPartitionLsn)"
Expand Down Expand Up @@ -389,8 +393,9 @@ readMode1 r storageOpts reader = do

if r.minBytes <= 0 || r.maxWaitMs <= 0 -- respond immediately
then do
Log.debug1 $ "Set reader to nonblocking"
S.readerSetTimeout reader 0 -- nonblocking
Log.debug1 $ "Set reader wait only when no data and timeout to 10ms"
S.readerSetWaitOnlyWhenNoData reader
S.readerSetTimeout reader 10
insertRecords recordTable
-- For non-empty results
=<< S.readerReadSome reader storageOpts.fetchMaxLen 10{-retries-}
Expand Down

0 comments on commit e015d28

Please sign in to comment.