diff --git a/common/hstream/HStream/MetaStore/ZookeeperUtils.hs b/common/hstream/HStream/MetaStore/ZookeeperUtils.hs index 91734ffb3..bf848aceb 100644 --- a/common/hstream/HStream/MetaStore/ZookeeperUtils.hs +++ b/common/hstream/HStream/MetaStore/ZookeeperUtils.hs @@ -2,10 +2,10 @@ {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE ScopedTypeVariables #-} -module HStream.MetaStore.ZookeeperUtils - where +module HStream.MetaStore.ZookeeperUtils where + import Control.Exception (catch, try) -import Control.Monad (void) +import Control.Monad (void, when) import Data.Aeson (FromJSON, ToJSON) import qualified Data.Aeson as Aeson import qualified Data.ByteString as BS @@ -16,10 +16,11 @@ import Z.Data.CBytes (CBytes) import Z.Data.Vector (Bytes) import qualified Z.Foreign as ZF import ZooKeeper (zooCreate, zooDelete, zooGet, - zooGetChildren, zooSet) + zooGetChildren, zooSet, zooState) import ZooKeeper.Exception import ZooKeeper.Types (DataCompletion (..), StringVector (..), StringsCompletion (..), ZHandle, + pattern ZooConnectedState, pattern ZooPersistent, zooOpenAclUnsafe) import qualified HStream.Logger as Log @@ -81,3 +82,10 @@ createPath :: HasCallStack => ZHandle -> CBytes -> IO () createPath zk path = do Log.debug . Log.buildString $ "create path " <> show path void $ zooCreate zk path Nothing zooOpenAclUnsafe ZooPersistent + +checkRecoverable :: ZHandle -> IO Bool +checkRecoverable zk = do + st <- zooState zk + when (st /= ZooConnectedState) $ do + Log.fatal $ "zk connect is unhealty, current state: " <> Log.build (show st) + return $ st == ZooConnectedState diff --git a/common/server/HStream/Common/Server/Lookup.hs b/common/server/HStream/Common/Server/Lookup.hs index 3adc7c5f8..3f89ec2da 100644 --- a/common/server/HStream/Common/Server/Lookup.hs +++ b/common/server/HStream/Common/Server/Lookup.hs @@ -11,7 +11,8 @@ module HStream.Common.Server.Lookup ) where import Control.Concurrent.STM -import Control.Exception (SomeException (..), try) +import Control.Exception (SomeException (..), throwIO, + try) import Data.List (find) import Data.Text (Text) import qualified Data.Vector as V @@ -21,6 +22,7 @@ import HStream.Common.Server.HashRing (LoadBalanceHashRing, readLoadBalanceHashRing) import HStream.Common.Server.MetaData (TaskAllocation (..)) import HStream.Common.Types (fromInternalServerNodeWithKey) +import qualified HStream.Exception as HE import HStream.Gossip (GossipContext, getMemberList) import qualified HStream.Logger as Log import qualified HStream.MetaStore.Types as M diff --git a/common/stats/HStream/Stats.hs b/common/stats/HStream/Stats.hs index 366cdc4ef..7fd1e90f3 100644 --- a/common/stats/HStream/Stats.hs +++ b/common/stats/HStream/Stats.hs @@ -76,6 +76,20 @@ module HStream.Stats , CounterExports(connector, delivered_in_records) , CounterExports(connector, delivered_in_bytes) + -- * PerCacheStoreStats + , cache_store_stat_erase + -- ** Counters + , cache_store_stat_getall + , CounterExports(cache_store, cs_append_total) + , CounterExports(cache_store, cs_append_failed) + , CounterExports(cache_store, cs_append_in_bytes) + , CounterExports(cache_store, cs_append_in_records) + , CounterExports(cache_store, cs_read_in_bytes) + , CounterExports(cache_store, cs_read_in_records) + , CounterExports(cache_store, cs_delivered_in_records) + , CounterExports(cache_store, cs_delivered_total) + , CounterExports(cache_store, cs_delivered_failed) + -- * PerQueryStats , query_stat_erase -- ** Counters @@ -201,6 +215,8 @@ PER_X_STAT(stream_) PER_X_STAT(subscription_) -- connector_stat_getall, connector_stat_erase PER_X_STAT(connector_) +-- cache_store_stat_getall, cache_store_stat_erase +PER_X_STAT(cache_store_) -- query_stat_getall, query_stat_erase PER_X_STAT(query_) -- view_stat_getall, view_stat_erase @@ -295,6 +311,14 @@ PER_X_STAT_GET(connector_stat_, name) \ PER_X_STAT_GETALL_SEP(connector_stat_, name) #include "../include/per_connector_stats.inc" +-- cache_store +#define STAT_DEFINE(name, _) \ +PER_X_STAT_ADD(cache_store_stat_, name) \ +PER_X_STAT_SET(cache_store_stat_, name) \ +PER_X_STAT_GET(cache_store_stat_, name) \ +PER_X_STAT_GETALL_SEP(cache_store_stat_, name) +#include "../include/per_cache_store_stats.inc" + -- Query #define STAT_DEFINE(name, _) \ PER_X_STAT_ADD(query_stat_, name) \ @@ -428,20 +452,32 @@ data ServerHistogramLabel = SHL_AppendRequestLatency | SHL_AppendLatency | SHL_ReadLatency + | SHL_AppendCacheStoreLatency + | SHL_ReadCacheStoreLatency + | SHL_CheckStoreClusterLatency + | SHL_CheckMetaClusterLatency packServerHistogramLabel :: ServerHistogramLabel -> CBytes -packServerHistogramLabel SHL_AppendRequestLatency = "append_request_latency" -packServerHistogramLabel SHL_AppendLatency = "append_latency" -packServerHistogramLabel SHL_ReadLatency = "read_latency" +packServerHistogramLabel SHL_AppendRequestLatency = "append_request_latency" +packServerHistogramLabel SHL_AppendLatency = "append_latency" +packServerHistogramLabel SHL_ReadLatency = "read_latency" +packServerHistogramLabel SHL_AppendCacheStoreLatency = "append_cache_store_latency" +packServerHistogramLabel SHL_ReadCacheStoreLatency = "read_cache_store_latency" +packServerHistogramLabel SHL_CheckStoreClusterLatency = "check_store_cluster_healthy_latency" +packServerHistogramLabel SHL_CheckMetaClusterLatency = "check_meta_cluster_healthy_latency" instance Read ServerHistogramLabel where readPrec = do l <- Read.lexP return $ case l of - Read.Ident "append_request_latency" -> SHL_AppendRequestLatency - Read.Ident "append_latency" -> SHL_AppendLatency - Read.Ident "read_latency" -> SHL_ReadLatency + Read.Ident "append_request_latency" -> SHL_AppendRequestLatency + Read.Ident "append_latency" -> SHL_AppendLatency + Read.Ident "read_latency" -> SHL_ReadLatency + Read.Ident "append_cache_store_latency" -> SHL_AppendCacheStoreLatency + Read.Ident "read_cache_store_latency" -> SHL_ReadCacheStoreLatency + Read.Ident "check_store_cluster_healthy_latency" -> SHL_CheckStoreClusterLatency + Read.Ident "check_meta_cluster_healthy_latency" -> SHL_CheckMetaClusterLatency x -> errorWithoutStackTrace $ "cannot parse ServerHistogramLabel: " <> show x serverHistogramAdd :: StatsHolder -> ServerHistogramLabel -> Int64 -> IO () diff --git a/common/stats/HStream/Stats/Internal.hs b/common/stats/HStream/Stats/Internal.hs index fb60585a2..cd13436f5 100644 --- a/common/stats/HStream/Stats/Internal.hs +++ b/common/stats/HStream/Stats/Internal.hs @@ -59,6 +59,8 @@ PER_X_STAT(stream_) PER_X_STAT(subscription_) -- connector__stat_getall, connector_stat_erase PER_X_STAT(connector_) +-- cache_store__stat_getall, cache_store_stat_erase +PER_X_STAT(cache_store_) -- query_stat_getall, query_stat_erase PER_X_STAT(query_) -- view_stat_getall, view_stat_erase @@ -105,6 +107,9 @@ foreign import ccall unsafe "hs_stats.h prefix##getall_##name" \ #define STAT_DEFINE(name, _) PER_X_STAT_DEFINE(subscription_stat_, name) #include "../include/per_subscription_stats.inc" +#define STAT_DEFINE(name, _) PER_X_STAT_DEFINE(cache_store_stat_, name) +#include "../include/per_cache_store_stats.inc" + #define TIME_SERIES_DEFINE(name, _, __, ___) \ foreign import ccall unsafe "hs_stats.h stream_time_series_add_##name" \ stream_time_series_add_##name \ diff --git a/common/stats/cbits/hs_stats.cpp b/common/stats/cbits/hs_stats.cpp index c59d365cf..f3dc385e6 100644 --- a/common/stats/cbits/hs_stats.cpp +++ b/common/stats/cbits/hs_stats.cpp @@ -39,6 +39,15 @@ void setPerConnectorStatsMember(const char* stat_name, #include "per_connector_stats.inc" } +void setPerCacheStoreStatsMember( + const char* stat_name, StatsCounter PerCacheStoreStats::*& member_ptr) { +#define STAT_DEFINE(name, _) \ + if (#name == std::string(stat_name)) { \ + member_ptr = &PerCacheStoreStats::name##_counter; \ + } +#include "per_cache_store_stats.inc" +} + void setPerQueryStatsMember(const char* stat_name, StatsCounter PerQueryStats::*& member_ptr) { #define STAT_DEFINE(name, _) \ @@ -164,7 +173,19 @@ int stream_time_series_getall_by_name( #include "per_connector_stats.inc" // connector_stat_getall, connector_stat_erase -PER_X_STAT(connector_, PerConnectorStats, per_connector_stats, setPerConnectorStatsMember) +PER_X_STAT(connector_, PerConnectorStats, per_connector_stats, + setPerConnectorStatsMember) + +// ---------------------------------------------------------------------------- +// PerCacheStoreStats +#define STAT_DEFINE(name, _) \ + PER_X_STAT_DEFINE(cache_store_stat_, per_cache_store_stats, \ + PerCacheStoreStats, name) +#include "per_cache_store_stats.inc" + +// cache_store_stat_getall, cache_store_stat_erase +PER_X_STAT(cache_store_, PerCacheStoreStats, per_cache_store_stats, + setPerCacheStoreStatsMember) // ---------------------------------------------------------------------------- // PerQueryStats @@ -199,7 +220,8 @@ PER_X_STAT(view_, PerViewStats, per_view_stats, setPerViewStatsMember) #include "per_subscription_time_series.inc" // subscription_stat_getall, subscription_stat_erase -PER_X_STAT(subscription_, PerSubscriptionStats, per_subscription_stats, setPerSubscriptionStatsMember) +PER_X_STAT(subscription_, PerSubscriptionStats, per_subscription_stats, + setPerSubscriptionStatsMember) int subscription_time_series_get(StatsHolder* stats_holder, const char* stat_name, const char* subs_name, diff --git a/common/stats/cbits/stats/ServerHistograms.h b/common/stats/cbits/stats/ServerHistograms.h index b90b726a6..8491670f6 100644 --- a/common/stats/cbits/stats/ServerHistograms.h +++ b/common/stats/cbits/stats/ServerHistograms.h @@ -14,6 +14,12 @@ struct ServerHistograms : public HistogramBundle { {"append_request_latency", &append_request_latency}, {"append_latency", &append_latency}, {"read_latency", &read_latency}, + {"append_cache_store_latency", &append_cache_store_latency}, + {"read_cache_store_latency", &read_cache_store_latency}, + {"check_store_cluster_healthy_latency", + &check_store_cluster_healthy_latency}, + {"check_meta_cluster_healthy_latency", + &check_meta_cluster_healthy_latency}, }; } @@ -23,6 +29,14 @@ struct ServerHistograms : public HistogramBundle { LatencyHistogram append_latency; // Latency of logdevice read LatencyHistogram read_latency; + // Latency of cache store writes + LatencyHistogram append_cache_store_latency; + // Latency of cache store read + LatencyHistogram read_cache_store_latency; + // Latency of check store cluster healthy + LatencyHistogram check_store_cluster_healthy_latency; + // Latency of check meta cluster healthy + LatencyHistogram check_meta_cluster_healthy_latency; }; }} // namespace hstream::common diff --git a/common/stats/cbits/stats/Stats.cpp b/common/stats/cbits/stats/Stats.cpp index 8b6f78e21..74c56ce74 100644 --- a/common/stats/cbits/stats/Stats.cpp +++ b/common/stats/cbits/stats/Stats.cpp @@ -8,20 +8,20 @@ namespace hstream { namespace common { static void aggregateStat(StatsAgg agg, StatsCounter& out, int64_t in) { switch (agg) { - case StatsAgg::SUM: - out += in; - break; - case StatsAgg::MAX: - if (in > out) { + case StatsAgg::SUM: + out += in; + break; + case StatsAgg::MAX: + if (in > out) { + out = in; + } + break; + case StatsAgg::SUBTRACT: + out -= in; + break; + case StatsAgg::ASSIGN: out = in; - } - break; - case StatsAgg::SUBTRACT: - out -= in; - break; - case StatsAgg::ASSIGN: - out = in; - break; + break; } } @@ -37,19 +37,19 @@ static void aggregateHistogram(StatsAggOptional agg, H& out, const H& in) { return; } switch (agg.value()) { - case StatsAgg::SUM: - out.merge(in); - break; - case StatsAgg::MAX: - // MAX doesn't make much sense for histograms. Let's just merge them. - out.merge(in); - break; - case StatsAgg::SUBTRACT: - out.subtract(in); - break; - case StatsAgg::ASSIGN: - out = in; - break; + case StatsAgg::SUM: + out.merge(in); + break; + case StatsAgg::MAX: + // MAX doesn't make much sense for histograms. Let's just merge them. + out.merge(in); + break; + case StatsAgg::SUBTRACT: + out.subtract(in); + break; + case StatsAgg::ASSIGN: + out = in; + break; } } @@ -77,7 +77,7 @@ std::string PerStreamStats::toJson() { } void PerConnectorStats::aggregate(PerConnectorStats const& other, - StatsAggOptional agg_override) { + StatsAggOptional agg_override) { #define STAT_DEFINE(name, agg) \ aggregateStat(StatsAgg::agg, agg_override, name##_counter, \ other.name##_counter); @@ -97,6 +97,27 @@ std::string PerConnectorStats::toJson() { return folly::toJson(this->toJsonObj()); } +void PerCacheStoreStats::aggregate(PerCacheStoreStats const& other, + StatsAggOptional agg_override) { +#define STAT_DEFINE(name, agg) \ + aggregateStat(StatsAgg::agg, agg_override, name##_counter, \ + other.name##_counter); +#include "per_cache_store_stats.inc" +} + +folly::dynamic PerCacheStoreStats::toJsonObj() { + folly::dynamic map = folly::dynamic::object; +#define STAT_DEFINE(name, _) \ + /* we know that all names are unique */ \ + map[#name] = name##_counter.load(); +#include "per_cache_store_stats.inc" + return map; +} + +std::string PerCacheStoreStats::toJson() { + return folly::toJson(this->toJsonObj()); +} + void PerQueryStats::aggregate(PerQueryStats const& other, StatsAggOptional agg_override) { #define STAT_DEFINE(name, agg) \ @@ -114,12 +135,10 @@ folly::dynamic PerQueryStats::toJsonObj() { return map; } -std::string PerQueryStats::toJson() { - return folly::toJson(this->toJsonObj()); -} +std::string PerQueryStats::toJson() { return folly::toJson(this->toJsonObj()); } void PerViewStats::aggregate(PerViewStats const& other, - StatsAggOptional agg_override) { + StatsAggOptional agg_override) { #define STAT_DEFINE(name, agg) \ aggregateStat(StatsAgg::agg, agg_override, name##_counter, \ other.name##_counter); @@ -135,9 +154,7 @@ folly::dynamic PerViewStats::toJsonObj() { return map; } -std::string PerViewStats::toJson() { - return folly::toJson(this->toJsonObj()); -} +std::string PerViewStats::toJson() { return folly::toJson(this->toJsonObj()); } void PerSubscriptionStats::aggregate(PerSubscriptionStats const& other, StatsAggOptional agg_override) { @@ -255,6 +272,7 @@ void Stats::aggregateCompoundStats(Stats const& other, // while we aggregate. _PER_STATS(per_stream_stats, PerStreamStats) _PER_STATS(per_connector_stats, PerConnectorStats) + _PER_STATS(per_cache_store_stats, PerCacheStoreStats) _PER_STATS(per_query_stats, PerQueryStats) _PER_STATS(per_view_stats, PerViewStats) _PER_STATS(per_subscription_stats, PerSubscriptionStats) @@ -272,6 +290,7 @@ void Stats::deriveStats() {} void Stats::reset() { per_stream_stats.wlock()->clear(); per_connector_stats.wlock()->clear(); + per_cache_store_stats.wlock()->clear(); per_query_stats.wlock()->clear(); per_view_stats.wlock()->clear(); per_subscription_stats.wlock()->clear(); @@ -298,6 +317,7 @@ folly::dynamic Stats::toJsonObj() { _PER_TO_JSON(per_stream_stats) _PER_TO_JSON(per_connector_stats) + _PER_TO_JSON(per_cache_store_stats) _PER_TO_JSON(per_query_stats) _PER_TO_JSON(per_view_stats) _PER_TO_JSON(per_subscription_stats) diff --git a/common/stats/cbits/stats/Stats.h b/common/stats/cbits/stats/Stats.h index 0abe06015..9b14af9b1 100644 --- a/common/stats/cbits/stats/Stats.h +++ b/common/stats/cbits/stats/Stats.h @@ -100,6 +100,23 @@ struct PerConnectorStats { std::mutex mutex; }; +// ---------------------------------------------------------------------------- +// PerCacheStoreStats +// +struct PerCacheStoreStats { +#define STAT_DEFINE(name, _) StatsCounter name##_counter{}; +#include "per_cache_store_stats.inc" + void aggregate(PerCacheStoreStats const& other, + StatsAggOptional agg_override); + // Show all per_cache_store_stats to a json formatted string. + folly::dynamic toJsonObj(); + std::string toJson(); + + // Mutex almost exclusively locked by one thread since PerCacheStoreStats + // objects are contained in thread-local stats + std::mutex mutex; +}; + // ---------------------------------------------------------------------------- // PerQueryStats // @@ -292,6 +309,11 @@ struct Stats { std::unordered_map>> per_connector_stats; + // Per-cache-store stats + folly::Synchronized< + std::unordered_map>> + per_cache_store_stats; + // Per-query stats folly::Synchronized< std::unordered_map>> diff --git a/common/stats/include/per_cache_store_stats.inc b/common/stats/include/per_cache_store_stats.inc new file mode 100644 index 000000000..d77d5580d --- /dev/null +++ b/common/stats/include/per_cache_store_stats.inc @@ -0,0 +1,27 @@ +#ifndef STAT_DEFINE +#error STAT_DEFINE() macro not defined +#define STAT_DEFINE(...) +#endif + +// Total number of append requests +STAT_DEFINE(cs_append_total, SUM) +// Failed append requests +STAT_DEFINE(cs_append_failed, SUM) + +// Total payload bytes successfully written to the cache store +STAT_DEFINE(cs_append_in_bytes, SUM) +// Total payload records successfully written to the cache store +STAT_DEFINE(cs_append_in_records, SUM) + +// ============== read stats ========================= +// Total payload bytes successfully read from the cache store +STAT_DEFINE(cs_read_in_bytes, SUM) +// Total payload records successfully read from the cache store +STAT_DEFINE(cs_read_in_records, SUM) + +// Total number of delivered records +STAT_DEFINE(cs_delivered_in_records, SUM) +STAT_DEFINE(cs_delivered_total, SUM) +STAT_DEFINE(cs_delivered_failed, SUM) + +#undef STAT_DEFINE diff --git a/conf/hstream.yaml b/conf/hstream.yaml index ad8387959..df1da921d 100644 --- a/conf/hstream.yaml +++ b/conf/hstream.yaml @@ -141,6 +141,12 @@ hserver: las: hstreamdb/sink-las:latest elasticsearch: hstreamdb/sink-elasticsearch:latest + cache-store: + # Enable server cache mode feature + enable-server-cache: false + # The cache store work directory + cache-store-path: /data/cache-store + # Internal grpc settings (typically, there's no need to modify these) #grpc: # channel-args: diff --git a/external/protocol b/external/protocol index d28d55aa0..12abd76f1 160000 --- a/external/protocol +++ b/external/protocol @@ -1 +1 @@ -Subproject commit d28d55aa0f90f7d30ca8236b1b5573b1e67971cf +Subproject commit 12abd76f1a102d441d0b9de5f163992d9068a944 diff --git a/hstream-store/HStream/Store/Internal/LogDevice.hs b/hstream-store/HStream/Store/Internal/LogDevice.hs index bc2f0217d..9ab09f6fe 100644 --- a/hstream-store/HStream/Store/Internal/LogDevice.hs +++ b/hstream-store/HStream/Store/Internal/LogDevice.hs @@ -21,6 +21,7 @@ module HStream.Store.Internal.LogDevice , module HStream.Store.Internal.LogDevice.VersionedConfigStore , module HStream.Store.Internal.LogDevice.Writer , module HStream.Store.Internal.LogDevice.LDQuery + , module HStream.Store.Internal.LogDevice.HealthCheck ) where import Control.Monad @@ -43,6 +44,7 @@ import qualified HStream.Store.Exception as E import HStream.Store.Internal.Foreign import HStream.Store.Internal.LogDevice.Checkpoint import HStream.Store.Internal.LogDevice.Configuration +import HStream.Store.Internal.LogDevice.HealthCheck import HStream.Store.Internal.LogDevice.LDQuery import HStream.Store.Internal.LogDevice.LogAttributes import HStream.Store.Internal.LogDevice.LogConfigTypes diff --git a/hstream-store/HStream/Store/Internal/LogDevice/HealthCheck.hs b/hstream-store/HStream/Store/Internal/LogDevice/HealthCheck.hs new file mode 100644 index 000000000..bb503f80c --- /dev/null +++ b/hstream-store/HStream/Store/Internal/LogDevice/HealthCheck.hs @@ -0,0 +1,43 @@ +module HStream.Store.Internal.LogDevice.HealthCheck + ( LdChecker + , newLdChecker + , isLdClusterHealthy + ) +where + +import Foreign.C.Types +import Foreign.ForeignPtr +import Foreign.Ptr + +import HStream.Store.Internal.Foreign +import HStream.Store.Internal.Types + +type LdChecker = ForeignPtr Checker + +newLdChecker :: LDClient -> IO LdChecker +newLdChecker client = do + checker <- withForeignPtr client $ \client' -> new_ld_checker client' + newForeignPtr delete_ld_checker checker + +-- Check the health status of the cluster. +-- If the total number of unhealthy nodes is greater than the specified limit, +-- it means that the cluster is not available and will return False. +isLdClusterHealthy + :: LdChecker + -> Int + -- ^ The upper limit of unhealthy nodes + -> IO Bool + -- ^ Return True if the cluster is healthy, otherwise False +isLdClusterHealthy checker unhealthyLimit = withForeignPtr checker $ \c -> do + cbool2bool <$> ld_checker_check c (fromIntegral unhealthyLimit) + +data Checker + +foreign import ccall unsafe "new_ld_checker" + new_ld_checker :: Ptr LogDeviceClient -> IO (Ptr Checker) + +foreign import ccall unsafe "&delete_ld_checker" + delete_ld_checker :: FunPtr (Ptr Checker -> IO ()) + +foreign import ccall safe "ld_checker_check" + ld_checker_check :: Ptr Checker -> CInt -> IO CBool diff --git a/hstream-store/HStream/Store/Stream.hs b/hstream-store/HStream/Store/Stream.hs index e92c1275d..74ea7c6f4 100644 --- a/hstream-store/HStream/Store/Stream.hs +++ b/hstream-store/HStream/Store/Stream.hs @@ -164,6 +164,10 @@ module HStream.Store.Stream , getStreamLogPath , createRandomLogGroup + , LD.LdChecker + , LD.newLdChecker + , LD.isLdClusterHealthy + -- * Re-export , def ) where diff --git a/hstream-store/cbits/logdevice/ld_health_checker.cpp b/hstream-store/cbits/logdevice/ld_health_checker.cpp new file mode 100644 index 000000000..3500d7189 --- /dev/null +++ b/hstream-store/cbits/logdevice/ld_health_checker.cpp @@ -0,0 +1,228 @@ +#include +#include +#include +#include + +#include "logdevice/common/GetClusterStateRequest.h" +#include "logdevice/common/Processor.h" +#include "logdevice/common/Semaphore.h" +#include "logdevice/common/configuration/nodes/NodeAttributesConfig.h" +#include "logdevice/common/configuration/nodes/ServiceDiscoveryConfig.h" +#include "logdevice/common/debug.h" +#include "logdevice/include/Client.h" +#include "logdevice/include/Err.h" +#include "logdevice/include/debug.h" +#include "logdevice/include/types.h" +#include "logdevice/lib/ClientImpl.h" + +#include "hs_logdevice.h" + +using namespace facebook::logdevice::configuration::nodes; +using namespace facebook::logdevice; + +class LdChecker { +public: + explicit LdChecker(std::shared_ptr client) + : client_(std::move(client)) {} + + LdChecker(const LdChecker&) = delete; + LdChecker& operator=(const LdChecker&) = delete; + + LdChecker(LdChecker&&) = delete; + LdChecker& operator=(LdChecker&&) = delete; + + ~LdChecker() = default; + + auto check(int unhealthy_node_limit) -> bool { + auto* client_impl = dynamic_cast(client_.get()); + auto config = client_impl->getConfig(); + const auto nodes_configuration = config->getNodesConfiguration(); + auto sdc = nodes_configuration->getServiceDiscovery(); + + auto start = std::chrono::steady_clock::now(); + getClusterState(*client_impl, *nodes_configuration); + auto end = std::chrono::steady_clock::now(); + auto duration = + std::chrono::duration_cast(end - start) + .count(); + ld_debug("GetClusterState took %ld ms", duration); + + // getClusterState(*client_impl, *nodes_configuration); + + auto total_nodes = sdc->numNodes(); + auto unhealthy_nodes_set = std::unordered_set(total_nodes); + + std::lock_guard guard(mutex_); + for (auto& res : stateResults_) { + if (res.status == E::OK) { + for (auto& [node_idx, state] : res.nodes_state) { + if (sdc->hasNode(node_idx) && state != 0) { + unhealthy_nodes_set.insert(node_idx); + } + if (sdc->hasNode(node_idx) && state == 3) { + unhealthy_nodes_set.insert(node_idx); + } + } + } + } + + if (!unhealthy_nodes_set.empty()) { + ld_warning("Cluster has %lu unhealthy nodes:", + unhealthy_nodes_set.size()); + printUnhealthyNodes(*nodes_configuration, unhealthy_nodes_set); + } + + return unhealthy_nodes_set.size() <= unhealthy_node_limit; + } + +private: + struct ClusterStateResult { + node_index_t node_id; + std::string addr; + Status status; + std::vector> nodes_state; + std::vector> nodes_status; + }; + + // need acquire lock before print + auto printUnhealthyNodes(const NodesConfiguration& nodes_configuration, + const std::unordered_set& sets) + -> void { + for (auto& res : stateResults_) { + auto st = "ALIVE"; + if (res.status == E::TIMEDOUT) { + st = "SUSPECT"; + } else if (res.status != E::OK) { + st = "DEAD"; + } + auto deadNodes = nodesStateToString(nodes_configuration, res.nodes_state); + auto unhealthyNodes = + nodesStatusToUnhealthy(nodes_configuration, res.nodes_status); + ld_warning("[Node %d(%s)]: status: %s, dead nodes index: [%s], " + "unhealthy nodes index: [%s]", + res.node_id, res.addr.c_str(), st, deadNodes.c_str(), + unhealthyNodes.c_str()); + } + ld_warning("Check return unhealthy nodes index: [%s]", + folly::join(',', sets).c_str()); + } + + auto addResult(node_index_t node_id, std::string address, Status status, + std::vector> nodes_state, + std::vector> nodes_status) + -> void { + ClusterStateResult res; + res.node_id = node_id; + res.addr = address; + res.status = status; + res.nodes_state = std::move(nodes_state); + res.nodes_status = std::move(nodes_status); + + { + std::lock_guard guard(mutex_); + stateResults_.push_back(std::move(res)); + } + } + + auto clearResults() -> void { + std::lock_guard guard(mutex_); + stateResults_.clear(); + } + + auto getClusterState(const ClientImpl& client_impl, + const NodesConfiguration& nodes_configuration) -> void { + int posted_requests = 0; + auto sdc = nodes_configuration.getServiceDiscovery(); + clearResults(); + + facebook::logdevice::Semaphore sem; + + for (auto it = sdc->begin(); it != sdc->end(); ++it) { + auto node_id = it->first; + auto attr = it->second; + auto addr = attr.default_client_data_address.toString(); + + auto cb = + [&sem, node_id, addr, + this](Status status, + std::vector> nodes_state, + std::vector boycotted_nodes, + std::vector> nodes_status) { + addResult(node_id, addr, status, nodes_state, nodes_status); + sem.post(); + }; + + std::unique_ptr req = std::make_unique( + std::chrono::milliseconds(2000), // 2s timeout + std::chrono::seconds(60), // wave timeout is useless here + std::move(cb), nodes_configuration.getNodeID(node_id)); + + auto rv = client_impl.getProcessor().postImportant(req); + ld_check(rv == 0); + ++posted_requests; + } + + while (posted_requests > 0) { + sem.wait(); + --posted_requests; + } + } + + static auto nodesStateToString( + const NodesConfiguration& nodes_configuration, + const std::vector>& nodes_state) + -> std::string { + auto sdc = *nodes_configuration.getServiceDiscovery(); + std::vector dead_nodes; + if (!nodes_state.empty()) { + for (auto& [node_idx, state] : nodes_state) { + if (sdc.hasNode(node_idx) && state != 0) { + dead_nodes.push_back(node_idx); + } + } + } + + return folly::join(',', dead_nodes); + } + + static auto nodesStatusToUnhealthy( + const NodesConfiguration& nodes_configuration, + const std::vector>& nodes_status) + -> std::string { + auto sdc = *nodes_configuration.getServiceDiscovery(); + std::vector unhealthy_nodes; + if (!nodes_status.empty()) { + for (auto& [node_idx, status] : nodes_status) { + if (sdc.hasNode(node_idx) && status == 3) { + unhealthy_nodes.push_back(node_idx); + } + } + } + return folly::join(',', unhealthy_nodes); + } + + std::shared_ptr client_; + std::vector stateResults_; + std::mutex mutex_; +}; + +extern "C" { + +struct ld_checker { + std::unique_ptr rep; +}; + +ld_checker* new_ld_checker(logdevice_client_t* client) { + auto checker = std::make_unique(client->rep); + ld_checker* res = new ld_checker; + res->rep = std::move(checker); + return res; +} + +void delete_ld_checker(ld_checker* checker) { delete checker; } + +bool ld_checker_check(ld_checker* checker, int unhealthy_node_limit) { + return checker->rep->check(unhealthy_node_limit); +} + +} /* end extern "C" */ diff --git a/hstream-store/hstream-store.cabal b/hstream-store/hstream-store.cabal index b1b50133d..1b3cc38f1 100644 --- a/hstream-store/hstream-store.cabal +++ b/hstream-store/hstream-store.cabal @@ -76,6 +76,7 @@ library HStream.Store.Internal.LogDevice.Reader HStream.Store.Internal.LogDevice.VersionedConfigStore HStream.Store.Internal.LogDevice.Writer + HStream.Store.Internal.LogDevice.HealthCheck HStream.Store.Internal.Types HStream.Store.Stream @@ -137,6 +138,7 @@ library cbits/logdevice/hs_versioned_config_store.cpp cbits/logdevice/hs_writer.cpp cbits/logdevice/ld_configuration.cpp + cbits/logdevice/ld_health_checker.cpp cbits/utils.cpp extra-lib-dirs: /usr/local/lib diff --git a/hstream/app/server.hs b/hstream/app/server.hs index 1dfe62724..777c40b55 100644 --- a/hstream/app/server.hs +++ b/hstream/app/server.hs @@ -2,6 +2,7 @@ {-# LANGUAGE DataKinds #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} @@ -35,7 +36,8 @@ import HStream.Common.Server.HashRing (updateHashRing) import HStream.Common.Server.MetaData (TaskAllocation (..), clusterStartTimeId) import HStream.Common.Types (getHStreamVersion) -import HStream.Common.ZookeeperClient (withZookeeperClient) +import HStream.Common.ZookeeperClient (unsafeGetZHandle, + withZookeeperClient) import HStream.Exception import HStream.Gossip (GossipContext (..), defaultGossipOpts, @@ -61,6 +63,8 @@ import HStream.Server.Config (AdvertisedListeners, getConfig, runServerCli) import qualified HStream.Server.Core.Cluster as Cluster import qualified HStream.Server.Experimental as Exp +import HStream.Server.HealthMonitor (mkHealthMonitor, + startMonitor) import qualified HStream.Server.HsGrpcHandler as HsGrpcHandler import qualified HStream.Server.HStreamApi as API import qualified HStream.Server.HStreamInternal as I @@ -174,6 +178,14 @@ app config@ServerOpts{..} = do forM_ as (Async.link2Only (const True) a) -- wati the default server waitGossipBoot gossipContext + + when (_enableServerCache) $ do + let ServerContext{scLDClient, metaHandle, scStatsHolder} = serverContext + healthMonitor <- mkHealthMonitor scLDClient scStatsHolder metaHandle 1 + aMonitor <- Async.async $ startMonitor serverContext healthMonitor 3 + Log.info $ "Start healthy monitor" + Async.link2Only (const True) a aMonitor + Async.wait a serve diff --git a/hstream/hstream.cabal b/hstream/hstream.cabal index 088e46a5b..d3e04d844 100644 --- a/hstream/hstream.cabal +++ b/hstream/hstream.cabal @@ -115,6 +115,8 @@ library HStream.Server.QueryWorker HStream.Server.Types HStream.Server.Validation + HStream.Server.CacheStore + HStream.Server.HealthMonitor other-modules: HStream.Server.Core.Query @@ -203,6 +205,7 @@ library , yaml , Z-Data , zoovisitor + , clock if impl(ghc <9.2.0) build-depends: hstream-admin-store diff --git a/hstream/src/HStream/Server/CacheStore.hs b/hstream/src/HStream/Server/CacheStore.hs new file mode 100644 index 000000000..b2a1eecba --- /dev/null +++ b/hstream/src/HStream/Server/CacheStore.hs @@ -0,0 +1,418 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE NamedFieldPuns #-} + +module HStream.Server.CacheStore +( CacheStore +, mkCacheStore +, initCacheStore +, deleteCacheStore +, writeRecord +, dumpToHStore +, StoreMode(..) +, setCacheStoreMode +) +where + +import Control.Concurrent (MVar, forkFinally, modifyMVar_, + newMVar, readMVar, swapMVar, + threadDelay) +import Control.Concurrent.STM (TVar, atomically, check, newTVarIO, + readTVar, readTVarIO, swapTVar, + writeTVar) +import Control.Exception (Exception (displayException), + catch, throwIO, try) +import Control.Monad (unless, void, when) +import Data.ByteString (ByteString) +import qualified Data.ByteString as BS +import qualified Data.ByteString.Char8 as BSC +import Data.Int (Int64) +import Data.IORef (IORef, atomicModifyIORef', + atomicWriteIORef, newIORef, + readIORef) +import qualified Data.List as L +import Data.Maybe (fromJust, isJust) +import qualified Data.Text as T +import qualified Data.Text.Encoding as T +import Data.Time.Clock.POSIX (getPOSIXTime) +import Data.Word (Word64) +import System.Clock +import Text.Printf (printf) + +import Database.RocksDB +import Database.RocksDB.Exception (RocksDbException) +import qualified HStream.Exception as HE +import qualified HStream.Logger as Log +import qualified HStream.Stats as ST +import qualified HStream.Store as S +import HStream.Utils (msecSince, textToCBytes) + +-- StoreMode is a logical concept that represents the operations that the current CacheStore can perform. +-- Cache mode: data can only be written to the store +-- Dump mode: data can only be exported from the store +data StoreMode = CacheMode | DumpMode deriving(Show, Eq) + +-- CacheStore contains all options to create and run a real rocksdb store. +-- Note that the Store is only created when it is being used. +data CacheStore = CacheStore + { store :: MVar (Maybe Store) + , path :: FilePath + , dbOpts :: DBOptions + , writeOptions :: WriteOptions + , readOptions :: ReadOptions + , enableWrite :: MVar Bool + , dumpState :: TVar DumpState + -- ^ The state of dump process + , needResume :: IORef Bool + -- ^ If dumpstate changes from dumping to suspend, needResume + -- will be set to True. This indicates that the cache store should + -- resume dump process after dumpstate changes back to dumping + , counter :: IORef Word64 + -- ^ Counter is used to generate a monotonically increasing offset + -- to make sure that each encoded-key is unique and ordered. + , statsHolder :: ST.StatsHolder + } + +-- Store is actually a column family in rocksdb, +-- with the timestamp of creating the store as the name of the column family. +data Store = Store + { db :: DB + , name :: String + , columnFamily :: ColumnFamily + , nextKeyToDump :: IORef ByteString + -- ^ next key to dump to hstore + , totalWrite :: IORef Word64 + -- ^ total records count written to the store + , totalRead :: IORef Word64 + -- ^ total records count read from the store + } + +data DumpState = + NoDump -- No dump task running + | Dumping -- Dump is on-going + | Suspend -- Dump is suspend + deriving(Show, Eq) + +mkCacheStore + :: FilePath -> DBOptions -> WriteOptions -> ReadOptions -> ST.StatsHolder -> IO CacheStore +mkCacheStore path dbOpts writeOptions readOptions statsHolder = do + store <- newMVar Nothing + enableWrite <- newMVar False + dumpState <- newTVarIO NoDump + counter <- newIORef 0 + needResume <- newIORef False + return CacheStore {..} + +-- Create a rocksdb store +-- Users need to ensure that they create a store before performing other operations. +initCacheStore :: CacheStore -> IO () +initCacheStore CacheStore{..} = do + modifyMVar_ store $ \st -> do + case st of + Just st'@Store{..} -> do + Log.info $ "Get cache store " <> Log.build name + return $ Just st' + Nothing -> do + Log.info $ "Open rocksdb" + res <- try $ listColumnFamilies dbOpts path + db <- case res of + Left (_ :: RocksDbException) -> do + -- `listColumnFamilies` expects the db to exist. If the call failed, we'll just assume + -- this is a new db, relying on a subsequent `open` failing in case it's a more severe issue. + open dbOpts path + Right cfs -> do + -- open db and remove existed column families + let defaultCfIndex = fromJust $ L.elemIndex "default" cfs + let cfs' = map (\n -> ColumnFamilyDescriptor {name = n, options = dbOpts}) cfs + (db, cfhs) <- openColumnFamilies dbOpts path cfs' + -- skip drop default column family + let shouldDrop = [x | (i, x) <- zip [0..] cfhs, i /= defaultCfIndex] + mapM_ (dropColumnFamily db) shouldDrop + mapM_ destroyColumnFamily cfhs + return db + + name <- show <$> getCurrentTimestamp + columnFamily <- createColumnFamily db dbOpts name + Log.info $ "Create cache store " <> Log.build name + + totalWrite <- newIORef 0 + totalRead <- newIORef 0 + nextKeyToDump <- newIORef BS.empty + atomicWriteIORef counter 0 + ST.cache_store_stat_erase statsHolder "cache_store" + + return $ Just Store{..} + +-- Destroy both rocksdb and columnFamily +-- Users need to ensure that all other operations are stopped before deleting the store. +deleteCacheStore :: CacheStore -> IO () +deleteCacheStore CacheStore{store} = do + modifyMVar_ store $ \st -> do + case st of + Just Store{..} -> do + dropColumnFamily db columnFamily + Log.info $ "Drop column family" + destroyColumnFamily columnFamily + Log.info $ "Destroy column family" + close db + Log.info $ "Delete cache store " <> Log.build name + + tWrites <- readIORef totalWrite + tReads <- readIORef totalRead + Log.info $ "CacheStore totalWrites: " <> Log.build tWrites <> ", totalReads: " <> Log.build tReads + when (tWrites /= tReads) $ do + Log.warning $ "CacheStore totalWrites and totalReads are not equal." + return Nothing + Nothing -> return Nothing + +getStore :: CacheStore -> IO Store +getStore CacheStore{store} = do + st <- readMVar store + case st of + Just st' -> return st' + Nothing -> do + Log.fatal $ "Cache store not initialized, should not get here" + throwIO $ HE.UnexpectedError "Store is not initialized." + +writeRecord :: CacheStore -> T.Text -> Word64 -> ByteString -> IO S.AppendCompletion +writeRecord st@CacheStore{..} streamName shardId payload = do + canWrite <- readMVar enableWrite + unless canWrite $ do + Log.warning $ "Cannot write to cache store becasue the store is not write-enabled." + -- throw an unavailable exception to client and let it retry + throwIO $ HE.ResourceAllocationException "CacheStore is not write-enabled." + + Store{..} <- getStore st + -- Obtain a monotonically increasing offset to ensure that each encoded-key is unique. + offset <- atomicModifyIORef' counter (\x -> (x + 1, x)) + let k = encodeKey streamName shardId offset + + !append_start <- getPOSIXTime + putCF db writeOptions columnFamily k payload `catch` record_failed + ST.serverHistogramAdd statsHolder ST.SHL_AppendCacheStoreLatency =<< msecSince append_start + ST.cache_store_stat_add_cs_append_in_bytes statsHolder cStreamName (fromIntegral $ BS.length payload) + ST.cache_store_stat_add_cs_append_in_records statsHolder cStreamName 1 + ST.cache_store_stat_add_cs_append_total statsHolder cStreamName 1 + + void $ atomicModifyIORef' totalWrite (\x -> (x + 1, x)) + lsn <- getCurrentTimestamp + return S.AppendCompletion + { appendCompLogID = shardId + , appendCompLSN = fromIntegral lsn + , appendCompTimestamp = 0 + } + where + cStreamName = "cache_store" + record_failed (e :: RocksDbException) = do + ST.cache_store_stat_add_cs_append_failed statsHolder cStreamName 1 + throwIO e + +-- dump all cached records to HStore +dumpToHStore :: CacheStore -> S.LDClient -> S.Compression -> IO () +dumpToHStore st@CacheStore{..} ldClient cmpStrategy = do + needSpawn <- atomically $ do + state <- readTVar dumpState + case state of + NoDump -> do + writeTVar dumpState Dumping + return True + _ -> return False + when (needSpawn) . void $ forkFinally dump finalizer + where + dump = do + cacheStore <- getStore st + Log.info $ "Starting dump cached data to HStore" + dumpLoop cacheStore + Log.info $ "Finish dump cached data to HStore" + + dumpLoop cacheStore = do + state <- readTVarIO dumpState + case state of + NoDump -> return () + Suspend -> do + void . atomically $ readTVar dumpState >>= \s -> check (s == Dumping) + dumpLoop cacheStore + Dumping -> do + doDump cacheStore ldClient cmpStrategy readOptions dumpState needResume statsHolder + offset <- readIORef counter + Log.info $ "Finish doDump, current offset: " <> Log.build (show offset) + resume <- readIORef needResume + when resume $ dumpLoop cacheStore + + finalizer (Left e) = do + -- What if dump process error? Currently, if dump process error, all data remained will lose + Log.fatal $ "dump cached data to HStore failed: " <> Log.build (show e) + reset + finalizer (Right _) = do + deleteCacheStore st + reset + + reset = do + _ <- atomically $ swapTVar dumpState NoDump + atomicWriteIORef needResume False + +-- Traverse all data from the beginning using the column family iterator of rocksdb, and write them into +-- hstore one by one. +-- This function exits under one of each following conditions: +-- - all data in rocksdb column family has been writen to hstore, +-- - or the iterator is invalid, +-- - or cacheStore is not in Dumping state, +-- - or an unexpected exception occurs. +doDump + :: Store + -> S.LDClient + -> S.Compression + -> ReadOptions + -> TVar DumpState + -> IORef Bool + -> ST.StatsHolder + -> IO () +doDump Store{..} ldClient cmpStrategy readOptions dumpState resume statsHolder = do + start <- getTime Monotonic + withIteratorCF db readOptions columnFamily $ \iter -> do + atomicWriteIORef resume False + firstKey <- readIORef nextKeyToDump + if BS.null firstKey + then do + Log.info $ "Start dump cache store from beginning" + seekToFirst iter + else do + Log.info $ "Dump resumed from key " <> Log.build firstKey + seek iter firstKey + + -- begin dump loop + whileM (checkContinue iter) $ do + -- decode key + k <- key iter + payload <- value iter + void $ atomicModifyIORef' totalRead (\x -> (x + 1, x)) + ST.cache_store_stat_add_cs_read_in_bytes statsHolder cStreamName (fromIntegral $ BS.length payload + BS.length k) + ST.cache_store_stat_add_cs_read_in_records statsHolder cStreamName 1 + atomicWriteIORef nextKeyToDump k + let (sName, shardId) = decodeKey k + + -- append hstore + -- TODO: How to handle LSN? + success <- appendHStoreWithRetry ldClient sName shardId payload cmpStrategy dumpState statsHolder + if success + then do + ST.cache_store_stat_add_cs_delivered_in_records statsHolder cStreamName 1 + ST.cache_store_stat_add_cs_delivered_total statsHolder cStreamName 1 + else do + ST.cache_store_stat_add_cs_delivered_failed statsHolder cStreamName 1 + + -- call iter next + !move_iter_start <- getPOSIXTime + next iter + ST.serverHistogramAdd statsHolder ST.SHL_ReadCacheStoreLatency =<< msecSince move_iter_start + + -- finish all reading or something happend cause dump loop exit, check the error code + errorM <- getError iter + case errorM of + Just msg -> do + -- FIXME: What if iterator return error when iterating? + Log.fatal $ "Cache store iterator error: " <> Log.build msg + Nothing -> do + end <- getTime Monotonic + let sDuration = toNanoSecs (diffTimeSpec end start) `div` 1000000 + nextKey <- readIORef nextKeyToDump + Log.info $ "Exit cache store iterator, total time " <> Log.build sDuration <> "ms" + <> ", next key to dump: " <> Log.build nextKey + return () + where + cStreamName = "cache_store" + checkContinue iter = do + state <- readTVarIO dumpState + iterValid <- valid iter + return $ state == Dumping && iterValid + +appendHStoreWithRetry + :: S.LDClient + -> T.Text + -> Word64 + -> ByteString + -> S.Compression + -> TVar DumpState + -> ST.StatsHolder + -> IO Bool +appendHStoreWithRetry ldClient streamName shardId payload cmpStrategy dumpState statsHolder = do + isJust <$> loop (3 :: Int) + where + cName = textToCBytes streamName + -- exitNum is used to quickly exit the loop when the dump state is not dumping, avoiding more retries. + exitNum = -99 + + loop cnt + | cnt > 0 = do + !append_start <- getPOSIXTime + res <- try $ S.appendCompressedBS ldClient shardId payload cmpStrategy Nothing + case res of + Right lsn -> do + ST.serverHistogramAdd statsHolder ST.SHL_AppendLatency =<< msecSince append_start + ST.stream_stat_add_append_in_bytes statsHolder cName (fromIntegral $ BS.length payload) + ST.stream_stat_add_append_in_records statsHolder cName 1 + ST.stream_stat_add_append_total statsHolder cName 1 + return $ Just lsn + Left (e :: S.SomeHStoreException) -> do + ST.stream_stat_add_append_failed statsHolder cName 1 + ST.stream_time_series_add_append_failed_requests statsHolder cName 1 + state <- readTVarIO dumpState + case state of + Dumping -> do + let cnt' = cnt - 1 + Log.warning $ "Dump to shardId " <> Log.build shardId <> " failed" + <> ", error: " <> Log.build (displayException e) + <> ", left retries = " <> Log.build (show cnt') + -- sleep 1s + threadDelay $ 1 * 1000 * 1000 + loop cnt' + _ -> loop exitNum + | cnt == exitNum = do + Log.warning $ "Dump to shardId " <> Log.build shardId + <> " failed because cache store is not in dumping state, will retry later." + return Nothing + | otherwise = do + Log.fatal $ "Dump to shardId " <> Log.build shardId + <> " failed after exausting the retry attempts, drop the record." + return Nothing + +----------------------------------------------------------------------------------------------------- +-- helper + +setCacheStoreMode :: CacheStore -> StoreMode -> IO () +setCacheStoreMode CacheStore{..} CacheMode = do + void $ swapMVar enableWrite True + resume <- atomically $ do + state <- readTVar dumpState + case state of + Dumping -> writeTVar dumpState Suspend >> return True + _ -> return False + atomicWriteIORef needResume resume + Log.info $ "Set CacheStore to CacheMode" +setCacheStoreMode CacheStore{..} DumpMode = do + void $ swapMVar enableWrite False + void $ atomically $ do + state <- readTVar dumpState + case state of + Suspend -> writeTVar dumpState Dumping + _ -> pure () + Log.info $ "Set CacheStore to DumpMode" + +whileM :: IO Bool -> IO () -> IO () +whileM cond act = do + cont <- cond + when cont $ do + act + whileM cond act +{-# INLINABLE whileM #-} + +encodeKey :: T.Text -> Word64 -> Word64 -> ByteString +encodeKey streamName shardId offset = + BS.concat [BSC.pack $ printf "%032d" offset, ":", T.encodeUtf8 streamName, ":", BSC.pack $ show shardId] + +decodeKey :: ByteString -> (T.Text, Word64) +decodeKey bs = + let [_ , streamName , shardId] = BSC.split ':' bs + in (T.decodeUtf8 streamName, read $ BSC.unpack shardId) + +getCurrentTimestamp :: IO Int64 +getCurrentTimestamp = fromIntegral . toNanoSecs <$> getTime Monotonic diff --git a/hstream/src/HStream/Server/Config.hs b/hstream/src/HStream/Server/Config.hs index 068b3f9ea..53839fc86 100644 --- a/hstream/src/HStream/Server/Config.hs +++ b/hstream/src/HStream/Server/Config.hs @@ -126,6 +126,9 @@ data ServerOpts = ServerOpts , _querySnapshotPath :: !FilePath , experimentalFeatures :: ![ExperimentalFeature] + , _enableServerCache :: !Bool + , _cacheStorePath :: !FilePath + #ifndef HStreamUseGrpcHaskell , grpcChannelArgs :: ![HsGrpc.ChannelArg] #endif @@ -263,6 +266,12 @@ parseJSONToOptions CliOptions{..} obj = do let experimentalFeatures = cliExperimentalFeatures + cacheStoreCfg <- nodeCfgObj .:? "cache-store" .!= mempty + serverCache <- cacheStoreCfg .:? "enable-server-cache" .!= False + let _enableServerCache = cliEnableServerCache || serverCache + cacheStorePath <- cacheStoreCfg .:? "cache-store-path" .!= "/data/cache_store" + let _cacheStorePath = fromMaybe cacheStorePath cliCacheStorePath + #ifndef HStreamUseGrpcHaskell grpcCfg <- nodeCfgObj .:? "grpc" .!= mempty grpcChanArgsCfg <- grpcCfg .:? "channel-args" .!= mempty diff --git a/hstream/src/HStream/Server/Configuration/Cli.hs b/hstream/src/HStream/Server/Configuration/Cli.hs index 06c4a254f..1dfbb4823 100644 --- a/hstream/src/HStream/Server/Configuration/Cli.hs +++ b/hstream/src/HStream/Server/Configuration/Cli.hs @@ -139,6 +139,10 @@ data CliOptions = CliOptions -- Experimental Features , cliExperimentalFeatures :: ![ExperimentalFeature] + -- Cache Mode config + , cliEnableServerCache :: !Bool + , cliCacheStorePath :: !(Maybe FilePath) + -- Internal options , cliStoreCompression :: !(Maybe Compression) } deriving Show @@ -182,6 +186,9 @@ cliOptionsParser = do cliExperimentalFeatures <- O.many experimentalFeatureParser + cliCacheStorePath <- optional cacheStorePathParser + cliEnableServerCache <- enableServerCacheParser + cliStoreCompression <- optional storeCompressionParser return CliOptions{..} @@ -455,6 +462,17 @@ querySnapshotPathParser = strOption <> metavar "PATH" <> value "/data/query_snapshots" <> help "hstream query snapshot store path" +enableServerCacheParser :: O.Parser Bool +enableServerCacheParser = flag False True + $ long "enable-server-cache" + <> help "enable server cache" + +cacheStorePathParser :: O.Parser FilePath +cacheStorePathParser = strOption + $ long "cache-store-path" + <> metavar "PATH" <> value "/data/cache_store" + <> help "hstream cache store path" + data ExperimentalFeature = ExperimentalStreamV2 deriving (Show, Eq) diff --git a/hstream/src/HStream/Server/Core/Cluster.hs b/hstream/src/HStream/Server/Core/Cluster.hs index cebc953cd..06717883e 100644 --- a/hstream/src/HStream/Server/Core/Cluster.hs +++ b/hstream/src/HStream/Server/Core/Cluster.hs @@ -1,5 +1,3 @@ -{-# LANGUAGE PatternSynonyms #-} - module HStream.Server.Core.Cluster ( describeCluster , lookupResource @@ -13,42 +11,30 @@ module HStream.Server.Core.Cluster , recoverLocalTasks ) where -import Control.Concurrent (MVar, tryReadMVar, withMVar) -import Control.Concurrent.STM (readTVarIO) -import Control.Exception (Handler (..), - SomeException (..), catches) -import Control.Monad (forM_, when) -import qualified Data.List as L -import qualified Data.Map.Strict as Map -import qualified Data.Text as T -import qualified Data.Vector as V +import Control.Concurrent (MVar, withMVar) +import Control.Exception (Exception (displayException), + Handler (..), + SomeException (..), catches) +import Control.Monad (forM_, when) +import qualified Data.Text as T -import HStream.Common.ConsistentHashing (getResNode) -import HStream.Common.Server.Lookup (lookupNode) -import HStream.Common.Server.MetaData (clusterStartTimeId) -import HStream.Common.Types (fromInternalServerNodeWithKey, - getHStreamVersion) -import qualified HStream.Exception as HE -import HStream.Gossip (GossipContext (..), - getFailedNodes, - getMemberList) -import qualified HStream.Gossip as Gossip -import qualified HStream.Gossip.Types as Gossip -import qualified HStream.Logger as Log -import HStream.MetaStore.Types (MetaStore (..)) -import qualified HStream.MetaStore.Types as Meta -import HStream.Server.Core.Common (lookupResource, - parseAllocationKey) +import HStream.Common.Server.Lookup (lookupNode) +import HStream.Common.Server.MetaData (clusterStartTimeId) +import qualified HStream.Exception as HE +import HStream.Gossip (GossipContext (..)) +import qualified HStream.Gossip as Gossip +import qualified HStream.Gossip.Types as Gossip +import qualified HStream.Logger as Log +import HStream.MetaStore.Types (MetaStore (..)) +import HStream.Server.Core.Common (lookupResource) import HStream.Server.HStreamApi -import qualified HStream.Server.HStreamInternal as I -import qualified HStream.Server.MetaData as Meta -import HStream.Server.QueryWorker (QueryWorker (QueryWorker)) -import HStream.Server.Types (ServerContext (..)) -import qualified HStream.Server.Types as Types -import qualified HStream.ThirdParty.Protobuf as Proto -import HStream.Utils (ResourceType (..), - getProtoTimestamp, - pattern EnumPB) +import qualified HStream.Server.HStreamInternal as I +import HStream.Server.QueryWorker (QueryWorker (QueryWorker)) +import HStream.Server.Types (ServerContext (..)) +import qualified HStream.Server.Types as Types +import qualified HStream.ThirdParty.Protobuf as Proto +import HStream.Utils (ResourceType (..), + getProtoTimestamp) describeCluster :: ServerContext -> IO DescribeClusterResponse describeCluster ServerContext{gossipContext = gc@GossipContext{..}, ..} = do @@ -106,7 +92,7 @@ lookupShardReader sc req@LookupShardReaderRequest{lookupShardReaderRequestReader nodeChangeEventHandler :: MVar ServerContext -> Gossip.ServerState -> I.ServerNode -> IO () nodeChangeEventHandler scMVar Gossip.ServerDead I.ServerNode {..} = do - Log.info $ "handle Server Dead event: " <> Log.buildString' serverNodeId + Log.info $ "handle Server Dead event for server: " <> Log.buildString' serverNodeId withMVar scMVar $ \sc@ServerContext{..} -> do recoverDeadNodeTasks sc scIOWorker serverNodeId recoverDeadNodeTasks sc (QueryWorker sc) serverNodeId @@ -133,11 +119,13 @@ recoverTasks :: Types.TaskManager a => ServerContext -> a -> [T.Text] -> IO () recoverTasks sc@ServerContext{..} tm tasks = forM_ tasks $ \task -> do taskNode <- lookupResource sc (Types.resourceType tm) task - when (serverID == serverNodeId taskNode) $ + when (serverID == serverNodeId taskNode) $ do + Log.info $ "recover " <> Log.build (show $ Types.resourceType tm) <> " task: " <> Log.build task catches (Types.recoverTask tm task) [ Handler (\(err :: HE.QueryAlreadyTerminated) -> return ()) , Handler (\(err :: SomeException) -> - Log.warning $ "Failed to recover dead node task" <> Log.buildString' (Types.resourceType tm) - <> " with name" <> Log.build task + Log.fatal $ "Failed to recover dead node task" <> Log.buildString' (Types.resourceType tm) + <> " with name" <> Log.build task + <> ", error: " <> Log.build (displayException err) ) ] diff --git a/hstream/src/HStream/Server/Core/Common.hs b/hstream/src/HStream/Server/Core/Common.hs index 3f37ebd14..997ceecd6 100644 --- a/hstream/src/HStream/Server/Core/Common.hs +++ b/hstream/src/HStream/Server/Core/Common.hs @@ -2,43 +2,33 @@ module HStream.Server.Core.Common where -import Control.Applicative ((<|>)) +import Control.Applicative ((<|>)) import Control.Concurrent -import Control.Concurrent.STM (atomically, readTVarIO, - writeTVar) -import Control.Exception (SomeException (..), throwIO, - try) +import Control.Concurrent.STM (atomically, writeTVar) +import Control.Exception (SomeException (..), throwIO, try) import Control.Monad -import qualified Data.Attoparsec.Text as AP -import qualified Data.ByteString as BS -import Data.Foldable (foldrM) -import qualified Data.HashMap.Strict as HM -import Data.List (find) -import qualified Data.Map.Strict as Map -import Data.Text (Text) -import qualified Data.Text as T -import qualified Data.Vector as V -import Data.Word (Word32, Word64) -import HStream.ThirdParty.Protobuf - -import HStream.Common.ConsistentHashing -import HStream.Common.Server.Lookup (lookupNodePersist) -import qualified HStream.Common.Server.MetaData as P -import HStream.Common.Types (fromInternalServerNodeWithKey) -import qualified HStream.Exception as HE -import HStream.Gossip -import qualified HStream.Logger as Log -import qualified HStream.MetaStore.Types as M +import qualified Data.Attoparsec.Text as AP +import qualified Data.ByteString as BS +import qualified Data.HashMap.Strict as HM +import qualified Data.Map.Strict as Map +import Data.Text (Text) +import qualified Data.Text as T +import qualified Data.Vector as V +import Data.Word (Word32, Word64) + +import HStream.Common.Server.Lookup (lookupNodePersist) +import qualified HStream.Exception as HE +import qualified HStream.Logger as Log +import qualified HStream.MetaStore.Types as M import HStream.Server.HStreamApi -import qualified HStream.Server.MetaData as P +import qualified HStream.Server.MetaData as P import HStream.Server.Types import HStream.SQL -import qualified HStream.Store as HS -import HStream.Utils (ResourceType (..), - decodeByteStringBatch, - msTimestampToProto, - textToCBytes, - updateRecordTimestamp) +import qualified HStream.Store as HS +import HStream.Utils (ResourceType (..), + decodeByteStringBatch, + msTimestampToProto, textToCBytes, + updateRecordTimestamp) insertAckedRecordId :: ShardRecordId -- ^ recordId need to insert @@ -219,7 +209,7 @@ lookupResource sc@ServerContext{..} ResView rid = do M.getMeta @P.ViewInfo rid metaHandle >>= \case Nothing -> throwIO $ HE.ViewNotFound rid Just P.ViewInfo{..} -> lookupResource sc ResQuery (P.queryId viewQuery) -lookupResource sc@ServerContext{..} rtype rid = do +lookupResource ServerContext{..} rtype rid = do let metaId = mkAllocationKey rtype rid lookupNodePersist metaHandle gossipContext loadBalanceHashRing rid metaId scAdvertisedListenersKey diff --git a/hstream/src/HStream/Server/Core/ShardReader.hs b/hstream/src/HStream/Server/Core/ShardReader.hs index fe9be458d..baf444b13 100644 --- a/hstream/src/HStream/Server/Core/ShardReader.hs +++ b/hstream/src/HStream/Server/Core/ShardReader.hs @@ -21,7 +21,7 @@ import ZooKeeper.Exception (ZNONODE (..)) import Control.Concurrent (modifyMVar_, newEmptyMVar, putMVar, readMVar, takeMVar, - withMVar) + threadDelay, withMVar) import Control.Exception (bracket, catch, throwIO, try) import Control.Monad (forM, forM_, join, unless, when) import Data.ByteString (ByteString) @@ -54,7 +54,7 @@ import HStream.Server.Types (BiStreamReader (..), BiStreamReaderSender, ServerContext (..), ServerInternalOffset, - ShardReader (..), + ServerMode (..), ShardReader (..), StreamReader (..), ToOffset (..), getLogLSN, mkShardReader, mkStreamReader) @@ -143,12 +143,19 @@ readShard ServerContext{..} API.ReadShardRequest{..} = do readRecords r@ShardReader{..} = do let cStreamName = textToCBytes targetStream !read_start <- getPOSIXTime - records <- readProcessGap r (fromIntegral readShardRequestMaxRecords) - Stats.serverHistogramAdd scStatsHolder Stats.SHL_ReadLatency =<< msecSince read_start - Stats.stream_stat_add_read_in_bytes scStatsHolder cStreamName (fromIntegral . sum $ map (BS.length . S.recordPayload) records) - Stats.stream_stat_add_read_in_batches scStatsHolder cStreamName (fromIntegral $ length records) - let (records', _) = filterRecords shardReaderStartTs shardReaderEndTs records - receivedRecordsVecs <- forM records' decodeRecordBatch + state <- readIORef serverState + receivedRecordsVecs <- case state of + ServerNormal -> do + records <- readProcessGap r (fromIntegral readShardRequestMaxRecords) + Stats.serverHistogramAdd scStatsHolder Stats.SHL_ReadLatency =<< msecSince read_start + Stats.stream_stat_add_read_in_bytes scStatsHolder cStreamName (fromIntegral . sum $ map (BS.length . S.recordPayload) records) + Stats.stream_stat_add_read_in_batches scStatsHolder cStreamName (fromIntegral $ length records) + let (records', _) = filterRecords shardReaderStartTs shardReaderEndTs records + forM records' decodeRecordBatch + ServerBackup -> do + -- sleep 5ms here to avoid client send too many read requests in a busy loop + threadDelay 5000 + return [] let res = V.fromList $ map (\(_, _, _, record) -> record) receivedRecordsVecs Log.debug $ "reader " <> Log.build readShardRequestReaderId <> " read " <> Log.build (V.length res) <> " batchRecords" diff --git a/hstream/src/HStream/Server/Core/Stream.hs b/hstream/src/HStream/Server/Core/Stream.hs index 95d97c851..6eb94823b 100644 --- a/hstream/src/HStream/Server/Core/Stream.hs +++ b/hstream/src/HStream/Server/Core/Stream.hs @@ -12,7 +12,6 @@ module HStream.Server.Core.Stream , getStream , listStreams , listStreamsWithPrefix - , append , appendStream , listShards , getTailRecordId @@ -36,6 +35,7 @@ import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import Data.Either (partitionEithers) import Data.Functor ((<&>)) +import Data.IORef (readIORef) import qualified Data.List as L import qualified Data.Map.Strict as M import Data.Maybe (fromJust, fromMaybe) @@ -52,10 +52,12 @@ import HStream.Common.Types import qualified HStream.Common.ZookeeperSlotAlloc as Slot import qualified HStream.Exception as HE import qualified HStream.Logger as Log +import qualified HStream.Server.CacheStore as DB import qualified HStream.Server.HStreamApi as API import qualified HStream.Server.MetaData as P import HStream.Server.Types (ServerContext (..), ServerInternalOffset (..), + ServerMode (..), ToOffset (..)) import qualified HStream.Stats as Stats import qualified HStream.Store as S @@ -348,32 +350,6 @@ getTailRecordIdV2 ServerContext{..} slotConfig API.GetTailRecordIdRequest{..} = } return $ API.GetTailRecordIdResponse { getTailRecordIdResponseTailRecordId = Just recordId} -append :: HasCallStack - => ServerContext - -> T.Text -- streamName - -> Word64 -- shardId - -> API.BatchedRecord -- payload - -> IO API.AppendResponse -append sc@ServerContext{..} streamName shardId payload = do - !recv_time <- getPOSIXTime - Log.debug $ "Receive Append Request: StreamName {" - <> Log.build streamName - <> "(shardId: " - <> Log.build shardId - <> ")}" - - Stats.handle_time_series_add_queries_in scStatsHolder "append" 1 - Stats.stream_stat_add_append_total scStatsHolder cStreamName 1 - Stats.stream_time_series_add_append_in_requests scStatsHolder cStreamName 1 - - !append_start <- getPOSIXTime - resp <- appendStream sc streamName shardId payload - Stats.serverHistogramAdd scStatsHolder Stats.SHL_AppendLatency =<< msecSince append_start - Stats.serverHistogramAdd scStatsHolder Stats.SHL_AppendRequestLatency =<< msecSince recv_time - return resp - where - cStreamName = textToCBytes streamName - appendStream :: HasCallStack => ServerContext -> T.Text @@ -385,18 +361,38 @@ appendStream ServerContext{..} streamName shardId record = do recordSize = API.batchedRecordBatchSize record payloadSize = BS.length payload when (payloadSize > scMaxRecordSize) $ throwIO $ HE.InvalidRecordSize payloadSize - S.AppendCompletion {..} <- S.appendCompressedBS scLDClient shardId payload cmpStrategy Nothing - Stats.stream_stat_add_append_in_bytes scStatsHolder cStreamName (fromIntegral payloadSize) - Stats.stream_stat_add_append_in_records scStatsHolder cStreamName (fromIntegral recordSize) - Stats.stream_time_series_add_append_in_bytes scStatsHolder cStreamName (fromIntegral payloadSize) - Stats.stream_time_series_add_append_in_records scStatsHolder cStreamName (fromIntegral recordSize) + + state <- readIORef serverState + S.AppendCompletion{..} <- case state of + ServerNormal -> do + Stats.handle_time_series_add_queries_in scStatsHolder "append" 1 + Stats.stream_stat_add_append_total scStatsHolder cStreamName 1 + Stats.stream_time_series_add_append_in_requests scStatsHolder cStreamName 1 + + !append_start <- getPOSIXTime + appendRes <- S.appendCompressedBS scLDClient shardId payload cmpStrategy Nothing `catch` record_failed + + Stats.serverHistogramAdd scStatsHolder Stats.SHL_AppendLatency =<< msecSince append_start + Stats.stream_stat_add_append_in_bytes scStatsHolder cStreamName (fromIntegral payloadSize) + Stats.stream_stat_add_append_in_records scStatsHolder cStreamName (fromIntegral recordSize) + Stats.stream_time_series_add_append_in_bytes scStatsHolder cStreamName (fromIntegral payloadSize) + Stats.stream_time_series_add_append_in_records scStatsHolder cStreamName (fromIntegral recordSize) + return appendRes + ServerBackup -> do + DB.writeRecord cacheStore streamName shardId payload + let rids = V.zipWith (API.RecordId shardId) (V.replicate (fromIntegral recordSize) appendCompLSN) (V.fromList [0..]) - return $ API.AppendResponse { - appendResponseStreamName = streamName + return $ API.AppendResponse + { appendResponseStreamName = streamName , appendResponseShardId = shardId - , appendResponseRecordIds = rids } - where - cStreamName = textToCBytes streamName + , appendResponseRecordIds = rids + } + where + cStreamName = textToCBytes streamName + record_failed (e :: S.SomeHStoreException) = do + Stats.stream_stat_add_append_failed scStatsHolder cStreamName 1 + Stats.stream_time_series_add_append_failed_requests scStatsHolder cStreamName 1 + throwIO e listShards :: HasCallStack diff --git a/hstream/src/HStream/Server/Handler/Cluster.hs b/hstream/src/HStream/Server/Handler/Cluster.hs index c1c750a97..547c3a8d8 100644 --- a/hstream/src/HStream/Server/Handler/Cluster.hs +++ b/hstream/src/HStream/Server/Handler/Cluster.hs @@ -20,17 +20,20 @@ module HStream.Server.Handler.Cluster , handleLookupKey ) where -import qualified HsGrpc.Server as G +import Control.Exception (throwIO) +import Data.IORef (readIORef) import Network.GRPC.HighLevel.Generated -import Control.Exception (throwIO) +import qualified HsGrpc.Server as G +import HStream.Common.Server.Lookup (lookupNode) import qualified HStream.Exception as HE import qualified HStream.Logger as Log import qualified HStream.Server.Core.Cluster as C import HStream.Server.Core.Common (lookupResource) import HStream.Server.Exception import HStream.Server.HStreamApi -import HStream.Server.Types (ServerContext (..)) +import HStream.Server.Types (ServerContext (..), + ServerMode (..)) import HStream.ThirdParty.Protobuf (Empty) import HStream.Utils (returnResp, validateResourceIdAndThrow) @@ -49,14 +52,28 @@ lookupResourceHandler :: ServerContext -> ServerRequest 'Normal LookupResourceRequest ServerNode -> IO (ServerResponse 'Normal ServerNode) -lookupResourceHandler sc (ServerNormalRequest _meta req@LookupResourceRequest{..}) = +lookupResourceHandler sc@ServerContext{..} (ServerNormalRequest _meta req@LookupResourceRequest{..}) = defaultExceptionHandle $ do - Log.debug $ "receive lookup resource request: " <> Log.build (show req) + Log.info $ "receive lookup resource request: " <> Log.build (show req) case lookupResourceRequestResType of Enumerated (Right rType) -> do validateResourceIdAndThrow rType lookupResourceRequestResId - returnResp =<< lookupResource sc rType lookupResourceRequestResId + state <- readIORef serverState + case state of + ServerNormal -> do + returnResp =<< lookupResource sc rType lookupResourceRequestResId + ServerBackup -> do + theNode <- case rType of + ResourceTypeResShard -> do + doLookup lookupResourceRequestResId + -- ResourceTypeResStream -> doLookup lookupResourceRequestResId + tp -> do + Log.warning $ "reject lookup " <> Log.build (show tp) <> " request because server is in backup mode" + throwIO $ HE.ResourceAllocationException "server is in backup mode, try later" + returnResp theNode x -> throwIO $ HE.InvalidResourceType (show x) + where + doLookup rid = lookupNode loadBalanceHashRing rid scAdvertisedListenersKey lookupShardHandler :: ServerContext @@ -94,13 +111,25 @@ handleDescribeCluster :: ServerContext -> G.UnaryHandler Empty DescribeClusterRe handleDescribeCluster sc _ _ = catchDefaultEx $ C.describeCluster sc handleLookupResource :: ServerContext -> G.UnaryHandler LookupResourceRequest ServerNode -handleLookupResource sc _ req@LookupResourceRequest{..} = catchDefaultEx $ do - Log.debug $ "receive lookup resource request: " <> Log.build (show req) +handleLookupResource sc@ServerContext{..} _sc req@LookupResourceRequest{..} = catchDefaultEx $ do + Log.info $ "receive lookup resource request: " <> Log.build (show req) case lookupResourceRequestResType of Enumerated (Right rType) -> do validateResourceIdAndThrow rType lookupResourceRequestResId - C.lookupResource sc rType lookupResourceRequestResId + state <- readIORef serverState + case state of + ServerNormal -> do + C.lookupResource sc rType lookupResourceRequestResId + ServerBackup -> do + case rType of + ResourceTypeResShard -> do + doLookup lookupResourceRequestResId + tp -> do + Log.warning $ "reject lookup " <> Log.build (show tp) <> " request because server is in backup mode" + throwIO $ HE.ResourceAllocationException "server is in backup mode, try later" x -> throwIO $ HE.InvalidResourceType (show x) + where + doLookup rid = lookupNode loadBalanceHashRing rid scAdvertisedListenersKey handleLookupShard :: ServerContext -> G.UnaryHandler LookupShardRequest LookupShardResponse handleLookupShard sc _ req = catchDefaultEx $ C.lookupShard sc req diff --git a/hstream/src/HStream/Server/Handler/Stats.hs b/hstream/src/HStream/Server/Handler/Stats.hs index a1d55834b..0441c59ef 100644 --- a/hstream/src/HStream/Server/Handler/Stats.hs +++ b/hstream/src/HStream/Server/Handler/Stats.hs @@ -15,7 +15,8 @@ module HStream.Server.Handler.Stats , handleGetStats ) where -import Control.Exception (throwIO) +import Control.Exception (Exception (displayException), + SomeException, throwIO, try) import Data.Functor ((<&>)) import Data.Int (Int64) import Data.Map.Strict (Map) @@ -139,6 +140,8 @@ getStatsInternal ServerContext{scStatsHolder = holder} s@(StatTypeStatQueryStat getQueryStatsInternal holder stats <&> convert s getStatsInternal ServerContext{scStatsHolder = holder} s@(StatTypeStatViewStat stats) = do getViewStatsInternal holder stats <&> convert s +getStatsInternal ServerContext{scStatsHolder = holder} s@(StatTypeStatCacheStoreStat stats) = do + getCacheStoreStatsInternal holder stats <&> convert s getStreamStatsInternal :: Stats.StatsHolder @@ -196,7 +199,7 @@ getConnectorStatsInternal -> PS.Enumerated API.ConnectorStats -> IO (Either T.Text (Map CBytes Int64)) getConnectorStatsInternal statsHolder ioWorker (PS.Enumerated stats) = do - Log.debug $ "request stream stats: " <> Log.buildString' stats + Log.debug $ "request connector stats: " <> Log.buildString' stats s <- Stats.newAggregateStats statsHolder case stats of Right API.ConnectorStatsDeliveredInRecords -> @@ -204,9 +207,40 @@ getConnectorStatsInternal statsHolder ioWorker (PS.Enumerated stats) = do Right API.ConnectorStatsDeliveredInBytes -> Stats.connector_stat_getall_delivered_in_bytes s <&> Right Right API.ConnectorStatsIsAlive -> do - cs <- IO.listIOTasks ioWorker - return . Right . Map.fromList $ - map (\API.Connector{..} -> if connectorStatus == "RUNNING" then (U.textToCBytes connectorName, 1) else (U.textToCBytes connectorName, 0)) cs + res <- try @SomeException $ IO.listIOTasks ioWorker + case res of + Left e -> return . Left . T.pack $ "can't list io tasks because meta exception: " <> displayException e + Right cs -> do + return . Right . Map.fromList $ + map (\API.Connector{..} -> if connectorStatus == "RUNNING" then (U.textToCBytes connectorName, 1) else (U.textToCBytes connectorName, 0)) cs + Left _ -> return . Left . T.pack $ "invalid stat type " <> show stats + +getCacheStoreStatsInternal + :: Stats.StatsHolder + -> PS.Enumerated API.CacheStoreStats + -> IO (Either T.Text (Map CBytes Int64)) +getCacheStoreStatsInternal statsHolder (PS.Enumerated stats) = do + Log.debug $ "request cache store stats: " <> Log.buildString' stats + s <- Stats.newAggregateStats statsHolder + case stats of + Right API.CacheStoreStatsCSAppendInBytes -> + Stats.cache_store_stat_getall_cs_append_in_bytes s <&> Right + Right API.CacheStoreStatsCSAppendInRecords -> do + Stats.cache_store_stat_getall_cs_append_in_records s <&> Right + Right API.CacheStoreStatsCSAppendTotal -> do + Stats.cache_store_stat_getall_cs_append_total s <&> Right + Right API.CacheStoreStatsCSAppendFailed -> + Stats.cache_store_stat_getall_cs_append_failed s <&> Right + Right API.CacheStoreStatsCSReadInBytes -> + Stats.cache_store_stat_getall_cs_read_in_bytes s <&> Right + Right API.CacheStoreStatsCSReadInRecords -> + Stats.cache_store_stat_getall_cs_read_in_records s <&> Right + Right API.CacheStoreStatsCSDeliveredInRecords -> + Stats.cache_store_stat_getall_cs_delivered_in_records s <&> Right + Right API.CacheStoreStatsCSDeliveredTotal -> do + Stats.cache_store_stat_getall_cs_delivered_total s <&> Right + Right API.CacheStoreStatsCSDeliveredFailed -> do + Stats.cache_store_stat_getall_cs_delivered_failed s <&> Right Left _ -> return . Left . T.pack $ "invalid stat type " <> show stats getQueryStatsInternal diff --git a/hstream/src/HStream/Server/Handler/Stream.hs b/hstream/src/HStream/Server/Handler/Stream.hs index 27ebd7c5a..433ce6012 100644 --- a/hstream/src/HStream/Server/Handler/Stream.hs +++ b/hstream/src/HStream/Server/Handler/Stream.hs @@ -39,15 +39,16 @@ module HStream.Server.Handler.Stream ) where import Control.Exception +import Control.Monad (when) import qualified Data.Map as Map import Data.Maybe (fromJust, isNothing) import qualified Data.Vector as V +import Database.RocksDB.Exception (RocksDbException) import qualified HsGrpc.Server as G import qualified HsGrpc.Server.Types as G import Network.GRPC.HighLevel.Generated import qualified ZooKeeper.Exception as ZK -import Control.Monad (when) import qualified HStream.Common.ZookeeperSlotAlloc as Slot import qualified HStream.Exception as HE import qualified HStream.Logger as Log @@ -56,7 +57,6 @@ import HStream.Server.Exception import HStream.Server.HStreamApi import HStream.Server.Types (ServerContext (..)) import HStream.Server.Validation -import qualified HStream.Stats as Stats import qualified HStream.Store as Store import HStream.ThirdParty.Protobuf as PB import HStream.Utils @@ -221,25 +221,19 @@ appendHandler :: ServerContext -> ServerRequest 'Normal AppendRequest AppendResponse -> IO (ServerResponse 'Normal AppendResponse) -appendHandler sc@ServerContext{..} (ServerNormalRequest _metadata request@AppendRequest{..}) = - appendStreamExceptionHandle inc_failed $ do +appendHandler sc (ServerNormalRequest _metadata request@AppendRequest{..}) = + appendStreamExceptionHandle $ do + Log.debug $ "Receive Append Request: StreamName {" <> Log.build appendRequestStreamName + <> "(shardId: " <> Log.build appendRequestShardId <> ")}" validateAppendRequest request - returnResp =<< C.append sc appendRequestStreamName appendRequestShardId (fromJust appendRequestRecords) - where - inc_failed = do - Stats.stream_stat_add_append_failed scStatsHolder cStreamName 1 - Stats.stream_time_series_add_append_failed_requests scStatsHolder cStreamName 1 - cStreamName = textToCBytes appendRequestStreamName + returnResp =<< C.appendStream sc appendRequestStreamName appendRequestShardId (fromJust appendRequestRecords) handleAppend :: ServerContext -> G.UnaryHandler AppendRequest AppendResponse -handleAppend sc@ServerContext{..} _ req@AppendRequest{..} = appendExHandle inc_failed $ do +handleAppend sc _ req@AppendRequest{..} = appendExHandle $ do + Log.debug $ "Receive Append Request: StreamName {" <> Log.build appendRequestStreamName + <> "(shardId: " <> Log.build appendRequestShardId <> ")}" validateAppendRequest req - C.append sc appendRequestStreamName appendRequestShardId (fromJust appendRequestRecords) - where - inc_failed = do - Stats.stream_stat_add_append_failed scStatsHolder cStreamName 1 - Stats.stream_time_series_add_append_failed_requests scStatsHolder cStreamName 1 - cStreamName = textToCBytes appendRequestStreamName + C.appendStream sc appendRequestStreamName appendRequestShardId (fromJust appendRequestRecords) {-# INLINE handleAppend #-} -------------------------------------------------------------------------------- @@ -286,11 +280,9 @@ handleTrimShard sc _ request@TrimShardRequest{..} = catchDefaultEx $ do -------------------------------------------------------------------------------- -- Exception Handlers -appendStreamExceptionHandle :: IO () -> HE.ExceptionHandle (ServerResponse 'Normal a) -appendStreamExceptionHandle f = HE.mkExceptionHandle' whileEx mkHandlers +appendStreamExceptionHandle :: HE.ExceptionHandle (ServerResponse 'Normal a) +appendStreamExceptionHandle = HE.mkExceptionHandle mkHandlers where - whileEx :: forall e. Exception e => e -> IO () - whileEx err = Log.warning (Log.buildString' err) >> f handlers = [ Handler (\(err :: Store.NOTFOUND) -> return (StatusUnavailable, HE.mkStatusDetails err)) @@ -302,6 +294,8 @@ appendStreamExceptionHandle f = HE.mkExceptionHandle' whileEx mkHandlers return (StatusUnavailable, HE.mkStatusDetails err)) , Handler (\(err :: Store.PEER_UNAVAILABLE) -> do return (StatusUnavailable, HE.mkStatusDetails err)) + , Handler (\(err :: RocksDbException) -> + return (StatusInternal, HE.mkStatusDetails err)) ] ++ defaultHandlers mkHandlers = HE.setRespType mkServerErrResp handlers @@ -310,8 +304,8 @@ appendStreamExceptionHandle f = HE.mkExceptionHandle' whileEx mkHandlers Log.warning (Log.buildString' err); \ G.throwGrpcError $ HE.mkGrpcStatus err G.StatusUnavailable -appendExHandle :: IO () -> (IO a -> IO a) -appendExHandle f = HE.mkExceptionHandle' (const f) handlers +appendExHandle :: IO a -> IO a +appendExHandle = HE.mkExceptionHandle handlers where handlers = [ MkUnavailable(Store.NOTFOUND) @@ -319,6 +313,8 @@ appendExHandle f = HE.mkExceptionHandle' (const f) handlers , MkUnavailable(Store.NOSEQUENCER) , MkUnavailable(Store.TIMEDOUT) , MkUnavailable(Store.PEER_UNAVAILABLE) + , Handler (\(err :: RocksDbException) -> + G.throwGrpcError $ HE.mkGrpcStatus err G.StatusInternal) ] ++ defaultExHandlers listShardsExceptionHandle :: HE.ExceptionHandle (ServerResponse 'Normal a) diff --git a/hstream/src/HStream/Server/HealthMonitor.hs b/hstream/src/HStream/Server/HealthMonitor.hs new file mode 100644 index 000000000..3858df252 --- /dev/null +++ b/hstream/src/HStream/Server/HealthMonitor.hs @@ -0,0 +1,105 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE MultiWayIf #-} + +module HStream.Server.HealthMonitor + ( HealthMonitor + , mkHealthMonitor + , startMonitor + ) +where + +import Control.Concurrent (threadDelay) +import Control.Exception (SomeException, try) +import Control.Monad (forever, when) +import Data.Time +import System.Clock + +import HStream.Common.ZookeeperClient (unsafeGetZHandle) +import qualified HStream.Logger as Log +import HStream.MetaStore.Types (MetaHandle (..)) +import HStream.MetaStore.ZookeeperUtils (checkRecoverable) +import HStream.Server.CacheStore (StoreMode (..), dumpToHStore, + initCacheStore, + setCacheStoreMode) +import HStream.Server.Types (ServerContext (..), + ServerMode (..), + getServerMode, setServerMode) +import qualified HStream.Stats as ST +import qualified HStream.Store as S + +data HealthMonitor = HealthMonitor + { ldChecker :: S.LdChecker + , metaHandle :: MetaHandle + , statsHolder :: ST.StatsHolder + , ldUnhealthyNodesLimit :: Int + } + +mkHealthMonitor :: S.LDClient -> ST.StatsHolder -> MetaHandle -> Int -> IO HealthMonitor +mkHealthMonitor ldClient statsHolder metaHandle ldUnhealthyNodesLimit = do + ldChecker <- S.newLdChecker ldClient + return HealthMonitor{..} + +startMonitor :: ServerContext -> HealthMonitor -> Int -> IO () +startMonitor sc hm delaySecond = forever $ do + threadDelay $ delaySecond * 1000 * 1000 + start <- getCurrentTime + -- Log.debug $ "========== docheck start..." <> " in " <> Log.build (show start) + res <- try @SomeException $ docheck sc hm + end <- getCurrentTime + case res of + Left e -> Log.fatal $ "monitor check error: " <> Log.build (show e) + Right _ -> return () + let diff = nominalDiffTimeToSeconds $ diffUTCTime end start + when (diff > 1) $ + Log.warning $ "Monitor check return slow, total use " <> Log.build (show diff) <> "s" + Log.debug $ "Health monitor finish check in " <> Log.build (show end) + <> ", with start time: " <> Log.build (show start) + <> ", duration: " <> Log.build (show diff) + +docheck :: ServerContext -> HealthMonitor -> IO () +docheck sc@ServerContext{..} hm = do + ldHealthy <- checkLdCluster hm + metaHealthy <- checkMeta hm + serverMode <- getServerMode sc + when (not ldHealthy || not metaHealthy) $ + Log.warning $ "Healthy check find unhealthy service: " + <> "ld cluster healthy: " <> Log.build (show ldHealthy) + <> ", meta cluster healthy: " <> Log.build (show metaHealthy) + <> ", serverMode: " <> Log.build (show serverMode) + if | serverMode == ServerNormal && not ldHealthy && not metaHealthy -> do + initCacheStore cacheStore + setCacheStoreMode cacheStore CacheMode + setServerMode sc ServerBackup + Log.info $ "Change server to back up mode" + | serverMode == ServerBackup && ldHealthy && metaHealthy -> do + setServerMode sc ServerNormal + setCacheStoreMode cacheStore DumpMode + Log.info $ "Change server to normal mode" + dumpToHStore cacheStore scLDClient cmpStrategy + | otherwise -> return () + +checkLdCluster :: HealthMonitor -> IO Bool +checkLdCluster HealthMonitor{..} = do + withLatency statsHolder "LdCluster" $ S.isLdClusterHealthy ldChecker ldUnhealthyNodesLimit + +checkMeta :: HealthMonitor -> IO Bool +checkMeta HealthMonitor{..} | ZKHandle c <- metaHandle = do + withLatency statsHolder "MetaCluster" $ checkRecoverable =<< unsafeGetZHandle c +checkMeta HealthMonitor{..} | _ <- metaHandle = do + return True + +withLatency :: ST.StatsHolder -> String -> IO a -> IO a +withLatency statsHolder checkType action = do + !start <- getTime Monotonic + res <- action + !end <- getTime Monotonic + let msDuration = toNanoSecs (diffTimeSpec end start) `div` 1000000 + when (msDuration > 1000) $ + Log.warning $ "check " <> Log.build checkType <> " return slow, total time " <> Log.build msDuration <> "ms" + case checkType of + "LdCluster" -> do + ST.serverHistogramAdd statsHolder ST.SHL_CheckStoreClusterLatency (fromIntegral msDuration) + "MetaCluster" -> do + ST.serverHistogramAdd statsHolder ST.SHL_CheckMetaClusterLatency (fromIntegral msDuration) + _ -> return () + return res diff --git a/hstream/src/HStream/Server/Initialization.hs b/hstream/src/HStream/Server/Initialization.hs index 37c21ab1e..2b45c86d6 100644 --- a/hstream/src/HStream/Server/Initialization.hs +++ b/hstream/src/HStream/Server/Initialization.hs @@ -40,6 +40,7 @@ import qualified Z.Data.CBytes as CB #if __GLASGOW_HASKELL__ < 902 import qualified HStream.Admin.Store.API as AA #endif +import Data.IORef (newIORef) import HStream.Common.ConsistentHashing (HashRing, constructServerMap, getAllocatedNodeId) import HStream.Common.Server.HashRing (initializeHashRing) @@ -49,6 +50,7 @@ import qualified HStream.IO.Types as IO import qualified HStream.IO.Worker as IO import qualified HStream.Logger as Log import HStream.MetaStore.Types (MetaHandle (..)) +import HStream.Server.CacheStore (mkCacheStore) import HStream.Server.Config (ServerOpts (..), TlsConfig (..)) import HStream.Server.Types @@ -88,11 +90,30 @@ initializeServer opts@ServerOpts{..} gossipContext hh db_m = do shardReaderMap <- newMVar HM.empty + serverMode <- newIORef ServerNormal + + -- ref: https://github.com/facebook/rocksdb/wiki/Setup-Options-and-Basic-Tuning#other-general-options + let tableOptions = def + { RocksDB.blockSize = 16 * 1024 + , RocksDB.pinL0FilterAndIndexBlocksInCache = True + } + dbOption = def + { RocksDB.createIfMissing = True + -- , RocksDB.maxBackgroundJobs = 6 + , RocksDB.blockBasedTableOptions = tableOptions + -- , RocksDB.bytesPerSync = 1048576 + } + let writeOption = def { RocksDB.disableWAL = True } + -- readOption = def { RocksDB.readaheadSize = 64 * 1024 * 1024 } + readOption = def + let path = _cacheStorePath <> show _serverID + cachedStore <- mkCacheStore path dbOption writeOption readOption statsHolder + -- recovery tasks return ServerContext - { metaHandle = hh + { metaHandle = hh , scLDClient = ldclient , serverID = _serverID , scAdvertisedListenersKey = Nothing @@ -112,6 +133,8 @@ initializeServer opts@ServerOpts{..} gossipContext hh db_m = do , shardReaderMap = shardReaderMap , querySnapshotPath = _querySnapshotPath , querySnapshotter = db_m + , serverState = serverMode + , cacheStore = cachedStore } -------------------------------------------------------------------------------- diff --git a/hstream/src/HStream/Server/Types.hs b/hstream/src/HStream/Server/Types.hs index f67f6c3fa..1cdacc7bf 100644 --- a/hstream/src/HStream/Server/Types.hs +++ b/hstream/src/HStream/Server/Types.hs @@ -5,6 +5,7 @@ {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE TupleSections #-} module HStream.Server.Types where @@ -33,23 +34,24 @@ import qualified HStream.Admin.Store.API as AA #endif import Control.Exception (throw, throwIO) import Control.Monad (when) -import Data.IORef (IORef) +import Data.IORef (IORef, atomicModifyIORef', + readIORef) import Data.Maybe (fromJust) import HStream.Base.Timer (CompactedWorker) import HStream.Common.Server.HashRing (LoadBalanceHashRing) import HStream.Common.Types (ShardKey) import qualified HStream.Exception as HE -import HStream.Gossip.Types (Epoch, GossipContext) +import HStream.Gossip.Types (GossipContext) import qualified HStream.IO.Types as IO import qualified HStream.IO.Worker as IO import HStream.MetaStore.Types (MetaHandle) +import HStream.Server.CacheStore (CacheStore) import HStream.Server.Config import qualified HStream.Server.HStreamApi as API import qualified HStream.Stats as Stats import qualified HStream.Store as HS import qualified HStream.Store as S import HStream.Utils (ResourceType (ResConnector), - textToCBytes, timestampToMsTimestamp) import Network.GRPC.HighLevel.Generated (GRPCIOError) @@ -86,6 +88,10 @@ type ServerID = Word32 type ServerState = PB.Enumerated API.NodeState type ShardDict = M.Map ShardKey HS.C_LogID +-- When server is in Backup mode, it will return all lookup request to its own serverID, and cache all write +-- request to local storage +data ServerMode = ServerNormal | ServerBackup deriving(Show, Eq) + data ServerContext = ServerContext { scLDClient :: HS.LDClient , serverID :: Word32 @@ -107,8 +113,19 @@ data ServerContext = ServerContext , shardReaderMap :: MVar (HM.HashMap Text (MVar ShardReader)) , querySnapshotPath :: FilePath , querySnapshotter :: Maybe RocksDB.DB + , serverState :: IORef ServerMode + , cacheStore :: CacheStore } +setServerMode :: ServerContext -> ServerMode -> IO () +setServerMode ServerContext{..} state = do + _ <- atomicModifyIORef' serverState $ (state,) + return () + +getServerMode :: ServerContext -> IO ServerMode +getServerMode ServerContext{..} = do + readIORef serverState + data SubscribeContextNewWrapper = SubscribeContextNewWrapper { scnwState :: TVar SubscribeState, scnwContext :: TMVar SubscribeContext diff --git a/hstream/test/HStream/ConfigSpec.hs b/hstream/test/HStream/ConfigSpec.hs index e3bde07fe..3f374d464 100644 --- a/hstream/test/HStream/ConfigSpec.hs +++ b/hstream/test/HStream/ConfigSpec.hs @@ -9,7 +9,6 @@ import Data.Bifunctor (second) import Data.ByteString (ByteString) import qualified Data.HashMap.Strict as HM import qualified Data.Map.Strict as M -import qualified Data.Map.Strict as Map import Data.Maybe (fromMaybe, isJust) import qualified Data.Set as Set import Data.Text (Text) @@ -29,10 +28,7 @@ import qualified Z.Data.CBytes as CB import HStream.Gossip (GossipOpts (..), defaultGossipOpts) import HStream.IO.Types (IOOptions (..)) -import HStream.Server.Config (CliOptions (..), - MetaStoreAddr (..), - ServerOpts (..), - TlsConfig (..), +import HStream.Server.Config (ServerOpts (..), parseHostPorts, parseJSONToOptions) import HStream.Server.Configuration.Cli @@ -54,12 +50,12 @@ spec = describe "HStream.ConfigSpec" $ do addr2 = "hstream://127.0.0.1:6571" listener2 = SAI.Listener{listenerAddress = "127.0.0.1", listenerPort = 6571} - either error Map.toList (parseAdvertisedListeners ("l1:" <> addr1)) - `shouldBe` Map.toList (M.singleton "l1" (Set.singleton listener1)) - either error Map.toList (parseAdvertisedListeners ("l1:" <> addr1 <> ",l2:" <> addr2)) - `shouldBe` Map.toList (M.fromList [ ("l1", Set.singleton listener1) - , ("l2", Set.singleton listener2) - ]) + either error M.toList (parseAdvertisedListeners ("l1:" <> addr1)) + `shouldBe` M.toList (M.singleton "l1" (Set.singleton listener1)) + either error M.toList (parseAdvertisedListeners ("l1:" <> addr1 <> ",l2:" <> addr2)) + `shouldBe` M.toList (M.fromList [ ("l1", Set.singleton listener1) + , ("l2", Set.singleton listener2) + ]) xdescribe "TODO: parseConfig" $ do it "basic config test" $ do @@ -126,6 +122,8 @@ defaultConfig = ServerOpts , _ioOptions = defaultIOOptions , _querySnapshotPath = "/data/query_snapshots" , experimentalFeatures = [] + , _enableServerCache = False + , _cacheStorePath = "" , grpcChannelArgs = [] , serverTokens = [] } @@ -249,6 +247,9 @@ emptyCliOptions = CliOptions , cliQuerySnapshotPath = Nothing , cliExperimentalFeatures = [] + + , cliEnableServerCache = False + , cliCacheStorePath = Nothing } @@ -294,6 +295,8 @@ instance Arbitrary ServerOpts where let experimentalFeatures = [] let grpcChannelArgs = [] let serverTokens = [] + let _enableServerCache = False + let _cacheStorePath = "" pure ServerOpts{..} instance Arbitrary CliOptions where @@ -305,7 +308,7 @@ instance Arbitrary CliOptions where cliServerGossipAddress <- genMaybe addressGen cliServerAdvertisedAddress <- genMaybe addressGen cliServerAdvertisedListeners <- arbitrary - cliListenersSecurityProtocolMap <- M.fromList . zip (Map.keys cliServerAdvertisedListeners) . repeat <$> elements ["plaintext", "tls"] + cliListenersSecurityProtocolMap <- M.fromList . zip (M.keys cliServerAdvertisedListeners) . repeat <$> elements ["plaintext", "tls"] cliServerInternalPort <- genMaybe $ fromIntegral <$> portGen cliServerID <- arbitrary cliServerLogLevel <- genMaybe $ read <$> logLevelGen @@ -328,6 +331,8 @@ instance Arbitrary CliOptions where cliIoConnectorImages <- listOf5' $ T.pack <$> connectorImageCliOptGen let cliQuerySnapshotPath = Just "/data/query_snapshots" let cliExperimentalFeatures = [] + let cliEnableServerCache = False + let cliCacheStorePath = Nothing pure CliOptions{..} instance Arbitrary Listener where @@ -447,8 +452,8 @@ updateServerOptsWithCliOpts CliOptions{..} x@ServerOpts{..} = x { , _serverInternalPort = fromMaybe _serverInternalPort cliServerInternalPort , _serverAddress = fromMaybe _serverAddress cliServerAdvertisedAddress , _serverGossipAddress = fromMaybe _serverGossipAddress cliServerGossipAddress - , _serverAdvertisedListeners = Map.union cliServerAdvertisedListeners _serverAdvertisedListeners - , _listenersSecurityProtocolMap = Map.union cliListenersSecurityProtocolMap _listenersSecurityProtocolMap + , _serverAdvertisedListeners = M.union cliServerAdvertisedListeners _serverAdvertisedListeners + , _listenersSecurityProtocolMap = M.union cliListenersSecurityProtocolMap _listenersSecurityProtocolMap , _serverID = fromMaybe _serverID cliServerID , _metaStore = fromMaybe _metaStore cliMetaStore , _ldConfigPath = cliStoreConfigPath @@ -462,7 +467,7 @@ updateServerOptsWithCliOpts CliOptions{..} x@ServerOpts{..} = x { , _ldLogLevel = fromMaybe _ldLogLevel cliLdLogLevel , _ckpRepFactor = fromMaybe _ckpRepFactor cliCkpRepFactor , _ioOptions = cliIoOptions - , _securityProtocolMap = Map.insert "tls" tlsConfig' _securityProtocolMap} + , _securityProtocolMap = M.insert "tls" tlsConfig' _securityProtocolMap} where port = fromMaybe _serverPort cliServerPort updateSeedsPort = second $ fromMaybe (fromIntegral port)