Skip to content

Commit

Permalink
kafka: do authorization on find coordinator (10)
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Mar 11, 2024
1 parent 6558476 commit 6c1a6dc
Showing 1 changed file with 38 additions and 19 deletions.
57 changes: 38 additions & 19 deletions hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import qualified Data.Text as T
import HStream.Common.Server.Lookup (KafkaResource (..),
lookupKafkaPersist)
import qualified HStream.Gossip as Gossip
import HStream.Kafka.Common.Acl
import HStream.Kafka.Common.Authorizer.Class
import HStream.Kafka.Common.Resource
import qualified HStream.Kafka.Common.Utils as K
import qualified HStream.Kafka.Common.Utils as Utils
import qualified HStream.Kafka.Server.Config.KafkaConfig as KC
Expand Down Expand Up @@ -250,26 +253,42 @@ data CoordinatorType
| TRANSACTION
deriving (Enum, Eq)

-- FIXME: This function does not catch any Kafka ErrorCodeException.
-- Is this proper?
handleFindCoordinator :: ServerContext -> K.RequestContext -> K.FindCoordinatorRequest -> IO K.FindCoordinatorResponse
handleFindCoordinator ServerContext{..} _ req = do
handleFindCoordinator ServerContext{..} reqCtx req = do
case toEnum (fromIntegral req.keyType) of
GROUP -> do
A.ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext loadBalanceHashRing scAdvertisedListenersKey (KafkaResGroup req.key)
Log.info $ "findCoordinator for group:" <> Log.buildString' req.key <> ", result:" <> Log.buildString' serverNodeId
return $ K.FindCoordinatorResponse {
errorMessage=Nothing
, nodeId=fromIntegral serverNodeId
, errorCode=0
, throttleTimeMs=0
, port=fromIntegral serverNodePort
, host=serverNodeHost
}
-- [ACL] check [DESCRIBE GROUP]
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.key AclOp_DESCRIBE >>= \case
True -> do
A.ServerNode{..} <- lookupKafkaPersist metaHandle gossipContext loadBalanceHashRing scAdvertisedListenersKey (KafkaResGroup req.key)
Log.info $ "findCoordinator for group:" <> Log.buildString' req.key <> ", result:" <> Log.buildString' serverNodeId
return $ K.FindCoordinatorResponse {
errorMessage=Nothing
, nodeId=fromIntegral serverNodeId
, errorCode=0
, throttleTimeMs=0
, port=fromIntegral serverNodePort
, host=serverNodeHost
}
-- Note: About kafka's error message, see org.apache.kafka.common.protocol.Errors
False -> return (makeErrorResponse K.GROUP_AUTHORIZATION_FAILED "Group authorization failed.")
_ -> do
return $ K.FindCoordinatorResponse {
errorMessage=Just "KeyType Must be 0(GROUP)"
, nodeId=0
, errorCode=K.COORDINATOR_NOT_AVAILABLE
, throttleTimeMs=0
, port=0
, host=""
}
-- TODO: authz [DESCRIBE TRANSACTION_ID] when this is supported
-- FIXME: Is the error code/message
return $ makeErrorResponse K.COORDINATOR_NOT_AVAILABLE "KeyType Must be 0(GROUP)"
where
-- FIXME: hard-coded constants
-- Note: Kafka returns `nodeId = -1`, `port = -1` and `host=""` on error.
-- See kafka.server.KafkaApis#getCoordinator and
-- org.apache.kafka.common.Node#NO_NODE
makeErrorResponse :: K.ErrorCode -> Text -> K.FindCoordinatorResponse
makeErrorResponse code errMsg = K.FindCoordinatorResponse {
errorMessage = Just errMsg
, nodeId = -1
, errorCode = code
, throttleTimeMs = 0
, port = -1
, host = ""
}

0 comments on commit 6c1a6dc

Please sign in to comment.