diff --git a/hstream-kafka/HStream/Kafka/Group/Group.hs b/hstream-kafka/HStream/Kafka/Group/Group.hs index 27a3fe75c..2e1be4efc 100644 --- a/hstream-kafka/HStream/Kafka/Group/Group.hs +++ b/hstream-kafka/HStream/Kafka/Group/Group.hs @@ -836,14 +836,16 @@ commitOffsets group@Group{..} req = do CompletingRebalance -> throw (ErrorCodeException K.REBALANCE_IN_PROGRESS) Dead -> throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) _ -> do - -- updateLatestHeartbeat - -- TODO: update heartbeat topics <- Utils.forKaArrayM req.topics $ \K.OffsetCommitRequestTopic{..} -> do res <- GOM.storeOffsets metadataManager name partitions return $ K.OffsetCommitResponseTopic {partitions = res, name = name} Utils.whenIORefEq storedMetadata False $ do Log.info $ "commited offsets on Empty Group, storing Empty Group:" <> Log.build group.groupId storeGroup group Map.empty + -- updateLatestHeartbeat + H.lookup members req.memberId >>= \case + Nothing -> pure () + Just m -> updateLatestHeartbeat m return K.OffsetCommitResponse {topics=topics, throttleTimeMs=0} validateOffsetcommit :: Group -> K.OffsetCommitRequest -> IO ()