diff --git a/hstream-kafka/HStream/Kafka/Common/Acl.hs b/hstream-kafka/HStream/Kafka/Common/Acl.hs new file mode 100644 index 000000000..3888ac288 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Common/Acl.hs @@ -0,0 +1,178 @@ +{-# LANGUAGE RecordWildCards #-} + +module HStream.Kafka.Common.Acl where + +import Data.Maybe +import Data.Text (Text) +import qualified Data.Text as T + +import HStream.Kafka.Common.Resource + +-- [0..14] +data AclOperation + = AclOp_UNKNOWN + | AclOp_ANY + | AclOp_ALL + | AclOp_READ + | AclOp_WRITE + | AclOp_CREATE + | AclOp_DELETE + | AclOp_ALTER + | AclOp_DESCRIBE + | AclOp_CLUSTER_ACTION + | AclOp_DESCRIBE_CONFIGS + | AclOp_ALTER_CONFIGS + | AclOp_IDEMPOTENT_WRITE + | AclOp_CREATE_TOKENS + | AclOp_DESCRIBE_TOKENS + deriving (Eq, Enum, Ord) +instance Show AclOperation where + show AclOp_UNKNOWN = "Unknown" + show AclOp_ANY = "Any" + show AclOp_ALL = "All" + show AclOp_READ = "Read" + show AclOp_WRITE = "Write" + show AclOp_CREATE = "Create" + show AclOp_DELETE = "Delete" + show AclOp_ALTER = "Alter" + show AclOp_DESCRIBE = "Describe" + show AclOp_CLUSTER_ACTION = "ClusterAction" + show AclOp_DESCRIBE_CONFIGS = "DescribeConfigs" + show AclOp_ALTER_CONFIGS = "AlterConfigs" + show AclOp_IDEMPOTENT_WRITE = "IdempotentWrite" + show AclOp_CREATE_TOKENS = "CreateTokens" + show AclOp_DESCRIBE_TOKENS = "DescribeTokens" +instance Read AclOperation where + readsPrec _ s = case s of + "Unknown" -> [(AclOp_UNKNOWN, "")] + "Any" -> [(AclOp_ANY, "")] + "All" -> [(AclOp_ALL, "")] + "Read" -> [(AclOp_READ, "")] + "Write" -> [(AclOp_WRITE, "")] + "Create" -> [(AclOp_CREATE, "")] + "Delete" -> [(AclOp_DELETE, "")] + "Alter" -> [(AclOp_ALTER, "")] + "Describe" -> [(AclOp_DESCRIBE, "")] + "ClusterAction" -> [(AclOp_CLUSTER_ACTION, "")] + "DescribeConfigs" -> [(AclOp_DESCRIBE_CONFIGS, "")] + "AlterConfigs" -> [(AclOp_ALTER_CONFIGS, "")] + "IdempotentWrite" -> [(AclOp_IDEMPOTENT_WRITE, "")] + "CreateTokens" -> [(AclOp_CREATE_TOKENS, "")] + "DescribeTokens" -> [(AclOp_DESCRIBE_TOKENS, "")] + _ -> [] + +-- [0..3] +data AclPermissionType + = AclPerm_UNKNOWN + | AclPerm_ANY -- used in filter + | AclPerm_DENY + | AclPerm_ALLOW + deriving (Eq, Enum, Ord) +instance Show AclPermissionType where + show AclPerm_UNKNOWN = "Unknown" + show AclPerm_ANY = "Any" + show AclPerm_DENY = "Deny" + show AclPerm_ALLOW = "Allow" +instance Read AclPermissionType where + readsPrec _ s = case s of + "Unknown" -> [(AclPerm_UNKNOWN, "")] + "Any" -> [(AclPerm_ANY, "")] + "Deny" -> [(AclPerm_DENY, "")] + "Allow" -> [(AclPerm_ALLOW, "")] + _ -> [] + +-- | Data of an access control entry (ACE), which is a 4-tuple of principal, +-- host, operation and permission type. +-- Used in both 'AccessControlEntry' and 'AccessControlEntryFilter', +-- with slightly different field requirements. +data AccessControlEntryData = AccessControlEntryData + { aceDataPrincipal :: Text + , aceDataHost :: Text + , aceDataOperation :: AclOperation + , aceDataPermissionType :: AclPermissionType + } deriving (Eq, Ord) +instance Show AccessControlEntryData where + show AccessControlEntryData{..} = + "(principal=" <> s_principal <> + ", host=" <> s_host <> + ", operation=" <> show aceDataOperation <> + ", permissionType=" <> show aceDataPermissionType <> ")" + where s_principal = if T.null aceDataPrincipal then "" else T.unpack aceDataPrincipal + s_host = if T.null aceDataHost then "" else T.unpack aceDataHost + +-- | An access control entry (ACE). +-- Requirements: principal and host can not be null. +-- operation can not be 'AclOp_ANY'. +-- permission type can not be 'AclPerm_ANY'. +newtype AccessControlEntry = AccessControlEntry + { aceData :: AccessControlEntryData + } deriving (Eq, Ord) +instance Show AccessControlEntry where + show AccessControlEntry{..} = show aceData + +-- | A filter which matches access control entry(ACE)s. +-- Requirements: principal and host can both be null. +newtype AccessControlEntryFilter = AccessControlEntryFilter + { aceFilterData :: AccessControlEntryData + } +instance Show AccessControlEntryFilter where + show AccessControlEntryFilter{..} = show aceFilterData + +instance Matchable AccessControlEntry AccessControlEntryFilter where + -- See org.apache.kafka.common.acl.AccessControlEntryFilter#matches + match AccessControlEntry{..} AccessControlEntryFilter{..} + | not (T.null (aceDataPrincipal aceFilterData)) && + aceDataPrincipal aceFilterData /= aceDataPrincipal aceData = False + | not (T.null (aceDataHost aceFilterData)) && + aceDataHost aceFilterData /= aceDataHost aceData = False + | aceDataOperation aceFilterData /= AclOp_ANY && + aceDataOperation aceFilterData /= aceDataOperation aceData = False + | otherwise = aceDataPermissionType aceFilterData == AclPerm_ANY || + aceDataPermissionType aceFilterData == aceDataPermissionType aceData + matchAtMostOne = isNothing . indefiniteFieldInFilter + indefiniteFieldInFilter AccessControlEntryFilter{ aceFilterData = AccessControlEntryData{..} } + | T.null aceDataPrincipal = Just "Principal is NULL" + | T.null aceDataHost = Just "Host is NULL" + | aceDataOperation == AclOp_ANY = Just "Operation is ANY" + | aceDataOperation == AclOp_UNKNOWN = Just "Operation is UNKNOWN" + | aceDataPermissionType == AclPerm_ANY = Just "Permission type is ANY" + | aceDataPermissionType == AclPerm_UNKNOWN = Just "Permission type is UNKNOWN" + | otherwise = Nothing + +-- | A binding between a resource pattern and an access control entry (ACE). +data AclBinding = AclBinding + { aclBindingResourcePattern :: ResourcePattern + , aclBindingACE :: AccessControlEntry + } deriving (Eq, Ord) +instance Show AclBinding where + show AclBinding{..} = + "(pattern=" <> show aclBindingResourcePattern <> + ", entry=" <> show aclBindingACE <> ")" + +-- | A filter which can match 'AclBinding's. +data AclBindingFilter = AclBindingFilter + { aclBindingFilterResourcePatternFilter :: ResourcePatternFilter + , aclBindingFilterACEFilter :: AccessControlEntryFilter + } +instance Show AclBindingFilter where + show AclBindingFilter{..} = + "(patternFilter=" <> show aclBindingFilterResourcePatternFilter <> + ", entryFilter=" <> show aclBindingFilterACEFilter <> ")" + +instance Matchable AclBinding AclBindingFilter where + -- See org.apache.kafka.common.acl.AclBindingFilter#matches + match AclBinding{..} AclBindingFilter{..} = + match aclBindingResourcePattern aclBindingFilterResourcePatternFilter && + match aclBindingACE aclBindingFilterACEFilter + matchAtMostOne AclBindingFilter{..} = + matchAtMostOne aclBindingFilterResourcePatternFilter && + matchAtMostOne aclBindingFilterACEFilter + indefiniteFieldInFilter AclBindingFilter{..} = + indefiniteFieldInFilter aclBindingFilterResourcePatternFilter <> + indefiniteFieldInFilter aclBindingFilterACEFilter + +-- TODO: validate +-- 1. No UNKNOWN contained +-- 2. resource pattern does not contain '/' +validateAclBinding :: AclBinding -> Either String () +validateAclBinding AclBinding{..} = Right () -- FIXME diff --git a/hstream-kafka/HStream/Kafka/Common/AclEntry.hs b/hstream-kafka/HStream/Kafka/Common/AclEntry.hs new file mode 100644 index 000000000..33e2909d5 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Common/AclEntry.hs @@ -0,0 +1,86 @@ +module HStream.Kafka.Common.AclEntry where + +import Data.Aeson ((.:), (.=)) +import qualified Data.Aeson as Aeson +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set +import Data.Text (Text) +import qualified Data.Text as T + +import HStream.Kafka.Common.Acl +import HStream.Kafka.Common.Resource +import HStream.Kafka.Common.Security + +data AclEntry = AclEntry + { aclEntryPrincipal :: Principal + , aclEntryHost :: Text + , aclEntryOperation :: AclOperation + , aclEntryPermissionType :: AclPermissionType + } deriving (Eq, Ord) +instance Show AclEntry where + show AclEntry{..} = + s_principal <> + " has " <> show aclEntryPermissionType <> + " permission for operations: " <> show aclEntryOperation <> + " from hosts: " <> s_host + where s_principal = show aclEntryPrincipal + s_host = T.unpack aclEntryHost +instance Aeson.ToJSON AclEntry where + toJSON AclEntry{..} = + Aeson.object [ "host" .= aclEntryHost + , "permissionType" .= show aclEntryPermissionType + , "operation" .= show aclEntryOperation + , "principal" .= show aclEntryPrincipal + ] +instance Aeson.FromJSON AclEntry where + parseJSON (Aeson.Object v) = AclEntry + <$> (principalFromText <$> v .: "principal") + <*> v .: "host" + <*> (read <$> v .: "operation") + <*> (read <$> v .: "permissionType") + parseJSON o = fail $ "Invalid AclEntry: " <> show o + +aceToAclEntry :: AccessControlEntry -> AclEntry +aceToAclEntry AccessControlEntry{ aceData = AccessControlEntryData{..} } = + AclEntry{..} + where aclEntryPrincipal = principalFromText aceDataPrincipal + aclEntryHost = aceDataHost + aclEntryOperation = aceDataOperation + aclEntryPermissionType = aceDataPermissionType + +aclEntryToAce :: AclEntry -> AccessControlEntry +aclEntryToAce AclEntry{..} = + AccessControlEntry{ aceData = AccessControlEntryData{..} } + where aceDataPrincipal = T.pack (show aclEntryPrincipal) + aceDataHost = aclEntryHost + aceDataOperation = aclEntryOperation + aceDataPermissionType = aclEntryPermissionType + +type Acls = Set.Set AclEntry +type Version = Int + +defaultVersion :: Version +defaultVersion = 1 + +data AclResourceNode = AclResourceNode + { aclResNodeVersion :: Version + , aclResNodeAcls :: Acls + } deriving (Show) +instance Aeson.ToJSON AclResourceNode where + toJSON AclResourceNode{..} = + Aeson.object [ "version" .= defaultVersion -- FIXME: version + , "acls" .= aclResNodeAcls + ] +instance Aeson.FromJSON AclResourceNode where + parseJSON (Aeson.Object v) = AclResourceNode + <$> v .: "version" + <*> v .: "acls" + parseJSON o = fail $ "Invalid AclResourceNode: " <> show o + +data AclCache = AclCache + { aclCacheAcls :: Map.Map ResourcePattern Acls + , aclCacheResources :: Map.Map (AccessControlEntry,ResourceType,PatternType) + (Set.Set Text) + } + +------------------------------------ diff --git a/hstream-kafka/HStream/Kafka/Common/AclStore.hs b/hstream-kafka/HStream/Kafka/Common/AclStore.hs new file mode 100644 index 000000000..e3b7098b4 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Common/AclStore.hs @@ -0,0 +1,169 @@ +module HStream.Kafka.Common.AclStore where + +import Control.Monad +import qualified Data.Aeson as Aeson +import Data.Functor ((<&>)) +import Data.IORef +import qualified Data.Map.Strict as Map +import qualified Data.Set as Set +import Data.Text (Text) +import qualified Data.Text as T + +import HStream.Kafka.Common.AclEntry +import HStream.Kafka.Common.Resource +import qualified HStream.Utils as Utils + +import qualified ZooKeeper as ZK +import qualified ZooKeeper.Types as ZK +import ZooKeeper.Types (ZHandle) + +-- | Storage interface for ACL data, such as zookeeper. +class AclStore a where + -- | Path of a pattern type in the store. + aclStorePath :: a -> PatternType -> Text + + -- | Path of a resource type in the store. + aclStorePath' :: a -> PatternType -> ResourceType -> Text + + -- | Path of a resource pattern in the store. + aclStorePath'' :: a -> ResourcePattern -> Text + + -- | List name of ACL stores of the given path. + listAclStoreNames :: a -> Text -> IO [Text] + + -- | Read an ACL node from the store by the given resource pattern. + getAclNode :: a -> ResourcePattern -> IO AclResourceNode + + -- | Write an ACL node to the store by the given resource pattern. + setAclNode :: a -> ResourcePattern -> AclResourceNode -> IO () + + -- | Delete an ACL node from the store by the given resource pattern. + deleteAclNode :: a -> ResourcePattern -> IO () + +loadAllAcls :: AclStore a => a -> (ResourcePattern -> Acls -> IO ()) -> IO () +loadAllAcls a aclsConsumer = do + forM_ [Pat_LITERAL, Pat_PREFIXED] $ \pat -> do + -- FIXME: 'read' + resTypes <- listAclStoreNames a (aclStorePath a pat) <&> (map (read . T.unpack)) + forM_ resTypes (go pat) + where + go pat resType = do + let resTypePath = aclStorePath' a pat resType + children <- listAclStoreNames a resTypePath + forM_ children $ \resName -> do + let resource = ResourcePattern resType resName pat + aclNode <- getAclNode a resource + aclsConsumer resource (aclResNodeAcls aclNode) + +------------------------------------------------------------ +-- ZooKeeper specific implementation +------------------------------------------------------------ +zkAclStorePath :: PatternType -> Text +zkAclStorePath pat = + case pat of + Pat_LITERAL -> "/kafka-acl" + Pat_PREFIXED -> "kafka-acl-extended" + pat_ -> error $ "Invalid pattern type: " <> show pat_ -- FIXME: error + +zkAclStorePath' :: PatternType -> ResourceType -> Text +zkAclStorePath' pat resType = + zkAclStorePath pat <> "/" <> T.pack (show resType) + +zkAclStorePath'' :: ResourcePattern -> Text +zkAclStorePath'' ResourcePattern{..} = + zkAclStorePath' resPatPatternType resPatResourceType <> "/" <> resPatResourceName + +instance AclStore ZHandle where + aclStorePath _ = zkAclStorePath + aclStorePath' _ = zkAclStorePath' + aclStorePath'' _ = zkAclStorePath'' + listAclStoreNames zkHandle path = do + ZK.zooGetChildren zkHandle (Utils.textToCBytes path) + <&> ((map Utils.cBytesToText) . ZK.unStrVec . ZK.strsCompletionValues) + getAclNode zkHandle resPat = do + let path = zkAclStorePath'' resPat + ZK.zooExists zkHandle (Utils.textToCBytes path) >>= \case + Nothing -> return $ AclResourceNode defaultVersion Set.empty + Just _ -> do + ZK.DataCompletion{..} <- ZK.zooGet zkHandle (Utils.textToCBytes path) + case dataCompletionValue of + Nothing -> return $ AclResourceNode defaultVersion Set.empty -- FIXME: or throw exception? + Just bytes -> do + let bl = Utils.bytesToLazyByteString bytes + case Aeson.eitherDecode bl of + Left err -> error $ "Invalid ACL node format: " <> err -- FIXME: error + Right node -> return node + setAclNode zkHandle resPat node = do + let path = zkAclStorePath'' resPat + ZK.zooExists zkHandle (Utils.textToCBytes path) >>= \case + -- FIXME: zookeeper acl + Nothing -> void $ + ZK.zooCreate zkHandle (Utils.textToCBytes path) (Just (Utils.lazyByteStringToBytes (Aeson.encode node))) ZK.zooOpenAclUnsafe ZK.ZooPersistent + -- FIXME: check version + Just _ -> void $ + ZK.zooSet zkHandle (Utils.textToCBytes path) (Just (Utils.lazyByteStringToBytes (Aeson.encode node))) Nothing + deleteAclNode zkHandle resPat = do + let path = zkAclStorePath'' resPat + -- FIXME: check version + ZK.zooDelete zkHandle (Utils.textToCBytes path) Nothing + +------------------------------------------------------------ +-- Mock store implementation (for test only) +------------------------------------------------------------ +type MockAclStore = IORef (Map.Map Text -- PatternType + (Map.Map Text -- ResourceType + (Map.Map Text AclResourceNode) + ) + ) + +newMockAclStore :: IO MockAclStore +newMockAclStore = newIORef $ Map.fromList [ ("LITERAL" , Map.empty) + , ("PREFIXED", Map.empty) + ] + +instance AclStore MockAclStore where + aclStorePath _ pat = T.pack (show pat) + aclStorePath' ref pat resType = aclStorePath ref pat <> "/" <> + T.pack (show resType) + aclStorePath'' ref ResourcePattern{..} = + aclStorePath' ref resPatPatternType resPatResourceType <> "/" <> + resPatResourceName + listAclStoreNames ref path = do + store <- readIORef ref + case T.splitOn "/" path of + [pat] -> case Map.lookup pat store of + Nothing -> return [] + Just x -> return $ Map.keys x + [pat,resType] -> case Map.lookup pat store of + Nothing -> return [] + Just x -> case Map.lookup resType x of + Nothing -> return [] + Just y -> return $ Map.keys y + _ -> error "MockAclStore: Invalid path" + getAclNode ref ResourcePattern{..} = do + store <- readIORef ref + case Map.lookup (T.pack (show resPatPatternType)) store of + Nothing -> return $ AclResourceNode defaultVersion Set.empty + Just x -> case Map.lookup (T.pack (show resPatResourceType)) x of + Nothing -> return $ AclResourceNode defaultVersion Set.empty + Just y -> case Map.lookup resPatResourceName y of + Nothing -> return $ AclResourceNode defaultVersion Set.empty + Just z -> return z + setAclNode ref ResourcePattern{..} node = do + atomicModifyIORef' ref + (\x -> let x' = case Map.lookup (T.pack (show resPatPatternType)) x of + Nothing -> Map.singleton (T.pack (show resPatResourceType)) (Map.singleton resPatResourceName node) + Just y -> case Map.lookup (T.pack (show resPatResourceType)) y of + Nothing -> Map.insert (T.pack (show resPatResourceType)) (Map.singleton resPatResourceName node) y + Just z -> Map.insert (T.pack (show resPatResourceType)) (Map.insert resPatResourceName node z) y + in (Map.insert (T.pack (show resPatPatternType)) x' x, ())) + deleteAclNode ref ResourcePattern{..} = do + atomicModifyIORef' ref + (\x -> let x' = case Map.lookup (T.pack (show resPatPatternType)) x of + Nothing -> x + Just y -> case Map.lookup (T.pack (show resPatResourceType)) y of + Nothing -> x + Just z -> let z' = Map.delete resPatResourceName z + y' = Map.insert (T.pack (show resPatResourceType)) z' y + in Map.insert (T.pack (show resPatPatternType)) y' x + in (x', ())) diff --git a/hstream-kafka/HStream/Kafka/Common/Authorizer.hs b/hstream-kafka/HStream/Kafka/Common/Authorizer.hs new file mode 100644 index 000000000..cf27a3c2d --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Common/Authorizer.hs @@ -0,0 +1,434 @@ +module HStream.Kafka.Common.Authorizer where + +import Control.Concurrent +import Control.Exception +import Control.Monad +import Data.Either +import qualified Data.Foldable as Foldable +import Data.Functor ((<&>)) +import Data.IORef +import qualified Data.List as L +import qualified Data.Map.Strict as Map +import Data.Maybe +import qualified Data.Set as Set +import Data.Text (Text) +import qualified Data.Text as T +import qualified Data.Vector as V +import GHC.Utils.Misc ((<&&>), (<||>)) + +import HStream.Kafka.Common.Acl +import HStream.Kafka.Common.AclEntry +import HStream.Kafka.Common.AclStore +import HStream.Kafka.Common.Authorizer.Class +import HStream.Kafka.Common.Resource +import HStream.Kafka.Common.Security +import qualified HStream.Logger as Log +import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Message as K + +------------------------------------------------------------ +-- ACL authorizer with an abstract metadata store +------------------------------------------------------------ +-- | An authorizer, with a in-memory cache, a presistent store and a lock. +-- The lock ensures that only one thread can update the cache and the store +-- at the same time. +data AclAuthorizer a = AclAuthorizer + { authorizerCache :: IORef AclCache + , authorizerAclStore :: a + , authorizerLock :: MVar () + } + +newAclAuthorizer :: IO a -> IO (AclAuthorizer a) +newAclAuthorizer newStore = do + let cache = AclCache Map.empty Map.empty + AclAuthorizer <$> newIORef cache <*> newStore <*> newMVar () + +initAclAuthorizer :: AclStore a => AclAuthorizer a -> IO () +initAclAuthorizer authorizer = + loadAllAcls (authorizerAclStore authorizer) $ \res acls -> + atomicModifyIORef' (authorizerCache authorizer) + (\x -> (updateCache x res acls, ())) + +-- FIXME: Does this function behave the same as Kafka? +-- e.g. List or Set? +-- | Get matching ACLs in cache for the given resource. +-- This is a pure function. +matchingAcls :: AclCache -> ResourceType -> Text -> Acls +matchingAcls AclCache{..} resType resName = + Set.unions [prefixed, wildcard, literal] + where + wildcard :: Acls + wildcard = let resPat = ResourcePattern resType wildcardResourceName Pat_LITERAL + in fromMaybe Set.empty (Map.lookup resPat aclCacheAcls) + literal :: Acls + literal = let resPat = ResourcePattern resType resName Pat_LITERAL + in fromMaybe Set.empty (Map.lookup resPat aclCacheAcls) + prefixed :: Acls + prefixed = Set.unions . Map.elems $ + Map.filterWithKey (\ResourcePattern{..} _ -> + resPatResourceType == resType && + resName `T.isPrefixOf` resPatResourceName + ) aclCacheAcls + +-- | Check if a set of ACLs contains a matching ACL for the given filter. +-- Log it if there is a matching ACL. +-- See kafka.security.authorizer.AclAuthorizer#matchingAclExists. +doesMatchingAclExist :: AclOperation + -> ResourcePattern + -> Principal + -> Text + -> AclPermissionType + -> Acls + -> IO Bool +doesMatchingAclExist op resPat principal host permType acls = do + case Foldable.find (\AclEntry{..} -> + aclEntryPermissionType == permType && + (aclEntryPrincipal == principal || aclEntryPrincipal == wildcardPrincipal) && + (op == aclEntryOperation || aclEntryOperation == AclOp_ALL) && + (aclEntryHost == host || aclEntryHost == wildcardHost) + ) acls of + Nothing -> return False + Just acl -> do + Log.debug $ "operation = " <> Log.buildString' op <> + " on resource = " <> Log.buildString' resPat <> + " from host = " <> Log.buildString' host <> + " is " <> Log.buildString' permType <> + " based on acl = " <> Log.buildString' acl + return True + +-- | Authorize an ACL action based on the request context and the given ACL cache. +authorizeAction :: AuthorizableRequestContext + -> AclAuthorizer a + -> AclAction + -> IO AuthorizationResult +authorizeAction reqCtx authorizer action@AclAction{..} = do + cache <- readIORef (authorizerCache authorizer) + case resPatPatternType aclActionResPat of + Pat_LITERAL -> do + let matchedAcls = matchingAcls cache (resPatResourceType aclActionResPat) + (resPatResourceName aclActionResPat) + -- NOTE: we check: + -- 1. if the session principal is a super user + -- 2. if there is no matching ACL but the system allow empty ACL + -- 3. if there is no DENY ACL and there is at least one ALLOW ACLs + isAuthorized <- (isSuperUser (authReqCtxPrincipal reqCtx)) <||> + (isEmptyAclAndAuthorized matchedAcls <||> + ((not <$> doesDenyAclExist matchedAcls) <&&> + (doesAllowAclExist matchedAcls) + ) + ) + logAuditMessage reqCtx action isAuthorized + if isAuthorized + then return Authz_ALLOWED + else return Authz_DENIED + _ -> error $ "Only literal resource patterns are supported for authorization. Got: " <> show (resPatPatternType aclActionResPat) + where + -- | Check if the given principal is a super user. + isSuperUser :: Principal -> IO Bool + isSuperUser principal = + case Set.member principal superUsers of + True -> do + Log.debug $ "principal = " <> Log.buildString' principal <> + " is a super user, allowing operation without checking acls." + return True + False -> return False + -- | Check if there is no matching ACL but the system allow empty ACL. + isEmptyAclAndAuthorized :: Acls -> IO Bool + isEmptyAclAndAuthorized acls = + if Set.null acls then do + Log.debug $ "No acl found for resource " <> Log.buildString' aclActionResPat <> + ", authorized = " <> Log.buildString' allowIfNoAclFound + return allowIfNoAclFound + else return False + + -- | Check if there is a matching DENY ACL for the given filter. + doesDenyAclExist :: Acls -> IO Bool + doesDenyAclExist acls = + doesMatchingAclExist aclActionOp aclActionResPat (authReqCtxPrincipal reqCtx) (authReqCtxHost reqCtx) AclPerm_DENY acls + + -- | Check if there is a matching ALLOW ACL for the given filter. + -- NOTE: `DESCRIBE` is implied by `READ`, `WRITE`, `DELETE` and `ALTER`. + -- See kafka.security.authorizer.AclAuthorizer#allowAclExists and + -- org.apache.kafka.common.acl.AclOperation. + doesAllowAclExist :: Acls -> IO Bool + doesAllowAclExist acls = do + let canAllowOps = + case aclActionOp of + AclOp_DESCRIBE -> [AclOp_DESCRIBE, AclOp_READ, AclOp_WRITE, AclOp_DELETE, AclOp_ALTER] + AclOp_DESCRIBE_CONFIGS -> [AclOp_DESCRIBE_CONFIGS, AclOp_ALTER_CONFIGS] + _ -> [aclActionOp] + -- short circuit + foldM (\acc op -> + if acc + then return True + else doesMatchingAclExist op aclActionResPat (authReqCtxPrincipal reqCtx) (authReqCtxHost reqCtx) AclPerm_ALLOW acls + ) False canAllowOps + +-- | Authorize a list of ACL actions based on the request context and the given ACL cache. +authorize :: AuthorizableRequestContext + -> AclAuthorizer a + -> [AclAction] + -> IO [AuthorizationResult] +authorize reqCtx authorizer actions = + forM actions (authorizeAction reqCtx authorizer) + +-- | Get ACL bindings (ACL entry with resource) in cache matching the given filter. +getAcls :: AuthorizableRequestContext + -> AclAuthorizer a + -> AclBindingFilter + -> IO [AclBinding] +getAcls _ AclAuthorizer{..} aclFilter = do + cache <- readIORef authorizerCache + return $ Map.foldrWithKey' f [] (aclCacheAcls cache) + where + f resPat acls acc = + let g aclEntry acc' = + let thisAclBinding = AclBinding resPat (aclEntryToAce aclEntry) + in if match thisAclBinding aclFilter + then acc' ++ [thisAclBinding] + else acc' + in Set.foldr' g acc acls + +-- | Create ACLs for the given bindings. +-- It updates both the cache and the store. +createAcls :: AclStore a + => AuthorizableRequestContext + -> AclAuthorizer a + -> [AclBinding] + -> IO K.CreateAclsResponse +createAcls _ authorizer bindings = withMVar (authorizerLock authorizer) $ \_ -> do + let bindingsWithIdx = L.zip [0..] bindings + (lefts_, rights_) <- partitionEithers <$> mapM validateEachBinding bindingsWithIdx + let errorResults = Map.fromList lefts_ + resourcesToUpdate = L.foldl' (\acc (res,x) -> Map.insertWith (++) res [x] acc) Map.empty rights_ + addResults <- + -- FIXME: recheck kafka's behavior, always take the latest result for each binding? + (forM (Map.toList resourcesToUpdate) addAclsForEachRes) <&> (Map.unions . (L.map Map.fromList)) + let results = Map.union errorResults addResults + -- FIXME: throttle time + return $ K.CreateAclsResponse 0 (K.KaArray . Just . V.fromList . Map.elems $ results) + where + validateEachBinding :: (Int, AclBinding) + -> IO (Either (Int, K.AclCreationResult) + (ResourcePattern, (Int, AclBinding)) + ) + validateEachBinding (i, b@AclBinding{..}) = do + case supportExtenedAcl of + False -> do + -- FIXME: ERROR CODE + let result = K.AclCreationResult K.NONE (Just "Adding ACLs on prefixed resource patterns requires version xxx or higher") + return $ Left (i, result) + True -> case validateAclBinding b of + -- FIXME: ERROR CODE + Left s -> return $ Left (i, K.AclCreationResult K.NONE (Just . T.pack $ s)) + Right _ -> return $ Right (aclBindingResourcePattern, (i, b)) + + addAclsForEachRes :: (ResourcePattern, [(Int, AclBinding)]) -> IO [(Int, K.AclCreationResult)] + addAclsForEachRes (res, bs) = do + results_e <- try $ updateResourceAcls authorizer res $ \curAcls -> + let aclsToAdd = Set.fromList ((aceToAclEntry . aclBindingACE . snd) <$> bs) + newAcls = Set.union curAcls aclsToAdd + results = L.map (\(i,_) -> (i, K.AclCreationResult K.NONE mempty)) bs + in (newAcls, results) + case results_e of + -- FIXME: ERROR CODE + Left (_ :: SomeException) -> return $ L.map (\(i,_) -> (i, K.AclCreationResult K.NONE (Just "Failed to update ACLs"))) bs + Right x -> return x + +-- | Delete ACls for the given filters. +-- It updates both the cache and the store. +deleteAcls :: AclStore a + => AuthorizableRequestContext + -> AclAuthorizer a + -> [AclBindingFilter] + -> IO K.DeleteAclsResponse +deleteAcls _ authorizer filters = withMVar (authorizerLock authorizer) $ \_ -> do + AclCache{..} <- readIORef (authorizerCache authorizer) + let filtersWithIdx = L.zip [0..] filters + let possibleResources = Map.keys aclCacheAcls <> + (concatMap ( Set.toList + . resourcePatternFromFilter + . aclBindingFilterResourcePatternFilter + ) (filter matchAtMostOne filters) + ) + resourcesToUpdate = + Map.fromList $ + L.filter (\(_,fs) -> not (L.null fs)) $ + L.map (\res -> + let matchedFilters = L.filter (\(_,x) -> match res (aclBindingFilterResourcePatternFilter x)) filtersWithIdx + in (res, matchedFilters) + ) possibleResources + -- Note: 'const' for always taking the first match (of each ACL) + removedBindingsToFilter <- + (mapM deleteAclsForRes (Map.toList resourcesToUpdate)) <&> (Map.unionsWith const) + let removeResult = groupByValue removedBindingsToFilter + let filterResults = L.map newFilterResult (Map.toList removeResult) + -- FIXME: throttle time + return $ K.DeleteAclsResponse 0 (K.KaArray . Just . V.fromList $ filterResults) + where + -- For each resource, we update its ACLs and return the removed bindings. + -- WARNING & FIXME: Difference from Kafka: + -- Kafka returns exception of each matched ACL (equal to the exception + -- of the whole update operation). However, we just return empty list + -- which ignores the exceptions of each matched ACL. + deleteAclsForRes :: (ResourcePattern, [(Int, AclBindingFilter)]) + -> IO (Map.Map AclBinding Int) + deleteAclsForRes (res,matchedFilters) = do + removedBindings_e <- + try $ updateResourceAcls authorizer res $ \curAcls -> + let bindingsToRemove = + let maps = catMaybes (L.map matchOneAcl (Set.toList curAcls)) + -- Note: 'const' for always taking the first match + in Map.unionsWith const maps + aclsToRemove = Set.filter (isJust . matchOneAcl) curAcls + newAcls = Set.difference curAcls aclsToRemove + in (newAcls, bindingsToRemove) + case removedBindings_e of + Left (_ :: SomeException) -> return Map.empty + Right x -> return x + where + -- For each ACE of certain resource, there can be zero or many filters + -- that match it. This function find the FIRST filter which can match it + -- or return Nothing if no matching. + matchOneAcl :: AclEntry -> Maybe (Map.Map AclBinding Int) + matchOneAcl entry = let ace = aclEntryToAce entry in + case L.find (\(_,f) -> match ace (aclBindingFilterACEFilter f)) matchedFilters of + Nothing -> Nothing + Just (idx,_) -> + let aclBinding = AclBinding res ace + in Just (Map.singleton aclBinding idx) + + newFilterResult :: (Int, [AclBinding]) -> K.DeleteAclsFilterResult + -- FIXME: L.(!!) + -- Note: 'filters' here is the argument of 'deleteAcls' function + newFilterResult (_, bs) = + let matchingAcls_ = + L.map (\AclBinding{..} -> + K.DeleteAclsMatchingAcl K.NONE + mempty + (fromIntegral (fromEnum (resPatResourceType aclBindingResourcePattern))) + (resPatResourceName aclBindingResourcePattern) + (aceDataPrincipal (aceData aclBindingACE)) + (aceDataHost (aceData aclBindingACE)) + (fromIntegral (fromEnum (aceDataOperation (aceData aclBindingACE)))) + (fromIntegral (fromEnum (aceDataPermissionType (aceData aclBindingACE)))) + ) bs + in K.DeleteAclsFilterResult K.NONE mempty (K.KaArray . Just . V.fromList $ matchingAcls_) + +-- FIXME: Make sure only one thread can run this function at the same time. +-- Currently, we make it by ensuring only one thread can invoke +-- this function. +-- | Update the ACLs of certain resource pattern using the given transform +-- function, and return the extra value of the function. +-- It updates both the cache and the store. +-- It will retry for some times. +-- This function may throw exceptions. +updateResourceAcls :: AclStore s => AclAuthorizer s -> ResourcePattern -> (Acls -> (Acls, a)) -> IO a +updateResourceAcls authorizer resPat f = do + cache <- readIORef (authorizerCache authorizer) + curAcls <- case Map.lookup resPat (aclCacheAcls cache) of + -- Note: Load from store if not found in cache because other thread may + -- have updated the cache. + -- (Won't happen now because of one global cache and lock) + Nothing -> + getAclNode (authorizerAclStore authorizer) resPat <&> aclResNodeAcls + Just acls -> return acls + (newAcls, a) <- go curAcls (0 :: Int) + when (curAcls /= newAcls) $ do + -- update cache + atomicModifyIORef' (authorizerCache authorizer) + (\x -> (updateCache x resPat newAcls ,())) + return a + where + go oldAcls retries + | retries <= 5 = do -- FIXME: max retries + let (newAcls, a) = f oldAcls + try $ if Set.null newAcls then do + Log.debug $ "Deleting path for " <> Log.buildString' resPat <> " because it had no ACLs remaining" + -- delete from store + deleteAclNode (authorizerAclStore authorizer) resPat + return (newAcls, a) + else do + -- update store + let newNode = AclResourceNode defaultVersion newAcls -- FIXME: version + setAclNode (authorizerAclStore authorizer) resPat newNode + return (newAcls, a) + >>= \case + -- FIXME: catch all exceptions? + Left (_ :: SomeException) -> do + Log.warning $ "Failed to update ACLs for " <> Log.buildString' resPat <> + ". Reading data and retrying update." + threadDelay (50 * 1000) -- FIXME: retry interval + go oldAcls (retries + 1) + Right acls_ -> return acls_ + | otherwise = do + Log.fatal $ "Failed to update ACLs for " <> Log.buildString' resPat <> + " after retrying a maximum of " <> Log.buildString' retries <> + " times" + error "update ACLs failed" -- FIXME: throw proper exception + +-- | Set ACLs for certain resource pattern to the given ones in cache. +-- This is a pure function. +updateCache :: AclCache -> ResourcePattern -> Acls -> AclCache +updateCache AclCache{..} resPat@ResourcePattern{..} acls = + let curAces = maybe Set.empty (Set.map aclEntryToAce) (Map.lookup resPat aclCacheAcls) + newAces = Set.map aclEntryToAce acls + acesToAdd = Set.difference newAces curAces + acesToRemove = Set.difference curAces newAces + in let cacheResAfterAdd = + Set.foldr' (\ace acc -> + let key = (ace,resPatResourceType,resPatPatternType) + in Map.insertWith Set.union key (Set.singleton resPatResourceName) acc + ) aclCacheResources acesToAdd + cacheResAfterRemove = + Set.foldr' (\ace acc -> + let key = (ace,resPatResourceType,resPatPatternType) + in Map.update (\x -> if x == Set.singleton resPatResourceName then Nothing else Just (Set.delete resPatResourceName x)) key acc + ) cacheResAfterAdd acesToRemove + in let newAcls = if Set.null acls + then Map.delete resPat aclCacheAcls + else Map.insert resPat acls aclCacheAcls + in AclCache newAcls cacheResAfterRemove + + +------------------------------------------------------------ +-- Helper functions +------------------------------------------------------------ +groupByValue :: Ord v => Map.Map k v -> Map.Map v [k] +groupByValue m = Map.foldrWithKey' f Map.empty m + where + f k v acc = Map.insertWith (++) v [k] acc + +supportExtenedAcl :: Bool +supportExtenedAcl = True + +superUsers :: Set.Set Principal +superUsers = Set.empty + +allowIfNoAclFound :: Bool +allowIfNoAclFound = False + +-- | Log an authorization action result. +-- Note: `allowed` actions use level `debug` if configured while +-- `denied` use `info`. +-- See kafka.security.Authorizer.AclAuthorizer$logAuditMessage. +logAuditMessage :: AuthorizableRequestContext -> AclAction -> Bool -> IO () +logAuditMessage AuthorizableRequestContext{..} AclAction{..} isAuthorized = do + let msg = "Principal = " <> show authReqCtxPrincipal <> + " is " <> (if isAuthorized then "Allowed" else "Denied") <> + " Operation = " <> show aclActionOp <> + " from host = " <> show authReqCtxHost <> + " on resource = " <> show aclActionResPat +{- +-- TODO: apiKey and resource ref count + " for request = " <> apiKey <> + " with resouceRefCount = " <> show refCount +-} + case isAuthorized of + True -> case aclActionLogIfAllowed of + True -> Log.debug . Log.buildString $ msg + False -> Log.trace . Log.buildString $ msg + False -> case aclActionLogIfDenied of + True -> Log.info . Log.buildString $ msg + False -> Log.trace . Log.buildString $ msg diff --git a/hstream-kafka/HStream/Kafka/Common/Authorizer/Class.hs b/hstream-kafka/HStream/Kafka/Common/Authorizer/Class.hs new file mode 100644 index 000000000..d9de3b3a3 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Common/Authorizer/Class.hs @@ -0,0 +1,67 @@ +{-# LANGUAGE FunctionalDependencies #-} + +module HStream.Kafka.Common.Authorizer.Class where + +import Data.Text (Text) + +import HStream.Kafka.Common.Acl +import HStream.Kafka.Common.Resource +import HStream.Kafka.Common.Security +import qualified Kafka.Protocol.Message as K + +data AclAction = AclAction + { aclActionResPat :: ResourcePattern + , aclActionOp :: AclOperation + , aclActionLogIfAllowed :: Bool + , aclActionLogIfDenied :: Bool + -- , more... + } +instance Show AclAction where + show AclAction{..} = + "Action(resourcePattern='" <> show aclActionResPat <> + "', operation='" <> show aclActionOp <> + "', logIfAllowed='" <> show aclActionLogIfAllowed <> + "', logIfDenied='" <> show aclActionLogIfDenied <> + "')" + +data AuthorizationResult + = Authz_ALLOWED + | Authz_DENIED + deriving (Eq, Enum, Show) + +-- TODO +data AuthorizableRequestContext = AuthorizableRequestContext + { authReqCtxHost :: Text + , authReqCtxPrincipal :: Principal + -- , ... + } + +class Authorizer s where + -- | Create new ACL bindings. + createAcls :: AuthorizableRequestContext + -> s + -> [AclBinding] + -> IO K.CreateAclsResponse + + -- | Remove matched ACL bindings. + deleteAcls :: AuthorizableRequestContext + -> s + -> [AclBindingFilter] + -> IO K.DeleteAclsResponse + + -- | Get matched ACL bindings + getAcls :: AuthorizableRequestContext + -> s + -> AclBindingFilter + -> IO [AclBinding] + + -- | Get the current number of ACLs. Return -1 if not implemented. + aclCount :: AuthorizableRequestContext + -> s + -> Int + + -- | Authorize the specified actions. + authorize :: AuthorizableRequestContext + -> s + -> [AclAction] + -> IO [AuthorizationResult] diff --git a/hstream-kafka/HStream/Kafka/Common/Resource.hs b/hstream-kafka/HStream/Kafka/Common/Resource.hs new file mode 100644 index 000000000..d64d5da13 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Common/Resource.hs @@ -0,0 +1,186 @@ +{-# LANGUAGE FunctionalDependencies #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE TypeFamilies #-} + +module HStream.Kafka.Common.Resource where + +import Data.Text (Text) +import qualified Data.Text as T +-- import Data.Kind (Type) +import qualified Data.Set as Set +-- import Control.Monad +import Data.Maybe + +-- +class Matchable a b | b -> a where + match :: a -> b -> Bool + matchAtMostOne :: b -> Bool + indefiniteFieldInFilter :: b -> Maybe Text + +-- | A type of resource that can be applied to by an ACL, which is an 'Int8' +-- start from 0. +-- See org.apache.kafka.common.resource.ResourceType. +data ResourceType + = Res_UNKNOWN + | Res_ANY + | Res_TOPIC + | Res_GROUP + | Res_CLUSTER + | Res_TRANSACTIONAL_ID + | Res_DELEGATION_TOKEN + | Res_USER + deriving (Eq, Enum, Ord) +instance Show ResourceType where + show Res_UNKNOWN = "UNKNOWN" + show Res_ANY = "ANY" + show Res_TOPIC = "TOPIC" + show Res_GROUP = "GROUP" + show Res_CLUSTER = "CLUSTER" + show Res_TRANSACTIONAL_ID = "TRANSACTIONAL_ID" + show Res_DELEGATION_TOKEN = "DELEGATION_TOKEN" + show Res_USER = "USER" +instance Read ResourceType where + readsPrec _ s = case s of + "UNKNOWN" -> [(Res_UNKNOWN, "")] + "ANY" -> [(Res_ANY, "")] + "TOPIC" -> [(Res_TOPIC, "")] + "GROUP" -> [(Res_GROUP, "")] + "CLUSTER" -> [(Res_CLUSTER, "")] + "TRANSACTIONAL_ID" -> [(Res_TRANSACTIONAL_ID, "")] + "DELEGATION_TOKEN" -> [(Res_DELEGATION_TOKEN, "")] + "USER" -> [(Res_USER, "")] + _ -> [] + +-- | A cluster resource which is a 2-tuple (type, name). +-- See org.apache.kafka.common.resource.Resource. +data Resource = Resource + { resResourceType :: ResourceType + , resResourceName :: Text + } +instance Show Resource where + show Resource{..} = + "(resourceType=" <> show resResourceType <> + ", name=" <> s_name <> ")" + where s_name = if T.null resResourceName then "" else T.unpack resResourceName + +-- [0..4] +-- | A resource pattern type, which is an 'Int8' start from 0. +-- WARNING: Be sure to understand the meaning of 'Pat_MATCH'. +-- A '(TYPE, "name", MATCH)' filter matches the following patterns: +-- 1. All '(TYPE, "name", LITERAL)' +-- 2. All '(TYPE, "*", LITERAL)' +-- 3. All '(TYPE, "name", PREFIXED)' +-- See org.apache.kafka.common.resource.PatternType. +data PatternType + = Pat_UNKNOWN + | Pat_ANY + | Pat_MATCH + | Pat_LITERAL + | Pat_PREFIXED + deriving (Eq, Enum, Ord) +instance Show PatternType where + show Pat_UNKNOWN = "UNKNOWN" + show Pat_ANY = "ANY" + show Pat_MATCH = "MATCH" + show Pat_LITERAL = "LITERAL" + show Pat_PREFIXED = "PREFIXED" + +isPatternTypeSpecific :: PatternType -> Bool +isPatternTypeSpecific Pat_LITERAL = True +isPatternTypeSpecific Pat_PREFIXED = True +isPatternTypeSpecific _ = False + +-- | A pattern used by ACLs to match resources. +-- See org.apache.kafka.common.resource.ResourcePattern. +data ResourcePattern = ResourcePattern + { resPatResourceType :: ResourceType -- | Can not be 'Res_ANY' + , resPatResourceName :: Text -- | Can not be null but can be 'WILDCARD' -- FIXME: which? + , resPatPatternType :: PatternType -- | Can not be 'Pat_ANY' or 'Pat_MATCH' + } deriving (Eq) +instance Show ResourcePattern where + show ResourcePattern{..} = + "ResourcePattern(resourceType=" <> show resPatResourceType <> + ", name=" <> T.unpack resPatResourceName <> + ", patternType=" <> show resPatPatternType <> ")" + +wildcardResourceName :: Text +wildcardResourceName = "*" + +-- | Orders by resource type, then pattern type, and finally REVERSE name +-- See kafka.security.authorizer. +instance Ord ResourcePattern where + compare p1 p2 = compare (resPatResourceType p1) (resPatResourceType p2) + <> compare (resPatPatternType p1) (resPatPatternType p2) + <> compare (resPatResourceName p2) (resPatResourceName p1) + +-- | A filter that can match 'ResourcePattern'. +-- See org.apache.kafka.common.resource.ResourcePatternFilter. +data ResourcePatternFilter = ResourcePatternFilter + { resPatFilterResourceType :: ResourceType + -- | The resource type to match. If 'Res_ANY', ignore the resource type. + -- Otherwise, only match patterns with the same resource type. + , resPatFilterResourceName :: Text + -- | The resource name to match. If null, ignore the resource name. + -- If 'WILDCARD', only match wildcard patterns. -- FIXME: which WILDCARD? + , resPatFilterPatternType :: PatternType + -- | The resource pattern type to match. + -- If 'Pat_ANY', match ignore the pattern type. + -- If 'Pat_MATCH', see 'Pat_MATCH'. + -- Otherwise, match patterns with the same pattern type. + } +instance Show ResourcePatternFilter where + show ResourcePatternFilter{..} = + "ResourcePattern(resourceType=" <> show resPatFilterResourceType <> + ", name=" <> s_name <> + ", patternType=" <> show resPatFilterPatternType <> ")" + where s_name = if T.null resPatFilterResourceName then "" else T.unpack resPatFilterResourceName + +instance Matchable ResourcePattern ResourcePatternFilter where + -- See org.apache.kafka.common.resource.ResourcePatternFilter#matches + match ResourcePattern{..} ResourcePatternFilter{..} + | resPatFilterResourceType /= Res_ANY && resPatResourceType /= resPatFilterResourceType = False + | resPatFilterPatternType /= Pat_ANY && + resPatFilterPatternType /= Pat_MATCH && + resPatFilterPatternType /= resPatPatternType = False + | T.null resPatFilterResourceName = True + | resPatFilterPatternType == Pat_ANY || + resPatFilterPatternType == resPatPatternType = resPatFilterResourceName == resPatResourceName + | otherwise = case resPatPatternType of + Pat_LITERAL -> resPatFilterResourceName == resPatResourceName || resPatResourceName == wildcardResourceName + Pat_PREFIXED -> T.isPrefixOf resPatFilterResourceName resPatResourceName + _ -> error $ "Unsupported PatternType: " <> show resPatPatternType -- FIXME: exception + matchAtMostOne = isNothing . indefiniteFieldInFilter + indefiniteFieldInFilter ResourcePatternFilter{..} + | resPatFilterResourceType == Res_ANY = Just "Resource type is ANY." + | resPatFilterResourceType == Res_UNKNOWN = Just "Resource type is UNKNOWN." + | T.null resPatFilterResourceName = Just "Resource name is NULL." + | resPatFilterPatternType == Pat_MATCH = Just "Resource pattern type is MATCH." + | resPatFilterPatternType == Pat_UNKNOWN = Just "Resource pattern type is UNKNOWN." + | otherwise = Nothing + +resourcePatternFromFilter :: ResourcePatternFilter -> Set.Set ResourcePattern +resourcePatternFromFilter ResourcePatternFilter{..} = + case resPatFilterPatternType of + Pat_LITERAL -> Set.singleton $ ResourcePattern + { resPatResourceType = resPatFilterResourceType + , resPatResourceName = resPatFilterResourceName + , resPatPatternType = resPatFilterPatternType + } + Pat_PREFIXED -> Set.singleton $ ResourcePattern + { resPatResourceType = resPatFilterResourceType + , resPatResourceName = resPatFilterResourceName + , resPatPatternType = resPatFilterPatternType + } + Pat_ANY -> Set.fromList [ ResourcePattern + { resPatResourceType = resPatFilterResourceType + , resPatResourceName = resPatFilterResourceName + , resPatPatternType = Pat_LITERAL + } + , ResourcePattern + { resPatResourceType = resPatFilterResourceType + , resPatResourceName = resPatFilterResourceName + , resPatPatternType = Pat_PREFIXED + } + ] + _ -> error $ "Cannot determine matching resources for patternType " <> show resPatFilterPatternType -- FIXME: exception diff --git a/hstream-kafka/HStream/Kafka/Common/Security.hs b/hstream-kafka/HStream/Kafka/Common/Security.hs new file mode 100644 index 000000000..dca22dab7 --- /dev/null +++ b/hstream-kafka/HStream/Kafka/Common/Security.hs @@ -0,0 +1,34 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE RecordWildCards #-} + +module HStream.Kafka.Common.Security where + +import Data.Text (Text) +import qualified Data.Text as T + +-- | A kafka principal. It is a 2-tuple of non-null type and non-null name. +-- For default authorizer, type is "User". +data Principal = Principal + { principalType :: Text + , principalName :: Text + } deriving (Eq, Ord) +instance Show Principal where + show Principal{..} = + T.unpack principalType <> ":" <> T.unpack principalName + +wildcardPrincipal :: Principal +wildcardPrincipal = Principal "User" "*" + +-- FIXME: error +-- | Build a 'Principal' from 'Text' with format "type:name". +-- Neither type nor name can be empty. +-- May throw exceptions if the format is invalid. +principalFromText :: Text -> Principal +principalFromText t = case T.splitOn ":" t of + [t1, t2] -> if T.null t1 then error "principalFromText: empty type" else + if T.null t2 then error "principalFromText: empty name" else + Principal t1 t2 + _ -> error "principalFromText: invalid principal" + +wildcardHost :: Text +wildcardHost = "*" diff --git a/hstream-kafka/hstream-kafka.cabal b/hstream-kafka/hstream-kafka.cabal index dfb068a02..85b0e8e5b 100644 --- a/hstream-kafka/hstream-kafka.cabal +++ b/hstream-kafka/hstream-kafka.cabal @@ -123,11 +123,18 @@ library exposed-modules: HStream.Kafka.Client.Api HStream.Kafka.Client.Cli + HStream.Kafka.Common.Acl + HStream.Kafka.Common.AclEntry + HStream.Kafka.Common.AclStore + HStream.Kafka.Common.Authorizer + HStream.Kafka.Common.Authorizer.Class HStream.Kafka.Common.KafkaException HStream.Kafka.Common.Metrics HStream.Kafka.Common.OffsetManager HStream.Kafka.Common.Read HStream.Kafka.Common.RecordFormat + HStream.Kafka.Common.Resource + HStream.Kafka.Common.Security HStream.Kafka.Common.Utils HStream.Kafka.Group.Group HStream.Kafka.Group.GroupCoordinator @@ -193,6 +200,7 @@ library , colourista ^>=0.1.0.1 , containers , directory + , extra , foreign , ghc , gsasl-hs @@ -236,3 +244,25 @@ library RecordWildCards UnliftedFFITypes UnliftedNewtypes + +test-suite kafka-common-test + import: shared-properties + type: exitcode-stdio-1.0 + main-is: Spec.hs + hs-source-dirs: tests/common + build-depends: + , base >=4.11 && <5 + , bytestring + , hspec + , hspec-expectations + , hstream-kafka + , text + + default-extensions: + LambdaCase + OverloadedStrings + RecordWildCards + + default-language: GHC2021 + build-tool-depends: hspec-discover:hspec-discover >=2 && <3 + ghc-options: -threaded -rtsopts -with-rtsopts=-N diff --git a/hstream-kafka/message/CreateAclsRequest.json b/hstream-kafka/message/CreateAclsRequest.json new file mode 100644 index 000000000..89f5cf7f4 --- /dev/null +++ b/hstream-kafka/message/CreateAclsRequest.json @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 30, + "type": "request", + "listeners": ["zkBroker", "broker", "controller"], + "name": "CreateAclsRequest", + // Version 1 adds resource pattern type. + // Version 2 enables flexible versions. + // Version 3 adds user resource type. + "validVersions": "0-3", + "flexibleVersions": "2+", + "fields": [ + { "name": "Creations", "type": "[]AclCreation", "versions": "0+", + "about": "The ACLs that we want to create.", "fields": [ + { "name": "ResourceType", "type": "int8", "versions": "0+", + "about": "The type of the resource." }, + { "name": "ResourceName", "type": "string", "versions": "0+", + "about": "The resource name for the ACL." }, + { "name": "ResourcePatternType", "type": "int8", "versions": "1+", "default": "3", + "about": "The pattern type for the ACL." }, + { "name": "Principal", "type": "string", "versions": "0+", + "about": "The principal for the ACL." }, + { "name": "Host", "type": "string", "versions": "0+", + "about": "The host for the ACL." }, + { "name": "Operation", "type": "int8", "versions": "0+", + "about": "The operation type for the ACL (read, write, etc.)." }, + { "name": "PermissionType", "type": "int8", "versions": "0+", + "about": "The permission type for the ACL (allow, deny, etc.)." } + ]} + ] +} diff --git a/hstream-kafka/message/CreateAclsResponse.json b/hstream-kafka/message/CreateAclsResponse.json new file mode 100644 index 000000000..da1632c03 --- /dev/null +++ b/hstream-kafka/message/CreateAclsResponse.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 30, + "type": "response", + "name": "CreateAclsResponse", + // Starting in version 1, on quota violation, brokers send out responses before throttling. + // Version 2 enables flexible versions. + // Version 3 adds user resource type. + "validVersions": "0-3", + "flexibleVersions": "2+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "Results", "type": "[]AclCreationResult", "versions": "0+", + "about": "The results for each ACL creation.", "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The result error, or zero if there was no error." }, + { "name": "ErrorMessage", "type": "string", "nullableVersions": "0+", "versions": "0+", + "about": "The result message, or null if there was no error." } + ]} + ] +} diff --git a/hstream-kafka/message/DeleteAclsRequest.json b/hstream-kafka/message/DeleteAclsRequest.json new file mode 100644 index 000000000..ea7106d4c --- /dev/null +++ b/hstream-kafka/message/DeleteAclsRequest.json @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 31, + "type": "request", + "listeners": ["zkBroker", "broker", "controller"], + "name": "DeleteAclsRequest", + // Version 1 adds the pattern type. + // Version 2 enables flexible versions. + // Version 3 adds the user resource type. + "validVersions": "0-3", + "flexibleVersions": "2+", + "fields": [ + { "name": "Filters", "type": "[]DeleteAclsFilter", "versions": "0+", + "about": "The filters to use when deleting ACLs.", "fields": [ + { "name": "ResourceTypeFilter", "type": "int8", "versions": "0+", + "about": "The resource type." }, + { "name": "ResourceNameFilter", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The resource name." }, + { "name": "PatternTypeFilter", "type": "int8", "versions": "1+", "default": "3", "ignorable": false, + "about": "The pattern type." }, + { "name": "PrincipalFilter", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The principal filter, or null to accept all principals." }, + { "name": "HostFilter", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The host filter, or null to accept all hosts." }, + { "name": "Operation", "type": "int8", "versions": "0+", + "about": "The ACL operation." }, + { "name": "PermissionType", "type": "int8", "versions": "0+", + "about": "The permission type." } + ]} + ] +} diff --git a/hstream-kafka/message/DeleteAclsResponse.json b/hstream-kafka/message/DeleteAclsResponse.json new file mode 100644 index 000000000..e00969df7 --- /dev/null +++ b/hstream-kafka/message/DeleteAclsResponse.json @@ -0,0 +1,58 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 31, + "type": "response", + "name": "DeleteAclsResponse", + // Version 1 adds the resource pattern type. + // Starting in version 1, on quota violation, brokers send out responses before throttling. + // Version 2 enables flexible versions. + // Version 3 adds the user resource type. + "validVersions": "0-3", + "flexibleVersions": "2+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "FilterResults", "type": "[]DeleteAclsFilterResult", "versions": "0+", + "about": "The results for each filter.", "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if the filter succeeded." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The error message, or null if the filter succeeded." }, + { "name": "MatchingAcls", "type": "[]DeleteAclsMatchingAcl", "versions": "0+", + "about": "The ACLs which matched this filter.", "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The deletion error code, or 0 if the deletion succeeded." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The deletion error message, or null if the deletion succeeded." }, + { "name": "ResourceType", "type": "int8", "versions": "0+", + "about": "The ACL resource type." }, + { "name": "ResourceName", "type": "string", "versions": "0+", + "about": "The ACL resource name." }, + { "name": "PatternType", "type": "int8", "versions": "1+", "default": "3", "ignorable": false, + "about": "The ACL resource pattern type." }, + { "name": "Principal", "type": "string", "versions": "0+", + "about": "The ACL principal." }, + { "name": "Host", "type": "string", "versions": "0+", + "about": "The ACL host." }, + { "name": "Operation", "type": "int8", "versions": "0+", + "about": "The ACL operation." }, + { "name": "PermissionType", "type": "int8", "versions": "0+", + "about": "The ACL permission type." } + ]} + ]} + ] +} diff --git a/hstream-kafka/message/DescribeAclsRequest.json b/hstream-kafka/message/DescribeAclsRequest.json new file mode 100644 index 000000000..4f0e851c7 --- /dev/null +++ b/hstream-kafka/message/DescribeAclsRequest.json @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 29, + "type": "request", + "listeners": ["zkBroker", "broker", "controller"], + "name": "DescribeAclsRequest", + // Version 1 adds resource pattern type. + // Version 2 enables flexible versions. + // Version 3 adds user resource type. + "validVersions": "0-3", + "flexibleVersions": "2+", + "fields": [ + { "name": "ResourceTypeFilter", "type": "int8", "versions": "0+", + "about": "The resource type." }, + { "name": "ResourceNameFilter", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The resource name, or null to match any resource name." }, + { "name": "PatternTypeFilter", "type": "int8", "versions": "1+", "default": "3", "ignorable": false, + "about": "The resource pattern to match." }, + { "name": "PrincipalFilter", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The principal to match, or null to match any principal." }, + { "name": "HostFilter", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The host to match, or null to match any host." }, + { "name": "Operation", "type": "int8", "versions": "0+", + "about": "The operation to match." }, + { "name": "PermissionType", "type": "int8", "versions": "0+", + "about": "The permission type to match." } + ] +} diff --git a/hstream-kafka/message/DescribeAclsResponse.json b/hstream-kafka/message/DescribeAclsResponse.json new file mode 100644 index 000000000..19de10944 --- /dev/null +++ b/hstream-kafka/message/DescribeAclsResponse.json @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 29, + "type": "response", + "name": "DescribeAclsResponse", + // Version 1 adds PatternType. + // Starting in version 1, on quota violation, brokers send out responses before throttling. + // Version 2 enables flexible versions. + // Version 3 adds user resource type. + "validVersions": "0-3", + "flexibleVersions": "2+", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no error." }, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The error message, or null if there was no error." }, + { "name": "Resources", "type": "[]DescribeAclsResource", "versions": "0+", + "about": "Each Resource that is referenced in an ACL.", "fields": [ + { "name": "ResourceType", "type": "int8", "versions": "0+", + "about": "The resource type." }, + { "name": "ResourceName", "type": "string", "versions": "0+", + "about": "The resource name." }, + { "name": "PatternType", "type": "int8", "versions": "1+", "default": "3", "ignorable": false, + "about": "The resource pattern type." }, + { "name": "Acls", "type": "[]AclDescription", "versions": "0+", + "about": "The ACLs.", "fields": [ + { "name": "Principal", "type": "string", "versions": "0+", + "about": "The ACL principal." }, + { "name": "Host", "type": "string", "versions": "0+", + "about": "The ACL host." }, + { "name": "Operation", "type": "int8", "versions": "0+", + "about": "The ACL operation." }, + { "name": "PermissionType", "type": "int8", "versions": "0+", + "about": "The ACL permission type." } + ]} + ]} + ] +} diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs index bba6ca8b2..d4495a9c6 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Struct.hs @@ -69,6 +69,30 @@ data FinalizedFeatureKeyV3 = FinalizedFeatureKeyV3 } deriving (Show, Eq, Generic) instance Serializable FinalizedFeatureKeyV3 +data AclCreationV0 = AclCreationV0 + { resourceType :: {-# UNPACK #-} !Int8 + -- ^ The type of the resource. + , resourceName :: !Text + -- ^ The resource name for the ACL. + , principal :: !Text + -- ^ The principal for the ACL. + , host :: !Text + -- ^ The host for the ACL. + , operation :: {-# UNPACK #-} !Int8 + -- ^ The operation type for the ACL (read, write, etc.). + , permissionType :: {-# UNPACK #-} !Int8 + -- ^ The permission type for the ACL (allow, deny, etc.). + } deriving (Show, Eq, Generic) +instance Serializable AclCreationV0 + +data AclCreationResultV0 = AclCreationResultV0 + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The result error, or zero if there was no error. + , errorMessage :: !NullableString + -- ^ The result message, or null if there was no error. + } deriving (Show, Eq, Generic) +instance Serializable AclCreationResultV0 + data CreatableReplicaAssignmentV0 = CreatableReplicaAssignmentV0 { partitionIndex :: {-# UNPACK #-} !Int32 -- ^ The partition index. @@ -136,6 +160,52 @@ instance Serializable CreatableTopicResultV1 type CreatableTopicResultV2 = CreatableTopicResultV1 +data DeleteAclsFilterV0 = DeleteAclsFilterV0 + { resourceTypeFilter :: {-# UNPACK #-} !Int8 + -- ^ The resource type. + , resourceNameFilter :: !NullableString + -- ^ The resource name. + , principalFilter :: !NullableString + -- ^ The principal filter, or null to accept all principals. + , hostFilter :: !NullableString + -- ^ The host filter, or null to accept all hosts. + , operation :: {-# UNPACK #-} !Int8 + -- ^ The ACL operation. + , permissionType :: {-# UNPACK #-} !Int8 + -- ^ The permission type. + } deriving (Show, Eq, Generic) +instance Serializable DeleteAclsFilterV0 + +data DeleteAclsMatchingAclV0 = DeleteAclsMatchingAclV0 + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The deletion error code, or 0 if the deletion succeeded. + , errorMessage :: !NullableString + -- ^ The deletion error message, or null if the deletion succeeded. + , resourceType :: {-# UNPACK #-} !Int8 + -- ^ The ACL resource type. + , resourceName :: !Text + -- ^ The ACL resource name. + , principal :: !Text + -- ^ The ACL principal. + , host :: !Text + -- ^ The ACL host. + , operation :: {-# UNPACK #-} !Int8 + -- ^ The ACL operation. + , permissionType :: {-# UNPACK #-} !Int8 + -- ^ The ACL permission type. + } deriving (Show, Eq, Generic) +instance Serializable DeleteAclsMatchingAclV0 + +data DeleteAclsFilterResultV0 = DeleteAclsFilterResultV0 + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if the filter succeeded. + , errorMessage :: !NullableString + -- ^ The error message, or null if the filter succeeded. + , matchingAcls :: !(KaArray DeleteAclsMatchingAclV0) + -- ^ The ACLs which matched this filter. + } deriving (Show, Eq, Generic) +instance Serializable DeleteAclsFilterResultV0 + data DeletableTopicResultV0 = DeletableTopicResultV0 { name :: !Text -- ^ The topic name @@ -146,6 +216,28 @@ instance Serializable DeletableTopicResultV0 type DeletableTopicResultV1 = DeletableTopicResultV0 +data AclDescriptionV0 = AclDescriptionV0 + { principal :: !Text + -- ^ The ACL principal. + , host :: !Text + -- ^ The ACL host. + , operation :: {-# UNPACK #-} !Int8 + -- ^ The ACL operation. + , permissionType :: {-# UNPACK #-} !Int8 + -- ^ The ACL permission type. + } deriving (Show, Eq, Generic) +instance Serializable AclDescriptionV0 + +data DescribeAclsResourceV0 = DescribeAclsResourceV0 + { resourceType :: {-# UNPACK #-} !Int8 + -- ^ The resource type. + , resourceName :: !Text + -- ^ The resource name. + , acls :: !(KaArray AclDescriptionV0) + -- ^ The ACLs. + } deriving (Show, Eq, Generic) +instance Serializable DescribeAclsResourceV0 + data DescribeConfigsResourceV0 = DescribeConfigsResourceV0 { resourceType :: {-# UNPACK #-} !Int8 -- ^ The resource type. @@ -917,6 +1009,20 @@ data ApiVersionsResponseV3 = ApiVersionsResponseV3 } deriving (Show, Eq, Generic) instance Serializable ApiVersionsResponseV3 +newtype CreateAclsRequestV0 = CreateAclsRequestV0 + { creations :: (KaArray AclCreationV0) + } deriving (Show, Eq, Generic) +instance Serializable CreateAclsRequestV0 + +data CreateAclsResponseV0 = CreateAclsResponseV0 + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , results :: !(KaArray AclCreationResultV0) + -- ^ The results for each ACL creation. + } deriving (Show, Eq, Generic) +instance Serializable CreateAclsResponseV0 + data CreateTopicsRequestV0 = CreateTopicsRequestV0 { topics :: !(KaArray CreatableTopicV0) -- ^ The topics to create. @@ -957,6 +1063,20 @@ data CreateTopicsResponseV2 = CreateTopicsResponseV2 } deriving (Show, Eq, Generic) instance Serializable CreateTopicsResponseV2 +newtype DeleteAclsRequestV0 = DeleteAclsRequestV0 + { filters :: (KaArray DeleteAclsFilterV0) + } deriving (Show, Eq, Generic) +instance Serializable DeleteAclsRequestV0 + +data DeleteAclsResponseV0 = DeleteAclsResponseV0 + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , filterResults :: !(KaArray DeleteAclsFilterResultV0) + -- ^ The results for each filter. + } deriving (Show, Eq, Generic) +instance Serializable DeleteAclsResponseV0 + data DeleteTopicsRequestV0 = DeleteTopicsRequestV0 { topicNames :: !(KaArray Text) -- ^ The names of the topics to delete @@ -982,6 +1102,35 @@ data DeleteTopicsResponseV1 = DeleteTopicsResponseV1 } deriving (Show, Eq, Generic) instance Serializable DeleteTopicsResponseV1 +data DescribeAclsRequestV0 = DescribeAclsRequestV0 + { resourceTypeFilter :: {-# UNPACK #-} !Int8 + -- ^ The resource type. + , resourceNameFilter :: !NullableString + -- ^ The resource name, or null to match any resource name. + , principalFilter :: !NullableString + -- ^ The principal to match, or null to match any principal. + , hostFilter :: !NullableString + -- ^ The host to match, or null to match any host. + , operation :: {-# UNPACK #-} !Int8 + -- ^ The operation to match. + , permissionType :: {-# UNPACK #-} !Int8 + -- ^ The permission type to match. + } deriving (Show, Eq, Generic) +instance Serializable DescribeAclsRequestV0 + +data DescribeAclsResponseV0 = DescribeAclsResponseV0 + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , errorMessage :: !NullableString + -- ^ The error message, or null if there was no error. + , resources :: !(KaArray DescribeAclsResourceV0) + -- ^ Each Resource that is referenced in an ACL. + } deriving (Show, Eq, Generic) +instance Serializable DescribeAclsResponseV0 + newtype DescribeConfigsRequestV0 = DescribeConfigsRequestV0 { resources :: (KaArray DescribeConfigsResourceV0) } deriving (Show, Eq, Generic) @@ -1761,6 +1910,9 @@ instance Service HStreamKafkaV0 where , "createTopics" , "deleteTopics" , "initProducerId" + , "describeAcls" + , "createAcls" + , "deleteAcls" , "describeConfigs" , "saslAuthenticate" ] @@ -1898,6 +2050,27 @@ instance HasMethodImpl HStreamKafkaV0 "initProducerId" where type MethodInput HStreamKafkaV0 "initProducerId" = InitProducerIdRequestV0 type MethodOutput HStreamKafkaV0 "initProducerId" = InitProducerIdResponseV0 +instance HasMethodImpl HStreamKafkaV0 "describeAcls" where + type MethodName HStreamKafkaV0 "describeAcls" = "describeAcls" + type MethodKey HStreamKafkaV0 "describeAcls" = 29 + type MethodVersion HStreamKafkaV0 "describeAcls" = 0 + type MethodInput HStreamKafkaV0 "describeAcls" = DescribeAclsRequestV0 + type MethodOutput HStreamKafkaV0 "describeAcls" = DescribeAclsResponseV0 + +instance HasMethodImpl HStreamKafkaV0 "createAcls" where + type MethodName HStreamKafkaV0 "createAcls" = "createAcls" + type MethodKey HStreamKafkaV0 "createAcls" = 30 + type MethodVersion HStreamKafkaV0 "createAcls" = 0 + type MethodInput HStreamKafkaV0 "createAcls" = CreateAclsRequestV0 + type MethodOutput HStreamKafkaV0 "createAcls" = CreateAclsResponseV0 + +instance HasMethodImpl HStreamKafkaV0 "deleteAcls" where + type MethodName HStreamKafkaV0 "deleteAcls" = "deleteAcls" + type MethodKey HStreamKafkaV0 "deleteAcls" = 31 + type MethodVersion HStreamKafkaV0 "deleteAcls" = 0 + type MethodInput HStreamKafkaV0 "deleteAcls" = DeleteAclsRequestV0 + type MethodOutput HStreamKafkaV0 "deleteAcls" = DeleteAclsResponseV0 + instance HasMethodImpl HStreamKafkaV0 "describeConfigs" where type MethodName HStreamKafkaV0 "describeConfigs" = "describeConfigs" type MethodKey HStreamKafkaV0 "describeConfigs" = 32 @@ -2291,6 +2464,9 @@ instance Show ApiKey where show (ApiKey (19)) = "CreateTopics(19)" show (ApiKey (20)) = "DeleteTopics(20)" show (ApiKey (22)) = "InitProducerId(22)" + show (ApiKey (29)) = "DescribeAcls(29)" + show (ApiKey (30)) = "CreateAcls(30)" + show (ApiKey (31)) = "DeleteAcls(31)" show (ApiKey (32)) = "DescribeConfigs(32)" show (ApiKey (36)) = "SaslAuthenticate(36)" show (ApiKey n) = "Unknown " <> show n @@ -2315,6 +2491,9 @@ supportedApiVersions = , ApiVersionV0 (ApiKey 19) 0 2 , ApiVersionV0 (ApiKey 20) 0 1 , ApiVersionV0 (ApiKey 22) 0 0 + , ApiVersionV0 (ApiKey 29) 0 0 + , ApiVersionV0 (ApiKey 30) 0 0 + , ApiVersionV0 (ApiKey 31) 0 0 , ApiVersionV0 (ApiKey 32) 0 0 , ApiVersionV0 (ApiKey 36) 0 0 ] @@ -2378,6 +2557,9 @@ getHeaderVersion (ApiKey (19)) 2 = (1, 0) getHeaderVersion (ApiKey (20)) 0 = (1, 0) getHeaderVersion (ApiKey (20)) 1 = (1, 0) getHeaderVersion (ApiKey (22)) 0 = (1, 0) +getHeaderVersion (ApiKey (29)) 0 = (1, 0) +getHeaderVersion (ApiKey (30)) 0 = (1, 0) +getHeaderVersion (ApiKey (31)) 0 = (1, 0) getHeaderVersion (ApiKey (32)) 0 = (1, 0) getHeaderVersion (ApiKey (36)) 0 = (1, 0) getHeaderVersion k v = error $ "Unknown " <> show k <> " v" <> show v diff --git a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs index dd73d102d..4accf0edd 100644 --- a/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs +++ b/hstream-kafka/protocol/Kafka/Protocol/Message/Total.hs @@ -53,6 +53,90 @@ abortedTransactionFromV5 = abortedTransactionFromV4 abortedTransactionFromV6 :: AbortedTransactionV6 -> AbortedTransaction abortedTransactionFromV6 = abortedTransactionFromV4 +data AclCreation = AclCreation + { resourceType :: {-# UNPACK #-} !Int8 + -- ^ The type of the resource. + , resourceName :: !Text + -- ^ The resource name for the ACL. + , principal :: !Text + -- ^ The principal for the ACL. + , host :: !Text + -- ^ The host for the ACL. + , operation :: {-# UNPACK #-} !Int8 + -- ^ The operation type for the ACL (read, write, etc.). + , permissionType :: {-# UNPACK #-} !Int8 + -- ^ The permission type for the ACL (allow, deny, etc.). + } deriving (Show, Eq, Generic) +instance Serializable AclCreation + +aclCreationToV0 :: AclCreation -> AclCreationV0 +aclCreationToV0 x = AclCreationV0 + { resourceType = x.resourceType + , resourceName = x.resourceName + , principal = x.principal + , host = x.host + , operation = x.operation + , permissionType = x.permissionType + } + +aclCreationFromV0 :: AclCreationV0 -> AclCreation +aclCreationFromV0 x = AclCreation + { resourceType = x.resourceType + , resourceName = x.resourceName + , principal = x.principal + , host = x.host + , operation = x.operation + , permissionType = x.permissionType + } + +data AclCreationResult = AclCreationResult + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The result error, or zero if there was no error. + , errorMessage :: !NullableString + -- ^ The result message, or null if there was no error. + } deriving (Show, Eq, Generic) +instance Serializable AclCreationResult + +aclCreationResultToV0 :: AclCreationResult -> AclCreationResultV0 +aclCreationResultToV0 x = AclCreationResultV0 + { errorCode = x.errorCode + , errorMessage = x.errorMessage + } + +aclCreationResultFromV0 :: AclCreationResultV0 -> AclCreationResult +aclCreationResultFromV0 x = AclCreationResult + { errorCode = x.errorCode + , errorMessage = x.errorMessage + } + +data AclDescription = AclDescription + { principal :: !Text + -- ^ The ACL principal. + , host :: !Text + -- ^ The ACL host. + , operation :: {-# UNPACK #-} !Int8 + -- ^ The ACL operation. + , permissionType :: {-# UNPACK #-} !Int8 + -- ^ The ACL permission type. + } deriving (Show, Eq, Generic) +instance Serializable AclDescription + +aclDescriptionToV0 :: AclDescription -> AclDescriptionV0 +aclDescriptionToV0 x = AclDescriptionV0 + { principal = x.principal + , host = x.host + , operation = x.operation + , permissionType = x.permissionType + } + +aclDescriptionFromV0 :: AclDescriptionV0 -> AclDescription +aclDescriptionFromV0 x = AclDescription + { principal = x.principal + , host = x.host + , operation = x.operation + , permissionType = x.permissionType + } + data ApiVersion = ApiVersion { apiKey :: {-# UNPACK #-} !ApiKey -- ^ The API index. @@ -265,6 +349,134 @@ deletableTopicResultFromV0 x = DeletableTopicResult deletableTopicResultFromV1 :: DeletableTopicResultV1 -> DeletableTopicResult deletableTopicResultFromV1 = deletableTopicResultFromV0 +data DeleteAclsFilter = DeleteAclsFilter + { resourceTypeFilter :: {-# UNPACK #-} !Int8 + -- ^ The resource type. + , resourceNameFilter :: !NullableString + -- ^ The resource name. + , principalFilter :: !NullableString + -- ^ The principal filter, or null to accept all principals. + , hostFilter :: !NullableString + -- ^ The host filter, or null to accept all hosts. + , operation :: {-# UNPACK #-} !Int8 + -- ^ The ACL operation. + , permissionType :: {-# UNPACK #-} !Int8 + -- ^ The permission type. + } deriving (Show, Eq, Generic) +instance Serializable DeleteAclsFilter + +deleteAclsFilterToV0 :: DeleteAclsFilter -> DeleteAclsFilterV0 +deleteAclsFilterToV0 x = DeleteAclsFilterV0 + { resourceTypeFilter = x.resourceTypeFilter + , resourceNameFilter = x.resourceNameFilter + , principalFilter = x.principalFilter + , hostFilter = x.hostFilter + , operation = x.operation + , permissionType = x.permissionType + } + +deleteAclsFilterFromV0 :: DeleteAclsFilterV0 -> DeleteAclsFilter +deleteAclsFilterFromV0 x = DeleteAclsFilter + { resourceTypeFilter = x.resourceTypeFilter + , resourceNameFilter = x.resourceNameFilter + , principalFilter = x.principalFilter + , hostFilter = x.hostFilter + , operation = x.operation + , permissionType = x.permissionType + } + +data DeleteAclsFilterResult = DeleteAclsFilterResult + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if the filter succeeded. + , errorMessage :: !NullableString + -- ^ The error message, or null if the filter succeeded. + , matchingAcls :: !(KaArray DeleteAclsMatchingAcl) + -- ^ The ACLs which matched this filter. + } deriving (Show, Eq, Generic) +instance Serializable DeleteAclsFilterResult + +deleteAclsFilterResultToV0 :: DeleteAclsFilterResult -> DeleteAclsFilterResultV0 +deleteAclsFilterResultToV0 x = DeleteAclsFilterResultV0 + { errorCode = x.errorCode + , errorMessage = x.errorMessage + , matchingAcls = fmap deleteAclsMatchingAclToV0 x.matchingAcls + } + +deleteAclsFilterResultFromV0 :: DeleteAclsFilterResultV0 -> DeleteAclsFilterResult +deleteAclsFilterResultFromV0 x = DeleteAclsFilterResult + { errorCode = x.errorCode + , errorMessage = x.errorMessage + , matchingAcls = fmap deleteAclsMatchingAclFromV0 x.matchingAcls + } + +data DeleteAclsMatchingAcl = DeleteAclsMatchingAcl + { errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The deletion error code, or 0 if the deletion succeeded. + , errorMessage :: !NullableString + -- ^ The deletion error message, or null if the deletion succeeded. + , resourceType :: {-# UNPACK #-} !Int8 + -- ^ The ACL resource type. + , resourceName :: !Text + -- ^ The ACL resource name. + , principal :: !Text + -- ^ The ACL principal. + , host :: !Text + -- ^ The ACL host. + , operation :: {-# UNPACK #-} !Int8 + -- ^ The ACL operation. + , permissionType :: {-# UNPACK #-} !Int8 + -- ^ The ACL permission type. + } deriving (Show, Eq, Generic) +instance Serializable DeleteAclsMatchingAcl + +deleteAclsMatchingAclToV0 :: DeleteAclsMatchingAcl -> DeleteAclsMatchingAclV0 +deleteAclsMatchingAclToV0 x = DeleteAclsMatchingAclV0 + { errorCode = x.errorCode + , errorMessage = x.errorMessage + , resourceType = x.resourceType + , resourceName = x.resourceName + , principal = x.principal + , host = x.host + , operation = x.operation + , permissionType = x.permissionType + } + +deleteAclsMatchingAclFromV0 :: DeleteAclsMatchingAclV0 -> DeleteAclsMatchingAcl +deleteAclsMatchingAclFromV0 x = DeleteAclsMatchingAcl + { errorCode = x.errorCode + , errorMessage = x.errorMessage + , resourceType = x.resourceType + , resourceName = x.resourceName + , principal = x.principal + , host = x.host + , operation = x.operation + , permissionType = x.permissionType + } + +data DescribeAclsResource = DescribeAclsResource + { resourceType :: {-# UNPACK #-} !Int8 + -- ^ The resource type. + , resourceName :: !Text + -- ^ The resource name. + , acls :: !(KaArray AclDescription) + -- ^ The ACLs. + } deriving (Show, Eq, Generic) +instance Serializable DescribeAclsResource + +describeAclsResourceToV0 :: DescribeAclsResource -> DescribeAclsResourceV0 +describeAclsResourceToV0 x = DescribeAclsResourceV0 + { resourceType = x.resourceType + , resourceName = x.resourceName + , acls = fmap aclDescriptionToV0 x.acls + } + +describeAclsResourceFromV0 :: DescribeAclsResourceV0 -> DescribeAclsResource +describeAclsResourceFromV0 x = DescribeAclsResource + { resourceType = x.resourceType + , resourceName = x.resourceName + , acls = fmap aclDescriptionFromV0 x.acls + } + data DescribeConfigsResource = DescribeConfigsResource { resourceType :: {-# UNPACK #-} !Int8 -- ^ The resource type. @@ -1796,6 +2008,42 @@ apiVersionsResponseFromV3 x = ApiVersionsResponse , taggedFields = x.taggedFields } +newtype CreateAclsRequest = CreateAclsRequest + { creations :: (KaArray AclCreation) + } deriving (Show, Eq, Generic) +instance Serializable CreateAclsRequest + +createAclsRequestToV0 :: CreateAclsRequest -> CreateAclsRequestV0 +createAclsRequestToV0 x = CreateAclsRequestV0 + { creations = fmap aclCreationToV0 x.creations + } + +createAclsRequestFromV0 :: CreateAclsRequestV0 -> CreateAclsRequest +createAclsRequestFromV0 x = CreateAclsRequest + { creations = fmap aclCreationFromV0 x.creations + } + +data CreateAclsResponse = CreateAclsResponse + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , results :: !(KaArray AclCreationResult) + -- ^ The results for each ACL creation. + } deriving (Show, Eq, Generic) +instance Serializable CreateAclsResponse + +createAclsResponseToV0 :: CreateAclsResponse -> CreateAclsResponseV0 +createAclsResponseToV0 x = CreateAclsResponseV0 + { throttleTimeMs = x.throttleTimeMs + , results = fmap aclCreationResultToV0 x.results + } + +createAclsResponseFromV0 :: CreateAclsResponseV0 -> CreateAclsResponse +createAclsResponseFromV0 x = CreateAclsResponse + { throttleTimeMs = x.throttleTimeMs + , results = fmap aclCreationResultFromV0 x.results + } + data CreateTopicsRequest = CreateTopicsRequest { topics :: !(KaArray CreatableTopic) -- ^ The topics to create. @@ -1875,6 +2123,42 @@ createTopicsResponseFromV2 x = CreateTopicsResponse , throttleTimeMs = x.throttleTimeMs } +newtype DeleteAclsRequest = DeleteAclsRequest + { filters :: (KaArray DeleteAclsFilter) + } deriving (Show, Eq, Generic) +instance Serializable DeleteAclsRequest + +deleteAclsRequestToV0 :: DeleteAclsRequest -> DeleteAclsRequestV0 +deleteAclsRequestToV0 x = DeleteAclsRequestV0 + { filters = fmap deleteAclsFilterToV0 x.filters + } + +deleteAclsRequestFromV0 :: DeleteAclsRequestV0 -> DeleteAclsRequest +deleteAclsRequestFromV0 x = DeleteAclsRequest + { filters = fmap deleteAclsFilterFromV0 x.filters + } + +data DeleteAclsResponse = DeleteAclsResponse + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , filterResults :: !(KaArray DeleteAclsFilterResult) + -- ^ The results for each filter. + } deriving (Show, Eq, Generic) +instance Serializable DeleteAclsResponse + +deleteAclsResponseToV0 :: DeleteAclsResponse -> DeleteAclsResponseV0 +deleteAclsResponseToV0 x = DeleteAclsResponseV0 + { throttleTimeMs = x.throttleTimeMs + , filterResults = fmap deleteAclsFilterResultToV0 x.filterResults + } + +deleteAclsResponseFromV0 :: DeleteAclsResponseV0 -> DeleteAclsResponse +deleteAclsResponseFromV0 x = DeleteAclsResponse + { throttleTimeMs = x.throttleTimeMs + , filterResults = fmap deleteAclsFilterResultFromV0 x.filterResults + } + data DeleteTopicsRequest = DeleteTopicsRequest { topicNames :: !(KaArray Text) -- ^ The names of the topics to delete @@ -1930,6 +2214,71 @@ deleteTopicsResponseFromV1 x = DeleteTopicsResponse , throttleTimeMs = x.throttleTimeMs } +data DescribeAclsRequest = DescribeAclsRequest + { resourceTypeFilter :: {-# UNPACK #-} !Int8 + -- ^ The resource type. + , resourceNameFilter :: !NullableString + -- ^ The resource name, or null to match any resource name. + , principalFilter :: !NullableString + -- ^ The principal to match, or null to match any principal. + , hostFilter :: !NullableString + -- ^ The host to match, or null to match any host. + , operation :: {-# UNPACK #-} !Int8 + -- ^ The operation to match. + , permissionType :: {-# UNPACK #-} !Int8 + -- ^ The permission type to match. + } deriving (Show, Eq, Generic) +instance Serializable DescribeAclsRequest + +describeAclsRequestToV0 :: DescribeAclsRequest -> DescribeAclsRequestV0 +describeAclsRequestToV0 x = DescribeAclsRequestV0 + { resourceTypeFilter = x.resourceTypeFilter + , resourceNameFilter = x.resourceNameFilter + , principalFilter = x.principalFilter + , hostFilter = x.hostFilter + , operation = x.operation + , permissionType = x.permissionType + } + +describeAclsRequestFromV0 :: DescribeAclsRequestV0 -> DescribeAclsRequest +describeAclsRequestFromV0 x = DescribeAclsRequest + { resourceTypeFilter = x.resourceTypeFilter + , resourceNameFilter = x.resourceNameFilter + , principalFilter = x.principalFilter + , hostFilter = x.hostFilter + , operation = x.operation + , permissionType = x.permissionType + } + +data DescribeAclsResponse = DescribeAclsResponse + { throttleTimeMs :: {-# UNPACK #-} !Int32 + -- ^ The duration in milliseconds for which the request was throttled due + -- to a quota violation, or zero if the request did not violate any quota. + , errorCode :: {-# UNPACK #-} !ErrorCode + -- ^ The error code, or 0 if there was no error. + , errorMessage :: !NullableString + -- ^ The error message, or null if there was no error. + , resources :: !(KaArray DescribeAclsResource) + -- ^ Each Resource that is referenced in an ACL. + } deriving (Show, Eq, Generic) +instance Serializable DescribeAclsResponse + +describeAclsResponseToV0 :: DescribeAclsResponse -> DescribeAclsResponseV0 +describeAclsResponseToV0 x = DescribeAclsResponseV0 + { throttleTimeMs = x.throttleTimeMs + , errorCode = x.errorCode + , errorMessage = x.errorMessage + , resources = fmap describeAclsResourceToV0 x.resources + } + +describeAclsResponseFromV0 :: DescribeAclsResponseV0 -> DescribeAclsResponse +describeAclsResponseFromV0 x = DescribeAclsResponse + { throttleTimeMs = x.throttleTimeMs + , errorCode = x.errorCode + , errorMessage = x.errorMessage + , resources = fmap describeAclsResourceFromV0 x.resources + } + newtype DescribeConfigsRequest = DescribeConfigsRequest { resources :: (KaArray DescribeConfigsResource) } deriving (Show, Eq, Generic) @@ -3338,6 +3687,13 @@ instance Exception ApiVersionsResponseEx catchApiVersionsResponseEx :: IO ApiVersionsResponse -> IO ApiVersionsResponse catchApiVersionsResponseEx act = act `catch` \(ApiVersionsResponseEx resp) -> pure resp +newtype CreateAclsResponseEx = CreateAclsResponseEx CreateAclsResponse + deriving (Show, Eq) +instance Exception CreateAclsResponseEx + +catchCreateAclsResponseEx :: IO CreateAclsResponse -> IO CreateAclsResponse +catchCreateAclsResponseEx act = act `catch` \(CreateAclsResponseEx resp) -> pure resp + newtype CreateTopicsResponseEx = CreateTopicsResponseEx CreateTopicsResponse deriving (Show, Eq) instance Exception CreateTopicsResponseEx @@ -3345,6 +3701,13 @@ instance Exception CreateTopicsResponseEx catchCreateTopicsResponseEx :: IO CreateTopicsResponse -> IO CreateTopicsResponse catchCreateTopicsResponseEx act = act `catch` \(CreateTopicsResponseEx resp) -> pure resp +newtype DeleteAclsResponseEx = DeleteAclsResponseEx DeleteAclsResponse + deriving (Show, Eq) +instance Exception DeleteAclsResponseEx + +catchDeleteAclsResponseEx :: IO DeleteAclsResponse -> IO DeleteAclsResponse +catchDeleteAclsResponseEx act = act `catch` \(DeleteAclsResponseEx resp) -> pure resp + newtype DeleteTopicsResponseEx = DeleteTopicsResponseEx DeleteTopicsResponse deriving (Show, Eq) instance Exception DeleteTopicsResponseEx @@ -3352,6 +3715,13 @@ instance Exception DeleteTopicsResponseEx catchDeleteTopicsResponseEx :: IO DeleteTopicsResponse -> IO DeleteTopicsResponse catchDeleteTopicsResponseEx act = act `catch` \(DeleteTopicsResponseEx resp) -> pure resp +newtype DescribeAclsResponseEx = DescribeAclsResponseEx DescribeAclsResponse + deriving (Show, Eq) +instance Exception DescribeAclsResponseEx + +catchDescribeAclsResponseEx :: IO DescribeAclsResponse -> IO DescribeAclsResponse +catchDescribeAclsResponseEx act = act `catch` \(DescribeAclsResponseEx resp) -> pure resp + newtype DescribeConfigsResponseEx = DescribeConfigsResponseEx DescribeConfigsResponse deriving (Show, Eq) instance Exception DescribeConfigsResponseEx diff --git a/hstream-kafka/tests/common/HStream/Kafka/Common/AuthorizerSpec.hs b/hstream-kafka/tests/common/HStream/Kafka/Common/AuthorizerSpec.hs new file mode 100644 index 000000000..c03da21f1 --- /dev/null +++ b/hstream-kafka/tests/common/HStream/Kafka/Common/AuthorizerSpec.hs @@ -0,0 +1,60 @@ +module HStream.Kafka.Common.AuthorizerSpec where + +import Data.Text (Text) +import HStream.Kafka.Common.TestUtils +import Test.Hspec +import Test.Hspec.Expectations + +import HStream.Kafka.Common.Acl +import HStream.Kafka.Common.AclEntry +import HStream.Kafka.Common.AclStore +import HStream.Kafka.Common.Authorizer +import HStream.Kafka.Common.Authorizer.Class hiding (Authorizer (..)) +import HStream.Kafka.Common.Resource hiding (match) +import HStream.Kafka.Common.Security + +spec :: Spec +spec = + describe "Basic authorization" $ do + it "with 0 then 1 ACL" $ do + a <- newAclAuthorizer newMockAclStore + initAclAuthorizer a + let user = Principal "User" "alice" + host = "example" + resource = "foo" `typed` Res_GROUP `match` Pat_LITERAL + action = AclOp_READ `on` resource + authorize (user `from` host) a [action] `shouldReturn` [Authz_DENIED] + let binding = user `from` "*" `does` AclOp_READ `is` AclPerm_ALLOW `on` resource + createAcls (user `from` host) a [binding] + authorize (user `from` host) a [action] `shouldReturn` [Authz_ALLOWED] + + it "basic auth" $ do + a <- newAclAuthorizer newMockAclStore + initAclAuthorizer a + let resource = ResourcePattern Res_TOPIC "foo" Pat_LITERAL + let user1 = Principal "User" "alice" + user2 = Principal "User" "bob" + user3 = Principal "User" "cathy" + let host1 = "192.168.1.1" + host2 = "192.168.1.2" + let acl1 = user1 `from` host1 `does` AclOp_READ `is` AclPerm_ALLOW + acl2 = user1 `from` host2 `does` AclOp_READ `is` AclPerm_ALLOW + acl3 = user1 `from` host1 `does` AclOp_READ `is` AclPerm_DENY + acl4 = user1 `from` host1 `does` AclOp_WRITE `is` AclPerm_ALLOW + acl5 = user1 `from` wildcardHost `does` AclOp_DESCRIBE `is` AclPerm_ALLOW + acl6 = user2 `from` wildcardHost `does` AclOp_READ `is` AclPerm_ALLOW + acl7 = user3 `from` wildcardHost `does` AclOp_WRITE `is` AclPerm_ALLOW + let bindings = map (`on` resource) [acl1, acl2, acl3, acl4, acl5, acl6, acl7] + createAcls (user1 `from` host1) a bindings + authorize (user1 `from` host2) a [AclOp_READ `on` resource] `shouldReturn` [Authz_ALLOWED] + authorize (user1 `from` host1) a [AclOp_READ `on` resource] `shouldReturn` [Authz_DENIED] + authorize (user1 `from` host1) a [AclOp_WRITE `on` resource] `shouldReturn` [Authz_ALLOWED] + authorize (user1 `from` host2) a [AclOp_WRITE `on` resource] `shouldReturn` [Authz_DENIED] + authorize (user1 `from` host1) a [AclOp_DESCRIBE `on` resource] `shouldReturn` [Authz_ALLOWED] + authorize (user1 `from` host2) a [AclOp_DESCRIBE `on` resource] `shouldReturn` [Authz_ALLOWED] + authorize (user1 `from` host1) a [AclOp_ALTER `on` resource] `shouldReturn` [Authz_DENIED] + authorize (user1 `from` host2) a [AclOp_ALTER `on` resource] `shouldReturn` [Authz_DENIED] + authorize (user2 `from` host1) a [AclOp_DESCRIBE `on` resource] `shouldReturn` [Authz_ALLOWED] + authorize (user3 `from` host1) a [AclOp_DESCRIBE `on` resource] `shouldReturn` [Authz_ALLOWED] + authorize (user2 `from` host1) a [AclOp_READ `on` resource] `shouldReturn` [Authz_ALLOWED] + authorize (user3 `from` host1) a [AclOp_WRITE `on` resource] `shouldReturn` [Authz_ALLOWED] diff --git a/hstream-kafka/tests/common/HStream/Kafka/Common/ResourceSpec.hs b/hstream-kafka/tests/common/HStream/Kafka/Common/ResourceSpec.hs new file mode 100644 index 000000000..3a881a60e --- /dev/null +++ b/hstream-kafka/tests/common/HStream/Kafka/Common/ResourceSpec.hs @@ -0,0 +1,8 @@ +module HStream.Kafka.Common.ResourceSpec (spec) where + +import Test.Hspec + +spec :: Spec +spec = describe "unimplemented" $ do + it "unimplemented res" $ do + True `shouldBe` True diff --git a/hstream-kafka/tests/common/HStream/Kafka/Common/TestUtils.hs b/hstream-kafka/tests/common/HStream/Kafka/Common/TestUtils.hs new file mode 100644 index 000000000..b79e003eb --- /dev/null +++ b/hstream-kafka/tests/common/HStream/Kafka/Common/TestUtils.hs @@ -0,0 +1,79 @@ +module HStream.Kafka.Common.TestUtils + ( typed + , match + , from + , does + , is + , on + ) where + +import Data.Text (Text) +import qualified Data.Text as T + +import HStream.Kafka.Common.Acl +import HStream.Kafka.Common.AclEntry +import HStream.Kafka.Common.Authorizer.Class +import HStream.Kafka.Common.Resource hiding (match) +import HStream.Kafka.Common.Security + +------------------------------------------------------------ +-- Construct 'ResourcePattern': +-- > resourceName `typed` Res_TOPIC `match` Pat_LITERAL +------------------------------------------------------------ +-- | For constructing a 'ResourcePattern' with 'match'. +infixl 6 `typed` +typed :: Text -> ResourceType -> PatternType -> ResourcePattern +typed = flip ResourcePattern + +-- | For constructing a 'ResourcePattern' with 'typed'. +infixl 6 `match` +match :: (PatternType -> ResourcePattern) + -> PatternType + -> ResourcePattern +match cont pt = cont pt + +------------------------------------------------------------ +-- Construct 'AccessControlEntry': +-- > principal `from` host `does` AclOp_READ `is` AclPerm_ALLOW + +-- Construct 'AuthorizableRequestContext': +-- > principal `from` host +------------------------------------------------------------ +-- | For constructing an ACE or 'AuthorizableRequestContext'. +class From r where + infixl 6 `from` + from :: Principal -> Text -> r +instance From (AclOperation -> AclPermissionType -> AccessControlEntry) where + from p n o t = AccessControlEntry (AccessControlEntryData (T.pack (show p)) n o t) +instance From AuthorizableRequestContext where + from = flip AuthorizableRequestContext + +-- | For constructing an ACE with 'from' and 'is'. +infixl 6 `does` +does :: (AclOperation -> AclPermissionType -> AccessControlEntry) + -> AclOperation + -> AclPermissionType -> AccessControlEntry +does cont op = cont op + +-- | For constructing an ACE with 'from' and 'does'. +infixl 6 `is` +is :: (AclPermissionType -> AccessControlEntry) + -> AclPermissionType + -> AccessControlEntry +is cont pt = cont pt + +------------------------------------------------------------ +-- Construct 'AclAction': +-- > AclOp_READ `on` resource + +-- Construct 'AclBinding': +-- > ace `on` resource +------------------------------------------------------------ +-- | For constructing an 'AclAction' or 'AclBinding'. +class On a r where + infixl 6 `on` + on :: a -> ResourcePattern -> r +instance On AclOperation AclAction where + on op res = AclAction res op True True +instance On AccessControlEntry AclBinding where + on ace res = AclBinding res ace diff --git a/hstream-kafka/tests/common/Spec.hs b/hstream-kafka/tests/common/Spec.hs new file mode 100644 index 000000000..a824f8c30 --- /dev/null +++ b/hstream-kafka/tests/common/Spec.hs @@ -0,0 +1 @@ +{-# OPTIONS_GHC -F -pgmF hspec-discover #-}