Skip to content

Commit

Permalink
handle lookup when meta cluster unavailable
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jun 19, 2024
1 parent a28aab2 commit adcc752
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 17 deletions.
14 changes: 9 additions & 5 deletions common/server/HStream/Common/Server/Lookup.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ module HStream.Common.Server.Lookup
) where

import Control.Concurrent.STM
import Control.Exception (SomeException (..), try)
import Control.Exception (SomeException (..), throwIO,
try)
import Data.List (find)
import Data.Text (Text)
import qualified Data.Vector as V
Expand All @@ -21,6 +22,7 @@ import HStream.Common.Server.HashRing (LoadBalanceHashRing,
readLoadBalanceHashRing)
import HStream.Common.Server.MetaData (TaskAllocation (..))
import HStream.Common.Types (fromInternalServerNodeWithKey)
import qualified HStream.Exception as HE
import HStream.Gossip (GossipContext, getMemberList)
import qualified HStream.Logger as Log
import qualified HStream.MetaStore.Types as M
Expand Down Expand Up @@ -55,8 +57,9 @@ lookupNodePersist metaHandle gossipContext loadBalanceHashRing
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
throwIO $ HE.ResourceAllocationException "server is in backup mode, try later"
-- lookupNodePersist metaHandle gossipContext loadBalanceHashRing
-- key metaId advertisedListenersKey
Right () -> return theNode
Just (TaskAllocation epoch nodeId, version) -> do
serverList <- getMemberList gossipContext >>=
Expand All @@ -73,8 +76,9 @@ lookupNodePersist metaHandle gossipContext loadBalanceHashRing
-- TODO: add a retry limit here
Log.warning $ "lookupNodePersist exception: " <> Log.buildString' e
<> ", retry..."
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
key metaId advertisedListenersKey
throwIO $ HE.ResourceAllocationException "server is in backup mode, try later"
-- lookupNodePersist metaHandle gossipContext loadBalanceHashRing
-- key metaId advertisedListenersKey
Right () -> return theNode'

data KafkaResource
Expand Down
2 changes: 1 addition & 1 deletion hstream/src/HStream/Server/CacheStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import System.Clock
import Text.Printf (printf)

import Database.RocksDB
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.Store as S
import qualified HStream.Exception as HE

-- StoreMode is a logical concept that represents the operations that the current CacheStore can perform.
-- Cache mode: data can only be written to the store
Expand Down
2 changes: 1 addition & 1 deletion hstream/src/HStream/Server/Core/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ lookupResource sc@ServerContext{..} ResView rid = do
M.getMeta @P.ViewInfo rid metaHandle >>= \case
Nothing -> throwIO $ HE.ViewNotFound rid
Just P.ViewInfo{..} -> lookupResource sc ResQuery (P.queryId viewQuery)
lookupResource sc@ServerContext{..} rtype rid = do
lookupResource ServerContext{..} rtype rid = do
let metaId = mkAllocationKey rtype rid
lookupNodePersist metaHandle gossipContext loadBalanceHashRing
rid metaId scAdvertisedListenersKey
Expand Down
58 changes: 49 additions & 9 deletions hstream/src/HStream/Server/Handler/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@ module HStream.Server.Handler.Cluster
, handleLookupKey
) where

import qualified HsGrpc.Server as G
import Control.Exception (throwIO, try)
import Data.IORef (readIORef)
import Network.GRPC.HighLevel.Generated

