Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: support ACL in produce (0) handler #1767

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion hstream-kafka/HStream/Kafka/Common/Acl.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
{-# LANGUAGE RecordWildCards #-}

module HStream.Kafka.Common.Acl where
module HStream.Kafka.Common.Acl
( AclOperation(..)
, AclPermissionType(..)
, AccessControlEntryData(..)
, AccessControlEntry(..)
, AccessControlEntryFilter(..)
, AclBinding(..)
, AclBindingFilter(..)
, validateAclBinding
) where

import Data.Char
import Data.Maybe
Expand Down
11 changes: 9 additions & 2 deletions hstream-kafka/HStream/Kafka/Common/Authorizer.hs
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
{-# LANGUAGE UndecidableInstances #-}

module HStream.Kafka.Common.Authorizer where
module HStream.Kafka.Common.Authorizer
( AclAuthorizer
, newAclAuthorizer
, initAclAuthorizer
, syncAclAuthorizer

, aceToAclDescription
, aclBindingsToDescribeAclsResource
) where

import Control.Concurrent
import Control.Exception
Expand Down Expand Up @@ -488,7 +496,6 @@ logAuditMessage AuthorizableRequestContext{..} AclAction{..} isAuthorized = do
True -> Log.info . Log.buildString $ msg
False -> Log.trace . Log.buildString $ msg

----
aceToAclDescription :: AccessControlEntry -> K.AclDescription
aceToAclDescription (AccessControlEntry AccessControlEntryData{..}) =
K.AclDescription
Expand Down
63 changes: 57 additions & 6 deletions hstream-kafka/HStream/Kafka/Common/Authorizer/Class.hs
Original file line number Diff line number Diff line change
@@ -1,14 +1,28 @@
module HStream.Kafka.Common.Authorizer.Class where
module HStream.Kafka.Common.Authorizer.Class
( AclAction(..)
, AuthorizationResult(..)
, AuthorizableRequestContext(..)
, toAuthorizableReqCtx

, Authorizer(..)
, AuthorizerObject(..)

, simpleAuthorize
) where

import Control.Exception
import Control.Monad
import Data.Maybe
import Data.Text (Text)
import qualified Data.Text as T

import HStream.Kafka.Common.Acl
import qualified HStream.Kafka.Common.KafkaException as K
import HStream.Kafka.Common.Resource
import HStream.Kafka.Common.Security
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

------------------------------------------------------------
-- Helper types
Expand Down Expand Up @@ -40,6 +54,12 @@ data AuthorizableRequestContext = AuthorizableRequestContext
-- , ...
}

-- FIXME: is it suitable to place this function here?
toAuthorizableReqCtx :: K.RequestContext -> AuthorizableRequestContext
toAuthorizableReqCtx reqCtx =
AuthorizableRequestContext (T.pack reqCtx.clientHost)
(Principal "User" (fromMaybe "" (join reqCtx.clientId)))

------------------------------------------------------------
-- Abstract authorizer interface
------------------------------------------------------------
Expand Down Expand Up @@ -79,11 +99,6 @@ class Authorizer s where
data AuthorizerObject where
AuthorizerObject :: Authorizer s => Maybe s -> AuthorizerObject

withAuthorizerObject :: AuthorizerObject
-> (forall s. Authorizer s => Maybe s -> a)
-> a
withAuthorizerObject (AuthorizerObject x) f = f x

-- NOTE: 'AuthorizerObject' can contain 'Nothing'.
-- Methods behave differently in two types on 'Nothing':
-- 1. management methods ('createAcls', 'deleteAcls' and 'getAcls'): throw an exception
Expand All @@ -101,3 +116,39 @@ instance Authorizer AuthorizerObject where
case x of
Nothing -> mapM (const $ pure Authz_ALLOWED)
Just s -> authorize ctx s

------------------------------------------------------------
-- Helper functions for using authorizers
------------------------------------------------------------
-- | The simplest way to authorize a single action.
simpleAuthorize :: Authorizer s
=> AuthorizableRequestContext
-> s
-> ResourceType
-> Text
-> AclOperation
-> IO Bool
simpleAuthorize ctx authorizer resType resName op = do
let resPat = ResourcePattern
{ resPatResourceType = resType
, resPatResourceName = resName
, resPatPatternType = Pat_LITERAL -- FIXME: support extended?
}
action = AclAction
{ aclActionResPat = resPat
, aclActionOp = op
, aclActionLogIfAllowed = defaultLogIfAllowed
, aclActionLogIfDenied = defaultLogIfDenied
}
authorize ctx authorizer [action] >>= \case
[Authz_ALLOWED] -> return True
[Authz_DENIED] -> return False
_ -> error "what happened?" -- FIXME: error
where
-- FIXME: configuable
defaultLogIfAllowed :: Bool
defaultLogIfAllowed = False

-- FIXME: configuable
defaultLogIfDenied :: Bool
defaultLogIfDenied = True
17 changes: 14 additions & 3 deletions hstream-kafka/HStream/Kafka/Common/Resource.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,27 @@
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeFamilies #-}

module HStream.Kafka.Common.Resource where
module HStream.Kafka.Common.Resource
( Matchable(..)
, ResourceType(..)
, Resource(..)
, PatternType(..)
, isPatternTypeSpecific
, ResourcePattern(..)
, ResourcePatternFilter(..)
, wildcardResourceName

, resourcePatternToMetadataKey
, resourcePatternFromMetadataKey
, resourcePatternFromFilter
) where

import Data.Char
import Data.Maybe
import qualified Data.Set as Set
import Data.Text (Text)
import qualified Data.Text as T

--
class Matchable a b | a -> b, b -> a where
match :: a -> b -> Bool
matchAtMostOne :: b -> Bool
Expand Down Expand Up @@ -85,7 +97,6 @@ instance Show Resource where
", name=" <> s_name <> ")"
where s_name = if T.null resResourceName then "<any>" else T.unpack resResourceName

-- [0..4]
-- | A resource pattern type, which is an 'Int8' start from 0.
-- WARNING: Be sure to understand the meaning of 'Pat_MATCH'.
-- A '(TYPE, "name", MATCH)' filter matches the following patterns:
Expand Down
132 changes: 74 additions & 58 deletions hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,31 @@ module HStream.Kafka.Server.Handler.Produce
, handleInitProducerId
) where

