diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs index 62623257e..d93920ecb 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs @@ -6,38 +6,39 @@ module HStream.Kafka.Server.Handler.Consume import Control.Exception import Control.Monad -import Data.ByteString (ByteString) -import qualified Data.ByteString as BS -import qualified Data.ByteString.Builder as BB +import Data.ByteString (ByteString) +import qualified Data.ByteString as BS +import qualified Data.ByteString.Builder as BB import Data.Int import Data.Maybe -import Data.Text (Text) -import qualified Data.Text as T -import Data.Vector (Vector) -import qualified Data.Vector as V -import qualified Data.Vector.Hashtables as HT -import qualified Data.Vector.Storable as VS +import Data.Text (Text) +import qualified Data.Text as T +import Data.Vector (Vector) +import qualified Data.Vector as V +import qualified Data.Vector.Hashtables as HT +import qualified Data.Vector.Storable as VS import GHC.Data.FastMutInt -import GHC.Stack (HasCallStack) - -import qualified HStream.Base.Growing as GV -import qualified HStream.Kafka.Common.Acl as K -import qualified HStream.Kafka.Common.Authorizer.Class as K -import qualified HStream.Kafka.Common.FetchManager as K -import qualified HStream.Kafka.Common.Metrics as M -import qualified HStream.Kafka.Common.OffsetManager as K -import qualified HStream.Kafka.Common.RecordFormat as K -import qualified HStream.Kafka.Common.Resource as K -import HStream.Kafka.Server.Config (ServerOpts (..), - StorageOptions (..)) -import HStream.Kafka.Server.Types (ServerContext (..)) -import qualified HStream.Logger as Log -import qualified HStream.Store as S -import qualified HStream.Utils as U -import qualified Kafka.Protocol.Encoding as K -import qualified Kafka.Protocol.Error as K -import qualified Kafka.Protocol.Message as K -import qualified Kafka.Protocol.Service as K +import GHC.Stack (HasCallStack) + +import qualified HStream.Base.Growing as GV +import qualified HStream.Kafka.Common.Acl as K +import qualified HStream.Kafka.Common.Authorizer.Class as K +import qualified HStream.Kafka.Common.FetchManager as K +import qualified HStream.Kafka.Common.Metrics as M +import qualified HStream.Kafka.Common.OffsetManager as K +import qualified HStream.Kafka.Common.RecordFormat as K +import qualified HStream.Kafka.Common.Resource as K +import HStream.Kafka.Server.Config (ServerOpts (..), + StorageOptions (..)) +import qualified HStream.Kafka.Server.Config.KafkaConfig as KC +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Logger as Log +import qualified HStream.Store as S +import qualified HStream.Utils as U +import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Message as K +import qualified Kafka.Protocol.Service as K ------------------------------------------------------------------------------- @@ -74,15 +75,14 @@ isErrPartitionData _ = False data Partition = Partition { logid :: {-# UNPACK #-} !S.C_LogID , elsn :: !LsnData - , request :: !K.FetchPartition + , request :: !K.FetchPartition -- ^ Original request } deriving (Show) data ReFetchRequest = ReFetchRequest { topics :: !(Vector (Text, Vector Partition)) - -- Original request , minBytes :: !Int32 - , maxWaitMs :: !Int32 , maxBytes :: !Int32 + , maxWaitMs :: !Int32 -- Helpful attrs , contFetch :: !Bool , totalReads :: !Int @@ -315,11 +315,19 @@ preProcessRequest ServerContext{..} reqCtx r = do -- TODO PERF: We can bybass loop all topics(using a global mutAllError). -- However, this will make the code more complex. let doesAllError = all (all (isErrPartitionData . (.elsn)) . snd) + -- Kafka: fetchMaxBytes = Math.min( + -- Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), + -- maxQuotaWindowBytes) + let fetchMaxBytes = min r.maxBytes (fromIntegral kafkaBrokerConfigs.fetchMaxBytes._value) + Log.debug1 $ "Received fetchMaxBytes " <> Log.build fetchMaxBytes + -- Kafka: fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes) + let fetchMinBytes = min r.minBytes fetchMaxBytes + Log.debug1 $ "Received fetchMinBytes " <> Log.build fetchMinBytes if contFetch == 0 then do pure $ ReFetchRequest{ topics = topics - , minBytes = r.minBytes - , maxBytes = r.maxBytes + , minBytes = fetchMinBytes + , maxBytes = fetchMaxBytes , maxWaitMs = r.maxWaitMs , contFetch = False , totalReads = numOfReads @@ -329,8 +337,8 @@ preProcessRequest ServerContext{..} reqCtx r = do if numOfReads == cacheNumOfReads then pure $ ReFetchRequest{ topics = topics - , minBytes = r.minBytes - , maxBytes = r.maxBytes + , minBytes = fetchMinBytes + , maxBytes = fetchMaxBytes , maxWaitMs = r.maxWaitMs , contFetch = True , totalReads = numOfReads @@ -348,8 +356,8 @@ preProcessRequest ServerContext{..} reqCtx r = do _ -> pure p pure (tn, ps') pure $ ReFetchRequest{ topics = ts - , minBytes = r.minBytes - , maxBytes = r.maxBytes + , minBytes = fetchMinBytes + , maxBytes = fetchMaxBytes , maxWaitMs = r.maxWaitMs , contFetch = False , totalReads = numOfReads