diff --git a/hstream-kafka/HStream/Kafka/Common/Acl.hs b/hstream-kafka/HStream/Kafka/Common/Acl.hs index 988ecf7fe..218086d2e 100644 --- a/hstream-kafka/HStream/Kafka/Common/Acl.hs +++ b/hstream-kafka/HStream/Kafka/Common/Acl.hs @@ -1,6 +1,15 @@ {-# LANGUAGE RecordWildCards #-} -module HStream.Kafka.Common.Acl where +module HStream.Kafka.Common.Acl + ( AclOperation(..) + , AclPermissionType(..) + , AccessControlEntryData(..) + , AccessControlEntry(..) + , AccessControlEntryFilter(..) + , AclBinding(..) + , AclBindingFilter(..) + , validateAclBinding + ) where import Data.Char import Data.Maybe diff --git a/hstream-kafka/HStream/Kafka/Common/Authorizer.hs b/hstream-kafka/HStream/Kafka/Common/Authorizer.hs index 3145b841a..61becfb77 100644 --- a/hstream-kafka/HStream/Kafka/Common/Authorizer.hs +++ b/hstream-kafka/HStream/Kafka/Common/Authorizer.hs @@ -1,6 +1,14 @@ {-# LANGUAGE UndecidableInstances #-} -module HStream.Kafka.Common.Authorizer where +module HStream.Kafka.Common.Authorizer + ( AclAuthorizer + , newAclAuthorizer + , initAclAuthorizer + , syncAclAuthorizer + + , aceToAclDescription + , aclBindingsToDescribeAclsResource + ) where import Control.Concurrent import Control.Exception @@ -488,7 +496,6 @@ logAuditMessage AuthorizableRequestContext{..} AclAction{..} isAuthorized = do True -> Log.info . Log.buildString $ msg False -> Log.trace . Log.buildString $ msg ----- aceToAclDescription :: AccessControlEntry -> K.AclDescription aceToAclDescription (AccessControlEntry AccessControlEntryData{..}) = K.AclDescription diff --git a/hstream-kafka/HStream/Kafka/Common/Authorizer/Class.hs b/hstream-kafka/HStream/Kafka/Common/Authorizer/Class.hs index 90ff81588..be158f07b 100644 --- a/hstream-kafka/HStream/Kafka/Common/Authorizer/Class.hs +++ b/hstream-kafka/HStream/Kafka/Common/Authorizer/Class.hs @@ -1,7 +1,20 @@ -module HStream.Kafka.Common.Authorizer.Class where +module HStream.Kafka.Common.Authorizer.Class + ( AclAction(..) + , AuthorizationResult(..) + , AuthorizableRequestContext(..) + , toAuthorizableReqCtx + + , Authorizer(..) + , AuthorizerObject(..) + + , simpleAuthorize + ) where import Control.Exception +import Control.Monad +import Data.Maybe import Data.Text (Text) +import qualified Data.Text as T import HStream.Kafka.Common.Acl import qualified HStream.Kafka.Common.KafkaException as K @@ -9,6 +22,7 @@ import HStream.Kafka.Common.Resource import HStream.Kafka.Common.Security import qualified Kafka.Protocol.Error as K import qualified Kafka.Protocol.Message as K +import qualified Kafka.Protocol.Service as K ------------------------------------------------------------ -- Helper types @@ -40,6 +54,12 @@ data AuthorizableRequestContext = AuthorizableRequestContext -- , ... } +-- FIXME: is it suitable to place this function here? +toAuthorizableReqCtx :: K.RequestContext -> AuthorizableRequestContext +toAuthorizableReqCtx reqCtx = + AuthorizableRequestContext (T.pack reqCtx.clientHost) + (Principal "User" (fromMaybe "" (join reqCtx.clientId))) + ------------------------------------------------------------ -- Abstract authorizer interface ------------------------------------------------------------ @@ -79,11 +99,6 @@ class Authorizer s where data AuthorizerObject where AuthorizerObject :: Authorizer s => Maybe s -> AuthorizerObject -withAuthorizerObject :: AuthorizerObject - -> (forall s. Authorizer s => Maybe s -> a) - -> a -withAuthorizerObject (AuthorizerObject x) f = f x - -- NOTE: 'AuthorizerObject' can contain 'Nothing'. -- Methods behave differently in two types on 'Nothing': -- 1. management methods ('createAcls', 'deleteAcls' and 'getAcls'): throw an exception @@ -101,3 +116,39 @@ instance Authorizer AuthorizerObject where case x of Nothing -> mapM (const $ pure Authz_ALLOWED) Just s -> authorize ctx s + +------------------------------------------------------------ +-- Helper functions for using authorizers +------------------------------------------------------------ +-- | The simplest way to authorize a single action. +simpleAuthorize :: Authorizer s + => AuthorizableRequestContext + -> s + -> ResourceType + -> Text + -> AclOperation + -> IO Bool +simpleAuthorize ctx authorizer resType resName op = do + let resPat = ResourcePattern + { resPatResourceType = resType + , resPatResourceName = resName + , resPatPatternType = Pat_LITERAL -- FIXME: support extended? + } + action = AclAction + { aclActionResPat = resPat + , aclActionOp = op + , aclActionLogIfAllowed = defaultLogIfAllowed + , aclActionLogIfDenied = defaultLogIfDenied + } + authorize ctx authorizer [action] >>= \case + [Authz_ALLOWED] -> return True + [Authz_DENIED] -> return False + _ -> error "what happened?" -- FIXME: error + where + -- FIXME: configuable + defaultLogIfAllowed :: Bool + defaultLogIfAllowed = False + + -- FIXME: configuable + defaultLogIfDenied :: Bool + defaultLogIfDenied = True diff --git a/hstream-kafka/HStream/Kafka/Common/Resource.hs b/hstream-kafka/HStream/Kafka/Common/Resource.hs index fc4b8368d..41155d961 100644 --- a/hstream-kafka/HStream/Kafka/Common/Resource.hs +++ b/hstream-kafka/HStream/Kafka/Common/Resource.hs @@ -3,7 +3,20 @@ {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeFamilies #-} -module HStream.Kafka.Common.Resource where +module HStream.Kafka.Common.Resource + ( Matchable(..) + , ResourceType(..) + , Resource(..) + , PatternType(..) + , isPatternTypeSpecific + , ResourcePattern(..) + , ResourcePatternFilter(..) + , wildcardResourceName + + , resourcePatternToMetadataKey + , resourcePatternFromMetadataKey + , resourcePatternFromFilter + ) where import Data.Char import Data.Maybe @@ -11,7 +24,6 @@ import qualified Data.Set as Set import Data.Text (Text) import qualified Data.Text as T --- class Matchable a b | a -> b, b -> a where match :: a -> b -> Bool matchAtMostOne :: b -> Bool @@ -85,7 +97,6 @@ instance Show Resource where ", name=" <> s_name <> ")" where s_name = if T.null resResourceName then "" else T.unpack resResourceName --- [0..4] -- | A resource pattern type, which is an 'Int8' start from 0. -- WARNING: Be sure to understand the meaning of 'Pat_MATCH'. -- A '(TYPE, "name", MATCH)' filter matches the following patterns: diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index e957218e4..940f3dbf4 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -3,28 +3,31 @@ module HStream.Kafka.Server.Handler.Produce , handleInitProducerId ) where -import qualified Control.Concurrent.Async as Async +import qualified Control.Concurrent.Async as Async import Control.Monad -import Data.ByteString (ByteString) -import qualified Data.ByteString as BS +import Data.ByteString (ByteString) +import qualified Data.ByteString as BS import Data.Int -import Data.Maybe (fromMaybe) -import Data.Text (Text) -import qualified Data.Text as T -import qualified Data.Vector as V +import Data.Maybe (fromMaybe) +import Data.Text (Text) +import qualified Data.Text as T +import qualified Data.Vector as V import Data.Word -import qualified HStream.Kafka.Common.Metrics as M -import qualified HStream.Kafka.Common.OffsetManager as K -import qualified HStream.Kafka.Common.RecordFormat as K -import HStream.Kafka.Server.Types (ServerContext (..)) -import qualified HStream.Logger as Log -import qualified HStream.Store as S -import qualified HStream.Utils as U -import qualified Kafka.Protocol.Encoding as K -import qualified Kafka.Protocol.Error as K -import qualified Kafka.Protocol.Message as K -import qualified Kafka.Protocol.Service as K +import HStream.Kafka.Common.Acl +import HStream.Kafka.Common.Authorizer.Class +import qualified HStream.Kafka.Common.Metrics as M +import qualified HStream.Kafka.Common.OffsetManager as K +import qualified HStream.Kafka.Common.RecordFormat as K +import HStream.Kafka.Common.Resource +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Logger as Log +import qualified HStream.Store as S +import qualified HStream.Utils as U +import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Message as K +import qualified Kafka.Protocol.Service as K -- acks: (FIXME: Currently we only support -1) -- 0: The server will not send any response(this is the only case where the @@ -48,8 +51,11 @@ handleProduce handleProduce ServerContext{..} _reqCtx req = do -- TODO: handle request args: acks, timeoutMs let topicData = fromMaybe V.empty (K.unKaArray req.topicData) - responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do + -- [ACL] authorization result for the **topic** + isTopicAuthzed <- + simpleAuthorize (toAuthorizableReqCtx _reqCtx) authorizer Res_TOPIC topic.name AclOp_WRITE + -- A topic is a stream. Here we donot need to check the topic existence, -- because the metadata api already does(?) partitions <- S.listStreamPartitionsOrderedByName @@ -69,47 +75,58 @@ handleProduce ServerContext{..} _reqCtx req = do Log.debug1 $ "Try to append to logid " <> Log.build logid <> "(" <> Log.build partition.index <> ")" - -- Wirte appends - (S.AppendCompletion{..}, offset) <- - appendRecords True scLDClient scOffsetManager - (topic.name, partition.index) logid recordBytes + -- [ACL] Generate response by the authorization result of the **topic** + case isTopicAuthzed of + False -> pure $ K.PartitionProduceResponse + { index = partition.index + , errorCode = K.TOPIC_AUTHORIZATION_FAILED + , baseOffset = -1 + , logAppendTimeMs = -1 + , logStartOffset = -1 + } + True -> do + -- FIXME: Block is too deep. Extract to a standalone function. + -- Wirte appends + (S.AppendCompletion{..}, offset) <- + appendRecords True scLDClient scOffsetManager + (topic.name, partition.index) logid recordBytes - Log.debug1 $ "Append done " <> Log.build appendCompLogID - <> ", lsn: " <> Log.build appendCompLSN - <> ", start offset: " <> Log.build offset + Log.debug1 $ "Append done " <> Log.build appendCompLogID + <> ", lsn: " <> Log.build appendCompLSN + <> ", start offset: " <> Log.build offset - -- TODO: performance improvements - -- - -- For each append request after version 5, we need to read the oldest - -- offset of the log. This will cause critical performance problems. - -- - --logStartOffset <- - -- if reqCtx.apiVersion >= 5 - -- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid - -- case m_logStartOffset of - -- Just logStartOffset -> pure logStartOffset - -- Nothing -> do - -- Log.fatal $ "Cannot get log start offset for logid " - -- <> Log.build logid - -- pure (-1) - -- else pure (-1) - let logStartOffset = (-1) + -- TODO: performance improvements + -- + -- For each append request after version 5, we need to read the oldest + -- offset of the log. This will cause critical performance problems. + -- + --logStartOffset <- + -- if reqCtx.apiVersion >= 5 + -- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid + -- case m_logStartOffset of + -- Just logStartOffset -> pure logStartOffset + -- Nothing -> do + -- Log.fatal $ "Cannot get log start offset for logid " + -- <> Log.build logid + -- pure (-1) + -- else pure (-1) + let logStartOffset = (-1) - -- TODO: PartitionProduceResponse.logAppendTimeMs - -- - -- The timestamp returned by broker after appending the messages. If - -- CreateTime is used for the topic, the timestamp will be -1. If - -- LogAppendTime is used for the topic, the timestamp will be the broker - -- local time when the messages are appended. - -- - -- Currently, only support LogAppendTime - pure $ K.PartitionProduceResponse - { index = partition.index - , errorCode = K.NONE - , baseOffset = offset - , logAppendTimeMs = appendCompTimestamp - , logStartOffset = logStartOffset - } + -- TODO: PartitionProduceResponse.logAppendTimeMs + -- + -- The timestamp returned by broker after appending the messages. If + -- CreateTime is used for the topic, the timestamp will be -1. If + -- LogAppendTime is used for the topic, the timestamp will be the broker + -- local time when the messages are appended. + -- + -- Currently, only support LogAppendTime + pure $ K.PartitionProduceResponse + { index = partition.index + , errorCode = K.NONE + , baseOffset = offset + , logAppendTimeMs = appendCompTimestamp + , logStartOffset = logStartOffset + } pure $ K.TopicProduceResponse topic.name (K.KaArray $ Just partitionResponses) @@ -131,7 +148,6 @@ handleInitProducerId _ _ _ = do } ------------------------------------------------------------------------------- - appendRecords :: Bool -> S.LDClient diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Security.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Security.hs index fe9050768..07698e996 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Security.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Security.hs @@ -9,7 +9,6 @@ module HStream.Kafka.Server.Handler.Security ) where import qualified Control.Exception as E -import Control.Monad import Data.Function (on) import qualified Data.List as L import Data.Maybe @@ -21,7 +20,6 @@ import HStream.Kafka.Common.Authorizer import HStream.Kafka.Common.Authorizer.Class import qualified HStream.Kafka.Common.KafkaException as K import HStream.Kafka.Common.Resource -import HStream.Kafka.Common.Security import HStream.Kafka.Server.Security.SASL (serverSupportedMechanismNames) import HStream.Kafka.Server.Types (ServerContext (..)) import qualified HStream.Logger as Log @@ -161,8 +159,3 @@ handleDeleteAcls ctx reqCtx req = do (fromMaybe "" x.hostFilter) (toEnum (fromIntegral x.operation)) (toEnum (fromIntegral x.permissionType)))) - -toAuthorizableReqCtx :: K.RequestContext -> AuthorizableRequestContext -toAuthorizableReqCtx reqCtx = - AuthorizableRequestContext (T.pack reqCtx.clientHost) - (Principal "User" (fromMaybe "" (join reqCtx.clientId)))