Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: do authorization on group-related handlers ([11..16]) #1772

Merged
merged 1 commit into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 0 additions & 70 deletions hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import qualified HStream.Kafka.Group.GroupOffsetManager as GOM
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as Meta
import HStream.Store (LDClient)
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

data GroupCoordinator = GroupCoordinator
{ groups :: C.MVar (Utils.HashTable T.Text Group)
Expand Down Expand Up @@ -58,26 +56,6 @@ instance TM.TaskManager GroupCoordinator where

unloadTaskAsync = unloadGroup

------------------- Join Group -------------------------

joinGroup :: GroupCoordinator -> K.RequestContext -> K.JoinGroupRequest -> IO K.JoinGroupResponse
joinGroup coordinator reqCtx req = do
handle (\((ErrorCodeException code)) -> makeErrorResponse code) $ do
-- get or create group
group <- getOrMaybeCreateGroup coordinator req.groupId req.memberId

-- join group
G.joinGroup group reqCtx req
where
makeErrorResponse code = return $ K.JoinGroupResponse {
errorCode = code
, generationId = -1
, protocolName = ""
, leader = ""
, memberId = req.memberId
, members = K.NonNullKaArray V.empty
, throttleTimeMs = 0
}

getOrMaybeCreateGroup :: GroupCoordinator -> T.Text -> T.Text -> IO Group
getOrMaybeCreateGroup GroupCoordinator{..} groupId memberId = do
Expand Down Expand Up @@ -112,31 +90,6 @@ getGroupM :: GroupCoordinator -> T.Text -> IO (Maybe Group)
getGroupM GroupCoordinator{..} groupId = do
C.withMVar groups $ \gs -> H.lookup gs groupId

syncGroup :: GroupCoordinator -> K.SyncGroupRequest -> IO K.SyncGroupResponse
syncGroup coordinator [email protected]{..} = do
handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do
group <- getGroup coordinator groupId
G.syncGroup group req
where makeErrorResponse code = return $ K.SyncGroupResponse {
errorCode = code
, assignment = ""
, throttleTimeMs = 0
}

leaveGroup :: GroupCoordinator -> K.LeaveGroupRequest -> IO K.LeaveGroupResponse
leaveGroup coordinator req = do
handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do
group <- getGroup coordinator req.groupId
G.leaveGroup group req
where makeErrorResponse code = return $ K.LeaveGroupResponse {errorCode=code, throttleTimeMs=0}

heartbeat :: GroupCoordinator -> K.HeartbeatRequest -> IO K.HeartbeatResponse
heartbeat coordinator req = do
handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do
group <- getGroup coordinator req.groupId
G.heartbeat group req
where makeErrorResponse code = return $ K.HeartbeatResponse {errorCode=code, throttleTimeMs=0}

------------------- Commit Offsets -------------------------
commitOffsets :: GroupCoordinator -> K.OffsetCommitRequest -> IO K.OffsetCommitResponse
commitOffsets coordinator req = do
Expand Down Expand Up @@ -177,29 +130,6 @@ fetchOffsets coordinator req = do
, committedOffset = -1
}

------------------- List Groups -------------------------
listGroups :: GroupCoordinator -> K.ListGroupsRequest -> IO K.ListGroupsResponse
listGroups gc _ = do
gs <- getAllGroups gc
listedGroups <- M.mapM G.overview gs
return $ K.ListGroupsResponse {errorCode=0, groups=Utils.listToKaArray listedGroups, throttleTimeMs=0}

------------------- Describe Groups -------------------------
describeGroups :: GroupCoordinator -> K.DescribeGroupsRequest -> IO K.DescribeGroupsResponse
describeGroups gc req = do
getGroups gc (Utils.kaArrayToList req.groups) >>= \gs -> do
listedGroups <- M.forM gs $ \case
(gid, Nothing) -> return $ K.DescribedGroup {
protocolData=""
, groupState=""
, errorCode=K.GROUP_ID_NOT_FOUND
, members=Utils.listToKaArray []
, groupId=gid
, protocolType=""
}
(_, Just g) -> G.describe g
return $ K.DescribeGroupsResponse {groups=Utils.listToKaArray listedGroups, throttleTimeMs=0}

