diff --git a/x/pairing/keeper/epoch_payments.go b/x/pairing/keeper/epoch_payments.go index 050fbba64c..d742ffc723 100644 --- a/x/pairing/keeper/epoch_payments.go +++ b/x/pairing/keeper/epoch_payments.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" + "github.com/cosmos/cosmos-sdk/store/cachekv" "github.com/cosmos/cosmos-sdk/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/lavanet/lava/utils" @@ -37,6 +38,24 @@ func (k Keeper) GetEpochPayments( return val, true } +func (k EpochPaymentHandler) SetEpochPaymentsCached(ctx sdk.Context, epochPayments types.EpochPayments) { + b := k.cdc.MustMarshal(&epochPayments) + k.EpochPaymentsCache.Set(types.EpochPaymentsKey(epochPayments.Index), b) +} + +func (k EpochPaymentHandler) GetEpochPaymentsCached( + ctx sdk.Context, + index string, +) (val types.EpochPayments, found bool) { + b := k.EpochPaymentsCache.Get(types.EpochPaymentsKey(index)) + if b == nil { + return val, false + } + + k.cdc.MustUnmarshal(b, &val) + return val, true +} + // RemoveEpochPayments removes a epochPayments from the store func (k Keeper) RemoveEpochPayments( ctx sdk.Context, @@ -73,13 +92,33 @@ func (k Keeper) RemoveOldEpochPayment(ctx sdk.Context) { // Function to get the epochPayments object from a specific epoch. Note that it also returns the epochPayments object's key which is the epoch in hex representation (base 16) func (k Keeper) GetEpochPaymentsFromBlock(ctx sdk.Context, epoch uint64) (epochPayment types.EpochPayments, found bool, key string) { - key = strconv.FormatUint(epoch, 16) + key = epochPaymentKey(epoch) epochPayment, found = k.GetEpochPayments(ctx, key) return } +func epochPaymentKey(epoch uint64) string { + return strconv.FormatUint(epoch, 16) +} + +type EpochPaymentHandler struct { + Keeper + EpochPaymentsCache *cachekv.Store + ProviderPaymentStorageCache *cachekv.Store + UniquePaymentStorageClientProviderCache *cachekv.Store +} + +func (k Keeper) NewEpochPaymentHandler(ctx sdk.Context) EpochPaymentHandler { + return EpochPaymentHandler{ + Keeper: k, + EpochPaymentsCache: cachekv.NewStore(prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.EpochPaymentsKeyPrefix))), + ProviderPaymentStorageCache: cachekv.NewStore(prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.ProviderPaymentStorageKeyPrefix))), + UniquePaymentStorageClientProviderCache: cachekv.NewStore(prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.UniquePaymentStorageClientProviderKeyPrefix))), + } +} + // Function to add an epoch payment to the epochPayments object -func (k Keeper) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, projectID string, providerAddress sdk.AccAddress, usedCU uint64, uniqueIdentifier string) uint64 { +func (k EpochPaymentHandler) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, projectID string, providerAddress sdk.AccAddress, usedCU uint64, uniqueIdentifier string) uint64 { if epoch < k.epochStorageKeeper.GetEarliestEpochStart(ctx) { return 0 } @@ -88,7 +127,8 @@ func (k Keeper) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, p userPaymentProviderStorage, usedCUProviderTotal := k.AddProviderPaymentInEpoch(ctx, chainID, epoch, projectID, providerAddress, usedCU, uniqueIdentifier) // get this epoch's epochPayments object - epochPayments, found, key := k.GetEpochPaymentsFromBlock(ctx, epoch) + key := epochPaymentKey(epoch) + epochPayments, found := k.GetEpochPaymentsCached(ctx, key) if !found { // this epoch doesn't have a epochPayments object, create one with the providerPaymentStorage object from before epochPayments = types.EpochPayments{Index: key, ProviderPaymentStorageKeys: []string{userPaymentProviderStorage.GetIndex()}} @@ -110,7 +150,7 @@ func (k Keeper) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, p } // update the epochPayments object - k.SetEpochPayments(ctx, epochPayments) + k.SetEpochPaymentsCached(ctx, epochPayments) return usedCUProviderTotal } @@ -164,7 +204,7 @@ func (k Keeper) RemoveAllEpochPaymentsForBlockAppendAdjustments(ctx sdk.Context, // delete the uniquePaymentStorageClientProvider object k.RemoveUniquePaymentStorageClientProvider(ctx, uniquePaymentStorage.Index) - consumer := k.GetConsumerFromUniquePayment(&uniquePaymentStorage) + consumer := k.GetConsumerFromUniquePayment(uniquePaymentStorageKey) provider, err := k.GetProviderFromProviderPaymentStorage(&providerPaymentStorage) if err != nil { diff --git a/x/pairing/keeper/msg_server_relay_payment.go b/x/pairing/keeper/msg_server_relay_payment.go index 2dce3370d0..96b064ec1f 100644 --- a/x/pairing/keeper/msg_server_relay_payment.go +++ b/x/pairing/keeper/msg_server_relay_payment.go @@ -41,6 +41,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen ctx := sdk.UnwrapSDKContext(goCtx) logger := k.Logger(ctx) + paymentHandler := k.NewEpochPaymentHandler(ctx) lavaChainID := ctx.BlockHeader().ChainID creator, err := sdk.AccAddressFromBech32(msg.Creator) if err != nil { @@ -157,7 +158,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen // if they failed (one relay should affect all of them). From here on, every check will // fail the TX *** - totalCUInEpochForUserProvider := k.Keeper.AddEpochPayment(ctx, relay.SpecId, epochStart, project.Index, providerAddr, relay.CuSum, strconv.FormatUint(relay.SessionId, 16)) + totalCUInEpochForUserProvider := paymentHandler.AddEpochPayment(ctx, relay.SpecId, epochStart, project.Index, providerAddr, relay.CuSum, strconv.FormatUint(relay.SessionId, 16)) ctx.GasMeter().RefundGas(ctx.GasMeter().GasConsumed(), "") if badgeFound { k.handleBadgeCu(ctx, badgeData, relay.Provider, relay.CuSum, newBadgeTimerExpiry) @@ -274,7 +275,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen } // update provider payment storage with complainer's CU - err = k.updateProviderPaymentStorageWithComplainerCU(ctx, relay.UnresponsiveProviders, logger, epochStart, relay.SpecId, cuAfterQos, providers, project.Index) + err = paymentHandler.updateProviderPaymentStorageWithComplainerCU(ctx, relay.UnresponsiveProviders, logger, epochStart, relay.SpecId, cuAfterQos, providers, project.Index) if err != nil { var reportedProviders []string for _, p := range relay.UnresponsiveProviders { @@ -317,6 +318,10 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen } utils.LogLavaEvent(ctx, logger, types.LatestBlocksReportEventName, latestBlockReports, "New LatestBlocks Report for provider") + paymentHandler.EpochPaymentsCache.Write() + paymentHandler.ProviderPaymentStorageCache.Write() + paymentHandler.UniquePaymentStorageClientProviderCache.Write() + // consume constant gas (dependent on the number of relays) ctx.GasMeter().RefundGas(ctx.GasMeter().GasConsumed(), "") ctx.GasMeter().ConsumeGas(uint64(10000+100000*len(msg.Relays)), "") @@ -335,7 +340,7 @@ func (k msgServer) setStakeEntryBlockReport(ctx sdk.Context, providerAddr sdk.Ac } } -func (k msgServer) updateProviderPaymentStorageWithComplainerCU(ctx sdk.Context, unresponsiveProviders []*types.ReportedProvider, logger log.Logger, epoch uint64, chainID string, cuSum uint64, providersToPair []epochstoragetypes.StakeEntry, projectID string) error { +func (k EpochPaymentHandler) updateProviderPaymentStorageWithComplainerCU(ctx sdk.Context, unresponsiveProviders []*types.ReportedProvider, logger log.Logger, epoch uint64, chainID string, cuSum uint64, providersToPair []epochstoragetypes.StakeEntry, projectID string) error { // check that unresponsiveData exists if len(unresponsiveProviders) == 0 { return nil @@ -371,32 +376,28 @@ func (k msgServer) updateProviderPaymentStorageWithComplainerCU(ctx sdk.Context, } // get this epoch's epochPayments object - epochPayments, found, key := k.GetEpochPaymentsFromBlock(ctx, epoch) + epochPayments, found := k.GetEpochPaymentsCached(ctx, epochPaymentKey(epoch)) if !found { // the epochPayments object should exist since we already paid. if not found, print an error and continue - utils.LavaFormatError("did not find epochPayments object", err, utils.Attribute{Key: "epochPaymentsKey", Value: key}) + utils.LavaFormatError("did not find epochPayments object", err, utils.Attribute{Key: "epochPaymentsKey", Value: epoch}) continue } // get the providerPaymentStorage object using the providerStorageKey providerStorageKey := k.GetProviderPaymentStorageKey(ctx, chainID, epoch, sdkUnresponsiveProviderAddress) - providerPaymentStorage, found := k.GetProviderPaymentStorage(ctx, providerStorageKey) - + providerPaymentStorage, found := k.GetProviderPaymentStorageCached(ctx, providerStorageKey) if !found { // providerPaymentStorage not found (this provider has no payments in this epoch and also no complaints) -> we need to add one complaint - emptyProviderPaymentStorageWithComplaint := types.ProviderPaymentStorage{ + providerPaymentStorage := types.ProviderPaymentStorage{ Index: providerStorageKey, UniquePaymentStorageClientProviderKeys: []string{}, Epoch: epoch, ComplainersTotalCu: uint64(0), } - // append the emptyProviderPaymentStorageWithComplaint to the epochPayments object's providerPaymentStorages - epochPayments.ProviderPaymentStorageKeys = append(epochPayments.GetProviderPaymentStorageKeys(), emptyProviderPaymentStorageWithComplaint.GetIndex()) - k.SetEpochPayments(ctx, epochPayments) - - // assign providerPaymentStorage with the new empty providerPaymentStorage - providerPaymentStorage = emptyProviderPaymentStorageWithComplaint + // append the providerPaymentStorage to the epochPayments object's providerPaymentStorages + epochPayments.ProviderPaymentStorageKeys = append(epochPayments.GetProviderPaymentStorageKeys(), providerPaymentStorage.GetIndex()) + k.SetEpochPaymentsCached(ctx, epochPayments) } // add complainer's used CU to providerPaymentStorage @@ -404,7 +405,7 @@ func (k msgServer) updateProviderPaymentStorageWithComplainerCU(ctx sdk.Context, timestamp := time.Unix(unresponsiveProvider.TimestampS, 0) utils.LogLavaEvent(ctx, logger, types.ProviderReportedEventName, map[string]string{"provider": unresponsiveProvider.GetAddress(), "timestamp": timestamp.Format(time.DateTime), "disconnections": strconv.FormatUint(unresponsiveProvider.GetDisconnections(), 10), "errors": strconv.FormatUint(unresponsiveProvider.GetErrors(), 10), "project": projectID, "cu": strconv.FormatUint(complainerCuToAdd, 10), "epoch": strconv.FormatUint(epoch, 10), "total_complaint_this_epoch": strconv.FormatUint(providerPaymentStorage.ComplainersTotalCu, 10)}, "provider got reported by consumer") // set the final provider payment storage state including the complaints - k.SetProviderPaymentStorage(ctx, providerPaymentStorage) + k.SetProviderPaymentStorageCached(ctx, providerPaymentStorage) } return nil diff --git a/x/pairing/keeper/msg_server_relay_payment_test.go b/x/pairing/keeper/msg_server_relay_payment_test.go index ca66d09e06..47c5663a57 100644 --- a/x/pairing/keeper/msg_server_relay_payment_test.go +++ b/x/pairing/keeper/msg_server_relay_payment_test.go @@ -962,3 +962,56 @@ func TestIntOverflow(t *testing.T) { ts.AdvanceEpoch() } } + +func TestPairingCaching(t *testing.T) { + ts := newTester(t) + ts.setupForPayments(3, 3, 0) // 3 provider, 3 client, default providers-to-pair + + ts.AdvanceEpoch() + + relayNum := uint64(0) + totalCU := uint64(0) + // trigger relay payment with cache + for i := 0; i < 3; i++ { + relays := []*types.RelaySession{} + _, provider1Addr := ts.GetAccount(common.PROVIDER, i) + for i := 0; i < 3; i++ { + consumerAcct, _ := ts.GetAccount(common.CONSUMER, i) + totalCU = 0 + for i := 0; i < 50; i++ { + totalCU += uint64(i) + relaySession := ts.newRelaySession(provider1Addr, relayNum, uint64(i), ts.BlockHeight(), 0) + sig, err := sigs.Sign(consumerAcct.SK, *relaySession) + relaySession.Sig = sig + require.NoError(t, err) + relays = append(relays, relaySession) + relayNum++ + } + } + _, err := ts.TxPairingRelayPayment(provider1Addr, relays...) + require.NoError(t, err) + } + + epochPayment, found, _ := ts.Keepers.Pairing.GetEpochPaymentsFromBlock(ts.Ctx, ts.EpochStart()) + require.True(t, found) + require.Len(t, epochPayment.ProviderPaymentStorageKeys, 3) + + UniquePayments := ts.Keepers.Pairing.GetAllUniquePaymentStorageClientProvider(ts.Ctx) + require.Len(t, UniquePayments, 3*3*50) + + storages := ts.Keepers.Pairing.GetAllProviderPaymentStorage(ts.Ctx) + for _, storage := range storages { + require.Len(t, storage.UniquePaymentStorageClientProviderKeys, 3*50) + } + + for i := 0; i < 3; i++ { + consumerAcct, _ := ts.GetAccount(common.CONSUMER, i) + project, err := ts.GetProjectForDeveloper(consumerAcct.Addr.String(), ts.BlockHeight()) + require.NoError(t, err) + require.Equal(t, totalCU*3, project.UsedCu) + + sub, err := ts.QuerySubscriptionCurrent(consumerAcct.Addr.String()) + require.NoError(t, err) + require.Equal(t, totalCU*3, sub.Sub.MonthCuTotal-sub.Sub.MonthCuLeft) + } +} diff --git a/x/pairing/keeper/provider_payment_storage.go b/x/pairing/keeper/provider_payment_storage.go index 1774347efb..f855aa4f26 100644 --- a/x/pairing/keeper/provider_payment_storage.go +++ b/x/pairing/keeper/provider_payment_storage.go @@ -38,6 +38,24 @@ func (k Keeper) GetProviderPaymentStorage( return val, true } +func (k EpochPaymentHandler) SetProviderPaymentStorageCached(ctx sdk.Context, providerPaymentStorage types.ProviderPaymentStorage) { + b := k.cdc.MustMarshal(&providerPaymentStorage) + k.ProviderPaymentStorageCache.Set(types.ProviderPaymentStorageKey(providerPaymentStorage.Index), b) +} + +func (k EpochPaymentHandler) GetProviderPaymentStorageCached( + ctx sdk.Context, + index string, +) (val types.ProviderPaymentStorage, found bool) { + b := k.ProviderPaymentStorageCache.Get(types.ProviderPaymentStorageKey(index)) + if b == nil { + return val, false + } + + k.cdc.MustUnmarshal(b, &val) + return val, true +} + // RemoveProviderPaymentStorage removes a providerPaymentStorage from the store func (k Keeper) RemoveProviderPaymentStorage( ctx sdk.Context, @@ -81,13 +99,13 @@ func (k Keeper) GetProviderPaymentStorageKey(ctx sdk.Context, chainID string, ep } // Function to add a payment (which is represented by a uniquePaymentStorageClientProvider object) to a providerPaymentStorage object -func (k Keeper) AddProviderPaymentInEpoch(ctx sdk.Context, chainID string, epoch uint64, projectID string, providerAddress sdk.AccAddress, usedCU uint64, uniqueIdentifier string) (userPayment *types.ProviderPaymentStorage, usedCUConsumerTotal uint64) { +func (k EpochPaymentHandler) AddProviderPaymentInEpoch(ctx sdk.Context, chainID string, epoch uint64, projectID string, providerAddress sdk.AccAddress, usedCU uint64, uniqueIdentifier string) (userPayment *types.ProviderPaymentStorage, usedCUConsumerTotal uint64) { // create an uniquePaymentStorageClientProvider object and set it in the KVStore uniquePaymentStorageClientProviderEntryAddr := k.AddUniquePaymentStorageClientProvider(ctx, chainID, epoch, projectID, providerAddress, uniqueIdentifier, usedCU) // get the providerPaymentStorage object providerPaymentStorageKey := k.GetProviderPaymentStorageKey(ctx, chainID, epoch, providerAddress) - userPaymentStorageInEpoch, found := k.GetProviderPaymentStorage(ctx, providerPaymentStorageKey) + userPaymentStorageInEpoch, found := k.GetProviderPaymentStorageCached(ctx, providerPaymentStorageKey) if !found { // is new entry -> create a new providerPaymentStorage object userPaymentStorageInEpoch = types.ProviderPaymentStorage{Index: providerPaymentStorageKey, UniquePaymentStorageClientProviderKeys: []string{uniquePaymentStorageClientProviderEntryAddr.GetIndex()}, Epoch: epoch} @@ -101,19 +119,29 @@ func (k Keeper) AddProviderPaymentInEpoch(ctx sdk.Context, chainID string, epoch } // set the providerPaymentStorage object in the KVStore - k.SetProviderPaymentStorage(ctx, userPaymentStorageInEpoch) + k.SetProviderPaymentStorageCached(ctx, userPaymentStorageInEpoch) return &userPaymentStorageInEpoch, usedCUConsumerTotal } // Function to get the total serviced CU by a provider in this epoch for a specific consumer -func (k Keeper) GetTotalUsedCUForConsumerPerEpoch(ctx sdk.Context, projectID string, uniquePaymentStorageKeys []string, providerAddress string) uint64 { +func (k EpochPaymentHandler) GetTotalUsedCUForConsumerPerEpoch(ctx sdk.Context, projectID string, uniquePaymentStorageKeys []string, providerAddress string) uint64 { usedCUProviderTotal := uint64(0) // go over the uniquePaymentStorageKeys for _, uniquePaymentKey := range uniquePaymentStorageKeys { + // if the uniquePaymentStorageClientProvider object is not between the provider and the specific consumer, continue + if k.GetConsumerFromUniquePayment(uniquePaymentKey) != projectID { + continue + } // get the uniquePaymentStorageClientProvider object - uniquePayment, found := k.GetUniquePaymentStorageClientProvider(ctx, uniquePaymentKey) + uniquePayment, found := k.GetUniquePaymentStorageClientProviderCached(ctx, uniquePaymentKey) + if !found { + uniquePayment, found = k.GetUniquePaymentStorageClientProvider(ctx, uniquePaymentKey) + if found { + k.SetUniquePaymentStorageClientProviderCached(ctx, uniquePayment) + } + } if !found { utils.LavaFormatError("could not find uniquePaymentStorageClientProvider object", fmt.Errorf("unique payment object not found"), utils.Attribute{Key: "providerAddress", Value: providerAddress}, @@ -121,11 +149,7 @@ func (k Keeper) GetTotalUsedCUForConsumerPerEpoch(ctx sdk.Context, projectID str ) continue } - - // if the uniquePaymentStorageClientProvider object is between the provider and the specific consumer, add the serviced CU - if k.GetConsumerFromUniquePayment(&uniquePayment) == projectID { - usedCUProviderTotal += uniquePayment.UsedCU - } + usedCUProviderTotal += uniquePayment.UsedCU } return usedCUProviderTotal } diff --git a/x/pairing/keeper/unique_payment_storage_client_provider.go b/x/pairing/keeper/unique_payment_storage_client_provider.go index b89fb49df8..75ddbdb020 100644 --- a/x/pairing/keeper/unique_payment_storage_client_provider.go +++ b/x/pairing/keeper/unique_payment_storage_client_provider.go @@ -33,6 +33,24 @@ func (k Keeper) GetUniquePaymentStorageClientProvider( return val, true } +func (k EpochPaymentHandler) SetUniquePaymentStorageClientProviderCached(ctx sdk.Context, uniquePaymentStorageClientProvider types.UniquePaymentStorageClientProvider) { + b := k.cdc.MustMarshal(&uniquePaymentStorageClientProvider) + k.UniquePaymentStorageClientProviderCache.Set(types.ProviderPaymentStorageKey(uniquePaymentStorageClientProvider.Index), b) +} + +func (k EpochPaymentHandler) GetUniquePaymentStorageClientProviderCached( + ctx sdk.Context, + index string, +) (val types.UniquePaymentStorageClientProvider, found bool) { + b := k.UniquePaymentStorageClientProviderCache.Get(types.UniquePaymentStorageClientProviderKey(index)) + if b == nil { + return val, false + } + + k.cdc.MustUnmarshal(b, &val) + return val, true +} + // RemoveUniquePaymentStorageClientProvider removes a uniquePaymentStorageClientProvider from the store func (k Keeper) RemoveUniquePaymentStorageClientProvider( ctx sdk.Context, @@ -73,8 +91,8 @@ func (k Keeper) IsDoubleSpend(ctx sdk.Context, chainID string, block uint64, pro return found } -func (k Keeper) GetConsumerFromUniquePayment(uniquePaymentStorageClientProvider *types.UniquePaymentStorageClientProvider) string { - key := uniquePaymentStorageClientProvider.Index +func (k Keeper) GetConsumerFromUniquePayment(uniquePaymentStorageClientProvider string) string { + key := uniquePaymentStorageClientProvider providerAdrLengh := charToAsciiNumber(rune(key[0])) provider := key[1 : providerAdrLengh+1] return provider