diff --git a/hstream-kafka/HStream/Kafka/Group/Group.hs b/hstream-kafka/HStream/Kafka/Group/Group.hs index a01e451ab..c7fde497e 100644 --- a/hstream-kafka/HStream/Kafka/Group/Group.hs +++ b/hstream-kafka/HStream/Kafka/Group/Group.hs @@ -916,11 +916,12 @@ fetchOffsets Group{..} reqTopic validateReqTopic = validateReqTopic reqTopic >>= , partitions = K.KaArray (Just $ (makeErrorPartition code) <$> partitions') } where + -- FIXME: hardcoded constants makeErrorPartition code idx = K.OffsetFetchResponsePartition { partitionIndex = idx , committedOffset = -1 - , metadata = Nothing + , metadata = Just "" , errorCode = code } diff --git a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs index 3f4161dc4..a59b48180 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs @@ -189,18 +189,19 @@ fetchOffsets GroupOffsetManager{..} topicName partitions = do , partitions = KaArray {unKaArray = Just res} } where + -- FIXME: hardcoded constants getOffset cache partitionIdx = do let key = mkTopicPartition topicName partitionIdx in case Map.lookup key cache of Just offset -> return $ OffsetFetchResponsePartition { committedOffset = offset - , metadata = Nothing + , metadata = Just "" , partitionIndex= partitionIdx , errorCode = K.NONE } Nothing -> return $ OffsetFetchResponsePartition { committedOffset = -1 - , metadata = Nothing + , metadata = Just "" , partitionIndex= partitionIdx , errorCode = K.NONE -- TODO: check the error code here @@ -214,9 +215,10 @@ fetchAllOffsets GroupOffsetManager{..} = do -- group offsets by TopicName cachedOffset <- Map.foldrWithKey foldF Map.empty <$> readIORef offsetsCache return . KaArray . Just . V.map makeTopic . V.fromList . Map.toList $ cachedOffset + -- FIXME: hardcoded constants where makePartition partition offset = OffsetFetchResponsePartition { committedOffset = offset - , metadata = Nothing + , metadata = Just "" , partitionIndex=partition , errorCode = K.NONE } diff --git a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs index df4ae8387..f68186e0c 100644 --- a/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs +++ b/hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs @@ -21,7 +21,8 @@ import HStream.Kafka.Common.OffsetManager (getLatestOffset, getOffsetByTimestamp, getOldestOffset) import HStream.Kafka.Common.Resource -import HStream.Kafka.Common.Utils (forKaArray, forKaArrayM) +import HStream.Kafka.Common.Utils (forKaArray, forKaArrayM, + vectorToKaArray) import qualified HStream.Kafka.Group.Group as G import qualified HStream.Kafka.Group.GroupCoordinator as GC import HStream.Kafka.Server.Types (ServerContext (..)) @@ -192,7 +193,7 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_DESCRIBE >>= \case False -> return $ makeErrorResponse K.TOPIC_AUTHORIZATION_FAILED True -> do - group <- GC.getGroup scGroupCoordinator req.groupId + group_m <- GC.getGroupM scGroupCoordinator req.groupId case K.unKaArray req.topics of -- 'Nothing' means fetch offsets of ALL topics. -- WARNING: Offsets of unauthzed topics should not be leaked @@ -201,11 +202,13 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio -- FIXME: Better method than passing a "validate" function? Nothing -> do Log.debug $ "fetching all offsets in group:" <> Log.build req.groupId - topicResps <- G.fetchAllOffsets group $ \reqTopic -> do - -- [ACL] check [DESCRIBE TOPIC] for each topic - simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic.name AclOp_DESCRIBE >>= \case - False -> return K.TOPIC_AUTHORIZATION_FAILED - True -> return K.NONE + topicResps <- case group_m of + Nothing -> return (vectorToKaArray V.empty) + Just group -> G.fetchAllOffsets group $ \reqTopic -> do + -- [ACL] check [DESCRIBE TOPIC] for each topic + simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic.name AclOp_DESCRIBE >>= \case + False -> return K.TOPIC_AUTHORIZATION_FAILED + True -> return K.NONE return $ K.OffsetFetchResponse { throttleTimeMs = 0 , errorCode = K.NONE @@ -217,11 +220,15 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio -- on each topic. That is why we pass a "validate" -- function to it. -- FIXME: Better method than passing a "validate" function? - G.fetchOffsets group reqTopic $ \reqTopic_ -> do - -- [ACL] check [DESCRIBE TOPIC] for each topic - simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic_.name AclOp_DESCRIBE >>= \case - False -> return K.TOPIC_AUTHORIZATION_FAILED - True -> return K.NONE + case group_m of + -- FIXME: what error code should it return? 'K.UNKNOWN_TOPIC_OR_PARTITION' + -- crashes some tests... + Nothing -> return (makeErrorTopicResponse K.NONE reqTopic) + Just group -> G.fetchOffsets group reqTopic $ \reqTopic_ -> do + -- [ACL] check [DESCRIBE TOPIC] for each topic + simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic_.name AclOp_DESCRIBE >>= \case + False -> return K.TOPIC_AUTHORIZATION_FAILED + True -> return K.NONE return $ K.OffsetFetchResponse { throttleTimeMs = 0 , errorCode = K.NONE @@ -240,7 +247,7 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio K.OffsetFetchResponsePartition { partitionIndex = idx , errorCode = code - , metadata = Nothing + , metadata = Just "" , committedOffset = -1 } }