import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.Async as Async
import Control.Monad
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Int
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Maybe (fromMaybe)
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Vector as V
import Data.Word

import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import HStream.Kafka.Common.Acl
import HStream.Kafka.Common.Authorizer.Class
import qualified HStream.Kafka.Common.Metrics as M
import qualified HStream.Kafka.Common.OffsetManager as K
import qualified HStream.Kafka.Common.RecordFormat as K
import HStream.Kafka.Common.Resource
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Utils as U
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

-- acks: (FIXME: Currently we only support -1)
-- 0: The server will not send any response(this is the only case where the
Expand All @@ -48,8 +51,11 @@ handleProduce
handleProduce ServerContext{..} _reqCtx req = do
-- TODO: handle request args: acks, timeoutMs
let topicData = fromMaybe V.empty (K.unKaArray req.topicData)

responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do
-- [ACL] authorization result for the **topic**
isTopicAuthzed <-
simpleAuthorize (toAuthorizableReqCtx _reqCtx) authorizer Res_TOPIC topic.name AclOp_WRITE

-- A topic is a stream. Here we donot need to check the topic existence,
-- because the metadata api already does(?)
partitions <- S.listStreamPartitionsOrderedByName
Expand All @@ -69,47 +75,58 @@ handleProduce ServerContext{..} _reqCtx req = do
Log.debug1 $ "Try to append to logid " <> Log.build logid
<> "(" <> Log.build partition.index <> ")"

-- Wirte appends
(S.AppendCompletion{..}, offset) <-
appendRecords True scLDClient scOffsetManager
(topic.name, partition.index) logid recordBytes
-- [ACL] Generate response by the authorization result of the **topic**
case isTopicAuthzed of
False -> pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = K.TOPIC_AUTHORIZATION_FAILED
, baseOffset = -1
, logAppendTimeMs = -1
, logStartOffset = -1
}
True -> do
-- FIXME: Block is too deep. Extract to a standalone function.
-- Wirte appends
(S.AppendCompletion{..}, offset) <-
appendRecords True scLDClient scOffsetManager
(topic.name, partition.index) logid recordBytes

Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN
<> ", start offset: " <> Log.build offset
Log.debug1 $ "Append done " <> Log.build appendCompLogID
<> ", lsn: " <> Log.build appendCompLSN
<> ", start offset: " <> Log.build offset

-- TODO: performance improvements
--
-- For each append request after version 5, we need to read the oldest
-- offset of the log. This will cause critical performance problems.
--
--logStartOffset <-
-- if reqCtx.apiVersion >= 5
-- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid
-- case m_logStartOffset of
-- Just logStartOffset -> pure logStartOffset
-- Nothing -> do
-- Log.fatal $ "Cannot get log start offset for logid "
-- <> Log.build logid
-- pure (-1)
-- else pure (-1)
let logStartOffset = (-1)
-- TODO: performance improvements
--
-- For each append request after version 5, we need to read the oldest
-- offset of the log. This will cause critical performance problems.
--
--logStartOffset <-
-- if reqCtx.apiVersion >= 5
-- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid
-- case m_logStartOffset of
-- Just logStartOffset -> pure logStartOffset
-- Nothing -> do
-- Log.fatal $ "Cannot get log start offset for logid "
-- <> Log.build logid
-- pure (-1)
-- else pure (-1)
let logStartOffset = (-1)

-- TODO: PartitionProduceResponse.logAppendTimeMs
--
-- The timestamp returned by broker after appending the messages. If
-- CreateTime is used for the topic, the timestamp will be -1. If
-- LogAppendTime is used for the topic, the timestamp will be the broker
-- local time when the messages are appended.
--
-- Currently, only support LogAppendTime
pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = K.NONE
, baseOffset = offset
, logAppendTimeMs = appendCompTimestamp
, logStartOffset = logStartOffset
}
-- TODO: PartitionProduceResponse.logAppendTimeMs
--
-- The timestamp returned by broker after appending the messages. If
-- CreateTime is used for the topic, the timestamp will be -1. If
-- LogAppendTime is used for the topic, the timestamp will be the broker
-- local time when the messages are appended.
--
-- Currently, only support LogAppendTime
pure $ K.PartitionProduceResponse
{ index = partition.index
, errorCode = K.NONE
, baseOffset = offset
, logAppendTimeMs = appendCompTimestamp
, logStartOffset = logStartOffset
}

pure $ K.TopicProduceResponse topic.name (K.KaArray $ Just partitionResponses)

Expand All @@ -131,7 +148,6 @@ handleInitProducerId _ _ _ = do
}

-------------------------------------------------------------------------------

appendRecords
:: Bool
-> S.LDClient
Expand Down
7 changes: 0 additions & 7 deletions hstream-kafka/HStream/Kafka/Server/Handler/Security.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ module HStream.Kafka.Server.Handler.Security
) where

import qualified Control.Exception as E
import Control.Monad
import Data.Function (on)
import qualified Data.List as L
import Data.Maybe
Expand All @@ -21,7 +20,6 @@ import HStream.Kafka.Common.Authorizer
import HStream.Kafka.Common.Authorizer.Class
import qualified HStream.Kafka.Common.KafkaException as K
import HStream.Kafka.Common.Resource
import HStream.Kafka.Common.Security
import HStream.Kafka.Server.Security.SASL (serverSupportedMechanismNames)
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified HStream.Logger as Log
Expand Down Expand Up @@ -161,8 +159,3 @@ handleDeleteAcls ctx reqCtx req = do
(fromMaybe "" x.hostFilter)
(toEnum (fromIntegral x.operation))
(toEnum (fromIntegral x.permissionType))))

toAuthorizableReqCtx :: K.RequestContext -> AuthorizableRequestContext
toAuthorizableReqCtx reqCtx =
AuthorizableRequestContext (T.pack reqCtx.clientHost)
(Principal "User" (fromMaybe "" (join reqCtx.clientId)))
Loading