Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka: do rebalance on all members have rejoined & some member is removed #1804

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions hstream-kafka/HStream/Kafka/Common/Utils.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

module HStream.Kafka.Common.Utils where

import Control.Concurrent
import Control.Exception (throw)
import qualified Control.Monad as M
import qualified Control.Monad.ST as ST
Expand All @@ -18,6 +19,7 @@ import qualified Data.Text.Encoding as T
import qualified Data.Vector as V
import HStream.Kafka.Common.KafkaException (ErrorCodeException (ErrorCodeException))
import qualified Kafka.Protocol.Encoding as K
import qualified System.Timeout as Timeout

type HashTable k v = H.BasicHashTable k v

Expand Down Expand Up @@ -96,3 +98,15 @@ encodeBase64 = Base64.extractBase64 . Base64.encodeBase64

decodeBase64 :: T.Text -> BS.ByteString
decodeBase64 = Base64.decodeBase64Lenient . T.encodeUtf8

-- | Perform the action when the predicate is true or timeout is reached.
onOrTimeout :: IO Bool -> Int -> IO b -> IO b
onOrTimeout p timeoutMs action =
Timeout.timeout (timeoutMs * 1000) loop >>= \case
Nothing -> action
Just a -> return a
where
loop = p >>= \case
True -> action
-- FIXME: Hardcoded constant (check every 1ms)
False -> threadDelay 1000 >> loop
73 changes: 58 additions & 15 deletions hstream-kafka/HStream/Kafka/Group/Group.hs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import Data.Int (Int32)
import qualified Data.IORef as IO
import qualified Data.List as List
import qualified Data.Map as Map
import Data.Maybe (fromMaybe, listToMaybe)
import Data.Maybe (fromMaybe, isJust,
listToMaybe)
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Data.UUID as UUID
Expand Down Expand Up @@ -350,18 +351,55 @@ doDynamicNewMemberJoinGroup group reqCtx req newMemberId delayedResponse = do

addMemberAndRebalance :: Group -> K.RequestContext -> K.JoinGroupRequest -> T.Text -> C.MVar K.JoinGroupResponse -> IO ()
addMemberAndRebalance group reqCtx req newMemberId delayedResponse = do
isGroupEmpty <- Utils.hashtableNull group.members
member <- newMemberFromReq reqCtx req newMemberId (refineProtocols req.protocols)
addMember group member (Just delayedResponse)
-- TODO: check state
prepareRebalance group $ "add member:" <> member.memberId
-- Note: We consider the group as a new one if empty, and always wait
-- an interval for joining. Otherwise, wait until all previous members
-- have joined or timeout.
-- FIXME: Is this correct?
-- FIXME: How long to wait for a new group? Is 'group.initial.rebalance.delay.ms' correct?
-- FIXME: Hardcoded constant!
memberRebalanceTimeoutMs <- IO.readIORef member.rebalanceTimeoutMs
prepareRebalance group
(if isGroupEmpty then return False else haveAllMembersRejoined)
(if isGroupEmpty then fromIntegral group.groupConfig.groupInitialRebalanceDelay
else memberRebalanceTimeoutMs)
("add member:" <> member.memberId)
where
-- Note: The new-added member is always present. So this is equivalent
-- to check among members before.
haveAllMembersRejoined :: IO Bool
haveAllMembersRejoined = do
H.foldM (\acc (mid,_) -> case acc of
False -> return False
True -> isJust <$> H.lookup group.delayedJoinResponses mid
) True group.members

updateMemberAndRebalance :: Group -> Member -> K.JoinGroupRequest -> C.MVar K.JoinGroupResponse -> IO ()
updateMemberAndRebalance group member req delayedResponse = do
updateMember group member req delayedResponse
prepareRebalance group $ "update member:" <> member.memberId

prepareRebalance :: Group -> T.Text -> IO ()
prepareRebalance group@Group{..} reason = do
-- Note: On this case, the group can not be empty because at least this member
-- is present. So we wait until all previous members have joined or timeout.
-- FIXME: How long to wait? Is 'rebalanceTimeoutMs' of the member correct?
timeout <- IO.readIORef member.rebalanceTimeoutMs
prepareRebalance group
haveAllMembersRejoined
timeout
("update member:" <> member.memberId)
where
-- Note: The new-added member is always present. So this is equivalent
-- to check among members before.
haveAllMembersRejoined :: IO Bool
haveAllMembersRejoined = do
H.foldM (\acc (mid,_) -> case acc of
False -> return False
True -> isJust <$> H.lookup group.delayedJoinResponses mid
) True group.members

prepareRebalance :: Group -> IO Bool -> Int32 -> T.Text -> IO ()
prepareRebalance group@Group{..} p timeoutMs reason = do
Log.info $ "prepare rebalance, group:" <> Log.build groupId
<> "reason:" <> Log.build reason
-- check state CompletingRebalance and cancel delayedSyncResponses
Expand All @@ -376,19 +414,16 @@ prepareRebalance group@Group{..} reason = do
-- setup delayed rebalance if delayedRebalance is Nothing
IO.readIORef delayedRebalance >>= \case
Nothing -> do
delayed <- makeDelayedRebalance group group.groupConfig.groupInitialRebalanceDelay
delayed <- makeDelayedRebalance group p (fromIntegral timeoutMs)
Log.info $ "created delayed rebalance thread:" <> Log.buildString' delayed
<> ", group:" <> Log.build groupId
IO.atomicWriteIORef delayedRebalance (Just delayed)
IO.atomicWriteIORef state PreparingRebalance
_ -> pure ()

-- TODO: dynamically delay with initTimeoutMs and RebalanceTimeoutMs
makeDelayedRebalance :: Group -> Int -> IO C.ThreadId
makeDelayedRebalance group rebalanceDelayMs = do
C.forkIO $ do
C.threadDelay (1000 * rebalanceDelayMs)
rebalance group
makeDelayedRebalance :: Group -> IO Bool -> Int -> IO C.ThreadId
makeDelayedRebalance group p rebalanceDelayMs =
C.forkIO $ Utils.onOrTimeout p rebalanceDelayMs (rebalance group)

rebalance :: Group -> IO ()
rebalance group@Group{..} = do
Expand Down Expand Up @@ -454,6 +489,8 @@ doRelance group@Group{..} leaderMemberId = do
H.delete delayedJoinResponses memberId

rebalanceTimeoutMs <- computeRebalnceTimeoutMs group
-- FIXME: Is it correct to use rebalance timeout here? Or maybe session timeout?
-- The state machine here is really weird...
delayedSyncTid <- makeDelayedSync group generationId rebalanceTimeoutMs
IO.atomicWriteIORef delayedSync (Just delayedSyncTid)
Log.info $ "create delayed sync for group:" <> Log.build groupId
Expand Down Expand Up @@ -496,7 +533,10 @@ makeDelayedSync group@Group{..} generationId timeoutMs = do

-- remove itself (to avoid killing itself in prepareRebalance)
IO.atomicWriteIORef delayedSync Nothing
prepareRebalance group $ "delayed sync timeout"
prepareRebalance group
(pure False) -- FIXME: Is this correct?
5000 -- FIXME: timeout?
"delayed sync timeout"
s -> do
Log.warning $ "unexpected delayed sync with wrong state:" <> Log.buildString' s
<> ", group:" <> Log.build groupId
Expand Down Expand Up @@ -731,7 +771,10 @@ removeMemberAndUpdateGroup group@Group{..} member = do
cancelDelayedJoinResponse group member.memberId

removeMember group member
prepareRebalance group $ "remove member:" <> member.memberId
prepareRebalance group
(pure True) -- FIXME: Is this correct?
5000 -- FIXME: timeout?
("remove member:" <> member.memberId)

cancelDelayedJoinResponse :: Group -> T.Text -> IO ()
cancelDelayedJoinResponse Group{..} memberId = do
Expand Down
Loading