Skip to content

Commit

Permalink
CNS-929: add cache store to epoch payments
Browse files Browse the repository at this point in the history
  • Loading branch information
oren-lava committed Mar 14, 2024
1 parent c1f9829 commit ca456b6
Show file tree
Hide file tree
Showing 5 changed files with 168 additions and 32 deletions.
50 changes: 45 additions & 5 deletions x/pairing/keeper/epoch_payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"

"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/utils"
Expand Down Expand Up @@ -37,6 +38,24 @@ func (k Keeper) GetEpochPayments(
return val, true
}

func (k EpochPaymentHandler) SetEpochPaymentsCached(ctx sdk.Context, epochPayments types.EpochPayments) {
b := k.cdc.MustMarshal(&epochPayments)
k.EpochPaymentsCache.Set(types.EpochPaymentsKey(epochPayments.Index), b)
}

func (k EpochPaymentHandler) GetEpochPaymentsCached(
ctx sdk.Context,
index string,
) (val types.EpochPayments, found bool) {
b := k.EpochPaymentsCache.Get(types.EpochPaymentsKey(index))
if b == nil {
return val, false
}

k.cdc.MustUnmarshal(b, &val)
return val, true
}

// RemoveEpochPayments removes a epochPayments from the store
func (k Keeper) RemoveEpochPayments(
ctx sdk.Context,
Expand Down Expand Up @@ -73,13 +92,33 @@ func (k Keeper) RemoveOldEpochPayment(ctx sdk.Context) {

// Function to get the epochPayments object from a specific epoch. Note that it also returns the epochPayments object's key which is the epoch in hex representation (base 16)
func (k Keeper) GetEpochPaymentsFromBlock(ctx sdk.Context, epoch uint64) (epochPayment types.EpochPayments, found bool, key string) {
key = strconv.FormatUint(epoch, 16)
key = epochPaymentKey(epoch)
epochPayment, found = k.GetEpochPayments(ctx, key)
return
}

func epochPaymentKey(epoch uint64) string {
return strconv.FormatUint(epoch, 16)
}

type EpochPaymentHandler struct {
Keeper
EpochPaymentsCache *cachekv.Store
ProviderPaymentStorageCache *cachekv.Store
UniquePaymentStorageClientProviderCache *cachekv.Store
}

func (k Keeper) NewEpochPaymentHandler(ctx sdk.Context) EpochPaymentHandler {
return EpochPaymentHandler{
Keeper: k,
EpochPaymentsCache: cachekv.NewStore(prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.EpochPaymentsKeyPrefix))),
ProviderPaymentStorageCache: cachekv.NewStore(prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.ProviderPaymentStorageKeyPrefix))),
UniquePaymentStorageClientProviderCache: cachekv.NewStore(prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.UniquePaymentStorageClientProviderKeyPrefix))),
}
}

