Skip to content

Commit

Permalink
kafka: fix response for OffsetFetch
Browse files Browse the repository at this point in the history
  • Loading branch information
Commelina committed May 8, 2024
1 parent e8073da commit 78e1777
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
3 changes: 2 additions & 1 deletion hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -916,11 +916,12 @@ fetchOffsets Group{..} reqTopic validateReqTopic = validateReqTopic reqTopic >>=
, partitions = K.KaArray (Just $ (makeErrorPartition code) <$> partitions')
}
where
-- FIXME: hardcoded constants
makeErrorPartition code idx =
K.OffsetFetchResponsePartition
{ partitionIndex = idx
, committedOffset = -1
, metadata = Nothing
, metadata = Just ""
, errorCode = code
}

Expand Down
8 changes: 5 additions & 3 deletions hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -189,18 +189,19 @@ fetchOffsets GroupOffsetManager{..} topicName partitions = do
, partitions = KaArray {unKaArray = Just res}
}
where
-- FIXME: hardcoded constants
getOffset cache partitionIdx = do
let key = mkTopicPartition topicName partitionIdx
in case Map.lookup key cache of
Just offset -> return $ OffsetFetchResponsePartition
{ committedOffset = offset
, metadata = Nothing
, metadata = Just ""
, partitionIndex= partitionIdx
, errorCode = K.NONE
}
Nothing -> return $ OffsetFetchResponsePartition
{ committedOffset = -1
, metadata = Nothing
, metadata = Just ""
, partitionIndex= partitionIdx
, errorCode = K.NONE
-- TODO: check the error code here
Expand All @@ -214,9 +215,10 @@ fetchAllOffsets GroupOffsetManager{..} = do
-- group offsets by TopicName
cachedOffset <- Map.foldrWithKey foldF Map.empty <$> readIORef offsetsCache
return . KaArray . Just . V.map makeTopic . V.fromList . Map.toList $ cachedOffset
-- FIXME: hardcoded constants
where makePartition partition offset = OffsetFetchResponsePartition
{ committedOffset = offset
, metadata = Nothing
, metadata = Just ""
, partitionIndex=partition
, errorCode = K.NONE
}
Expand Down
33 changes: 20 additions & 13 deletions hstream-kafka/HStream/Kafka/Server/Handler/Offset.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import HStream.Kafka.Common.OffsetManager (getLatestOffset,
getOffsetByTimestamp,
getOldestOffset)
import HStream.Kafka.Common.Resource
import HStream.Kafka.Common.Utils (forKaArray, forKaArrayM)
import HStream.Kafka.Common.Utils (forKaArray, forKaArrayM,
vectorToKaArray)
import qualified HStream.Kafka.Group.Group as G
import qualified HStream.Kafka.Group.GroupCoordinator as GC
import HStream.Kafka.Server.Types (ServerContext (..))
Expand Down Expand Up @@ -192,7 +193,7 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_GROUP req.groupId AclOp_DESCRIBE >>= \case
False -> return $ makeErrorResponse K.TOPIC_AUTHORIZATION_FAILED
True -> do
group <- GC.getGroup scGroupCoordinator req.groupId
group_m <- GC.getGroupM scGroupCoordinator req.groupId
case K.unKaArray req.topics of
-- 'Nothing' means fetch offsets of ALL topics.
-- WARNING: Offsets of unauthzed topics should not be leaked
Expand All @@ -201,11 +202,13 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
-- FIXME: Better method than passing a "validate" function?
Nothing -> do
Log.debug $ "fetching all offsets in group:" <> Log.build req.groupId
topicResps <- G.fetchAllOffsets group $ \reqTopic -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
topicResps <- case group_m of
Nothing -> return (vectorToKaArray V.empty)
Just group -> G.fetchAllOffsets group $ \reqTopic -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
return $ K.OffsetFetchResponse
{ throttleTimeMs = 0
, errorCode = K.NONE
Expand All @@ -217,11 +220,15 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
-- on each topic. That is why we pass a "validate"
-- function to it.
-- FIXME: Better method than passing a "validate" function?
G.fetchOffsets group reqTopic $ \reqTopic_ -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic_.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
case group_m of
-- FIXME: what error code should it return? 'K.UNKNOWN_TOPIC_OR_PARTITION'
-- crashes some tests...
Nothing -> return (makeErrorTopicResponse K.NONE reqTopic)
Just group -> G.fetchOffsets group reqTopic $ \reqTopic_ -> do
-- [ACL] check [DESCRIBE TOPIC] for each topic
simpleAuthorize (toAuthorizableReqCtx reqCtx) authorizer Res_TOPIC reqTopic_.name AclOp_DESCRIBE >>= \case
False -> return K.TOPIC_AUTHORIZATION_FAILED
True -> return K.NONE
return $ K.OffsetFetchResponse
{ throttleTimeMs = 0
, errorCode = K.NONE
Expand All @@ -240,7 +247,7 @@ handleOffsetFetch ServerContext{..} reqCtx req = E.handle (\(K.ErrorCodeExceptio
K.OffsetFetchResponsePartition
{ partitionIndex = idx
, errorCode = code
, metadata = Nothing
, metadata = Just ""
, committedOffset = -1
}
}
Expand Down

0 comments on commit 78e1777

Please sign in to comment.