Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into PRT-bug-fix-not-retur…
Browse files Browse the repository at this point in the history
…ning-from-a-routine-if-failed-tx-hash
  • Loading branch information
ranlavanet committed Mar 19, 2024
2 parents 0caf2c7 + ad9b5eb commit 4f88628
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 71 deletions.
82 changes: 53 additions & 29 deletions .github/pr_labeler.yml
Original file line number Diff line number Diff line change
@@ -1,53 +1,77 @@
"C:cmd/lavad":
- cmd/lavad/**/*
- changed-files:
- any-glob-to-any-file: "cmd/lavad/**/*"
"C:cmd/lavap":
- cmd/lavap/**/*
- changed-files:
- any-glob-to-any-file: "cmd/lavap/**/*"
"C:cmd/lavavisor":
- cmd/lavavisor/**/*
- changed-files:
- any-glob-to-any-file: "cmd/lavavisor/**/*"
"C:cmd/common":
- cmd/common/**/*
- changed-files:
- any-glob-to-any-file: "cmd/common/**/*"
"C:protocol":
- protocol/**/*
- changed-files:
- any-glob-to-any-file: "protocol/**/*"
"Team:Consensus":
- x/**/*
- app/**/*
- changed-files:
- any-glob-to-any-file: "x/**/*"
- any-glob-to-any-file: "app/**/*"
"Team:Protocol":
- protocol/**/*
- ecosystem/**/*
- changed-files:
- any-glob-to-any-file: "protocol/**/*"
- any-glob-to-any-file: "ecosystem/**/*"
"C:x/conflict":
- x/conflict/**/*
- changed-files:
- any-glob-to-any-file: "x/conflict/**/*"
"C:x/downtime":
- x/downtime/**/*
- changed-files:
- any-glob-to-any-file: "x/downtime/**/*"
"C:x/dualstaking":
- x/dualstaking/**/*
- changed-files:
- any-glob-to-any-file: "x/dualstaking/**/*"
"C:x/epochstorage":
- x/epochstorage/**/*
- changed-files:
- any-glob-to-any-file: "x/epochstorage/**/*"
"C:x/fixationstore":
- x/fixationstore/**/*
- changed-files:
- any-glob-to-any-file: "x/fixationstore/**/*"
"C:x/pairing":
- x/pairing/**/*
- changed-files:
- any-glob-to-any-file: "x/pairing/**/*"
"C:x/plans":
- x/plans/**/*
- changed-files:
- any-glob-to-any-file: "x/plans/**/*"
"C:x/projects":
- x/projects/**/*
- changed-files:
- any-glob-to-any-file: "x/projects/**/*"
"C:x/protocol":
- x/protocol/**/*
- changed-files:
- any-glob-to-any-file: "x/protocol/**/*"
"C:x/rewards":
- x/rewards/**/*
- changed-files:
- any-glob-to-any-file: "x/rewards/**/*"
"C:x/spec":
- x/spec/**/*
- changed-files:
- any-glob-to-any-file: "x/spec/**/*"
"C:x/subscription":
- x/subscription/**/*
- changed-files:
- any-glob-to-any-file: "x/subscription/**/*"
"C:x/timerstore":
- x/timerstore/**/*
- changed-files:
- any-glob-to-any-file: "x/timerstore/**/*"
"Type: Build":
- Makefile
- Dockerfile
- docker-compose.yml
- scripts/*
- changed-files:
- any-glob-to-any-file: "Makefile"
- any-glob-to-any-file: "Dockerfile"
- any-glob-to-any-file: "docker-compose.yml"
- any-glob-to-any-file: "scripts/*"
"Type: CI":
- .github/**
- changed-files:
- any-glob-to-any-file: ".github/**"
"Type: Docs":
- docs/**/*
- changed-files:
- any-glob-to-any-file: "docs/**/*"
"Type: Proto":
- proto/**/*
- changed-files:
- any-glob-to-any-file: "proto/**/*"
2 changes: 1 addition & 1 deletion .github/workflows/lint_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ permissions:
jobs:
main:
permissions:
pull-requests: read # for amannn/action-semantic-pull-request to analyze PRs
pull-requests: write # for amannn/action-semantic-pull-request to analyze PRs
statuses: write # for amannn/action-semantic-pull-request to mark status of analyzed PR
runs-on: ubuntu-latest
steps:
Expand Down
56 changes: 51 additions & 5 deletions x/pairing/keeper/epoch_payments.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"

"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/lavanet/lava/utils"
Expand Down Expand Up @@ -37,6 +38,24 @@ func (k Keeper) GetEpochPayments(
return val, true
}

func (k EpochPaymentHandler) SetEpochPaymentsCached(ctx sdk.Context, epochPayments types.EpochPayments) {
b := k.cdc.MustMarshal(&epochPayments)
k.EpochPaymentsCache.Set(types.EpochPaymentsKey(epochPayments.Index), b)
}

func (k EpochPaymentHandler) GetEpochPaymentsCached(
ctx sdk.Context,
index string,
) (val types.EpochPayments, found bool) {
b := k.EpochPaymentsCache.Get(types.EpochPaymentsKey(index))
if b == nil {
return val, false
}

k.cdc.MustUnmarshal(b, &val)
return val, true
}

// RemoveEpochPayments removes a epochPayments from the store
func (k Keeper) RemoveEpochPayments(
ctx sdk.Context,
Expand Down Expand Up @@ -73,13 +92,39 @@ func (k Keeper) RemoveOldEpochPayment(ctx sdk.Context) {

// Function to get the epochPayments object from a specific epoch. Note that it also returns the epochPayments object's key which is the epoch in hex representation (base 16)
func (k Keeper) GetEpochPaymentsFromBlock(ctx sdk.Context, epoch uint64) (epochPayment types.EpochPayments, found bool, key string) {
key = strconv.FormatUint(epoch, 16)
key = epochPaymentKey(epoch)
epochPayment, found = k.GetEpochPayments(ctx, key)
return
}

func epochPaymentKey(epoch uint64) string {
return strconv.FormatUint(epoch, 16)
}

type EpochPaymentHandler struct {
Keeper
EpochPaymentsCache *cachekv.Store
ProviderPaymentStorageCache *cachekv.Store
UniquePaymentStorageClientProviderCache *cachekv.Store
}

func (k Keeper) NewEpochPaymentHandler(ctx sdk.Context) EpochPaymentHandler {
return EpochPaymentHandler{
Keeper: k,
EpochPaymentsCache: cachekv.NewStore(prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.EpochPaymentsKeyPrefix))),
ProviderPaymentStorageCache: cachekv.NewStore(prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.ProviderPaymentStorageKeyPrefix))),
UniquePaymentStorageClientProviderCache: cachekv.NewStore(prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.UniquePaymentStorageClientProviderKeyPrefix))),
}
}

func (k EpochPaymentHandler) Flush() {
k.EpochPaymentsCache.Write()
k.ProviderPaymentStorageCache.Write()
k.UniquePaymentStorageClientProviderCache.Write()
}

// Function to add an epoch payment to the epochPayments object
func (k Keeper) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, projectID string, providerAddress sdk.AccAddress, usedCU uint64, uniqueIdentifier string) uint64 {
func (k EpochPaymentHandler) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, projectID string, providerAddress sdk.AccAddress, usedCU uint64, uniqueIdentifier string) uint64 {
if epoch < k.epochStorageKeeper.GetEarliestEpochStart(ctx) {
return 0
}
Expand All @@ -88,7 +133,8 @@ func (k Keeper) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, p
userPaymentProviderStorage, usedCUProviderTotal := k.AddProviderPaymentInEpoch(ctx, chainID, epoch, projectID, providerAddress, usedCU, uniqueIdentifier)

// get this epoch's epochPayments object
epochPayments, found, key := k.GetEpochPaymentsFromBlock(ctx, epoch)
key := epochPaymentKey(epoch)
epochPayments, found := k.GetEpochPaymentsCached(ctx, key)
if !found {
// this epoch doesn't have a epochPayments object, create one with the providerPaymentStorage object from before
epochPayments = types.EpochPayments{Index: key, ProviderPaymentStorageKeys: []string{userPaymentProviderStorage.GetIndex()}}
Expand All @@ -110,7 +156,7 @@ func (k Keeper) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, p
}

// update the epochPayments object
k.SetEpochPayments(ctx, epochPayments)
k.SetEpochPaymentsCached(ctx, epochPayments)

return usedCUProviderTotal
}
Expand Down Expand Up @@ -164,7 +210,7 @@ func (k Keeper) RemoveAllEpochPaymentsForBlockAppendAdjustments(ctx sdk.Context,

// delete the uniquePaymentStorageClientProvider object
k.RemoveUniquePaymentStorageClientProvider(ctx, uniquePaymentStorage.Index)
consumer := k.GetConsumerFromUniquePayment(&uniquePaymentStorage)
consumer := k.GetConsumerFromUniquePayment(uniquePaymentStorageKey)

provider, err := k.GetProviderFromProviderPaymentStorage(&providerPaymentStorage)
if err != nil {
Expand Down
34 changes: 14 additions & 20 deletions x/pairing/keeper/msg_server_relay_payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen

ctx := sdk.UnwrapSDKContext(goCtx)
logger := k.Logger(ctx)
paymentHandler := k.NewEpochPaymentHandler(ctx)
lavaChainID := ctx.BlockHeader().ChainID
creator, err := sdk.AccAddressFromBech32(msg.Creator)
if err != nil {
Expand Down Expand Up @@ -143,7 +144,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen
continue
}

if k.IsDoubleSpend(ctx, relay.SpecId, epochStart, project.Index, providerAddr, strconv.FormatUint(relay.SessionId, 16)) {
if paymentHandler.IsDoubleSpend(ctx, relay.SpecId, epochStart, project.Index, providerAddr, strconv.FormatUint(relay.SessionId, 16)) {
utils.LavaFormatWarning("double spending detected", err,
utils.Attribute{Key: "epoch", Value: epochStart},
utils.Attribute{Key: "client", Value: clientAddr.String()},
Expand All @@ -157,8 +158,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen
// if they failed (one relay should affect all of them). From here on, every check will
// fail the TX ***

totalCUInEpochForUserProvider := k.Keeper.AddEpochPayment(ctx, relay.SpecId, epochStart, project.Index, providerAddr, relay.CuSum, strconv.FormatUint(relay.SessionId, 16))
ctx.GasMeter().RefundGas(ctx.GasMeter().GasConsumed(), "")
totalCUInEpochForUserProvider := paymentHandler.AddEpochPayment(ctx, relay.SpecId, epochStart, project.Index, providerAddr, relay.CuSum, strconv.FormatUint(relay.SessionId, 16))
if badgeFound {
k.handleBadgeCu(ctx, badgeData, relay.Provider, relay.CuSum, newBadgeTimerExpiry)
}
Expand Down Expand Up @@ -274,7 +274,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen
}

// update provider payment storage with complainer's CU
err = k.updateProviderPaymentStorageWithComplainerCU(ctx, relay.UnresponsiveProviders, logger, epochStart, relay.SpecId, cuAfterQos, providers, project.Index)
err = paymentHandler.updateProviderPaymentStorageWithComplainerCU(ctx, relay.UnresponsiveProviders, logger, epochStart, relay.SpecId, cuAfterQos, providers, project.Index)
if err != nil {
var reportedProviders []string
for _, p := range relay.UnresponsiveProviders {
Expand Down Expand Up @@ -317,9 +317,7 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen
}
utils.LogLavaEvent(ctx, logger, types.LatestBlocksReportEventName, latestBlockReports, "New LatestBlocks Report for provider")

// consume constant gas (dependent on the number of relays)
ctx.GasMeter().RefundGas(ctx.GasMeter().GasConsumed(), "")
ctx.GasMeter().ConsumeGas(uint64(10000+100000*len(msg.Relays)), "")
paymentHandler.Flush()

return &types.MsgRelayPaymentResponse{RejectedRelays: rejected_relays}, nil
}
Expand All @@ -335,7 +333,7 @@ func (k msgServer) setStakeEntryBlockReport(ctx sdk.Context, providerAddr sdk.Ac
}
}

func (k msgServer) updateProviderPaymentStorageWithComplainerCU(ctx sdk.Context, unresponsiveProviders []*types.ReportedProvider, logger log.Logger, epoch uint64, chainID string, cuSum uint64, providersToPair []epochstoragetypes.StakeEntry, projectID string) error {
func (k EpochPaymentHandler) updateProviderPaymentStorageWithComplainerCU(ctx sdk.Context, unresponsiveProviders []*types.ReportedProvider, logger log.Logger, epoch uint64, chainID string, cuSum uint64, providersToPair []epochstoragetypes.StakeEntry, projectID string) error {
// check that unresponsiveData exists
if len(unresponsiveProviders) == 0 {
return nil
Expand Down Expand Up @@ -371,40 +369,36 @@ func (k msgServer) updateProviderPaymentStorageWithComplainerCU(ctx sdk.Context,
}

// get this epoch's epochPayments object
epochPayments, found, key := k.GetEpochPaymentsFromBlock(ctx, epoch)
epochPayments, found := k.GetEpochPaymentsCached(ctx, epochPaymentKey(epoch))
if !found {
// the epochPayments object should exist since we already paid. if not found, print an error and continue
utils.LavaFormatError("did not find epochPayments object", err, utils.Attribute{Key: "epochPaymentsKey", Value: key})
utils.LavaFormatError("did not find epochPayments object", err, utils.Attribute{Key: "epochPaymentsKey", Value: epoch})
continue
}

// get the providerPaymentStorage object using the providerStorageKey
providerStorageKey := k.GetProviderPaymentStorageKey(ctx, chainID, epoch, sdkUnresponsiveProviderAddress)
providerPaymentStorage, found := k.GetProviderPaymentStorage(ctx, providerStorageKey)

providerPaymentStorage, found := k.GetProviderPaymentStorageCached(ctx, providerStorageKey)
if !found {
// providerPaymentStorage not found (this provider has no payments in this epoch and also no complaints) -> we need to add one complaint
emptyProviderPaymentStorageWithComplaint := types.ProviderPaymentStorage{
providerPaymentStorage = types.ProviderPaymentStorage{
Index: providerStorageKey,
UniquePaymentStorageClientProviderKeys: []string{},
Epoch: epoch,
ComplainersTotalCu: uint64(0),
}

// append the emptyProviderPaymentStorageWithComplaint to the epochPayments object's providerPaymentStorages
epochPayments.ProviderPaymentStorageKeys = append(epochPayments.GetProviderPaymentStorageKeys(), emptyProviderPaymentStorageWithComplaint.GetIndex())
k.SetEpochPayments(ctx, epochPayments)

// assign providerPaymentStorage with the new empty providerPaymentStorage
providerPaymentStorage = emptyProviderPaymentStorageWithComplaint
// append the providerPaymentStorage to the epochPayments object's providerPaymentStorages
epochPayments.ProviderPaymentStorageKeys = append(epochPayments.GetProviderPaymentStorageKeys(), providerPaymentStorage.GetIndex())
k.SetEpochPaymentsCached(ctx, epochPayments)
}

// add complainer's used CU to providerPaymentStorage
providerPaymentStorage.ComplainersTotalCu += complainerCuToAdd
timestamp := time.Unix(unresponsiveProvider.TimestampS, 0)
utils.LogLavaEvent(ctx, logger, types.ProviderReportedEventName, map[string]string{"provider": unresponsiveProvider.GetAddress(), "timestamp": timestamp.Format(time.DateTime), "disconnections": strconv.FormatUint(unresponsiveProvider.GetDisconnections(), 10), "errors": strconv.FormatUint(unresponsiveProvider.GetErrors(), 10), "project": projectID, "cu": strconv.FormatUint(complainerCuToAdd, 10), "epoch": strconv.FormatUint(epoch, 10), "total_complaint_this_epoch": strconv.FormatUint(providerPaymentStorage.ComplainersTotalCu, 10)}, "provider got reported by consumer")
// set the final provider payment storage state including the complaints
k.SetProviderPaymentStorage(ctx, providerPaymentStorage)
k.SetProviderPaymentStorageCached(ctx, providerPaymentStorage)
}

return nil
Expand Down
53 changes: 53 additions & 0 deletions x/pairing/keeper/msg_server_relay_payment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,3 +962,56 @@ func TestIntOverflow(t *testing.T) {
ts.AdvanceEpoch()
}
}

func TestPairingCaching(t *testing.T) {
ts := newTester(t)
ts.setupForPayments(3, 3, 0) // 3 provider, 3 client, default providers-to-pair

ts.AdvanceEpoch()

relayNum := uint64(0)
totalCU := uint64(0)
// trigger relay payment with cache
for i := 0; i < 3; i++ {
relays := []*types.RelaySession{}
_, provider1Addr := ts.GetAccount(common.PROVIDER, i)
for i := 0; i < 3; i++ {
consumerAcct, _ := ts.GetAccount(common.CONSUMER, i)
totalCU = 0
for i := 0; i < 50; i++ {
totalCU += uint64(i)
relaySession := ts.newRelaySession(provider1Addr, relayNum, uint64(i), ts.BlockHeight(), 0)
sig, err := sigs.Sign(consumerAcct.SK, *relaySession)
relaySession.Sig = sig
require.NoError(t, err)
relays = append(relays, relaySession)
relayNum++
}
}
_, err := ts.TxPairingRelayPayment(provider1Addr, relays...)
require.NoError(t, err)
}

epochPayment, found, _ := ts.Keepers.Pairing.GetEpochPaymentsFromBlock(ts.Ctx, ts.EpochStart())
require.True(t, found)
require.Len(t, epochPayment.ProviderPaymentStorageKeys, 3)

UniquePayments := ts.Keepers.Pairing.GetAllUniquePaymentStorageClientProvider(ts.Ctx)
require.Len(t, UniquePayments, 3*3*50)

storages := ts.Keepers.Pairing.GetAllProviderPaymentStorage(ts.Ctx)
for _, storage := range storages {
require.Len(t, storage.UniquePaymentStorageClientProviderKeys, 3*50)
}

for i := 0; i < 3; i++ {
consumerAcct, _ := ts.GetAccount(common.CONSUMER, i)
project, err := ts.GetProjectForDeveloper(consumerAcct.Addr.String(), ts.BlockHeight())
require.NoError(t, err)
require.Equal(t, totalCU*3, project.UsedCu)

sub, err := ts.QuerySubscriptionCurrent(consumerAcct.Addr.String())
require.NoError(t, err)
require.Equal(t, totalCU*3, sub.Sub.MonthCuTotal-sub.Sub.MonthCuLeft)
}
}
Loading

0 comments on commit 4f88628

Please sign in to comment.