From 7bfb3147220de57c8ce671f8f3b56524c512948e Mon Sep 17 00:00:00 2001 From: Elad Gildnur Date: Mon, 9 Dec 2024 16:55:03 +0200 Subject: [PATCH] Add the consumer geolocation to the QoS server report --- protocol/metrics/consumer_optimizer_qos_client.go | 6 +++++- protocol/provideroptimizer/provider_optimizer_test.go | 2 +- protocol/rpcconsumer/rpcconsumer.go | 6 ++++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/protocol/metrics/consumer_optimizer_qos_client.go b/protocol/metrics/consumer_optimizer_qos_client.go index 72183853d9..625a67433a 100644 --- a/protocol/metrics/consumer_optimizer_qos_client.go +++ b/protocol/metrics/consumer_optimizer_qos_client.go @@ -32,6 +32,7 @@ type ConsumerOptimizerQoSClient struct { currentEpoch atomic.Uint64 lock sync.RWMutex reportsToSend []OptimizerQoSReportToSend + geoLocation uint64 } type OptimizerQoSReport struct { @@ -57,6 +58,7 @@ type OptimizerQoSReportToSend struct { Epoch uint64 `json:"epoch"` ProviderStake int64 `json:"provider_stake"` EntryIndex int `json:"entry_index"` + GeoLocation uint64 `json:"geo_location"` } func (oqosr OptimizerQoSReportToSend) String() string { @@ -71,7 +73,7 @@ type OptimizerInf interface { CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*OptimizerQoSReport } -func NewConsumerOptimizerQoSClient(consumerAddress, endpointAddress string, interval ...time.Duration) *ConsumerOptimizerQoSClient { +func NewConsumerOptimizerQoSClient(consumerAddress, endpointAddress string, geoLocation uint64, interval ...time.Duration) *ConsumerOptimizerQoSClient { hostname, err := os.Hostname() if err != nil { utils.LavaFormatWarning("Error while getting hostname for ConsumerOptimizerQoSClient", err) @@ -85,6 +87,7 @@ func NewConsumerOptimizerQoSClient(consumerAddress, endpointAddress string, inte chainIdToProviderToRelaysCount: map[string]map[string]uint64{}, chainIdToProviderToNodeErrorsCount: map[string]map[string]uint64{}, chainIdToProviderToEpochToStake: map[string]map[string]map[uint64]int64{}, + geoLocation: geoLocation, } } @@ -145,6 +148,7 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz Epoch: epoch, NodeErrorRate: coqc.calculateNodeErrorRate(chainId, report.ProviderAddress), ProviderStake: coqc.getProviderChainStake(chainId, report.ProviderAddress, epoch), + GeoLocation: coqc.geoLocation, } coqc.queueSender.appendQueue(optimizerQoSReportToSend) diff --git a/protocol/provideroptimizer/provider_optimizer_test.go b/protocol/provideroptimizer/provider_optimizer_test.go index 6de13de8b6..8462f69276 100644 --- a/protocol/provideroptimizer/provider_optimizer_test.go +++ b/protocol/provideroptimizer/provider_optimizer_test.go @@ -781,7 +781,7 @@ func TestProviderOptimizerWithOptimizerQoSClient(t *testing.T) { chainId := "dontcare" - consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient("lava@test", mockHttpServer.URL, 1*time.Second) + consumerOptimizerQoSClient := metrics.NewConsumerOptimizerQoSClient("lava@test", mockHttpServer.URL, 1, 1*time.Second) consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(context.Background(), 900*time.Millisecond) providerOptimizer := NewProviderOptimizer(STRATEGY_BALANCED, TEST_AVERAGE_BLOCK_TIME, TEST_BASE_WORLD_LATENCY, 10, consumerOptimizerQoSClient, chainId) diff --git a/protocol/rpcconsumer/rpcconsumer.go b/protocol/rpcconsumer/rpcconsumer.go index 5e84e56511..445c9058dd 100644 --- a/protocol/rpcconsumer/rpcconsumer.go +++ b/protocol/rpcconsumer/rpcconsumer.go @@ -129,6 +129,7 @@ type rpcConsumerStartOptions struct { stateShare bool refererData *chainlib.RefererData staticProvidersList []*lavasession.RPCProviderEndpoint // define static providers as backup to lava providers + geoLocation uint64 } func getConsumerAddressAndKeys(clientCtx client.Context) (sdk.AccAddress, *secp256k1.PrivateKey, error) { @@ -174,8 +175,8 @@ func (rpcc *RPCConsumer) Start(ctx context.Context, options *rpcConsumerStartOpt consumerUsageServeManager := metrics.NewConsumerRelayServerClient(options.analyticsServerAddresses.RelayServerAddress) // start up relay server reporting var consumerOptimizerQoSClient *metrics.ConsumerOptimizerQoSClient if options.analyticsServerAddresses.OptimizerQoSAddress != "" || options.analyticsServerAddresses.OptimizerQoSListen { - consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(consumerAddr.String(), options.analyticsServerAddresses.OptimizerQoSAddress, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client - consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) // start up optimizer qos client + consumerOptimizerQoSClient = metrics.NewConsumerOptimizerQoSClient(consumerAddr.String(), options.analyticsServerAddresses.OptimizerQoSAddress, options.geoLocation, metrics.OptimizerQosServerPushInterval) // start up optimizer qos client + consumerOptimizerQoSClient.StartOptimizersQoSReportsCollecting(ctx, metrics.OptimizerQosServerSamplingInterval) } consumerMetricsManager := metrics.NewConsumerMetricsManager(metrics.ConsumerMetricsManagerOptions{ NetworkAddress: options.analyticsServerAddresses.MetricsListenAddress, @@ -716,6 +717,7 @@ rpcconsumer consumer_examples/full_consumer_example.yml --cache-be "127.0.0.1:77 rpcConsumerSharedState, refererData, staticProviderEndpoints, + geolocation, }) return err },