diff --git a/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs index 23e8cdff2..f0e4e1719 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs @@ -25,10 +25,8 @@ import qualified HStream.Kafka.Group.GroupOffsetManager as GOM import qualified HStream.Logger as Log import qualified HStream.MetaStore.Types as Meta import HStream.Store (LDClient) -import qualified Kafka.Protocol.Encoding as K import qualified Kafka.Protocol.Error as K import qualified Kafka.Protocol.Message as K -import qualified Kafka.Protocol.Service as K data GroupCoordinator = GroupCoordinator { groups :: C.MVar (Utils.HashTable T.Text Group) @@ -58,26 +56,6 @@ instance TM.TaskManager GroupCoordinator where unloadTaskAsync = unloadGroup -------------------- Join Group ------------------------- - -joinGroup :: GroupCoordinator -> K.RequestContext -> K.JoinGroupRequest -> IO K.JoinGroupResponse -joinGroup coordinator reqCtx req = do - handle (\((ErrorCodeException code)) -> makeErrorResponse code) $ do - -- get or create group - group <- getOrMaybeCreateGroup coordinator req.groupId req.memberId - - -- join group - G.joinGroup group reqCtx req - where - makeErrorResponse code = return $ K.JoinGroupResponse { - errorCode = code - , generationId = -1 - , protocolName = "" - , leader = "" - , memberId = req.memberId - , members = K.NonNullKaArray V.empty - , throttleTimeMs = 0 - } getOrMaybeCreateGroup :: GroupCoordinator -> T.Text -> T.Text -> IO Group getOrMaybeCreateGroup GroupCoordinator{..} groupId memberId = do @@ -112,31 +90,6 @@ getGroupM :: GroupCoordinator -> T.Text -> IO (Maybe Group) getGroupM GroupCoordinator{..} groupId = do C.withMVar groups $ \gs -> H.lookup gs groupId -syncGroup :: GroupCoordinator -> K.SyncGroupRequest -> IO K.SyncGroupResponse -syncGroup coordinator req@K.SyncGroupRequest{..} = do - handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do - group <- getGroup coordinator groupId - G.syncGroup group req - where makeErrorResponse code = return $ K.SyncGroupResponse { - errorCode = code - , assignment = "" - , throttleTimeMs = 0 - } - -leaveGroup :: GroupCoordinator -> K.LeaveGroupRequest -> IO K.LeaveGroupResponse -leaveGroup coordinator req = do - handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do - group <- getGroup coordinator req.groupId - G.leaveGroup group req - where makeErrorResponse code = return $ K.LeaveGroupResponse {errorCode=code, throttleTimeMs=0} - -heartbeat :: GroupCoordinator -> K.HeartbeatRequest -> IO K.HeartbeatResponse -heartbeat coordinator req = do - handle (\(ErrorCodeException code) -> makeErrorResponse code) $ do - group <- getGroup coordinator req.groupId - G.heartbeat group req - where makeErrorResponse code = return $ K.HeartbeatResponse {errorCode=code, throttleTimeMs=0} - ------------------- Commit Offsets ------------------------- commitOffsets :: GroupCoordinator -> K.OffsetCommitRequest -> IO K.OffsetCommitResponse commitOffsets coordinator req = do @@ -177,29 +130,6 @@ fetchOffsets coordinator req = do , committedOffset = -1 } -------------------- List Groups ------------------------- -listGroups :: GroupCoordinator -> K.ListGroupsRequest -> IO K.ListGroupsResponse -listGroups gc _ = do - gs <- getAllGroups gc - listedGroups <- M.mapM G.overview gs - return $ K.ListGroupsResponse {errorCode=0, groups=Utils.listToKaArray listedGroups, throttleTimeMs=0} - -------------------- Describe Groups ------------------------- -describeGroups :: GroupCoordinator -> K.DescribeGroupsRequest -> IO K.DescribeGroupsResponse -describeGroups gc req = do - getGroups gc (Utils.kaArrayToList req.groups) >>= \gs -> do - listedGroups <- M.forM gs $ \case - (gid, Nothing) -> return $ K.DescribedGroup { - protocolData="" - , groupState="" - , errorCode=K.GROUP_ID_NOT_FOUND - , members=Utils.listToKaArray [] - , groupId=gid - , protocolType="" - } - (_, Just g) -> G.describe g - return $ K.DescribeGroupsResponse {groups=Utils.listToKaArray listedGroups, throttleTimeMs=0} - ------------------- Load/Unload Group ------------------------- -- load group from meta store loadGroupAndOffsets :: GroupCoordinator -> T.Text -> IO () diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs index 930fc971d..dd4c48a5b 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Group.hs @@ -10,25 +10,153 @@ module HStream.Kafka.Server.Handler.Group , handleDescribeGroups ) where -import qualified HStream.Kafka.Group.GroupCoordinator as GC -import HStream.Kafka.Server.Types (ServerContext (..)) -import qualified Kafka.Protocol.Message as K -import qualified Kafka.Protocol.Service as K +import qualified Control.Exception as E +import Control.Monad +import qualified Data.Vector as V -handleJoinGroup :: ServerContext -> K.RequestContext -> K.JoinGroupRequest -> IO K.JoinGroupResponse -handleJoinGroup ServerContext{..} = GC.joinGroup scGroupCoordinator +import HStream.Kafka.Common.Acl +import HStream.Kafka.Common.Authorizer.Class +import qualified HStream.Kafka.Common.KafkaException as K +import HStream.Kafka.Common.Resource +import qualified HStream.Kafka.Common.Utils as Utils +import qualified HStream.Kafka.Group.Group as G +import qualified HStream.Kafka.Group.GroupCoordinator as GC +import HStream.Kafka.Server.Types (ServerContext (..)) +import qualified Kafka.Protocol.Encoding as K +import qualified Kafka.Protocol.Error as K +import qualified Kafka.Protocol.Message as K +import qualified Kafka.Protocol.Service as K -handleSyncGroup :: ServerContext -> K.RequestContext -> K.SyncGroupRequest -> IO K.SyncGroupResponse -handleSyncGroup ServerContext{..} _ = GC.syncGroup scGroupCoordinator +handleJoinGroup :: ServerContext + -> K.RequestContext + -> K.JoinGroupRequest + -> IO K.JoinGroupResponse +handleJoinGroup ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeException code) -> return (makeErrorResponse code)) $ do + -- [ACL] check [READ GROUP] + simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_READ >>= \case + False -> return $ makeErrorResponse K.GROUP_AUTHORIZATION_FAILED + True -> do + -- get or create group + group <- GC.getOrMaybeCreateGroup scGroupCoordinator req.groupId req.memberId + -- join group + G.joinGroup group reqCtx req + where + -- FIXME: Hard-coded constants + makeErrorResponse code = K.JoinGroupResponse { + errorCode = code + , generationId = -1 + , protocolName = "" + , leader = "" + -- FIXME: memberId for error response should be `""`? + -- see org.apache.kafka.common.requests.getErrorResponse + , memberId = req.memberId + , members = K.NonNullKaArray V.empty + , throttleTimeMs = 0 + } -handleHeartbeat :: ServerContext -> K.RequestContext -> K.HeartbeatRequest -> IO K.HeartbeatResponse -handleHeartbeat ServerContext{..} _ = GC.heartbeat scGroupCoordinator +handleSyncGroup :: ServerContext + -> K.RequestContext + -> K.SyncGroupRequest + -> IO K.SyncGroupResponse +handleSyncGroup ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeException code) -> return (makeErrorResponse code)) $ do + -- [ACL] check [READ GROUP] + simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_READ >>= \case + False -> return $ makeErrorResponse K.GROUP_AUTHORIZATION_FAILED + True -> do + group <- GC.getGroup scGroupCoordinator req.groupId + G.syncGroup group req + where + -- FIXME: Hard-coded constants + makeErrorResponse code = K.SyncGroupResponse { + errorCode = code + , assignment = "" + , throttleTimeMs = 0 + } -handleLeaveGroup :: ServerContext -> K.RequestContext -> K.LeaveGroupRequest -> IO K.LeaveGroupResponse -handleLeaveGroup ServerContext{..} _ = GC.leaveGroup scGroupCoordinator +handleHeartbeat :: ServerContext + -> K.RequestContext + -> K.HeartbeatRequest + -> IO K.HeartbeatResponse +handleHeartbeat ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeException code) -> return (makeErrorResponse code)) $ do + -- [ACL] check [READ GROUP] + simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_READ >>= \case + False -> return $ makeErrorResponse K.GROUP_AUTHORIZATION_FAILED + True -> do + group <- GC.getGroup scGroupCoordinator req.groupId + G.heartbeat group req + where + -- FIXME: Hard-coded constants + makeErrorResponse code = K.HeartbeatResponse { + errorCode = code + , throttleTimeMs = 0 + } +handleLeaveGroup :: ServerContext + -> K.RequestContext + -> K.LeaveGroupRequest + -> IO K.LeaveGroupResponse +handleLeaveGroup ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeException code) -> return (makeErrorResponse code)) $ do + -- [ACL] check [READ GROUP] + simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_READ >>= \case + False -> return $ makeErrorResponse K.GROUP_AUTHORIZATION_FAILED + True -> do + group <- GC.getGroup scGroupCoordinator req.groupId + G.leaveGroup group req + where + -- FIXME: Hard-coded constants + makeErrorResponse code = K.LeaveGroupResponse { + errorCode = code + , throttleTimeMs = 0 + } + +-- FIXME: This handler does not handle any Kafka ErrorCodeException. +-- Is this proper? handleListGroups :: ServerContext -> K.RequestContext -> K.ListGroupsRequest -> IO K.ListGroupsResponse -handleListGroups ServerContext{..} _ = GC.listGroups scGroupCoordinator +handleListGroups ServerContext{..} reqCtx _ = do + -- TODO: check [DESCRIBE CLUSTER] first. If authzed, return all groups. + -- Note: Difference from handlers above: + -- Check [DESCRIBE GROUP] for each group, then return authzed ones + -- only. Unauthzed groups will not cause a UNAUTHORIZED error. + gs <- GC.getAllGroups scGroupCoordinator + listedGroups <- mapM G.overview gs + authzedGroups <- + filterM (\listedGroup -> + simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP listedGroup.groupId AclOp_DESCRIBE + ) listedGroups + -- FIXME: hard-coded constants + return $ K.ListGroupsResponse + { errorCode = K.NONE + , groups = Utils.listToKaArray authzedGroups + , throttleTimeMs = 0 + } -handleDescribeGroups :: ServerContext -> K.RequestContext -> K.DescribeGroupsRequest -> IO K.DescribeGroupsResponse -handleDescribeGroups ServerContext{..} _ = GC.describeGroups scGroupCoordinator +-- FIXME: This handler does not handle any Kafka ErrorCodeException. +-- Is this proper? +handleDescribeGroups :: ServerContext + -> K.RequestContext + -> K.DescribeGroupsRequest + -> IO K.DescribeGroupsResponse +handleDescribeGroups ServerContext{..} reqCtx req = do + groups_m <- GC.getGroups scGroupCoordinator (Utils.kaArrayToList req.groups) + describedGroups <- forM groups_m $ \(gid, group_m) -> + -- [ACL] for each group id, check [DESCRIBE GROUP] + simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP gid AclOp_DESCRIBE >>= \case + False -> return $ makeErrorGroup gid K.GROUP_AUTHORIZATION_FAILED + True -> case group_m of + Nothing -> return $ makeErrorGroup gid K.GROUP_ID_NOT_FOUND + Just group -> G.describe group + -- FIXME: hard-coded constants + return $ K.DescribeGroupsResponse + { groups = Utils.listToKaArray describedGroups + , throttleTimeMs = 0 + } + where + -- FIXME: hard-coded constants + makeErrorGroup gid code = K.DescribedGroup { + protocolData = "" + , groupState = "" + , errorCode = code + , members = Utils.listToKaArray [] + , groupId = gid + , protocolType = "" + }