diff --git a/protocol/chainlib/consumer_websocket_manager.go b/protocol/chainlib/consumer_websocket_manager.go index 1d5339638e..a3bd553424 100644 --- a/protocol/chainlib/consumer_websocket_manager.go +++ b/protocol/chainlib/consumer_websocket_manager.go @@ -160,7 +160,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() { continue } - // check whether its a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow. + // check whether it's a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow. if !IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) { if IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE) { err := cwm.consumerWsSubscriptionManager.Unsubscribe(webSocketCtx, protocolMessage, dappID, userIp, cwm.WebsocketConnectionUID, metricsData) diff --git a/protocol/chainlib/consumer_ws_subscription_manager.go b/protocol/chainlib/consumer_ws_subscription_manager.go index 10d8972e38..102bd8240a 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager.go +++ b/protocol/chainlib/consumer_ws_subscription_manager.go @@ -56,6 +56,7 @@ type ConsumerWSSubscriptionManager struct { activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage currentlyPendingSubscriptions map[string]*pendingSubscriptionsBroadcastManager lock sync.RWMutex + consumerMetricsManager *metrics.ConsumerMetricsManager } func NewConsumerWSSubscriptionManager( @@ -65,6 +66,7 @@ func NewConsumerWSSubscriptionManager( connectionType string, chainParser ChainParser, activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage, + consumerMetricsManager *metrics.ConsumerMetricsManager, ) *ConsumerWSSubscriptionManager { return &ConsumerWSSubscriptionManager{ connectedDapps: make(map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply]), @@ -76,6 +78,7 @@ func NewConsumerWSSubscriptionManager( relaySender: relaySender, connectionType: connectionType, activeSubscriptionProvidersStorage: activeSubscriptionProvidersStorage, + consumerMetricsManager: consumerMetricsManager, } } @@ -216,6 +219,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( // called after send relay failure or parsing failure afterwards onSubscriptionFailure := func() { + go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) cwsm.failedPendingSubscription(hashedParams) closeWebsocketRepliesChannel() } @@ -255,6 +259,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( // Validated there are no active subscriptions that we can use. firstSubscriptionReply, returnWebsocketRepliesChan := cwsm.checkForActiveSubscriptionWithLock(webSocketCtx, hashedParams, protocolMessage, dappKey, websocketRepliesSafeChannelSender, closeWebsocketRepliesChannel) if firstSubscriptionReply != nil { + go cwsm.consumerMetricsManager.SetDuplicatedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) if returnWebsocketRepliesChan { return firstSubscriptionReply, websocketRepliesChan, nil } @@ -412,7 +417,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription( cwsm.successfulPendingSubscription(hashedParams) // Need to be run once for subscription go cwsm.listenForSubscriptionMessages(webSocketCtx, dappID, consumerIp, replyServer, hashedParams, providerAddr, metricsData, closeSubscriptionChan) - + go cwsm.consumerMetricsManager.SetWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) return &reply, websocketRepliesChan, nil } @@ -524,12 +529,14 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), ) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonUser) return case <-replyServer.Context().Done(): utils.LavaFormatTrace("reply server context canceled", utils.LogAttr("GUID", webSocketCtx), utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)), ) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonConsumer) return default: var reply pairingtypes.RelayReply @@ -537,6 +544,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( if err != nil { // The connection was closed by the provider utils.LavaFormatTrace("error reading from subscription stream", utils.LogAttr("original error", err.Error())) + go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonProvider) return } err = cwsm.handleIncomingSubscriptionNodeMessage(hashedParams, &reply, providerAddr) @@ -545,6 +553,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages( utils.LogAttr("hashedParams", hashedParams), utils.LogAttr("reply", reply), ) + go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType) return } } diff --git a/protocol/chainlib/consumer_ws_subscription_manager_test.go b/protocol/chainlib/consumer_ws_subscription_manager_test.go index c549cb6772..9aebc649a4 100644 --- a/protocol/chainlib/consumer_ws_subscription_manager_test.go +++ b/protocol/chainlib/consumer_ws_subscription_manager_test.go @@ -27,6 +27,9 @@ import ( const ( numberOfParallelSubscriptions = 10 uniqueId = "1234" + projectHashTest = "test_projecthash" + chainIdTest = "test_chainId" + apiTypeTest = "test_apiType" ) func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *testing.T) { @@ -51,7 +54,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`), }, } - + metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest) for _, play := range playbook { t.Run(play.name, func(t *testing.T) { ts := SetupForTests(t, 1, play.specId, "../../") @@ -136,7 +139,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage()) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) uniqueIdentifiers := make([]string, numberOfParallelSubscriptions) wg := sync.WaitGroup{} wg.Add(numberOfParallelSubscriptions) @@ -151,7 +154,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes var repliesChan <-chan *pairingtypes.RelayReply var firstReply *pairingtypes.RelayReply - firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[index], nil) + firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[index], metricsData) go func() { for subMsg := range repliesChan { // utils.LavaFormatInfo("got reply for index", utils.LogAttr("index", index)) @@ -169,7 +172,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes // now we have numberOfParallelSubscriptions subscriptions currently running require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions) // remove one - err = manager.Unsubscribe(ts.Ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[0], nil) + err = manager.Unsubscribe(ts.Ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[0], metricsData) require.NoError(t, err) // now we have numberOfParallelSubscriptions - 1 require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions-1) @@ -177,7 +180,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes require.Len(t, manager.activeSubscriptions, 1) // same flow for unsubscribe all - err = manager.UnsubscribeAll(ts.Ctx, dapp, ip, uniqueIdentifiers[1], nil) + err = manager.UnsubscribeAll(ts.Ctx, dapp, ip, uniqueIdentifiers[1], metricsData) require.NoError(t, err) // now we have numberOfParallelSubscriptions - 2 require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions-2) @@ -209,7 +212,6 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`), }, } - for _, play := range playbook { t.Run(play.name, func(t *testing.T) { ts := SetupForTests(t, 1, play.specId, "../../") @@ -291,9 +293,9 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { Times(1) // Should call SendParsedRelay, because it is the first time we subscribe consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) - + metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage()) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) wg := sync.WaitGroup{} wg.Add(10) @@ -305,7 +307,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) { ctx := utils.WithUniqueIdentifier(ts.Ctx, utils.GenerateUniqueIdentifier()) var repliesChan <-chan *pairingtypes.RelayReply var firstReply *pairingtypes.RelayReply - firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp+strconv.Itoa(index), ts.Consumer.Addr.String(), uniqueId, nil) + firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp+strconv.Itoa(index), ts.Consumer.Addr.String(), uniqueId, metricsData) go func() { for subMsg := range repliesChan { require.Equal(t, string(play.subscriptionFirstReply1), string(subMsg.Data)) @@ -379,6 +381,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { unsubscribeMessage2: []byte(`{"jsonrpc":"2.0","method":"eth_unsubscribe","params":["0x2134567890"],"id":1}`), }, } + metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest) for _, play := range playbook { t.Run(play.name, func(t *testing.T) { @@ -538,12 +541,12 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String()) // Create a new ConsumerWSSubscriptionManager - manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage()) + manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil) // Start a new subscription for the first time, called SendParsedRelay once ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - firstReply, repliesChan1, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, nil) + firstReply, repliesChan1, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData) assert.NoError(t, err) unsubscribeMessageWg.Add(1) assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data)) @@ -559,7 +562,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Start a subscription again, same params, same dappKey, should not call SendParsedRelay ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - firstReply, repliesChan2, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, nil) + firstReply, repliesChan2, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData) assert.NoError(t, err) assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data)) assert.Nil(t, repliesChan2) // Same subscription, same dappKey, no need for a new channel @@ -568,7 +571,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Start a subscription again, same params, different dappKey, should not call SendParsedRelay ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - firstReply, repliesChan3, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp2, ts.Consumer.Addr.String(), uniqueId, nil) + firstReply, repliesChan3, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp2, ts.Consumer.Addr.String(), uniqueId, metricsData) assert.NoError(t, err) assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data)) assert.NotNil(t, repliesChan3) // Same subscription, but different dappKey, so will create new channel @@ -652,7 +655,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { // Start a subscription again, different params, same dappKey, should call SendParsedRelay ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - firstReply, repliesChan4, err := manager.StartSubscription(ctx, subscribeProtocolMessage2, dapp1, ts.Consumer.Addr.String(), uniqueId, nil) + firstReply, repliesChan4, err := manager.StartSubscription(ctx, subscribeProtocolMessage2, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData) assert.NoError(t, err) unsubscribeMessageWg.Add(1) assert.Equal(t, string(play.subscriptionFirstReply2), string(firstReply.Data)) @@ -671,7 +674,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) unsubProtocolMessage := NewProtocolMessage(unsubscribeChainMessage1, nil, relayResult1.Request.RelayData, dapp2, ts.Consumer.Addr.String()) - err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, nil) + err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, metricsData) require.NoError(t, err) listenForExpectedMessages(ctx, repliesChan1, string(play.subscriptionFirstReply1)) @@ -697,7 +700,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) { Times(2) // Should call SendParsedRelay, because it unsubscribed ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier()) - err = manager.UnsubscribeAll(ctx, dapp1, ts.Consumer.Addr.String(), uniqueId, nil) + err = manager.UnsubscribeAll(ctx, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData) require.NoError(t, err) expectNoMoreMessages(ctx, repliesChan1) diff --git a/protocol/metrics/metrics_consumer_manager.go b/protocol/metrics/metrics_consumer_manager.go index 2f3337e432..83cd72d025 100644 --- a/protocol/metrics/metrics_consumer_manager.go +++ b/protocol/metrics/metrics_consumer_manager.go @@ -13,6 +13,12 @@ import ( "github.com/prometheus/client_golang/prometheus/promhttp" ) +const ( + WsDisconnectionReasonConsumer = "consumer-disconnect" + WsDisconnectionReasonProvider = "provider-disconnect" + WsDisconnectionReasonUser = "user-disconnect" +) + type LatencyTracker struct { AverageLatency time.Duration // in nano seconds (time.Since result) TotalRequests int @@ -34,6 +40,10 @@ type ConsumerMetricsManager struct { totalNodeErroredRecoveryAttemptsMetric *prometheus.CounterVec totalRelaysSentToProvidersMetric *prometheus.CounterVec totalRelaysSentByNewBatchTickerMetric *prometheus.CounterVec + totalWsSubscriptionRequestsMetric *prometheus.CounterVec + totalFailedWsSubscriptionRequestsMetric *prometheus.CounterVec + totalWsSubscriptionDissconnectMetric *prometheus.CounterVec + totalDuplicatedWsSubscriptionRequestsMetric *prometheus.CounterVec blockMetric *prometheus.GaugeVec latencyMetric *prometheus.GaugeVec qosMetric *prometheus.GaugeVec @@ -88,6 +98,26 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM Help: "The total number of errors encountered by the consumer over time.", }, []string{"spec", "apiInterface"}) + totalWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_ws_subscription_requests", + Help: "The total number of websocket subscription requests over time per chain id per api interface.", + }, []string{"spec", "apiInterface"}) + + totalFailedWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_failed_ws_subscription_requests", + Help: "The total number of failed websocket subscription requests over time per chain id per api interface.", + }, []string{"spec", "apiInterface"}) + + totalDuplicatedWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_duplicated_ws_subscription_requests", + Help: "The total number of duplicated webscket subscription requests over time per chain id per api interface.", + }, []string{"spec", "apiInterface"}) + + totalWsSubscriptionDissconnectMetric := prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "lava_consumer_total_ws_subscription_disconnect", + Help: "The total number of websocket subscription disconnects over time per chain id per api interface per dissconnect reason.", + }, []string{"spec", "apiInterface", "dissconectReason"}) + blockMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "lava_latest_block", Help: "The latest block measured", @@ -196,10 +226,18 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM prometheus.MustRegister(totalNodeErroredRecoveryAttemptsMetric) prometheus.MustRegister(relayProcessingLatencyBeforeProvider) prometheus.MustRegister(relayProcessingLatencyAfterProvider) + prometheus.MustRegister(totalWsSubscriptionRequestsMetric) + prometheus.MustRegister(totalFailedWsSubscriptionRequestsMetric) + prometheus.MustRegister(totalDuplicatedWsSubscriptionRequestsMetric) + prometheus.MustRegister(totalWsSubscriptionDissconnectMetric) consumerMetricsManager := &ConsumerMetricsManager{ totalCURequestedMetric: totalCURequestedMetric, totalRelaysRequestedMetric: totalRelaysRequestedMetric, + totalWsSubscriptionRequestsMetric: totalWsSubscriptionRequestsMetric, + totalFailedWsSubscriptionRequestsMetric: totalFailedWsSubscriptionRequestsMetric, + totalDuplicatedWsSubscriptionRequestsMetric: totalDuplicatedWsSubscriptionRequestsMetric, + totalWsSubscriptionDissconnectMetric: totalWsSubscriptionDissconnectMetric, totalErroredMetric: totalErroredMetric, blockMetric: blockMetric, latencyMetric: latencyMetric, @@ -460,3 +498,31 @@ func SetVersionInner(protocolVersionMetric *prometheus.GaugeVec, version string) combined := major*1000000 + minor*1000 + patch protocolVersionMetric.WithLabelValues("version").Set(float64(combined)) } + +func (pme *ConsumerMetricsManager) SetWsSubscriptionRequestMetric(chainId string, apiInterface string) { + if pme == nil { + return + } + pme.totalWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() +} + +func (pme *ConsumerMetricsManager) SetFailedWsSubscriptionRequestMetric(chainId string, apiInterface string) { + if pme == nil { + return + } + pme.totalFailedWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() +} + +func (pme *ConsumerMetricsManager) SetDuplicatedWsSubscriptionRequestMetric(chainId string, apiInterface string) { + if pme == nil { + return + } + pme.totalDuplicatedWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc() +} + +func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chainId string, apiInterface string, disconnectReason string) { + if pme == nil { + return + } + pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc() +} diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index c919678735..1223ef34be 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -298,7 +298,7 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt if rpcEndpoint.ApiInterface == spectypes.APIInterfaceJsonRPC { specMethodType = http.MethodPost } - consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage) + consumerWsSubscriptionManager = chainlib.NewConsumerWSSubscriptionManager(consumerSessionManager, rpcConsumerServer, options.refererData, specMethodType, chainParser, activeSubscriptionProvidersStorage, consumerMetricsManager) utils.LavaFormatInfo("RPCConsumer Listening", utils.Attribute{Key: "endpoints", Value: rpcEndpoint.String()}) err = rpcConsumerServer.ServeRPCRequests(ctx, rpcEndpoint, rpcc.consumerStateTracker, chainParser, finalizationConsensus, consumerSessionManager, options.requiredResponses, privKey, lavaChainID, options.cache, rpcConsumerMetrics, consumerAddr, consumerConsistency, relaysMonitor, options.cmdFlags, options.stateShare, options.refererData, consumerReportsManager, consumerWsSubscriptionManager)