From 255732c3cacf5ac6011b516b548533ca3e0e98b1 Mon Sep 17 00:00:00 2001 From: YangKian <1207783292@qq.com> Date: Tue, 30 Apr 2024 11:09:31 +0800 Subject: [PATCH] refactor: support set group initialRebalaceDelay --- hstream-kafka/HStream/Kafka/Group/Group.hs | 38 ++++++++++--------- .../HStream/Kafka/Group/GroupCoordinator.hs | 30 +++++++++------ .../HStream/Kafka/Group/GroupOffsetManager.hs | 12 ++++-- hstream-kafka/HStream/Kafka/Server/Types.hs | 19 +++++++++- 4 files changed, 65 insertions(+), 34 deletions(-) diff --git a/hstream-kafka/HStream/Kafka/Group/Group.hs b/hstream-kafka/HStream/Kafka/Group/Group.hs index 6a71f0bb1..a01e451ab 100644 --- a/hstream-kafka/HStream/Kafka/Group/Group.hs +++ b/hstream-kafka/HStream/Kafka/Group/Group.hs @@ -105,18 +105,12 @@ data GroupState | Empty deriving (Show, Eq) -data GroupConfig - = GroupConfig - { - } - -data Group - = Group +data Group = Group { lock :: C.MVar () , groupId :: T.Text , groupGenerationId :: IO.IORef Int32 , state :: IO.IORef GroupState - , config :: GroupConfig + , groupConfig :: GroupConfig , leader :: IO.IORef (Maybe T.Text) , members :: HashTable T.Text Member -- , pendingMembers :: HashTable T.Text () @@ -142,8 +136,12 @@ data Group , storedMetadata :: IO.IORef Bool } -newGroup :: T.Text -> GroupOffsetManager -> Meta.MetaHandle -> IO Group -newGroup group metadataManager metaHandle = do +data GroupConfig = GroupConfig + { groupInitialRebalanceDelay :: Int + } deriving (Show) + +newGroup :: T.Text -> GroupOffsetManager -> Meta.MetaHandle -> GroupConfig -> IO Group +newGroup group metadataManager metaHandle config = do lock <- C.newMVar () state <- IO.newIORef Empty groupGenerationId <- IO.newIORef 0 @@ -169,7 +167,7 @@ newGroup group metadataManager metaHandle = do , groupId = group , groupGenerationId = groupGenerationId , state = state - , config = GroupConfig + , groupConfig = config , leader = leader -- all members , members = members @@ -193,8 +191,13 @@ newGroup group metadataManager metaHandle = do , storedMetadata = storedMetadata } -newGroupFromValue :: CM.GroupMetadataValue -> GroupOffsetManager -> Meta.MetaHandle -> IO Group -newGroupFromValue value metadataManager metaHandle = do +newGroupFromValue + :: CM.GroupMetadataValue + -> GroupOffsetManager + -> Meta.MetaHandle + -> GroupConfig + -> IO Group +newGroupFromValue value metadataManager metaHandle config = do lock <- C.newMVar () state <- IO.newIORef (if V.null value.members then Empty else Stable) @@ -221,7 +224,7 @@ newGroupFromValue value metadataManager metaHandle = do , groupId = value.groupId , groupGenerationId = groupGenerationId , state = state - , config = GroupConfig + , groupConfig = config , leader = leader -- all members , members = members @@ -371,10 +374,9 @@ prepareRebalance group@Group{..} reason = do -- isEmptyState <- (Empty ==) <$> IO.readIORef state -- setup delayed rebalance if delayedRebalance is Nothing - -- TODO: configurable initRebalanceDelayMs, 5000 by default IO.readIORef delayedRebalance >>= \case Nothing -> do - delayed <- makeDelayedRebalance group 5000 + delayed <- makeDelayedRebalance group group.groupConfig.groupInitialRebalanceDelay Log.info $ "created delayed rebalance thread:" <> Log.buildString' delayed <> ", group:" <> Log.build groupId IO.atomicWriteIORef delayedRebalance (Just delayed) @@ -382,10 +384,10 @@ prepareRebalance group@Group{..} reason = do _ -> pure () -- TODO: dynamically delay with initTimeoutMs and RebalanceTimeoutMs -makeDelayedRebalance :: Group -> Int32 -> IO C.ThreadId +makeDelayedRebalance :: Group -> Int -> IO C.ThreadId makeDelayedRebalance group rebalanceDelayMs = do C.forkIO $ do - C.threadDelay (1000 * fromIntegral rebalanceDelayMs) + C.threadDelay (1000 * rebalanceDelayMs) rebalance group rebalance :: Group -> IO () diff --git a/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs index 2d3029b19..400df2a9c 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupCoordinator.hs @@ -26,15 +26,22 @@ import HStream.Store (LDClient) import qualified Kafka.Protocol.Error as K data GroupCoordinator = GroupCoordinator - { groups :: C.MVar (Utils.HashTable T.Text Group) - , metaHandle :: Meta.MetaHandle - , serverId :: Word32 - , ldClient :: LDClient - , offsetTopicReplica :: Int + { groups :: C.MVar (Utils.HashTable T.Text Group) + , metaHandle :: Meta.MetaHandle + , serverId :: Word32 + , ldClient :: LDClient + , groupConfig :: G.GroupConfig + , offsetConfig :: GOM.OffsetConfig } -mkGroupCoordinator :: Meta.MetaHandle -> LDClient -> Word32 -> Int -> IO GroupCoordinator -mkGroupCoordinator metaHandle ldClient serverId offsetTopicReplica = do +mkGroupCoordinator + :: Meta.MetaHandle + -> LDClient + -> Word32 + -> GOM.OffsetConfig + -> G.GroupConfig + -> IO GroupCoordinator +mkGroupCoordinator metaHandle ldClient serverId offsetConfig groupConfig = do groups <- H.new >>= C.newMVar return $ GroupCoordinator {..} @@ -59,8 +66,8 @@ getOrMaybeCreateGroup GroupCoordinator{..} groupId memberId = do H.lookup gs groupId >>= \case Nothing -> if T.null memberId then do - metadataManager <- GOM.mkGroupOffsetManager ldClient (fromIntegral serverId) groupId offsetTopicReplica - ng <- G.newGroup groupId metadataManager metaHandle + metadataManager <- GOM.mkGroupOffsetManager ldClient (fromIntegral serverId) groupId offsetConfig + ng <- G.newGroup groupId metadataManager metaHandle groupConfig H.insert gs groupId ng return ng else throw (ErrorCodeException K.UNKNOWN_MEMBER_ID) @@ -90,7 +97,7 @@ getGroupM GroupCoordinator{..} groupId = do -- load group from meta store loadGroupAndOffsets :: GroupCoordinator -> T.Text -> IO () loadGroupAndOffsets gc groupId = do - offsetManager <- GOM.mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId gc.offsetTopicReplica + offsetManager <- GOM.mkGroupOffsetManager gc.ldClient (fromIntegral gc.serverId) groupId gc.offsetConfig GOM.loadOffsetsFromStorage offsetManager Meta.getMeta @CM.GroupMetadataValue groupId gc.metaHandle >>= \case Nothing -> do @@ -105,7 +112,8 @@ addGroupByValue gc value offsetManager = do C.withMVar gc.groups $ \gs -> do H.lookup gs value.groupId >>= \case Nothing -> do - ng <- G.newGroupFromValue value offsetManager gc.metaHandle + -- TODO: double check if persistence groupConfig in metastore is needed + ng <- G.newGroupFromValue value offsetManager gc.metaHandle gc.groupConfig H.insert gs value.groupId ng Just _ -> do Log.warning $ "load group failed, group:" <> Log.build value.groupId <> " is loaded" diff --git a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs index 90f8fb5ff..3f4161dc4 100644 --- a/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs +++ b/hstream-kafka/HStream/Kafka/Group/GroupOffsetManager.hs @@ -8,6 +8,7 @@ module HStream.Kafka.Group.GroupOffsetManager , fetchAllOffsets , nullOffsets , loadOffsetsFromStorage + , OffsetConfig (..) ) where import Control.Exception (throw) @@ -50,15 +51,20 @@ data GroupOffsetManager = forall os. OffsetStorage os => GroupOffsetManager , offsetStorage :: os , offsetsCache :: IORef (Map.Map TopicPartition Int64) , partitionsMap :: IORef (Map.Map TopicPartition S.C_LogID) + , offsetConfig :: OffsetConfig } +data OffsetConfig = OffsetConfig + { offsetsTopicReplicationFactor :: Int + } deriving (Show) + -- FIXME: if we create a consumer group with groupName haven been used, call -- mkCkpOffsetStorage with groupName may lead us to a un-clean ckp-store -mkGroupOffsetManager :: S.LDClient -> Int32 -> T.Text -> Int -> IO GroupOffsetManager -mkGroupOffsetManager ldClient serverId groupName offsetReplica = do +mkGroupOffsetManager :: S.LDClient -> Int32 -> T.Text -> OffsetConfig -> IO GroupOffsetManager +mkGroupOffsetManager ldClient serverId groupName offsetConfig = do offsetsCache <- newIORef Map.empty partitionsMap <- newIORef Map.empty - offsetStorage <- mkCkpOffsetStorage ldClient groupName offsetReplica + offsetStorage <- mkCkpOffsetStorage ldClient groupName offsetConfig.offsetsTopicReplicationFactor return GroupOffsetManager{..} loadOffsetsFromStorage :: GroupOffsetManager -> IO () diff --git a/hstream-kafka/HStream/Kafka/Server/Types.hs b/hstream-kafka/HStream/Kafka/Server/Types.hs index b61835ed2..32b2dd0fa 100644 --- a/hstream-kafka/HStream/Kafka/Server/Types.hs +++ b/hstream-kafka/HStream/Kafka/Server/Types.hs @@ -18,8 +18,10 @@ import HStream.Kafka.Common.FetchManager (FetchContext, import HStream.Kafka.Common.OffsetManager (OffsetManager, initOffsetReader, newOffsetManager) +import qualified HStream.Kafka.Group.Group as G import HStream.Kafka.Group.GroupCoordinator (GroupCoordinator, mkGroupCoordinator) +import qualified HStream.Kafka.Group.GroupOffsetManager as GOM import HStream.Kafka.Server.Config (ServerOpts (..)) import qualified HStream.Kafka.Server.Config.KafkaConfig as KC import HStream.MetaStore.Types (MetaHandle (..)) @@ -62,8 +64,9 @@ initServerContext opts@ServerOpts{..} gossipContext mh = do statsHolder <- newServerStatsHolder epochHashRing <- initializeHashRing gossipContext - let replica = _kafkaBrokerConfigs.offsetsTopicReplication._value - scGroupCoordinator <- mkGroupCoordinator mh ldclient _serverID replica + let groupConfigs = brokerConfigToGroupConfig _kafkaBrokerConfigs + offsetConfigs = brokerConfigToOffsetConfig _kafkaBrokerConfigs + scGroupCoordinator <- mkGroupCoordinator mh ldclient _serverID offsetConfigs groupConfigs -- must be initialized later offsetManager <- newOffsetManager ldclient -- Trick to avoid use maybe, must be initialized later @@ -113,3 +116,15 @@ initConnectionContext sc = do !fc <- initFetchContext (scLDClient sc) pure sc{scOffsetManager = om, fetchCtx = fc} + +brokerConfigToOffsetConfig :: KC.KafkaBrokerConfigs -> GOM.OffsetConfig +brokerConfigToOffsetConfig KC.KafkaBrokerConfigs{..} = + GOM.OffsetConfig { + offsetsTopicReplicationFactor = offsetsTopicReplication._value + } + +brokerConfigToGroupConfig :: KC.KafkaBrokerConfigs -> G.GroupConfig +brokerConfigToGroupConfig KC.KafkaBrokerConfigs{..} = + G.GroupConfig { + groupInitialRebalanceDelay = groupInitialRebalanceDelay._value + }