Skip to content

Commit

Permalink
feat: PRT: Add subscription metrics (#1695)
Browse files Browse the repository at this point in the history
* add subscription metrics

* fix metric typo

* fix pr

* fix typo

* fix lint

* fix tests

* Add "Connection refused" to allowedErrorsDuringEmergencyMode

* remove consumerSessionManager from consumer ws sub

* add back session t ws sub

* fix pr

* fix lint

* fix lint

* change disconnect reason map

* fix pr

* fix pr

---------

Co-authored-by: leon mandel <[email protected]>
Co-authored-by: Elad Gildnur <[email protected]>
Co-authored-by: Elad Gildnur <[email protected]>
Co-authored-by: Ran Mishael <[email protected]>
  • Loading branch information
5 people authored Sep 25, 2024
1 parent a5206e7 commit bdb5f04
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 19 deletions.
2 changes: 1 addition & 1 deletion protocol/chainlib/consumer_websocket_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (cwm *ConsumerWebsocketManager) ListenToMessages() {
continue
}

// check whether its a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
// check whether it's a normal relay / unsubscribe / unsubscribe_all otherwise its a subscription flow.
if !IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_SUBSCRIBE) {
if IsFunctionTagOfType(protocolMessage, spectypes.FUNCTION_TAG_UNSUBSCRIBE) {
err := cwm.consumerWsSubscriptionManager.Unsubscribe(webSocketCtx, protocolMessage, dappID, userIp, cwm.WebsocketConnectionUID, metricsData)
Expand Down
11 changes: 10 additions & 1 deletion protocol/chainlib/consumer_ws_subscription_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type ConsumerWSSubscriptionManager struct {
activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage
currentlyPendingSubscriptions map[string]*pendingSubscriptionsBroadcastManager
lock sync.RWMutex
consumerMetricsManager *metrics.ConsumerMetricsManager
}

func NewConsumerWSSubscriptionManager(
Expand All @@ -65,6 +66,7 @@ func NewConsumerWSSubscriptionManager(
connectionType string,
chainParser ChainParser,
activeSubscriptionProvidersStorage *lavasession.ActiveSubscriptionProvidersStorage,
consumerMetricsManager *metrics.ConsumerMetricsManager,
) *ConsumerWSSubscriptionManager {
return &ConsumerWSSubscriptionManager{
connectedDapps: make(map[string]map[string]*common.SafeChannelSender[*pairingtypes.RelayReply]),
Expand All @@ -76,6 +78,7 @@ func NewConsumerWSSubscriptionManager(
relaySender: relaySender,
connectionType: connectionType,
activeSubscriptionProvidersStorage: activeSubscriptionProvidersStorage,
consumerMetricsManager: consumerMetricsManager,
}
}

Expand Down Expand Up @@ -216,6 +219,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(

// called after send relay failure or parsing failure afterwards
onSubscriptionFailure := func() {
go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
cwsm.failedPendingSubscription(hashedParams)
closeWebsocketRepliesChannel()
}
Expand Down Expand Up @@ -255,6 +259,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
// Validated there are no active subscriptions that we can use.
firstSubscriptionReply, returnWebsocketRepliesChan := cwsm.checkForActiveSubscriptionWithLock(webSocketCtx, hashedParams, protocolMessage, dappKey, websocketRepliesSafeChannelSender, closeWebsocketRepliesChannel)
if firstSubscriptionReply != nil {
go cwsm.consumerMetricsManager.SetDuplicatedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
if returnWebsocketRepliesChan {
return firstSubscriptionReply, websocketRepliesChan, nil
}
Expand Down Expand Up @@ -412,7 +417,7 @@ func (cwsm *ConsumerWSSubscriptionManager) StartSubscription(
cwsm.successfulPendingSubscription(hashedParams)
// Need to be run once for subscription
go cwsm.listenForSubscriptionMessages(webSocketCtx, dappID, consumerIp, replyServer, hashedParams, providerAddr, metricsData, closeSubscriptionChan)

go cwsm.consumerMetricsManager.SetWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
return &reply, websocketRepliesChan, nil
}

Expand Down Expand Up @@ -524,19 +529,22 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages(
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
)
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonUser)
return
case <-replyServer.Context().Done():
utils.LavaFormatTrace("reply server context canceled",
utils.LogAttr("GUID", webSocketCtx),
utils.LogAttr("hashedParams", utils.ToHexString(hashedParams)),
)
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonConsumer)
return
default:
var reply pairingtypes.RelayReply
err := replyServer.RecvMsg(&reply)
if err != nil {
// The connection was closed by the provider
utils.LavaFormatTrace("error reading from subscription stream", utils.LogAttr("original error", err.Error()))
go cwsm.consumerMetricsManager.SetWsSubscriptioDisconnectRequestMetric(metricsData.ChainID, metricsData.APIType, metrics.WsDisconnectionReasonProvider)
return
}
err = cwsm.handleIncomingSubscriptionNodeMessage(hashedParams, &reply, providerAddr)
Expand All @@ -545,6 +553,7 @@ func (cwsm *ConsumerWSSubscriptionManager) listenForSubscriptionMessages(
utils.LogAttr("hashedParams", hashedParams),
utils.LogAttr("reply", reply),
)
go cwsm.consumerMetricsManager.SetFailedWsSubscriptionRequestMetric(metricsData.ChainID, metricsData.APIType)
return
}
}
Expand Down
35 changes: 19 additions & 16 deletions protocol/chainlib/consumer_ws_subscription_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
const (
numberOfParallelSubscriptions = 10
uniqueId = "1234"
projectHashTest = "test_projecthash"
chainIdTest = "test_chainId"
apiTypeTest = "test_apiType"
)

func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *testing.T) {
Expand All @@ -51,7 +54,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`),
},
}

metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)
for _, play := range playbook {
t.Run(play.name, func(t *testing.T) {
ts := SetupForTests(t, 1, play.specId, "../../")
Expand Down Expand Up @@ -136,7 +139,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())

// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage())
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)
uniqueIdentifiers := make([]string, numberOfParallelSubscriptions)
wg := sync.WaitGroup{}
wg.Add(numberOfParallelSubscriptions)
Expand All @@ -151,7 +154,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
var repliesChan <-chan *pairingtypes.RelayReply
var firstReply *pairingtypes.RelayReply

firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[index], nil)
firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[index], metricsData)
go func() {
for subMsg := range repliesChan {
// utils.LavaFormatInfo("got reply for index", utils.LogAttr("index", index))
Expand All @@ -169,15 +172,15 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptionsOnSameDappIdIp(t *tes
// now we have numberOfParallelSubscriptions subscriptions currently running
require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions)
// remove one
err = manager.Unsubscribe(ts.Ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[0], nil)
err = manager.Unsubscribe(ts.Ctx, protocolMessage1, dapp, ip, uniqueIdentifiers[0], metricsData)
require.NoError(t, err)
// now we have numberOfParallelSubscriptions - 1
require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions-1)
// check we still have an active subscription.
require.Len(t, manager.activeSubscriptions, 1)

// same flow for unsubscribe all
err = manager.UnsubscribeAll(ts.Ctx, dapp, ip, uniqueIdentifiers[1], nil)
err = manager.UnsubscribeAll(ts.Ctx, dapp, ip, uniqueIdentifiers[1], metricsData)
require.NoError(t, err)
// now we have numberOfParallelSubscriptions - 2
require.Len(t, manager.connectedDapps, numberOfParallelSubscriptions-2)
Expand Down Expand Up @@ -209,7 +212,6 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {
subscriptionFirstReply2: []byte(`{"jsonrpc":"2.0","id":4,"result":{}}`),
},
}

for _, play := range playbook {
t.Run(play.name, func(t *testing.T) {
ts := SetupForTests(t, 1, play.specId, "../../")
Expand Down Expand Up @@ -291,9 +293,9 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {
Times(1) // Should call SendParsedRelay, because it is the first time we subscribe

consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())

metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)
// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage())
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)

wg := sync.WaitGroup{}
wg.Add(10)
Expand All @@ -305,7 +307,7 @@ func TestConsumerWSSubscriptionManagerParallelSubscriptions(t *testing.T) {
ctx := utils.WithUniqueIdentifier(ts.Ctx, utils.GenerateUniqueIdentifier())
var repliesChan <-chan *pairingtypes.RelayReply
var firstReply *pairingtypes.RelayReply
firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp+strconv.Itoa(index), ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan, err = manager.StartSubscription(ctx, protocolMessage1, dapp+strconv.Itoa(index), ts.Consumer.Addr.String(), uniqueId, metricsData)
go func() {
for subMsg := range repliesChan {
require.Equal(t, string(play.subscriptionFirstReply1), string(subMsg.Data))
Expand Down Expand Up @@ -379,6 +381,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
unsubscribeMessage2: []byte(`{"jsonrpc":"2.0","method":"eth_unsubscribe","params":["0x2134567890"],"id":1}`),
},
}
metricsData := metrics.NewRelayAnalytics(projectHashTest, chainIdTest, apiTypeTest)

for _, play := range playbook {
t.Run(play.name, func(t *testing.T) {
Expand Down Expand Up @@ -538,12 +541,12 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
consumerSessionManager := CreateConsumerSessionManager(play.specId, play.apiInterface, ts.Consumer.Addr.String())

// Create a new ConsumerWSSubscriptionManager
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage())
manager := NewConsumerWSSubscriptionManager(consumerSessionManager, relaySender, nil, play.connectionType, chainParser, lavasession.NewActiveSubscriptionProvidersStorage(), nil)

// Start a new subscription for the first time, called SendParsedRelay once
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())

firstReply, repliesChan1, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan1, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
unsubscribeMessageWg.Add(1)
assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data))
Expand All @@ -559,7 +562,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

// Start a subscription again, same params, same dappKey, should not call SendParsedRelay
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
firstReply, repliesChan2, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan2, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data))
assert.Nil(t, repliesChan2) // Same subscription, same dappKey, no need for a new channel
Expand All @@ -568,7 +571,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

// Start a subscription again, same params, different dappKey, should not call SendParsedRelay
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
firstReply, repliesChan3, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp2, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan3, err := manager.StartSubscription(ctx, subscribeProtocolMessage1, dapp2, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
assert.Equal(t, string(play.subscriptionFirstReply1), string(firstReply.Data))
assert.NotNil(t, repliesChan3) // Same subscription, but different dappKey, so will create new channel
Expand Down Expand Up @@ -652,7 +655,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
// Start a subscription again, different params, same dappKey, should call SendParsedRelay
ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())

firstReply, repliesChan4, err := manager.StartSubscription(ctx, subscribeProtocolMessage2, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
firstReply, repliesChan4, err := manager.StartSubscription(ctx, subscribeProtocolMessage2, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
assert.NoError(t, err)
unsubscribeMessageWg.Add(1)
assert.Equal(t, string(play.subscriptionFirstReply2), string(firstReply.Data))
Expand All @@ -671,7 +674,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {

ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
unsubProtocolMessage := NewProtocolMessage(unsubscribeChainMessage1, nil, relayResult1.Request.RelayData, dapp2, ts.Consumer.Addr.String())
err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, nil)
err = manager.Unsubscribe(ctx, unsubProtocolMessage, dapp2, ts.Consumer.Addr.String(), uniqueId, metricsData)
require.NoError(t, err)

listenForExpectedMessages(ctx, repliesChan1, string(play.subscriptionFirstReply1))
Expand All @@ -697,7 +700,7 @@ func TestConsumerWSSubscriptionManager(t *testing.T) {
Times(2) // Should call SendParsedRelay, because it unsubscribed

ctx = utils.WithUniqueIdentifier(ctx, utils.GenerateUniqueIdentifier())
err = manager.UnsubscribeAll(ctx, dapp1, ts.Consumer.Addr.String(), uniqueId, nil)
err = manager.UnsubscribeAll(ctx, dapp1, ts.Consumer.Addr.String(), uniqueId, metricsData)
require.NoError(t, err)

expectNoMoreMessages(ctx, repliesChan1)
Expand Down
66 changes: 66 additions & 0 deletions protocol/metrics/metrics_consumer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const (
WsDisconnectionReasonConsumer = "consumer-disconnect"
WsDisconnectionReasonProvider = "provider-disconnect"
WsDisconnectionReasonUser = "user-disconnect"
)

type LatencyTracker struct {
AverageLatency time.Duration // in nano seconds (time.Since result)
TotalRequests int
Expand All @@ -34,6 +40,10 @@ type ConsumerMetricsManager struct {
totalNodeErroredRecoveryAttemptsMetric *prometheus.CounterVec
totalRelaysSentToProvidersMetric *prometheus.CounterVec
totalRelaysSentByNewBatchTickerMetric *prometheus.CounterVec
totalWsSubscriptionRequestsMetric *prometheus.CounterVec
totalFailedWsSubscriptionRequestsMetric *prometheus.CounterVec
totalWsSubscriptionDissconnectMetric *prometheus.CounterVec
totalDuplicatedWsSubscriptionRequestsMetric *prometheus.CounterVec
blockMetric *prometheus.GaugeVec
latencyMetric *prometheus.GaugeVec
qosMetric *prometheus.GaugeVec
Expand Down Expand Up @@ -88,6 +98,26 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
Help: "The total number of errors encountered by the consumer over time.",
}, []string{"spec", "apiInterface"})

totalWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_ws_subscription_requests",
Help: "The total number of websocket subscription requests over time per chain id per api interface.",
}, []string{"spec", "apiInterface"})

totalFailedWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_failed_ws_subscription_requests",
Help: "The total number of failed websocket subscription requests over time per chain id per api interface.",
}, []string{"spec", "apiInterface"})

totalDuplicatedWsSubscriptionRequestsMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_duplicated_ws_subscription_requests",
Help: "The total number of duplicated webscket subscription requests over time per chain id per api interface.",
}, []string{"spec", "apiInterface"})

totalWsSubscriptionDissconnectMetric := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "lava_consumer_total_ws_subscription_disconnect",
Help: "The total number of websocket subscription disconnects over time per chain id per api interface per dissconnect reason.",
}, []string{"spec", "apiInterface", "dissconectReason"})

blockMetric := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "lava_latest_block",
Help: "The latest block measured",
Expand Down Expand Up @@ -196,10 +226,18 @@ func NewConsumerMetricsManager(options ConsumerMetricsManagerOptions) *ConsumerM
prometheus.MustRegister(totalNodeErroredRecoveryAttemptsMetric)
prometheus.MustRegister(relayProcessingLatencyBeforeProvider)
prometheus.MustRegister(relayProcessingLatencyAfterProvider)
prometheus.MustRegister(totalWsSubscriptionRequestsMetric)
prometheus.MustRegister(totalFailedWsSubscriptionRequestsMetric)
prometheus.MustRegister(totalDuplicatedWsSubscriptionRequestsMetric)
prometheus.MustRegister(totalWsSubscriptionDissconnectMetric)

consumerMetricsManager := &ConsumerMetricsManager{
totalCURequestedMetric: totalCURequestedMetric,
totalRelaysRequestedMetric: totalRelaysRequestedMetric,
totalWsSubscriptionRequestsMetric: totalWsSubscriptionRequestsMetric,
totalFailedWsSubscriptionRequestsMetric: totalFailedWsSubscriptionRequestsMetric,
totalDuplicatedWsSubscriptionRequestsMetric: totalDuplicatedWsSubscriptionRequestsMetric,
totalWsSubscriptionDissconnectMetric: totalWsSubscriptionDissconnectMetric,
totalErroredMetric: totalErroredMetric,
blockMetric: blockMetric,
latencyMetric: latencyMetric,
Expand Down Expand Up @@ -460,3 +498,31 @@ func SetVersionInner(protocolVersionMetric *prometheus.GaugeVec, version string)
combined := major*1000000 + minor*1000 + patch
protocolVersionMetric.WithLabelValues("version").Set(float64(combined))
}

func (pme *ConsumerMetricsManager) SetWsSubscriptionRequestMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) SetFailedWsSubscriptionRequestMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalFailedWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) SetDuplicatedWsSubscriptionRequestMetric(chainId string, apiInterface string) {
if pme == nil {
return
}
pme.totalDuplicatedWsSubscriptionRequestsMetric.WithLabelValues(chainId, apiInterface).Inc()
}

func (pme *ConsumerMetricsManager) SetWsSubscriptioDisconnectRequestMetric(chainId string, apiInterface string, disconnectReason string) {
if pme == nil {
return
}
pme.totalWsSubscriptionDissconnectMetric.WithLabelValues(chainId, apiInterface, disconnectReason).Inc()
}
Loading

0 comments on commit bdb5f04

Please sign in to comment.