------------------- Load/Unload Group -------------------------
-- load group from meta store
loadGroupAndOffsets :: GroupCoordinator -> T.Text -> IO ()
Expand Down
158 changes: 143 additions & 15 deletions hstream-kafka/HStream/Kafka/Server/Handler/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,153 @@ module HStream.Kafka.Server.Handler.Group
, handleDescribeGroups
) where

import qualified HStream.Kafka.Group.GroupCoordinator as GC
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K
import qualified Control.Exception as E
import Control.Monad
import qualified Data.Vector as V

handleJoinGroup :: ServerContext -> K.RequestContext -> K.JoinGroupRequest -> IO K.JoinGroupResponse
handleJoinGroup ServerContext{..} = GC.joinGroup scGroupCoordinator
import HStream.Kafka.Common.Acl
import HStream.Kafka.Common.Authorizer.Class
import qualified HStream.Kafka.Common.KafkaException as K
import HStream.Kafka.Common.Resource
import qualified HStream.Kafka.Common.Utils as Utils
import qualified HStream.Kafka.Group.Group as G
import qualified HStream.Kafka.Group.GroupCoordinator as GC
import HStream.Kafka.Server.Types (ServerContext (..))
import qualified Kafka.Protocol.Encoding as K
import qualified Kafka.Protocol.Error as K
import qualified Kafka.Protocol.Message as K
import qualified Kafka.Protocol.Service as K

handleSyncGroup :: ServerContext -> K.RequestContext -> K.SyncGroupRequest -> IO K.SyncGroupResponse
handleSyncGroup ServerContext{..} _ = GC.syncGroup scGroupCoordinator
handleJoinGroup :: ServerContext
-> K.RequestContext
-> K.JoinGroupRequest
-> IO K.JoinGroupResponse
handleJoinGroup ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeException code) -> return (makeErrorResponse code)) $ do
-- [ACL] check [READ GROUP]
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_READ >>= \case
False -> return $ makeErrorResponse K.GROUP_AUTHORIZATION_FAILED
True -> do
-- get or create group
group <- GC.getOrMaybeCreateGroup scGroupCoordinator req.groupId req.memberId
-- join group
G.joinGroup group reqCtx req
where
-- FIXME: Hard-coded constants
makeErrorResponse code = K.JoinGroupResponse {
errorCode = code
, generationId = -1
, protocolName = ""
, leader = ""
-- FIXME: memberId for error response should be `""`?
-- see org.apache.kafka.common.requests.getErrorResponse
, memberId = req.memberId
, members = K.NonNullKaArray V.empty
, throttleTimeMs = 0
}

handleHeartbeat :: ServerContext -> K.RequestContext -> K.HeartbeatRequest -> IO K.HeartbeatResponse
handleHeartbeat ServerContext{..} _ = GC.heartbeat scGroupCoordinator
handleSyncGroup :: ServerContext
-> K.RequestContext
-> K.SyncGroupRequest
-> IO K.SyncGroupResponse
handleSyncGroup ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeException code) -> return (makeErrorResponse code)) $ do
-- [ACL] check [READ GROUP]
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_READ >>= \case
False -> return $ makeErrorResponse K.GROUP_AUTHORIZATION_FAILED
True -> do
group <- GC.getGroup scGroupCoordinator req.groupId
G.syncGroup group req
where
-- FIXME: Hard-coded constants
makeErrorResponse code = K.SyncGroupResponse {
errorCode = code
, assignment = ""
, throttleTimeMs = 0
}

