Skip to content

Commit

Permalink
fix(kafka-consumer-group): update heartbeat after committing offsets (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
s12f authored Feb 2, 2024
1 parent f861289 commit d2d41b6
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -836,14 +836,16 @@ commitOffsets group@Group{..} req = do
CompletingRebalance -> throw (ErrorCodeException K.REBALANCE_IN_PROGRESS)
Dead -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID)
_ -> do
-- updateLatestHeartbeat
-- TODO: update heartbeat
topics <- Utils.forKaArrayM req.topics $ \K.OffsetCommitRequestTopic{..} -> do
res <- GOM.storeOffsets metadataManager name partitions
return $ K.OffsetCommitResponseTopic {partitions = res, name = name}
Utils.whenIORefEq storedMetadata False $ do
Log.info $ "commited offsets on Empty Group, storing Empty Group:" <> Log.build group.groupId
storeGroup group Map.empty
-- updateLatestHeartbeat
H.lookup members req.memberId >>= \case
Nothing -> pure ()
Just m -> updateLatestHeartbeat m
return K.OffsetCommitResponse {topics=topics, throttleTimeMs=0}

validateOffsetcommit :: Group -> K.OffsetCommitRequest -> IO ()
Expand Down

0 comments on commit d2d41b6

Please sign in to comment.