Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka: ensure Fetch returns some data even when minBytes is set to 0 #1781

Merged
merged 2 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
throwIO $ K.FetchResponseEx resp

-- Client request to new reading
when (not r.contFetch) $ do
unless r.contFetch $ do
-- Clear the context
K.clearFetchLogCtx fetchCtx
-- Start reading
Expand Down Expand Up @@ -283,7 +283,7 @@ preProcessRequest ServerContext{..} reqCtx r = do
numOfReads <- readFastMutInt mutNumOfReads
-- TODO PERF: We can bybass loop all topics(using a global mutAllError).
-- However, this will make the code more complex.
let doesAllError ts = all (all (isErrPartitionData . (.elsn)) . snd) ts
let doesAllError = all (all (isErrPartitionData . (.elsn)) . snd)
if contFetch == 0
then do
pure $ ReFetchRequest{ topics = topics
Expand Down Expand Up @@ -375,7 +375,7 @@ readMode1 r storageOpts reader = do
V.map (BS.length . K.unCompactBytes . (.recordBytes)) remRecords
-- [TAG_NEV]: Make sure do not insert empty vector to the table,
-- since we will assume the vector is non-empty in `encodePartition`
when (not $ V.null remRecords) $
unless (V.null remRecords) $
insertRemRecords recordTable p.logid remRecords
x -> Log.fatal $
"LogicError: this should not be reached, " <> Log.buildString' x
Expand All @@ -388,12 +388,16 @@ readMode1 r storageOpts reader = do
let defTimeout = fromIntegral storageOpts.fetchReaderTimeout

if r.minBytes <= 0 || r.maxWaitMs <= 0 -- respond immediately
then do S.readerSetTimeout reader 0 -- nonblocking
insertRecords recordTable
=<< S.readerRead reader storageOpts.fetchMaxLen
then do
Log.debug1 $ "Set reader to nonblocking"
S.readerSetTimeout reader 0 -- nonblocking
insertRecords recordTable
-- For non-empty results
=<< S.readerReadSome reader storageOpts.fetchMaxLen 10{-retries-}
else
if r.maxWaitMs > defTimeout
then do
Log.debug1 $ "Set reader timeout to " <> Log.build defTimeout
S.readerSetTimeout reader defTimeout
rs1 <- M.observeDuration M.topicReadStoreLatency $
S.readerRead reader storageOpts.fetchMaxLen
Expand All @@ -407,6 +411,7 @@ readMode1 r storageOpts reader = do
S.readerRead reader storageOpts.fetchMaxLen
insertRecords recordTable rs2
else do
Log.debug1 $ "Set reader timeout to " <> Log.build r.maxWaitMs
S.readerSetTimeout reader r.maxWaitMs
rs <- M.observeDuration M.topicReadStoreLatency $
S.readerRead reader storageOpts.fetchMaxLen
Expand Down
26 changes: 26 additions & 0 deletions hstream-store/HStream/Store/Internal/LogDevice/Reader.hs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,32 @@ ckpReaderRead reader maxlen =
Right rs -> return rs
Left _ -> go rp pp

readerReadSome :: DataRecordFormat a => LDReader -> Int -> Int -> IO [DataRecord a]
readerReadSome reader maxlen maxretries =
withForeignPtr reader $ \reader' ->
allocaBytes (maxlen * dataRecordSize) $ go reader' maxretries
where
go _ 0 _ = return []
go !rp !retries !pp = do
m_records <- tryReaderRead' rp nullPtr pp nullPtr maxlen
case m_records of
Right [] -> go rp (retries - 1) pp
Right rs -> return rs
Left _ -> go rp (retries - 1) pp

ckpReaderReadSome :: DataRecordFormat a => LDSyncCkpReader -> Int -> Int -> IO [DataRecord a]
ckpReaderReadSome reader maxlen maxretries =
withForeignPtr reader $ \reader' ->
allocaBytes (maxlen * dataRecordSize) $ go reader' maxretries
where
go _ 0 _ = return []
go !rp !retries !pp = do
m_records <- tryReaderRead' nullPtr rp pp nullPtr maxlen
case m_records of
Right [] -> go rp (retries - 1) pp
Right rs -> return rs
Left _ -> go rp (retries - 1) pp

-- | Attempts to read a batch of records.
--
-- The call either delivers 0 or more (up to `maxlen`) data records, or
Expand Down
2 changes: 2 additions & 0 deletions hstream-store/HStream/Store/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ module HStream.Store.Stream
, LD.readerStartReading
, LD.DataRecordFormat
, LD.readerRead
, LD.readerReadSome
, LD.readerReadAllowGap
, LD.readerSetTimeout
, LD.readerSetWithoutPayload
Expand All @@ -142,6 +143,7 @@ module HStream.Store.Stream
, LD.startReadingFromCheckpoint
, LD.startReadingFromCheckpointOrStart
, LD.ckpReaderRead
, LD.ckpReaderReadSome
, LD.ckpReaderReadAllowGap
, LD.ckpReaderSetTimeout
, LD.ckpReaderSetWithoutPayload
Expand Down
Loading