diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs index e957218e4..0893483fb 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Produce.hs @@ -3,28 +3,31 @@ module HStream.Kafka.Server.Handler.Produce , handleInitProducerId ) where -import qualified Control.Concurrent.Async as Async +import qualified Control.Concurrent.Async as Async import Control.Monad -import Data.ByteString (ByteString) -import qualified Data.ByteString as BS +import Data.ByteString (ByteString) +import qualified Data.ByteString as BS import Data.Int -import Data.Maybe (fromMaybe) -import Data.Text (Text) -import qualified Data.Text as T -import qualified Data.Vector as V +import Data.Maybe (fromMaybe) +import Data.Text (Text) +import qualified Data.Text as T +import qualified Data.Vector as V import Data.Word -import qualified HStream.Kafka.Common.Metrics as M -import qualified HStream.Kafka.Common.OffsetManager as K -import qualified HStream.Kafka.Common.RecordFormat as K -import HStream.Kafka.Server.Types (ServerContext (..)) -import qualified HStream.Logger as Log -import qualified HStream.Store as S -import qualified HStream.Utils as U -import qualified Kafka.Protocol.Encoding as K -import qualified Kafka.Protocol.Error as K -import qualified Kafka.Protocol.Message as K -import qualified Kafka.Protocol.Service as K +import HStream.Kafka.Common.Acl +import qualified HStream.Kafka.Common.Metrics as M +import qualified HStream.Kafka.Common.OffsetManager as K +import qualified HStream.Kafka.Common.RecordFormat as K +import HStream.Kafka.Common.Resource +import HStream.Kafka.Server.Security.AuthorizeHelper +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Logger as Log +import qualified HStream.Store as S +import qualified HStream.Utils as U +import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Message as K +import qualified Kafka.Protocol.Service as K -- acks: (FIXME: Currently we only support -1) -- 0: The server will not send any response(this is the only case where the @@ -49,9 +52,14 @@ handleProduce ServerContext{..} _reqCtx req = do -- TODO: handle request args: acks, timeoutMs let topicData = fromMaybe V.empty (K.unKaArray req.topicData) - responses <- V.forM topicData $ \topic{- TopicProduceData -} -> do + -- [ACL] cache authz result of each topic + topicsAuthzResult <- V.forM topicData $ \topic -> + simpleAuthorize (toAuthorizableReqCtx _reqCtx) authorizer Res_TOPIC topic.name AclOp_WRITE + + responses <- V.iforM topicData $ \i topic{- TopicProduceData -} -> do -- A topic is a stream. Here we donot need to check the topic existence, -- because the metadata api already does(?) + -- FIXME: is it correct? See kafka.server.KafkaApis#handleProduceRequest partitions <- S.listStreamPartitionsOrderedByName scLDClient (S.transToTopicStreamName topic.name) let partitionData = fromMaybe V.empty (K.unKaArray topic.partitionData) @@ -69,47 +77,50 @@ handleProduce ServerContext{..} _reqCtx req = do Log.debug1 $ "Try to append to logid " <> Log.build logid <> "(" <> Log.build partition.index <> ")" - -- Wirte appends - (S.AppendCompletion{..}, offset) <- - appendRecords True scLDClient scOffsetManager - (topic.name, partition.index) logid recordBytes + -- [ACL] authorize each (topic,partition). It uses cache above. + -- FIXME: Block is too deep. Extract to a standalone function. + partitionResponseWithAuthz (topicsAuthzResult V.! i) partition.index $ do + -- Wirte appends + (S.AppendCompletion{..}, offset) <- + appendRecords True scLDClient scOffsetManager + (topic.name, partition.index) logid recordBytes - Log.debug1 $ "Append done " <> Log.build appendCompLogID - <> ", lsn: " <> Log.build appendCompLSN - <> ", start offset: " <> Log.build offset + Log.debug1 $ "Append done " <> Log.build appendCompLogID + <> ", lsn: " <> Log.build appendCompLSN + <> ", start offset: " <> Log.build offset - -- TODO: performance improvements - -- - -- For each append request after version 5, we need to read the oldest - -- offset of the log. This will cause critical performance problems. - -- - --logStartOffset <- - -- if reqCtx.apiVersion >= 5 - -- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid - -- case m_logStartOffset of - -- Just logStartOffset -> pure logStartOffset - -- Nothing -> do - -- Log.fatal $ "Cannot get log start offset for logid " - -- <> Log.build logid - -- pure (-1) - -- else pure (-1) - let logStartOffset = (-1) + -- TODO: performance improvements + -- + -- For each append request after version 5, we need to read the oldest + -- offset of the log. This will cause critical performance problems. + -- + --logStartOffset <- + -- if reqCtx.apiVersion >= 5 + -- then do m_logStartOffset <- K.getOldestOffset scOffsetManager logid + -- case m_logStartOffset of + -- Just logStartOffset -> pure logStartOffset + -- Nothing -> do + -- Log.fatal $ "Cannot get log start offset for logid " + -- <> Log.build logid + -- pure (-1) + -- else pure (-1) + let logStartOffset = (-1) - -- TODO: PartitionProduceResponse.logAppendTimeMs - -- - -- The timestamp returned by broker after appending the messages. If - -- CreateTime is used for the topic, the timestamp will be -1. If - -- LogAppendTime is used for the topic, the timestamp will be the broker - -- local time when the messages are appended. - -- - -- Currently, only support LogAppendTime - pure $ K.PartitionProduceResponse - { index = partition.index - , errorCode = K.NONE - , baseOffset = offset - , logAppendTimeMs = appendCompTimestamp - , logStartOffset = logStartOffset - } + -- TODO: PartitionProduceResponse.logAppendTimeMs + -- + -- The timestamp returned by broker after appending the messages. If + -- CreateTime is used for the topic, the timestamp will be -1. If + -- LogAppendTime is used for the topic, the timestamp will be the broker + -- local time when the messages are appended. + -- + -- Currently, only support LogAppendTime + pure $ K.PartitionProduceResponse + { index = partition.index + , errorCode = K.NONE + , baseOffset = offset + , logAppendTimeMs = appendCompTimestamp + , logStartOffset = logStartOffset + } pure $ K.TopicProduceResponse topic.name (K.KaArray $ Just partitionResponses) @@ -131,6 +142,19 @@ handleInitProducerId _ _ _ = do } ------------------------------------------------------------------------------- +partitionResponseWithAuthz :: Bool -- is authorized + -> Int32 -- partition index + -> IO K.PartitionProduceResponse -- action if authorized + -> IO K.PartitionProduceResponse +partitionResponseWithAuthz isOK idx action + | not isOK = pure $ K.PartitionProduceResponse + { index = idx + , errorCode = K.TOPIC_AUTHORIZATION_FAILED + , baseOffset = -1 + , logAppendTimeMs = -1 + , logStartOffset = -1 + } + | otherwise = action appendRecords :: Bool diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Security.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Security.hs index fe9050768..bd10df3f9 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Security.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Security.hs @@ -8,27 +8,26 @@ module HStream.Kafka.Server.Handler.Security , handleDeleteAcls ) where -import qualified Control.Exception as E -import Control.Monad -import Data.Function (on) -import qualified Data.List as L +import qualified Control.Exception as E +import Data.Function (on) +import qualified Data.List as L import Data.Maybe -import qualified Data.Text as T -import qualified Data.Vector as V +import qualified Data.Text as T +import qualified Data.Vector as V import HStream.Kafka.Common.Acl import HStream.Kafka.Common.Authorizer import HStream.Kafka.Common.Authorizer.Class -import qualified HStream.Kafka.Common.KafkaException as K +import qualified HStream.Kafka.Common.KafkaException as K import HStream.Kafka.Common.Resource -import HStream.Kafka.Common.Security -import HStream.Kafka.Server.Security.SASL (serverSupportedMechanismNames) -import HStream.Kafka.Server.Types (ServerContext (..)) -import qualified HStream.Logger as Log -import qualified Kafka.Protocol.Encoding as K -import qualified Kafka.Protocol.Error as K -import qualified Kafka.Protocol.Message as K -import qualified Kafka.Protocol.Service as K +import HStream.Kafka.Server.Security.AuthorizeHelper (toAuthorizableReqCtx) +import HStream.Kafka.Server.Security.SASL (serverSupportedMechanismNames) +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified HStream.Logger as Log +import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Message as K +import qualified Kafka.Protocol.Service as K ------------------------------------------------------------------------------- handleSaslHandshake :: ServerContext -> K.RequestContext -> K.SaslHandshakeRequest -> IO K.SaslHandshakeResponse @@ -161,8 +160,3 @@ handleDeleteAcls ctx reqCtx req = do (fromMaybe "" x.hostFilter) (toEnum (fromIntegral x.operation)) (toEnum (fromIntegral x.permissionType)))) - -toAuthorizableReqCtx :: K.RequestContext -> AuthorizableRequestContext -toAuthorizableReqCtx reqCtx = - AuthorizableRequestContext (T.pack reqCtx.clientHost) - (Principal "User" (fromMaybe "" (join reqCtx.clientId))) diff --git a/hstream-kafka/HStream/Kafka/Server/Security/AuthorizeHelper.hs b/hstream-kafka/HStream/Kafka/Server/Security/AuthorizeHelper.hs new file mode 100644 index 000000000..5d7d191a5 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Server/Security/AuthorizeHelper.hs @@ -0,0 +1,50 @@ +module HStream.Kafka.Server.Security.AuthorizeHelper where + +import Control.Monad +import Data.Maybe +import Data.Text (Text) +import qualified Data.Text as T + +import HStream.Kafka.Common.Acl +import HStream.Kafka.Common.Authorizer.Class +import HStream.Kafka.Common.Resource +import HStream.Kafka.Common.Security +import qualified Kafka.Protocol.Service as K + +toAuthorizableReqCtx :: K.RequestContext -> AuthorizableRequestContext +toAuthorizableReqCtx reqCtx = + AuthorizableRequestContext (T.pack reqCtx.clientHost) + (Principal "User" (fromMaybe "" (join reqCtx.clientId))) + +-- FIXME: configuable +defaultLogIfAllowed :: Bool +defaultLogIfAllowed = False + +-- FIXME: configuable +defaultLogIfDenied :: Bool +defaultLogIfDenied = True + +-- | Do a +simpleAuthorize :: Authorizer s + => AuthorizableRequestContext + -> s + -> ResourceType + -> Text + -> AclOperation + -> IO Bool +simpleAuthorize ctx authorizer resType resName op = do + let resPat = ResourcePattern + { resPatResourceType = resType + , resPatResourceName = resName + , resPatPatternType = Pat_LITERAL -- FIXME: support extended? + } + action = AclAction + { aclActionResPat = resPat + , aclActionOp = op + , aclActionLogIfAllowed = defaultLogIfAllowed + , aclActionLogIfDenied = defaultLogIfDenied + } + authorize ctx authorizer [action] >>= \case + [Authz_ALLOWED] -> return True + [Authz_DENIED] -> return False + _ -> error "what happened?" -- FIXME: error diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index aa0f2dcac..8223acbce 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -173,6 +173,7 @@ library HStream.Kafka.Server.Handler.Offset HStream.Kafka.Server.Handler.Produce HStream.Kafka.Server.Handler.Topic + HStream.Kafka.Server.Security.AuthorizeHelper HStream.Kafka.Server.Security.SASL cxx-sources: