Skip to content

Commit

Permalink
kafka: do authorization on fetch (1)
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Mar 11, 2024
1 parent 678737a commit 6558476
Showing 1 changed file with 84 additions and 71 deletions.
155 changes: 84 additions & 71 deletions hstream-kafka/HStream/Kafka/Server/Handler/Consume.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,38 @@ module HStream.Kafka.Server.Handler.Consume

import Control.Exception
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as BB
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Builder as BB
import Data.Int
import Data.Maybe
import Data.Text (Text)
import qualified Data.Text as T
import Data.Vector (Vector)
import qualified Data.Vector as V
import qualified Data.Vector.Hashtables as HT
import qualified Data.Vector.Storable as VS
import Data.Text (Text)
import qualified Data.Text as T
import Data.Vector (Vector)
import qualified Data.Vector as V
import qualified Data.Vector.Hashtables as HT
import qualified Data.Vector.Storable as VS
import GHC.Data.FastMutInt
import GHC.Stack (HasCallStack)

import qualified HStream.Base.Growing as GV
import qualified HStream.Kafka.Common.FetchManager as K
import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Server.Config (ServerOpts (..),
StorageOptions (..))
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import GHC.Stack (HasCallStack)

import qualified HStream.Base.Growing as GV
import HStream.Kafka.Common.Acl
import HStream.Kafka.Common.Authorizer.Class
import qualified HStream.Kafka.Common.FetchManager as K
import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Common.Resource
import HStream.Kafka.Server.Config (ServerOpts (..),
StorageOptions (..))
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

-------------------------------------------------------------------------------

Expand Down Expand Up @@ -95,14 +98,14 @@ handleFetch
:: HasCallStack
=> ServerContext
-> K.RequestContext -> K.FetchRequest -> IO K.FetchResponse
handleFetch sc@ServerContext{..} _ r_ = K.catchFetchResponseEx $ do
handleFetch sc@ServerContext{..} reqCtx r_ = K.catchFetchResponseEx $ do
-- Currently, we use a per-connection reader(fetchReader) to read.
let fetchReader = fetchCtx.reader

---------------------------------------
-- * Preprocess request
---------------------------------------
r <- preProcessRequest sc r_
r <- preProcessRequest sc reqCtx r_

-- Fail fast: all error
when r.allError $ do
Expand Down Expand Up @@ -207,14 +210,18 @@ handleFetch sc@ServerContext{..} _ r_ = K.catchFetchResponseEx $ do

-------------------------------------------------------------------------------

preProcessRequest :: ServerContext -> K.FetchRequest -> IO ReFetchRequest
preProcessRequest ServerContext{..} r = do
preProcessRequest :: ServerContext -> K.RequestContext -> K.FetchRequest -> IO ReFetchRequest
preProcessRequest ServerContext{..} reqCtx r = do
-- kafka broker just throw java.lang.RuntimeException if topics is null, here
-- we do the same.
let K.NonNullKaArray topicReqs = r.topics
mutContFetch <- newFastMutInt 1 -- Bool
mutNumOfReads <- newFastMutInt 0 -- Total number of reads
topics <- V.forM topicReqs $ \t{- K.FetchTopic -} -> do
-- [ACL] check [READ TOPIC]
-- TODO: In kafka, check [CLUSTER_ACTION CLUSTER] instead if the request is from follower.
-- Of course, we do not consider this now.
isTopicAuthzed <- simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC t.topic AclOp_READ
-- Partition should be non-empty
let K.NonNullKaArray partitionReqs = t.partitions
-- FIXME: we can also cache this in FetchContext, however, we need to
Expand All @@ -224,47 +231,53 @@ preProcessRequest ServerContext{..} r = do
ps <- V.forM partitionReqs $ \p{- K.FetchPartition -} -> do
M.withLabel M.totalConsumeRequest (t.topic, T.pack . show $ p.partition) $
\counter -> void $ M.addCounter counter 1
let m_logid = orderedParts V.!? fromIntegral p.partition
case m_logid of
Nothing -> do
let elsn = ErrPartitionData $
errorPartitionResponse p.partition K.UNKNOWN_TOPIC_OR_PARTITION
-- Actually, the logid should be Nothing but 0, however, we won't
-- use it, so just set it to 0
pure $ Partition 0 elsn p
Just (_, logid) -> do
void $ atomicFetchAddFastMut mutNumOfReads 1
contFetch <- readFastMutInt mutContFetch
elsn <-
if contFetch == 0
then getPartitionLsn scLDClient scOffsetManager logid p.partition
p.fetchOffset
else do
m_logCtx <- K.getFetchLogCtx fetchCtx logid
case m_logCtx of
Nothing -> do -- Cache miss
writeFastMutInt mutContFetch 0
getPartitionLsn scLDClient scOffsetManager
logid p.partition p.fetchOffset
Just logCtx ->
if (logCtx.nextOffset /= p.fetchOffset)
then do
writeFastMutInt mutContFetch 0
getPartitionLsn scLDClient scOffsetManager logid p.partition
p.fetchOffset
else do
m <- K.getLatestOffsetWithLsn scOffsetManager logid
case m of
Just (latestOffset, _tailLsn) -> do
let highwaterOffset = latestOffset + 1
pure $ ContReading logCtx.remRecords highwaterOffset
Nothing -> do
Log.debug $ "Continue reading, but logid "
<> Log.build logid <> " is empty"
pure $ ErrPartitionData $
errorPartitionResponse p.partition
K.OFFSET_OUT_OF_RANGE
pure $ Partition logid elsn p
-- FIXME: too deep nesting...
if isTopicAuthzed then do
let elsn = ErrPartitionData $
errorPartitionResponse p.partition K.TOPIC_AUTHORIZATION_FAILED
pure $ Partition 0 elsn p
else do
let m_logid = orderedParts V.!? fromIntegral p.partition
case m_logid of
Nothing -> do
let elsn = ErrPartitionData $
errorPartitionResponse p.partition K.UNKNOWN_TOPIC_OR_PARTITION
-- Actually, the logid should be Nothing but 0, however, we won't
-- use it, so just set it to 0
pure $ Partition 0 elsn p
Just (_, logid) -> do
void $ atomicFetchAddFastMut mutNumOfReads 1
contFetch <- readFastMutInt mutContFetch
elsn <-
if contFetch == 0
then getPartitionLsn scLDClient scOffsetManager logid p.partition
p.fetchOffset
else do
m_logCtx <- K.getFetchLogCtx fetchCtx logid
case m_logCtx of
Nothing -> do -- Cache miss
writeFastMutInt mutContFetch 0
getPartitionLsn scLDClient scOffsetManager
logid p.partition p.fetchOffset
Just logCtx ->
if (logCtx.nextOffset /= p.fetchOffset)
then do
writeFastMutInt mutContFetch 0
getPartitionLsn scLDClient scOffsetManager logid p.partition
p.fetchOffset
else do
m <- K.getLatestOffsetWithLsn scOffsetManager logid
case m of
Just (latestOffset, _tailLsn) -> do
let highwaterOffset = latestOffset + 1
pure $ ContReading logCtx.remRecords highwaterOffset
Nothing -> do
Log.debug $ "Continue reading, but logid "
<> Log.build logid <> " is empty"
pure $ ErrPartitionData $
errorPartitionResponse p.partition
K.OFFSET_OUT_OF_RANGE
pure $ Partition logid elsn p
pure (t.topic, ps)
contFetch <- readFastMutInt mutContFetch
numOfReads <- readFastMutInt mutNumOfReads
Expand Down

0 comments on commit 6558476

Please sign in to comment.