import Control.Exception (throwIO)
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.Server.Core.Cluster as C
import HStream.Server.Core.Common (lookupResource)
import HStream.Server.Exception
import HStream.Server.HStreamApi
import HStream.Server.Types (ServerContext (..))
import qualified HsGrpc.Server as G
import HStream.Common.Server.Lookup (lookupNode)
-- import HStream.Server.Types (ServerContext (..))
import HStream.Server.Types (ServerContext (..),
ServerMode (..))
import HStream.ThirdParty.Protobuf (Empty)
import HStream.Utils (returnResp,
validateResourceIdAndThrow)
Expand All @@ -49,14 +53,29 @@ lookupResourceHandler
:: ServerContext
-> ServerRequest 'Normal LookupResourceRequest ServerNode
-> IO (ServerResponse 'Normal ServerNode)
lookupResourceHandler sc (ServerNormalRequest _meta req@LookupResourceRequest{..}) =
lookupResourceHandler sc@ServerContext{..} (ServerNormalRequest _meta req@LookupResourceRequest{..}) =
defaultExceptionHandle $ do
Log.debug $ "receive lookup resource request: " <> Log.build (show req)
Log.info $ "receive lookup resource request: " <> Log.build (show req)
case lookupResourceRequestResType of
Enumerated (Right rType) -> do
validateResourceIdAndThrow rType lookupResourceRequestResId
returnResp =<< lookupResource sc rType lookupResourceRequestResId
-- returnResp =<< lookupResource sc rType lookupResourceRequestResId
state <- readIORef serverState
case state of
ServerNormal -> do
returnResp =<< lookupResource sc rType lookupResourceRequestResId
ServerBackup -> do
theNode <- case rType of
ResourceTypeResShard -> do
doLookup lookupResourceRequestResId
-- ResourceTypeResStream -> doLookup lookupResourceRequestResId
tp -> do
Log.warning $ "reject lookup " <> Log.build (show tp) <> " request because server is in backup mode"
throwIO $ HE.ResourceAllocationException "server is in backup mode, try later"
returnResp theNode
x -> throwIO $ HE.InvalidResourceType (show x)
where
doLookup rid = lookupNode loadBalanceHashRing rid scAdvertisedListenersKey

lookupShardHandler
:: ServerContext
Expand Down Expand Up @@ -94,13 +113,34 @@ handleDescribeCluster :: ServerContext -> G.UnaryHandler Empty DescribeClusterRe
handleDescribeCluster sc _ _ = catchDefaultEx $ C.describeCluster sc

handleLookupResource :: ServerContext -> G.UnaryHandler LookupResourceRequest ServerNode
handleLookupResource sc _ req@LookupResourceRequest{..} = catchDefaultEx $ do
Log.debug $ "receive lookup resource request: " <> Log.build (show req)
handleLookupResource sc@ServerContext{..} _sc req@LookupResourceRequest{..} = catchDefaultEx $ do
-- Log.debug $ "receive lookup resource request: " <> Log.build (show req)
-- case lookupResourceRequestResType of
-- Enumerated (Right rType) -> do
-- validateResourceIdAndThrow rType lookupResourceRequestResId
-- C.lookupResource sc rType lookupResourceRequestResId
-- x -> throwIO $ HE.InvalidResourceType (show x)
Log.info $ "receive lookup resource request: " <> Log.build (show req)
case lookupResourceRequestResType of
Enumerated (Right rType) -> do
validateResourceIdAndThrow rType lookupResourceRequestResId
C.lookupResource sc rType lookupResourceRequestResId
state <- readIORef serverState
case state of
ServerNormal -> do
res <- try $ lookupResource sc rType lookupResourceRequestResId
case res of
Left (_ :: HE.ResourceAllocationException) -> handleLookupResource sc _sc req
Right res' -> return res'
ServerBackup -> do
case rType of
ResourceTypeResShard -> do
doLookup lookupResourceRequestResId
tp -> do
Log.warning $ "reject lookup " <> Log.build (show tp) <> " request because server is in backup mode"
throwIO $ HE.ResourceAllocationException "server is in backup mode, try later"
x -> throwIO $ HE.InvalidResourceType (show x)
where
doLookup rid = lookupNode loadBalanceHashRing rid scAdvertisedListenersKey

handleLookupShard :: ServerContext -> G.UnaryHandler LookupShardRequest LookupShardResponse
handleLookupShard sc _ req = catchDefaultEx $ C.lookupShard sc req
Expand Down
3 changes: 2 additions & 1 deletion hstream/src/HStream/Server/HealthMonitor.hs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ docheck :: ServerContext -> HealthMonitor -> IO ()
docheck sc@ServerContext{..} hm = do
ldHealthy <- checkLdCluster hm
metaHealthy <- checkMeta hm
serverMode <- getServerMode sc
when (not ldHealthy || not metaHealthy) $
Log.warning $ "Healthy check find unhealthy service: "
<> "ld cluster healthy: " <> Log.build (show ldHealthy)
<> ", meta cluster healthy: " <> Log.build (show metaHealthy)
serverMode <- getServerMode sc
<> ", serverMode: " <> Log.build (show serverMode)
if | serverMode == ServerNormal && not ldHealthy && not metaHealthy -> do
initCacheStore cachedStore
setCacheStoreMode cachedStore CacheMode
Expand Down

0 comments on commit adcc752

Please sign in to comment.