Skip to content

Commit

Permalink
Kafka FetchRequest: proper calculation of minBytes and maxBytes
Browse files Browse the repository at this point in the history
  • Loading branch information
4eUeP committed May 8, 2024
1 parent e7632ff commit e8073da
Showing 1 changed file with 46 additions and 38 deletions.
84 changes: 46 additions & 38 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,39 @@ module HStream.Kafka.Server.Handler.Consume

import Control.Exception
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as BB
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as BB
import Data.Int
import Data.Maybe
import Data.Text (Text)
import qualified Data.Text as T
import Data.Vector (Vector)
import qualified Data.Vector as V
import qualified Data.Vector.Hashtables as HT
import qualified Data.Vector.Storable as VS
import Data.Text (Text)
import qualified Data.Text as T
import Data.Vector (Vector)
import qualified Data.Vector as V
import qualified Data.Vector.Hashtables as HT
import qualified Data.Vector.Storable as VS
import GHC.Data.FastMutInt
import GHC.Stack (HasCallStack)

import qualified HStream.Base.Growing as GV
import qualified HStream.Kafka.Common.Acl as K
import qualified HStream.Kafka.Common.Authorizer.Class as K
import qualified HStream.Kafka.Common.FetchManager as K
import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import qualified HStream.Kafka.Common.Resource as K
import HStream.Kafka.Server.Config (ServerOpts (..),
StorageOptions (..))
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import GHC.Stack (HasCallStack)

import qualified HStream.Base.Growing as GV
import qualified HStream.Kafka.Common.Acl as K
import qualified HStream.Kafka.Common.Authorizer.Class as K
import qualified HStream.Kafka.Common.FetchManager as K
import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import qualified HStream.Kafka.Common.Resource as K
import HStream.Kafka.Server.Config (ServerOpts (..),
StorageOptions (..))
import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -74,15 +75,14 @@ isErrPartitionData _ = False
data Partition = Partition
{ logid :: {-# UNPACK #-} !S.C_LogID
, elsn :: !LsnData
, request :: !K.FetchPartition
, request :: !K.FetchPartition -- ^ Original request
} deriving (Show)

data ReFetchRequest = ReFetchRequest
{ topics :: !(Vector (Text, Vector Partition))
-- Original request
, minBytes :: !Int32
, maxWaitMs :: !Int32
, maxBytes :: !Int32
, maxWaitMs :: !Int32
-- Helpful attrs
, contFetch :: !Bool
, totalReads :: !Int
Expand Down Expand Up @@ -315,11 +315,19 @@ preProcessRequest ServerContext{..} reqCtx r = do
-- TODO PERF: We can bybass loop all topics(using a global mutAllError).
-- However, this will make the code more complex.
let doesAllError = all (all (isErrPartitionData . (.elsn)) . snd)
-- Kafka: fetchMaxBytes = Math.min(
-- Math.min(fetchRequest.maxBytes, config.fetchMaxBytes),
-- maxQuotaWindowBytes)
let fetchMaxBytes = min r.maxBytes (fromIntegral kafkaBrokerConfigs.fetchMaxBytes._value)
Log.debug1 $ "Received fetchMaxBytes " <> Log.build fetchMaxBytes
-- Kafka: fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
let fetchMinBytes = min r.minBytes fetchMaxBytes
Log.debug1 $ "Received fetchMinBytes " <> Log.build fetchMinBytes
if contFetch == 0
then do
pure $ ReFetchRequest{ topics = topics
, minBytes = r.minBytes
, maxBytes = r.maxBytes
, minBytes = fetchMinBytes
, maxBytes = fetchMaxBytes
, maxWaitMs = r.maxWaitMs
, contFetch = False
, totalReads = numOfReads
Expand All @@ -329,8 +337,8 @@ preProcessRequest ServerContext{..} reqCtx r = do
if numOfReads == cacheNumOfReads
then
pure $ ReFetchRequest{ topics = topics
, minBytes = r.minBytes
, maxBytes = r.maxBytes
, minBytes = fetchMinBytes
, maxBytes = fetchMaxBytes
, maxWaitMs = r.maxWaitMs
, contFetch = True
, totalReads = numOfReads
Expand All @@ -348,8 +356,8 @@ preProcessRequest ServerContext{..} reqCtx r = do
_ -> pure p
pure (tn, ps')
pure $ ReFetchRequest{ topics = ts
, minBytes = r.minBytes
, maxBytes = r.maxBytes
, minBytes = fetchMinBytes
, maxBytes = fetchMaxBytes
, maxWaitMs = r.maxWaitMs
, contFetch = False
, totalReads = numOfReads
Expand Down

0 comments on commit e8073da

Please sign in to comment.