handleLeaveGroup :: ServerContext -> K.RequestContext -> K.LeaveGroupRequest -> IO K.LeaveGroupResponse
handleLeaveGroup ServerContext{..} _ = GC.leaveGroup scGroupCoordinator
handleHeartbeat :: ServerContext
-> K.RequestContext
-> K.HeartbeatRequest
-> IO K.HeartbeatResponse
handleHeartbeat ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeException code) -> return (makeErrorResponse code)) $ do
-- [ACL] check [READ GROUP]
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_READ >>= \case
False -> return $ makeErrorResponse K.GROUP_AUTHORIZATION_FAILED
True -> do
group <- GC.getGroup scGroupCoordinator req.groupId
G.heartbeat group req
where
-- FIXME: Hard-coded constants
makeErrorResponse code = K.HeartbeatResponse {
errorCode = code
, throttleTimeMs = 0
}

handleLeaveGroup :: ServerContext
-> K.RequestContext
-> K.LeaveGroupRequest
-> IO K.LeaveGroupResponse
handleLeaveGroup ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeException code) -> return (makeErrorResponse code)) $ do
-- [ACL] check [READ GROUP]
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_READ >>= \case
False -> return $ makeErrorResponse K.GROUP_AUTHORIZATION_FAILED
True -> do
group <- GC.getGroup scGroupCoordinator req.groupId
G.leaveGroup group req
where
-- FIXME: Hard-coded constants
makeErrorResponse code = K.LeaveGroupResponse {
errorCode = code
, throttleTimeMs = 0
}

-- FIXME: This handler does not handle any Kafka ErrorCodeException.
-- Is this proper?
handleListGroups :: ServerContext -> K.RequestContext -> K.ListGroupsRequest -> IO K.ListGroupsResponse
handleListGroups ServerContext{..} _ = GC.listGroups scGroupCoordinator
handleListGroups ServerContext{..} reqCtx _ = do
-- TODO: check [DESCRIBE CLUSTER] first. If authzed, return all groups.
-- Note: Difference from handlers above:
-- Check [DESCRIBE GROUP] for each group, then return authzed ones
-- only. Unauthzed groups will not cause a UNAUTHORIZED error.
gs <- GC.getAllGroups scGroupCoordinator
listedGroups <- mapM G.overview gs
authzedGroups <-
filterM (\listedGroup ->
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP listedGroup.groupId AclOp_DESCRIBE
) listedGroups
-- FIXME: hard-coded constants
return $ K.ListGroupsResponse
{ errorCode = K.NONE
, groups = Utils.listToKaArray authzedGroups
, throttleTimeMs = 0
}

handleDescribeGroups :: ServerContext -> K.RequestContext -> K.DescribeGroupsRequest -> IO K.DescribeGroupsResponse
handleDescribeGroups ServerContext{..} _ = GC.describeGroups scGroupCoordinator
-- FIXME: This handler does not handle any Kafka ErrorCodeException.
-- Is this proper?
handleDescribeGroups :: ServerContext
-> K.RequestContext
-> K.DescribeGroupsRequest
-> IO K.DescribeGroupsResponse
handleDescribeGroups ServerContext{..} reqCtx req = do
groups_m <- GC.getGroups scGroupCoordinator (Utils.kaArrayToList req.groups)
describedGroups <- forM groups_m $ \(gid, group_m) ->
-- [ACL] for each group id, check [DESCRIBE GROUP]
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP gid AclOp_DESCRIBE >>= \case
False -> return $ makeErrorGroup gid K.GROUP_AUTHORIZATION_FAILED
True -> case group_m of
Nothing -> return $ makeErrorGroup gid K.GROUP_ID_NOT_FOUND
Just group -> G.describe group
-- FIXME: hard-coded constants
return $ K.DescribeGroupsResponse
{ groups = Utils.listToKaArray describedGroups
, throttleTimeMs = 0
}
where
-- FIXME: hard-coded constants
makeErrorGroup gid code = K.DescribedGroup {
protocolData = ""
, groupState = ""
, errorCode = code
, members = Utils.listToKaArray []
, groupId = gid
, protocolType = ""
}
Loading