Skip to content

Commit

Permalink
kafka: do authorization on produce (0)
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Feb 27, 2024
1 parent 3fedf95 commit 452196f
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 57 deletions.
138 changes: 81 additions & 57 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,31 @@ module HStream.Kafka.Server.Handler.Produce
, handleInitProducerId
) where

import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.Async as Async
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Int
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word

import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import HStream.Kafka.Common.Acl
import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Common.Resource
import HStream.Kafka.Server.Security.AuthorizeHelper
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

-- acks: (FIXME: Currently we only support -1)
-- 0: The server will not send any response(this is the only case where the
Expand All @@ -49,9 +52,14 @@ handleProduce ServerContext{..} _reqCtx req = do
-- TODO: handle request args: acks, timeoutMs
let topicData = fromMaybe V.empty (K.unKaArray req.topicData)

responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do
-- [ACL] cache authz result of each topic
topicsAuthzResult <- V.forM topicData $ \topic ->
simpleAuthorize (toAuthorizableReqCtx _reqCtx) authorizer Res_TOPIC topic.name AclOp_WRITE

responses <- V.iforM topicData $ \i topic{- TopicProduceData -} -> do
-- A topic is a stream. Here we donot need to check the topic existence,
-- because the metadata api already does(?)
-- FIXME: is it correct? See kafka.server.KafkaApis#handleProduceRequest
partitions <- S.listStreamPartitionsOrderedByName
scLDClient (S.transToTopicStreamName topic.name)
let partitionData = fromMaybe V.empty (K.unKaArray topic.partitionData)
Expand All @@ -69,47 +77,50 @@ handleProduce ServerContext{..} _reqCtx req = do
Log.debug1 $ "Try to append to logid " <> Log.build logid
<> "(" <> Log.build partition.index <> ")"

-- Wirte appends
(S.AppendCompletion{..}, offset) <-
appendRecords True scLDClient scOffsetManager
(topic.name, partition.index) logid recordBytes
-- [ACL] authorize each (topic,partition). It uses cache above.
-- FIXME: Block is too deep. Extract to a standalone function.
partitionResponseWithAuthz (topicsAuthzResult V.! i) partition.index $ do
-- Wirte appends
(S.AppendCompletion{..}, offset) <-
appendRecords True scLDClient scOffsetManager
(topic.name, partition.index) logid recordBytes

Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN
<> ", start offset: " <> Log.build offset
Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN
<> ", start offset: " <> Log.build offset

-- TODO: performance improvements
--
-- For each append request after version 5, we need to read the oldest
-- offset of the log. This will cause critical performance problems.
--
--logStartOffset <-
-- if reqCtx.apiVersion >= 5
-- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid
-- case m_logStartOffset of
-- Just logStartOffset -> pure logStartOffset
-- Nothing -> do
-- Log.fatal $ "Cannot get log start offset for logid "
-- <> Log.build logid
-- pure (-1)
-- else pure (-1)
let logStartOffset = (-1)
-- TODO: performance improvements
--
-- For each append request after version 5, we need to read the oldest
-- offset of the log. This will cause critical performance problems.
--
--logStartOffset <-
-- if reqCtx.apiVersion >= 5
-- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid
-- case m_logStartOffset of
-- Just logStartOffset -> pure logStartOffset
-- Nothing -> do
-- Log.fatal $ "Cannot get log start offset for logid "
-- <> Log.build logid
-- pure (-1)
-- else pure (-1)
let logStartOffset = (-1)

-- TODO: PartitionProduceResponse.logAppendTimeMs
--
-- The timestamp returned by broker after appending the messages. If
-- CreateTime is used for the topic, the timestamp will be -1. If
-- LogAppendTime is used for the topic, the timestamp will be the broker
-- local time when the messages are appended.
--
-- Currently, only support LogAppendTime
pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = K.NONE
, baseOffset = offset
, logAppendTimeMs = appendCompTimestamp
, logStartOffset = logStartOffset
}
-- TODO: PartitionProduceResponse.logAppendTimeMs
--
-- The timestamp returned by broker after appending the messages. If
-- CreateTime is used for the topic, the timestamp will be -1. If
-- LogAppendTime is used for the topic, the timestamp will be the broker
-- local time when the messages are appended.
--
-- Currently, only support LogAppendTime
pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = K.NONE
, baseOffset = offset
, logAppendTimeMs = appendCompTimestamp
, logStartOffset = logStartOffset
}

pure $ K.TopicProduceResponse topic.name (K.KaArray $ Just partitionResponses)

Expand All @@ -131,6 +142,19 @@ handleInitProducerId _ _ _ = do
}

-------------------------------------------------------------------------------
partitionResponseWithAuthz :: Bool -- is authorized
-> Int32 -- partition index
-> IO K.PartitionProduceResponse -- action if authorized
-> IO K.PartitionProduceResponse
partitionResponseWithAuthz isOK idx action
| not isOK = pure $ K.PartitionProduceResponse
{ index = idx
, errorCode = K.TOPIC_AUTHORIZATION_FAILED
, baseOffset = -1
, logAppendTimeMs = -1
, logStartOffset = -1
}
| otherwise = action

appendRecords
:: Bool
Expand Down
50 changes: 50 additions & 0 deletions hstream-kafka/HStream/Kafka/Server/Security/AuthorizeHelper.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
module HStream.Kafka.Server.Security.AuthorizeHelper where

import Control.Monad
import Data.Maybe
import Data.Text (Text)
import qualified Data.Text as T

import HStream.Kafka.Common.Acl
import HStream.Kafka.Common.Authorizer.Class
import HStream.Kafka.Common.Resource
import HStream.Kafka.Common.Security
import qualified Kafka.Protocol.Service as K

toAuthorizableReqCtx :: K.RequestContext -> AuthorizableRequestContext
toAuthorizableReqCtx reqCtx =
AuthorizableRequestContext (T.pack reqCtx.clientHost)
(Principal "User" (fromMaybe "" (join reqCtx.clientId)))

-- FIXME: configuable
defaultLogIfAllowed :: Bool
defaultLogIfAllowed = False

-- FIXME: configuable
defaultLogIfDenied :: Bool
defaultLogIfDenied = True

-- | Do a
simpleAuthorize :: Authorizer s
=> AuthorizableRequestContext
-> s
-> ResourceType
-> Text
-> AclOperation
-> IO Bool
simpleAuthorize ctx authorizer resType resName op = do
let resPat = ResourcePattern
{ resPatResourceType = resType
, resPatResourceName = resName
, resPatPatternType = Pat_LITERAL -- FIXME: support extended?
}
action = AclAction
{ aclActionResPat = resPat
, aclActionOp = op
, aclActionLogIfAllowed = defaultLogIfAllowed
, aclActionLogIfDenied = defaultLogIfDenied
}
authorize ctx authorizer [action] >>= \case
[Authz_ALLOWED] -> return True
[Authz_DENIED] -> return False
_ -> error "what happened?" -- FIXME: error
1 change: 1 addition & 0 deletions hstream-kafka/hstream-kafka.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ library
HStream.Kafka.Server.Handler.Offset
HStream.Kafka.Server.Handler.Produce
HStream.Kafka.Server.Handler.Topic
HStream.Kafka.Server.Security.AuthorizeHelper
HStream.Kafka.Server.Security.SASL

cxx-sources:
Expand Down

0 comments on commit 452196f

Please sign in to comment.