Skip to content

Commit

Permalink
integration
Browse files Browse the repository at this point in the history
  • Loading branch information
YangKian committed Jun 20, 2024
1 parent 23dc65d commit 41c0df9
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 5 deletions.
13 changes: 12 additions & 1 deletion hstream/app/server.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
Expand Down Expand Up @@ -35,7 +36,8 @@ import HStream.Common.Server.HashRing (updateHashRing)
import HStream.Common.Server.MetaData (TaskAllocation (..),
clusterStartTimeId)
import HStream.Common.Types (getHStreamVersion)
import HStream.Common.ZookeeperClient (withZookeeperClient)
import HStream.Common.ZookeeperClient (unsafeGetZHandle,
withZookeeperClient)
import HStream.Exception
import HStream.Gossip (GossipContext (..),
defaultGossipOpts,
Expand All @@ -61,6 +63,8 @@ import HStream.Server.Config (AdvertisedListeners,
getConfig, runServerCli)
import qualified HStream.Server.Core.Cluster as Cluster
import qualified HStream.Server.Experimental as Exp
import HStream.Server.HealthMonitor (mkHealthMonitor,
startMonitor)
import qualified HStream.Server.HsGrpcHandler as HsGrpcHandler
import qualified HStream.Server.HStreamApi as API
import qualified HStream.Server.HStreamInternal as I
Expand Down Expand Up @@ -174,6 +178,13 @@ app config@ServerOpts{..} = do
forM_ as (Async.link2Only (const True) a)
-- wati the default server
waitGossipBoot gossipContext

let ServerContext{scLDClient, metaHandle} = serverContext
healthMonitor <- mkHealthMonitor scLDClient metaHandle 1
aMonitor <- Async.async $ startMonitor serverContext healthMonitor 3
Log.info $ "Start healthy monitor"
Async.link2Only (const True) a aMonitor

Async.wait a

serve
Expand Down
4 changes: 3 additions & 1 deletion hstream/src/HStream/Server/CacheStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ initCacheStore :: CacheStore -> IO ()
initCacheStore CacheStore{..} = do
modifyMVar_ store $ \st -> do
case st of
Just st' -> return $ Just st'
Just st'@Store{..} -> do
Log.info $ "Get cache store " <> Log.build name
return $ Just st'
Nothing -> do
db <- open dbOpts path
Log.info $ "Open rocksdb"
Expand Down
9 changes: 8 additions & 1 deletion hstream/src/HStream/Server/Core/Stream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Either (partitionEithers)
import Data.Functor ((<&>))
import Data.IORef (readIORef)
import qualified Data.List as L
import qualified Data.Map.Strict as M
import Data.Maybe (fromJust, fromMaybe)
Expand All @@ -52,10 +53,12 @@ import HStream.Common.Types
import qualified HStream.Common.ZookeeperSlotAlloc as Slot
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.Server.CacheStore as DB
import qualified HStream.Server.HStreamApi as API
import qualified HStream.Server.MetaData as P
import HStream.Server.Types (ServerContext (..),
ServerInternalOffset (..),
ServerMode (..),
ToOffset (..))
import qualified HStream.Stats as Stats
import qualified HStream.Store as S
Expand Down Expand Up @@ -385,7 +388,11 @@ appendStream ServerContext{..} streamName shardId record = do
recordSize = API.batchedRecordBatchSize record
payloadSize = BS.length payload
when (payloadSize > scMaxRecordSize) $ throwIO $ HE.InvalidRecordSize payloadSize
S.AppendCompletion {..} <- S.appendCompressedBS scLDClient shardId payload cmpStrategy Nothing
-- S.AppendCompletion {..} <- S.appendCompressedBS scLDClient shardId payload cmpStrategy Nothing
state <- readIORef serverState
S.AppendCompletion {..} <- case state of
ServerNormal -> S.appendCompressedBS scLDClient shardId payload cmpStrategy Nothing
ServerBackup -> DB.writeRecord cachedStore streamName shardId payload
Stats.stream_stat_add_append_in_bytes scStatsHolder cStreamName (fromIntegral payloadSize)
Stats.stream_stat_add_append_in_records scStatsHolder cStreamName (fromIntegral recordSize)
Stats.stream_time_series_add_append_in_bytes scStatsHolder cStreamName (fromIntegral payloadSize)
Expand Down
4 changes: 2 additions & 2 deletions hstream/src/HStream/Server/Handler/Cluster.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import Control.Exception (throwIO, try)
import Data.IORef (readIORef)
import Network.GRPC.HighLevel.Generated

import qualified HsGrpc.Server as G
import HStream.Common.Server.Lookup (lookupNode)
import qualified HStream.Exception as HE
import qualified HStream.Logger as Log
import qualified HStream.Server.Core.Cluster as C
import HStream.Server.Core.Common (lookupResource)
import HStream.Server.Exception
import HStream.Server.HStreamApi
import qualified HsGrpc.Server as G
import HStream.Common.Server.Lookup (lookupNode)
-- import HStream.Server.Types (ServerContext (..))
import HStream.Server.Types (ServerContext (..),
ServerMode (..))
Expand Down
9 changes: 9 additions & 0 deletions hstream/src/HStream/Server/Initialization.hs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import qualified Z.Data.CBytes as CB
#if __GLASGOW_HASKELL__ < 902
import qualified HStream.Admin.Store.API as AA
#endif
import Data.IORef (newIORef)
import HStream.Common.ConsistentHashing (HashRing, constructServerMap,
getAllocatedNodeId)
import HStream.Common.Server.HashRing (initializeHashRing)
Expand All @@ -49,6 +50,7 @@ import qualified HStream.IO.Types as IO
import qualified HStream.IO.Worker as IO
import qualified HStream.Logger as Log
import HStream.MetaStore.Types (MetaHandle (..))
import HStream.Server.CacheStore (mkCacheStore)
import HStream.Server.Config (ServerOpts (..),
TlsConfig (..))
import HStream.Server.Types
Expand Down Expand Up @@ -88,6 +90,11 @@ initializeServer opts@ServerOpts{..} gossipContext hh db_m = do

shardReaderMap <- newMVar HM.empty

serverMode <- newIORef ServerNormal
let dbOption = def { RocksDB.createIfMissing = True }
let path = _cacheStorePath <> show _serverID
cachedStore <- mkCacheStore path dbOption def def

-- recovery tasks

return
Expand All @@ -112,6 +119,8 @@ initializeServer opts@ServerOpts{..} gossipContext hh db_m = do
, shardReaderMap = shardReaderMap
, querySnapshotPath = _querySnapshotPath
, querySnapshotter = db_m
, serverState = serverMode
, cachedStore = cachedStore
}

--------------------------------------------------------------------------------
Expand Down

0 comments on commit 41c0df9

Please sign in to comment.