// Function to add an epoch payment to the epochPayments object
func (k Keeper) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, projectID string, providerAddress sdk.AccAddress, usedCU uint64, uniqueIdentifier string) uint64 {
func (k EpochPaymentHandler) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, projectID string, providerAddress sdk.AccAddress, usedCU uint64, uniqueIdentifier string) uint64 {
if epoch < k.epochStorageKeeper.GetEarliestEpochStart(ctx) {
return 0
}
Expand All @@ -88,7 +127,8 @@ func (k Keeper) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, p
userPaymentProviderStorage, usedCUProviderTotal := k.AddProviderPaymentInEpoch(ctx, chainID, epoch, projectID, providerAddress, usedCU, uniqueIdentifier)

// get this epoch's epochPayments object
epochPayments, found, key := k.GetEpochPaymentsFromBlock(ctx, epoch)
key := epochPaymentKey(epoch)
epochPayments, found := k.GetEpochPaymentsCached(ctx, key)
if !found {
// this epoch doesn't have a epochPayments object, create one with the providerPaymentStorage object from before
epochPayments = types.EpochPayments{Index: key, ProviderPaymentStorageKeys: []string{userPaymentProviderStorage.GetIndex()}}
Expand All @@ -110,7 +150,7 @@ func (k Keeper) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, p
}

// update the epochPayments object
k.SetEpochPayments(ctx, epochPayments)
k.SetEpochPaymentsCached(ctx, epochPayments)

return usedCUProviderTotal
}
Expand Down Expand Up @@ -164,7 +204,7 @@ func (k Keeper) RemoveAllEpochPaymentsForBlockAppendAdjustments(ctx sdk.Context,

// delete the uniquePaymentStorageClientProvider object
k.RemoveUniquePaymentStorageClientProvider(ctx, uniquePaymentStorage.Index)
consumer := k.GetConsumerFromUniquePayment(&uniquePaymentStorage)
consumer := k.GetConsumerFromUniquePayment(uniquePaymentStorageKey)

provider, err := k.GetProviderFromProviderPaymentStorage(&providerPaymentStorage)
if err != nil {
Expand Down
31 changes: 16 additions & 15 deletions x/pairing/keeper/msg_server_relay_payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen

ctx := sdk.UnwrapSDKContext(goCtx)
logger := k.Logger(ctx)
paymentHandler := k.NewEpochPaymentHandler(ctx)
lavaChainID := ctx.BlockHeader().ChainID
creator, err := sdk.AccAddressFromBech32(msg.Creator)
if err != nil {
Expand Down Expand Up @@ -157,7 +158,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen
// if they failed (one relay should affect all of them). From here on, every check will
// fail the TX ***

totalCUInEpochForUserProvider := k.Keeper.AddEpochPayment(ctx, relay.SpecId, epochStart, project.Index, providerAddr, relay.CuSum, strconv.FormatUint(relay.SessionId, 16))
totalCUInEpochForUserProvider := paymentHandler.AddEpochPayment(ctx, relay.SpecId, epochStart, project.Index, providerAddr, relay.CuSum, strconv.FormatUint(relay.SessionId, 16))
ctx.GasMeter().RefundGas(ctx.GasMeter().GasConsumed(), "")
if badgeFound {
k.handleBadgeCu(ctx, badgeData, relay.Provider, relay.CuSum, newBadgeTimerExpiry)
Expand Down Expand Up @@ -274,7 +275,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen
}

// update provider payment storage with complainer's CU
err = k.updateProviderPaymentStorageWithComplainerCU(ctx, relay.UnresponsiveProviders, logger, epochStart, relay.SpecId, cuAfterQos, providers, project.Index)
err = paymentHandler.updateProviderPaymentStorageWithComplainerCU(ctx, relay.UnresponsiveProviders, logger, epochStart, relay.SpecId, cuAfterQos, providers, project.Index)
if err != nil {
var reportedProviders []string
for _, p := range relay.UnresponsiveProviders {
Expand Down Expand Up @@ -317,6 +318,10 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen
}
utils.LogLavaEvent(ctx, logger, types.LatestBlocksReportEventName, latestBlockReports, "New LatestBlocks Report for provider")

paymentHandler.EpochPaymentsCache.Write()
paymentHandler.ProviderPaymentStorageCache.Write()
paymentHandler.UniquePaymentStorageClientProviderCache.Write()

// consume constant gas (dependent on the number of relays)
ctx.GasMeter().RefundGas(ctx.GasMeter().GasConsumed(), "")
ctx.GasMeter().ConsumeGas(uint64(10000+100000*len(msg.Relays)), "")
Expand All @@ -335,7 +340,7 @@ func (k msgServer) setStakeEntryBlockReport(ctx sdk.Context, providerAddr sdk.Ac
}
}

func (k msgServer) updateProviderPaymentStorageWithComplainerCU(ctx sdk.Context, unresponsiveProviders []*types.ReportedProvider, logger log.Logger, epoch uint64, chainID string, cuSum uint64, providersToPair []epochstoragetypes.StakeEntry, projectID string) error {
func (k EpochPaymentHandler) updateProviderPaymentStorageWithComplainerCU(ctx sdk.Context, unresponsiveProviders []*types.ReportedProvider, logger log.Logger, epoch uint64, chainID string, cuSum uint64, providersToPair []epochstoragetypes.StakeEntry, projectID string) error {
// check that unresponsiveData exists
if len(unresponsiveProviders) == 0 {
return nil
Expand Down Expand Up @@ -371,40 +376,36 @@ func (k msgServer) updateProviderPaymentStorageWithComplainerCU(ctx sdk.Context,
}

// get this epoch's epochPayments object
epochPayments, found, key := k.GetEpochPaymentsFromBlock(ctx, epoch)
epochPayments, found := k.GetEpochPaymentsCached(ctx, epochPaymentKey(epoch))
if !found {
// the epochPayments object should exist since we already paid. if not found, print an error and continue
utils.LavaFormatError("did not find epochPayments object", err, utils.Attribute{Key: "epochPaymentsKey", Value: key})
utils.LavaFormatError("did not find epochPayments object", err, utils.Attribute{Key: "epochPaymentsKey", Value: epoch})
continue
}

// get the providerPaymentStorage object using the providerStorageKey
providerStorageKey := k.GetProviderPaymentStorageKey(ctx, chainID, epoch, sdkUnresponsiveProviderAddress)
providerPaymentStorage, found := k.GetProviderPaymentStorage(ctx, providerStorageKey)

providerPaymentStorage, found := k.GetProviderPaymentStorageCached(ctx, providerStorageKey)
if !found {
// providerPaymentStorage not found (this provider has no payments in this epoch and also no complaints) -> we need to add one complaint
emptyProviderPaymentStorageWithComplaint := types.ProviderPaymentStorage{
providerPaymentStorage := types.ProviderPaymentStorage{
Index: providerStorageKey,
UniquePaymentStorageClientProviderKeys: []string{},
Epoch: epoch,
ComplainersTotalCu: uint64(0),
}

// append the emptyProviderPaymentStorageWithComplaint to the epochPayments object's providerPaymentStorages
epochPayments.ProviderPaymentStorageKeys = append(epochPayments.GetProviderPaymentStorageKeys(), emptyProviderPaymentStorageWithComplaint.GetIndex())
k.SetEpochPayments(ctx, epochPayments)

// assign providerPaymentStorage with the new empty providerPaymentStorage
providerPaymentStorage = emptyProviderPaymentStorageWithComplaint
// append the providerPaymentStorage to the epochPayments object's providerPaymentStorages
epochPayments.ProviderPaymentStorageKeys = append(epochPayments.GetProviderPaymentStorageKeys(), providerPaymentStorage.GetIndex())
k.SetEpochPaymentsCached(ctx, epochPayments)
}

// add complainer's used CU to providerPaymentStorage
providerPaymentStorage.ComplainersTotalCu += complainerCuToAdd
timestamp := time.Unix(unresponsiveProvider.TimestampS, 0)
utils.LogLavaEvent(ctx, logger, types.ProviderReportedEventName, map[string]string{"provider": unresponsiveProvider.GetAddress(), "timestamp": timestamp.Format(time.DateTime), "disconnections": strconv.FormatUint(unresponsiveProvider.GetDisconnections(), 10), "errors": strconv.FormatUint(unresponsiveProvider.GetErrors(), 10), "project": projectID, "cu": strconv.FormatUint(complainerCuToAdd, 10), "epoch": strconv.FormatUint(epoch, 10), "total_complaint_this_epoch": strconv.FormatUint(providerPaymentStorage.ComplainersTotalCu, 10)}, "provider got reported by consumer")
// set the final provider payment storage state including the complaints
k.SetProviderPaymentStorage(ctx, providerPaymentStorage)
k.SetProviderPaymentStorageCached(ctx, providerPaymentStorage)
}

return nil
Expand Down
53 changes: 53 additions & 0 deletions x/pairing/keeper/msg_server_relay_payment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,3 +962,56 @@ func TestIntOverflow(t *testing.T) {
ts.AdvanceEpoch()
}
}

func TestPairingCaching(t *testing.T) {
ts := newTester(t)
ts.setupForPayments(3, 3, 0) // 3 provider, 3 client, default providers-to-pair

ts.AdvanceEpoch()

relayNum := uint64(0)
totalCU := uint64(0)
// trigger relay payment with cache
for i := 0; i < 3; i++ {
relays := []*types.RelaySession{}
_, provider1Addr := ts.GetAccount(common.PROVIDER, i)
for i := 0; i < 3; i++ {
consumerAcct, _ := ts.GetAccount(common.CONSUMER, i)
totalCU = 0
for i := 0; i < 50; i++ {
totalCU += uint64(i)
relaySession := ts.newRelaySession(provider1Addr, relayNum, uint64(i), ts.BlockHeight(), 0)
sig, err := sigs.Sign(consumerAcct.SK, *relaySession)
relaySession.Sig = sig
require.NoError(t, err)
relays = append(relays, relaySession)
relayNum++
}
}
_, err := ts.TxPairingRelayPayment(provider1Addr, relays...)
require.NoError(t, err)
}

epochPayment, found, _ := ts.Keepers.Pairing.GetEpochPaymentsFromBlock(ts.Ctx, ts.EpochStart())
require.True(t, found)
require.Len(t, epochPayment.ProviderPaymentStorageKeys, 3)

UniquePayments := ts.Keepers.Pairing.GetAllUniquePaymentStorageClientProvider(ts.Ctx)
require.Len(t, UniquePayments, 3*3*50)

storages := ts.Keepers.Pairing.GetAllProviderPaymentStorage(ts.Ctx)
for _, storage := range storages {
require.Len(t, storage.UniquePaymentStorageClientProviderKeys, 3*50)
}

for i := 0; i < 3; i++ {
consumerAcct, _ := ts.GetAccount(common.CONSUMER, i)
project, err := ts.GetProjectForDeveloper(consumerAcct.Addr.String(), ts.BlockHeight())
require.NoError(t, err)
require.Equal(t, totalCU*3, project.UsedCu)

sub, err := ts.QuerySubscriptionCurrent(consumerAcct.Addr.String())
require.NoError(t, err)
require.Equal(t, totalCU*3, sub.Sub.MonthCuTotal-sub.Sub.MonthCuLeft)
}
}
44 changes: 34 additions & 10 deletions x/pairing/keeper/provider_payment_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,24 @@ func (k Keeper) GetProviderPaymentStorage(
return val, true
}

func (k EpochPaymentHandler) SetProviderPaymentStorageCached(ctx sdk.Context, providerPaymentStorage types.ProviderPaymentStorage) {
b := k.cdc.MustMarshal(&providerPaymentStorage)
k.ProviderPaymentStorageCache.Set(types.ProviderPaymentStorageKey(providerPaymentStorage.Index), b)
}

func (k EpochPaymentHandler) GetProviderPaymentStorageCached(
ctx sdk.Context,
index string,
) (val types.ProviderPaymentStorage, found bool) {
b := k.ProviderPaymentStorageCache.Get(types.ProviderPaymentStorageKey(index))
if b == nil {
return val, false
}

k.cdc.MustUnmarshal(b, &val)
return val, true
}

// RemoveProviderPaymentStorage removes a providerPaymentStorage from the store
func (k Keeper) RemoveProviderPaymentStorage(
ctx sdk.Context,
Expand Down Expand Up @@ -81,13 +99,13 @@ func (k Keeper) GetProviderPaymentStorageKey(ctx sdk.Context, chainID string, ep
}

// Function to add a payment (which is represented by a uniquePaymentStorageClientProvider object) to a providerPaymentStorage object
func (k Keeper) AddProviderPaymentInEpoch(ctx sdk.Context, chainID string, epoch uint64, projectID string, providerAddress sdk.AccAddress, usedCU uint64, uniqueIdentifier string) (userPayment *types.ProviderPaymentStorage, usedCUConsumerTotal uint64) {
func (k EpochPaymentHandler) AddProviderPaymentInEpoch(ctx sdk.Context, chainID string, epoch uint64, projectID string, providerAddress sdk.AccAddress, usedCU uint64, uniqueIdentifier string) (userPayment *types.ProviderPaymentStorage, usedCUConsumerTotal uint64) {
// create an uniquePaymentStorageClientProvider object and set it in the KVStore
uniquePaymentStorageClientProviderEntryAddr := k.AddUniquePaymentStorageClientProvider(ctx, chainID, epoch, projectID, providerAddress, uniqueIdentifier, usedCU)

// get the providerPaymentStorage object
providerPaymentStorageKey := k.GetProviderPaymentStorageKey(ctx, chainID, epoch, providerAddress)
userPaymentStorageInEpoch, found := k.GetProviderPaymentStorage(ctx, providerPaymentStorageKey)
userPaymentStorageInEpoch, found := k.GetProviderPaymentStorageCached(ctx, providerPaymentStorageKey)
if !found {
// is new entry -> create a new providerPaymentStorage object
userPaymentStorageInEpoch = types.ProviderPaymentStorage{Index: providerPaymentStorageKey, UniquePaymentStorageClientProviderKeys: []string{uniquePaymentStorageClientProviderEntryAddr.GetIndex()}, Epoch: epoch}
Expand All @@ -101,31 +119,37 @@ func (k Keeper) AddProviderPaymentInEpoch(ctx sdk.Context, chainID string, epoch
}

// set the providerPaymentStorage object in the KVStore
k.SetProviderPaymentStorage(ctx, userPaymentStorageInEpoch)
k.SetProviderPaymentStorageCached(ctx, userPaymentStorageInEpoch)

return &userPaymentStorageInEpoch, usedCUConsumerTotal
}

// Function to get the total serviced CU by a provider in this epoch for a specific consumer
func (k Keeper) GetTotalUsedCUForConsumerPerEpoch(ctx sdk.Context, projectID string, uniquePaymentStorageKeys []string, providerAddress string) uint64 {
func (k EpochPaymentHandler) GetTotalUsedCUForConsumerPerEpoch(ctx sdk.Context, projectID string, uniquePaymentStorageKeys []string, providerAddress string) uint64 {
usedCUProviderTotal := uint64(0)

// go over the uniquePaymentStorageKeys
for _, uniquePaymentKey := range uniquePaymentStorageKeys {
// if the uniquePaymentStorageClientProvider object is not between the provider and the specific consumer, continue
if k.GetConsumerFromUniquePayment(uniquePaymentKey) != projectID {
continue
}
// get the uniquePaymentStorageClientProvider object
uniquePayment, found := k.GetUniquePaymentStorageClientProvider(ctx, uniquePaymentKey)
uniquePayment, found := k.GetUniquePaymentStorageClientProviderCached(ctx, uniquePaymentKey)
if !found {
uniquePayment, found = k.GetUniquePaymentStorageClientProvider(ctx, uniquePaymentKey)
if found {
k.SetUniquePaymentStorageClientProviderCached(ctx, uniquePayment)
}
}
if !found {
utils.LavaFormatError("could not find uniquePaymentStorageClientProvider object", fmt.Errorf("unique payment object not found"),
utils.Attribute{Key: "providerAddress", Value: providerAddress},
utils.Attribute{Key: "projectID", Value: projectID},
)
continue
}

// if the uniquePaymentStorageClientProvider object is between the provider and the specific consumer, add the serviced CU
if k.GetConsumerFromUniquePayment(&uniquePayment) == projectID {
usedCUProviderTotal += uniquePayment.UsedCU
}
usedCUProviderTotal += uniquePayment.UsedCU
}
return usedCUProviderTotal
}
22 changes: 20 additions & 2 deletions x/pairing/keeper/unique_payment_storage_client_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,24 @@ func (k Keeper) GetUniquePaymentStorageClientProvider(
return val, true
}

func (k EpochPaymentHandler) SetUniquePaymentStorageClientProviderCached(ctx sdk.Context, uniquePaymentStorageClientProvider types.UniquePaymentStorageClientProvider) {
b := k.cdc.MustMarshal(&uniquePaymentStorageClientProvider)
k.UniquePaymentStorageClientProviderCache.Set(types.ProviderPaymentStorageKey(uniquePaymentStorageClientProvider.Index), b)
}

func (k EpochPaymentHandler) GetUniquePaymentStorageClientProviderCached(
ctx sdk.Context,
index string,
) (val types.UniquePaymentStorageClientProvider, found bool) {
b := k.UniquePaymentStorageClientProviderCache.Get(types.UniquePaymentStorageClientProviderKey(index))
if b == nil {
return val, false
}

k.cdc.MustUnmarshal(b, &val)
return val, true
}

// RemoveUniquePaymentStorageClientProvider removes a uniquePaymentStorageClientProvider from the store
func (k Keeper) RemoveUniquePaymentStorageClientProvider(
ctx sdk.Context,
Expand Down Expand Up @@ -73,8 +91,8 @@ func (k Keeper) IsDoubleSpend(ctx sdk.Context, chainID string, block uint64, pro
return found
}

func (k Keeper) GetConsumerFromUniquePayment(uniquePaymentStorageClientProvider *types.UniquePaymentStorageClientProvider) string {
key := uniquePaymentStorageClientProvider.Index
func (k Keeper) GetConsumerFromUniquePayment(uniquePaymentStorageClientProvider string) string {
key := uniquePaymentStorageClientProvider
providerAdrLengh := charToAsciiNumber(rune(key[0]))
provider := key[1 : providerAdrLengh+1]
return provider
Expand Down

0 comments on commit ca456b6

Please sign in to comment.