Skip to content

Commit

Permalink
kafka: do authorization on describe configs (32)
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed Mar 11, 2024
1 parent 6c1a6dc commit bb29f86
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 9 deletions.
12 changes: 8 additions & 4 deletions hstream-kafka/HStream/Kafka/Server/Config/KafkaConfigManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ listTopicConfigs KafkaConfigManager{..} topic keys = do
let keys' = fromMaybe (V.fromList $ Map.keys KC.allTopicConfigs) (K.unKaArray keys)
configs' = convertConfigs configs
case V.mapM (getConfig configs') keys' of
Left msg -> return $ getErrorResponse KC.TOPIC topic msg
Left msg -> return $ getErrorResponse KC.TOPIC topic K.INVALID_CONFIG msg
Right configsInResp -> return $ K.DescribeConfigsResult
{ configs=K.NonNullKaArray configsInResp
, errorCode=0
Expand All @@ -64,10 +64,14 @@ listTopicConfigs KafkaConfigManager{..} topic keys = do
getConfig :: Map.Map T.Text (Maybe T.Text) -> T.Text -> Either T.Text K.DescribeConfigsResourceResult
getConfig configs configName = getConfigByInstance <$> KC.getTopicConfig configName configs

getErrorResponse :: KC.KafkaConfigResource -> T.Text -> T.Text -> K.DescribeConfigsResult
getErrorResponse rt rn msg = K.DescribeConfigsResult
getErrorResponse :: KC.KafkaConfigResource
-> T.Text
-> K.ErrorCode
-> T.Text
-> K.DescribeConfigsResult
getErrorResponse rt rn code msg = K.DescribeConfigsResult
{ configs=K.NonNullKaArray V.empty
, errorCode=K.INVALID_CONFIG
, errorCode=code
, resourceName=rn
, errorMessage=Just msg
, resourceType=fromIntegral . fromEnum $ rt
Expand Down
33 changes: 28 additions & 5 deletions hstream-kafka/HStream/Kafka/Server/Handler/Basic.hs
Original file line number Diff line number Diff line change
Expand Up @@ -228,22 +228,45 @@ handleMetadata ctx reqCtx req = do
---------------------------------------------------------------------------
-- 32: DescribeConfigs
---------------------------------------------------------------------------
-- FIXME: This function does not catch any Kafka ErrorCodeException.
-- Is this proper?
-- FIXME: Recheck if returned error codes and messages are proper.
-- See kafka.server.ConfigHelper#handleDescribeConfigsRequest
handleDescribeConfigs
:: ServerContext
-> K.RequestContext
-> K.DescribeConfigsRequest
-> IO K.DescribeConfigsResponse
handleDescribeConfigs serverCtx _ req = do
handleDescribeConfigs serverCtx reqCtx req = do
manager <- KCM.mkKafkaConfigManager serverCtx.scLDClient serverCtx.kafkaBrokerConfigs
results <- V.forM (Utils.kaArrayToVector req.resources) $ \resource -> do
case toEnum (fromIntegral resource.resourceType) of
KC.TOPIC -> KCM.listTopicConfigs manager resource.resourceName resource.configurationKeys
KC.TOPIC -> do
-- [ACL] check [DESCRIBE_CONFIGS TOPIC]
simpleAuthorize (toAuthorizableReqCtx reqCtx) serverCtx.authorizer Res_TOPIC resource.resourceName AclOp_DESCRIBE_CONFIGS >>= \case
False -> return $ KCM.getErrorResponse KC.TOPIC
resource.resourceName
K.TOPIC_AUTHORIZATION_FAILED
"Topic authorization failed."
True ->
KCM.listTopicConfigs manager resource.resourceName resource.configurationKeys
KC.BROKER -> do
-- FIXME: authorize [DESCRIBE_CONFIGS CLUSTER] first
if T.pack (show serverCtx.serverID) == resource.resourceName
then KCM.listBrokerConfigs manager resource.resourceName resource.configurationKeys
else return $ KCM.getErrorResponse KC.BROKER resource.resourceName ("invalid broker id:" <> resource.resourceName)
rt -> return $ KCM.getErrorResponse rt resource.resourceName ("unsupported resource type:" <> T.pack (show rt))
return $ K.DescribeConfigsResponse {results=K.NonNullKaArray results, throttleTimeMs=0}
else return $ KCM.getErrorResponse KC.BROKER
resource.resourceName
K.INVALID_REQUEST
("Unexpected broker id, expected " <> (T.pack (show serverCtx.serverID)) <> " but received " <> resource.resourceName)
rt -> return $ KCM.getErrorResponse rt
resource.resourceName
K.INVALID_REQUEST
("Unexpected resource type " <> T.pack (show rt) <> " for resouce" <> resource.resourceName)

return $ K.DescribeConfigsResponse {
results = K.NonNullKaArray results
, throttleTimeMs = 0
}

---------------------------------------------------------------------------
-- 32: FindCoordinator
Expand Down

0 comments on commit bb29f86

Please sign in to comment.