From 1f6ac0fcbcf1405ebfc3c85ff215590cf1773163 Mon Sep 17 00:00:00 2001 From: Commelina Date: Wed, 8 May 2024 13:20:15 +0300 Subject: [PATCH] kafka: fix response for OffsetFetch (#1812) --- hstream-kafka/HStream/Kafka/Common/Utils.hs | 3 -- hstream-kafka/HStream/Kafka/Group/Group.hs | 3 +- .../HStream/Kafka/Group/GroupOffsetManager.hs | 8 +++-- .../HStream/Kafka/Server/Handler/Offset.hs | 30 +++++++++++-------- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Common/Utils.hs b/hstream-kafka/HStream/Kafka/Common/Utils.hs index 3d7194c85..31b74b97d 100644 --- a/hstream-kafka/HStream/Kafka/Common/Utils.hs +++ b/hstream-kafka/HStream/Kafka/Common/Utils.hs @@ -46,9 +46,6 @@ listToKaArray = K.KaArray . Just . V.fromList kaArrayToVector :: K.KaArray a -> V.Vector a kaArrayToVector kaArray = fromMaybe V.empty (K.unKaArray kaArray) -vectorToKaArray :: V.Vector a -> K.KaArray a -vectorToKaArray vec = K.KaArray (Just vec) - mapKaArray :: (a -> b) -> K.KaArray a -> K.KaArray b mapKaArray f arr = K.KaArray (fmap (V.map f) (K.unKaArray arr)) diff --git a/hstream-kafka/HStream/Kafka/Group/Group.hs b/hstream-kafka/HStream/Kafka/Group/Group.hs index a01e451ab..c7fde497e 100644 --- a/hstream-kafka/HStream/Kafka/Group/Group.hs +++ b/hstream-kafka/HStream/Kafka/Group/Group.hs @@ -916,11 +916,12 @@ fetchOffsets Group{..} reqTopic validateReqTopic = validateReqTopic reqTopic >>= , partitions = K.KaArray (Just $ (makeErrorPartition code) <$> partitions') } where + -- FIXME: hardcoded constants makeErrorPartition code idx = K.OffsetFetchResponsePartition { partitionIndex = idx , committedOffset = -1 - , metadata = Nothing + , metadata = Just "" , errorCode = code } diff --git a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs index 3f4161dc4..a59b48180 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs @@ -189,18 +189,19 @@ fetchOffsets GroupOffsetManager{..} topicName partitions = do , partitions = KaArray {unKaArray = Just res} } where + -- FIXME: hardcoded constants getOffset cache partitionIdx = do let key = mkTopicPartition topicName partitionIdx in case Map.lookup key cache of Just offset -> return $ OffsetFetchResponsePartition { committedOffset = offset - , metadata = Nothing + , metadata = Just "" , partitionIndex= partitionIdx , errorCode = K.NONE } Nothing -> return $ OffsetFetchResponsePartition { committedOffset = -1 - , metadata = Nothing + , metadata = Just "" , partitionIndex= partitionIdx , errorCode = K.NONE -- TODO: check the error code here @@ -214,9 +215,10 @@ fetchAllOffsets GroupOffsetManager{..} = do -- group offsets by TopicName cachedOffset <- Map.foldrWithKey foldF Map.empty <$> readIORef offsetsCache return . KaArray . Just . V.map makeTopic . V.fromList . Map.toList $ cachedOffset + -- FIXME: hardcoded constants where makePartition partition offset = OffsetFetchResponsePartition { committedOffset = offset - , metadata = Nothing + , metadata = Just "" , partitionIndex=partition , errorCode = K.NONE } diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs index df4ae8387..30bce055c 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs @@ -192,7 +192,7 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_DESCRIBE >>= \case False -> return $ makeErrorResponse K.TOPIC_AUTHORIZATION_FAILED True -> do - group <- GC.getGroup scGroupCoordinator req.groupId + group_m <- GC.getGroupM scGroupCoordinator req.groupId case K.unKaArray req.topics of -- 'Nothing' means fetch offsets of ALL topics. -- WARNING: Offsets of unauthzed topics should not be leaked @@ -201,11 +201,13 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio -- FIXME: Better method than passing a "validate" function? Nothing -> do Log.debug $ "fetching all offsets in group:" <> Log.build req.groupId - topicResps <- G.fetchAllOffsets group $ \reqTopic -> do - -- [ACL] check [DESCRIBE TOPIC] for each topic - simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic.name AclOp_DESCRIBE >>= \case - False -> return K.TOPIC_AUTHORIZATION_FAILED - True -> return K.NONE + topicResps <- case group_m of + Nothing -> return (K.NonNullKaArray V.empty) + Just group -> G.fetchAllOffsets group $ \reqTopic -> do + -- [ACL] check [DESCRIBE TOPIC] for each topic + simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic.name AclOp_DESCRIBE >>= \case + False -> return K.TOPIC_AUTHORIZATION_FAILED + True -> return K.NONE return $ K.OffsetFetchResponse { throttleTimeMs = 0 , errorCode = K.NONE @@ -217,11 +219,15 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio -- on each topic. That is why we pass a "validate" -- function to it. -- FIXME: Better method than passing a "validate" function? - G.fetchOffsets group reqTopic $ \reqTopic_ -> do - -- [ACL] check [DESCRIBE TOPIC] for each topic - simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic_.name AclOp_DESCRIBE >>= \case - False -> return K.TOPIC_AUTHORIZATION_FAILED - True -> return K.NONE + case group_m of + -- FIXME: what error code should it return? 'K.UNKNOWN_TOPIC_OR_PARTITION' + -- crashes some tests... + Nothing -> return (makeErrorTopicResponse K.NONE reqTopic) + Just group -> G.fetchOffsets group reqTopic $ \reqTopic_ -> do + -- [ACL] check [DESCRIBE TOPIC] for each topic + simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic_.name AclOp_DESCRIBE >>= \case + False -> return K.TOPIC_AUTHORIZATION_FAILED + True -> return K.NONE return $ K.OffsetFetchResponse { throttleTimeMs = 0 , errorCode = K.NONE @@ -240,7 +246,7 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio K.OffsetFetchResponsePartition { partitionIndex = idx , errorCode = code - , metadata = Nothing + , metadata = Just "" , committedOffset = -1 } }