Skip to content

Commit

Permalink
ported the rate limiter fix for reverted cctx
Browse files Browse the repository at this point in the history
  • Loading branch information
ws4charlie committed May 24, 2024
1 parent a98d2cd commit e627adf
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 19 deletions.
46 changes: 36 additions & 10 deletions x/crosschain/keeper/grpc_query_cctx_rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

sdkmath "cosmossdk.io/math"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/zeta-chain/zetacore/pkg/chains"
"github.com/zeta-chain/zetacore/pkg/coin"
"github.com/zeta-chain/zetacore/x/crosschain/types"
fungibletypes "github.com/zeta-chain/zetacore/x/fungible/types"
Expand Down Expand Up @@ -34,7 +35,7 @@ func (k Keeper) ListPendingCctxWithinRateLimit(c context.Context, req *types.Que
totalPending := uint64(0)
totalWithdrawInAzeta := sdkmath.NewInt(0)
cctxs := make([]*types.CrossChainTx, 0)
chains := k.zetaObserverKeeper.GetSupportedForeignChains(ctx)
foreignChains := k.zetaObserverKeeper.GetSupportedForeignChains(ctx)

// check rate limit flags to decide if we should apply rate limit
applyLimit := true
Expand All @@ -48,7 +49,7 @@ func (k Keeper) ListPendingCctxWithinRateLimit(c context.Context, req *types.Que

// fallback to non-rate-limited query if rate limiter is disabled
if !applyLimit {
for _, chain := range chains {
for _, chain := range foreignChains {
resp, err := k.ListPendingCctx(ctx, &types.QueryListPendingCctxRequest{ChainId: chain.ChainId, Limit: limit})
if err == nil {
cctxs = append(cctxs, resp.CrossChainTx...)
Expand Down Expand Up @@ -100,15 +101,21 @@ func (k Keeper) ListPendingCctxWithinRateLimit(c context.Context, req *types.Que
}

// if a cctx falls within the rate limiter window
isCctxInWindow := func(cctx *types.CrossChainTx) bool {
isCCTXInWindow := func(cctx *types.CrossChainTx) bool {
// #nosec G701 checked positive
return cctx.InboundTxParams.InboundTxObservedExternalHeight >= uint64(leftWindowBoundary)
}

// if a cctx is an outgoing cctx that orginates from ZetaChain
// reverted incoming cctx has an external `SenderChainId` and should not be counted
isCCTXOutgoing := func(cctx *types.CrossChainTx) bool {
return chains.IsZetaChain(cctx.InboundTxParams.SenderChainId)
}

// query pending nonces for each foreign chain and get the lowest height of the pending cctxs
lowestPendingCctxHeight := int64(0)
pendingNoncesMap := make(map[int64]observertypes.PendingNonces)
for _, chain := range chains {
for _, chain := range foreignChains {
pendingNonces, found := k.GetObserverKeeper().GetPendingNonces(ctx, tss.TssPubkey, chain.ChainId)
if !found {
return nil, status.Error(codes.Internal, "pending nonces not found")
Expand Down Expand Up @@ -144,7 +151,7 @@ func (k Keeper) ListPendingCctxWithinRateLimit(c context.Context, req *types.Que
}

// query backwards for potential missed pending cctxs for each foreign chain
for _, chain := range chains {
for _, chain := range foreignChains {
// we should at least query 1000 prior to find any pending cctx that we might have missed
// this logic is needed because a confirmation of higher nonce will automatically update the p.NonceLow
// therefore might mask some lower nonce cctx that is still pending.
Expand All @@ -161,16 +168,26 @@ func (k Keeper) ListPendingCctxWithinRateLimit(c context.Context, req *types.Que
if err != nil {
return nil, err
}
inWindow := isCctxInWindow(cctx)
inWindow := isCCTXInWindow(cctx)
isOutgoing := isCCTXOutgoing(cctx)

// we should at least go backwards by 1000 nonces to pick up missed pending cctxs
// we might go even further back if rate limiter is enabled and the endNonce hasn't hit the left window boundary yet
// stop at the left window boundary if the `endNonce` hasn't hit it yet
if nonce < endNonce && !inWindow {
break
}
// skip the cctx if rate limit is exceeded but still accumulate the total withdraw value
if inWindow && rateLimitExceeded(chain.ChainId, cctx, gasCoinRates, erc20CoinRates, foreignCoinMap, &totalWithdrawInAzeta, withdrawLimitInAzeta) {
// sum up the cctxs' value if the cctx is outgoing and within the window
if inWindow && isOutgoing &&
rateLimitExceeded(
chain.ChainId,
cctx,
gasCoinRates,
erc20CoinRates,
foreignCoinMap,
&totalWithdrawInAzeta,
withdrawLimitInAzeta,
) {
limitExceeded = true
continue
}
Expand All @@ -189,7 +206,7 @@ func (k Keeper) ListPendingCctxWithinRateLimit(c context.Context, req *types.Que
missedPending := len(cctxs)

// query forwards for pending cctxs for each foreign chain
for _, chain := range chains {
for _, chain := range foreignChains {
pendingNonces := pendingNoncesMap[chain.ChainId]

// #nosec G701 always in range
Expand All @@ -201,9 +218,18 @@ func (k Keeper) ListPendingCctxWithinRateLimit(c context.Context, req *types.Que
if err != nil {
return nil, err
}
isOutgoing := isCCTXOutgoing(cctx)

// skip the cctx if rate limit is exceeded but still accumulate the total withdraw value
if rateLimitExceeded(chain.ChainId, cctx, gasCoinRates, erc20CoinRates, foreignCoinMap, &totalWithdrawInAzeta, withdrawLimitInAzeta) {
if isOutgoing && rateLimitExceeded(
chain.ChainId,
cctx,
gasCoinRates,
erc20CoinRates,
foreignCoinMap,
&totalWithdrawInAzeta,
withdrawLimitInAzeta,
) {
limitExceeded = true
continue
}
Expand Down
57 changes: 48 additions & 9 deletions x/crosschain/keeper/grpc_query_cctx_rate_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"cosmossdk.io/math"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"
"github.com/zeta-chain/zetacore/pkg/chains"
"github.com/zeta-chain/zetacore/pkg/coin"
keepertest "github.com/zeta-chain/zetacore/testutil/keeper"
"github.com/zeta-chain/zetacore/testutil/sample"
Expand All @@ -22,6 +23,9 @@ var (

// local btc chain ID
btcChainID = getValidBtcChainID()

// local zeta chain ID
zetaChainID = chains.ZetaPrivnetChain().ChainId
)

// createTestRateLimiterFlags creates a custom rate limiter flags
Expand Down Expand Up @@ -67,7 +71,8 @@ func createCctxsWithCoinTypeAndHeightRange(
t *testing.T,
lowBlock uint64,
highBlock uint64,
chainID int64,
senderChainID int64,
receiverChainID int64,
coinType coin.CoinType,
asset string,
amount uint64,
Expand All @@ -76,13 +81,13 @@ func createCctxsWithCoinTypeAndHeightRange(
// create 1 pending cctxs per block
for i := lowBlock; i <= highBlock; i++ {
nonce := i - 1
cctx := sample.CrossChainTx(t, fmt.Sprintf("%d-%d", chainID, nonce))
cctx := sample.CrossChainTx(t, fmt.Sprintf("%d-%d", receiverChainID, nonce))
cctx.CctxStatus.Status = status
cctx.InboundTxParams.SenderChainId = chainID
cctx.InboundTxParams.SenderChainId = senderChainID
cctx.InboundTxParams.CoinType = coinType
cctx.InboundTxParams.Asset = asset
cctx.InboundTxParams.InboundTxObservedExternalHeight = i
cctx.GetCurrentOutTxParam().ReceiverChainId = chainID
cctx.GetCurrentOutTxParam().ReceiverChainId = receiverChainID
cctx.GetCurrentOutTxParam().Amount = sdk.NewUint(amount)
cctx.GetCurrentOutTxParam().OutboundTxTssNonce = nonce
cctxs = append(cctxs, cctx)
Expand All @@ -101,7 +106,7 @@ func setCctxsInKeeper(
for _, cctx := range cctxs {
k.SetCrossChainTx(ctx, *cctx)
zk.ObserverKeeper.SetNonceToCctx(ctx, observertypes.NonceToCctx{
ChainId: cctx.InboundTxParams.SenderChainId,
ChainId: cctx.GetCurrentOutTxParam().ReceiverChainId,
// #nosec G701 always in range for tests
Nonce: int64(cctx.GetCurrentOutTxParam().OutboundTxTssNonce),
CctxIndex: cctx.Index,
Expand Down Expand Up @@ -275,13 +280,18 @@ func TestKeeper_ListPendingCctxWithinRateLimit(t *testing.T) {

// create Eth chain 999 mined and 200 pending cctxs for rate limiter test
// the number 999 is to make it less than `MaxLookbackNonce` so the LoopBackwards gets the chance to hit nonce 0
ethMinedCctxs := createCctxsWithCoinTypeAndHeightRange(t, 1, 999, ethChainID, coin.CoinType_Gas, "", uint64(1e15), types.CctxStatus_OutboundMined)
ethPendingCctxs := createCctxsWithCoinTypeAndHeightRange(t, 1000, 1199, ethChainID, coin.CoinType_Gas, "", uint64(1e15), types.CctxStatus_PendingOutbound)
ethMinedCctxs := createCctxsWithCoinTypeAndHeightRange(t, 1, 999, zetaChainID, ethChainID, coin.CoinType_Gas, "", uint64(1e15), types.CctxStatus_OutboundMined)
ethPendingCctxs := createCctxsWithCoinTypeAndHeightRange(t, 1000, 1199, zetaChainID, ethChainID, coin.CoinType_Gas, "", uint64(1e15), types.CctxStatus_PendingOutbound)

// create Eth chain 999 reverted and 200 pending revert cctxs for rate limiter test
// these cctxs should be just ignored by the rate limiter as we can't compare their `ObservedExternalHeight` with window boundary
ethRevertedCctxs := createCctxsWithCoinTypeAndHeightRange(t, 1, 999, ethChainID, ethChainID, coin.CoinType_Gas, "", uint64(1e15), types.CctxStatus_Reverted)
ethPendingRevertCctxs := createCctxsWithCoinTypeAndHeightRange(t, 1000, 1199, ethChainID, ethChainID, coin.CoinType_Gas, "", uint64(1e15), types.CctxStatus_PendingRevert)

// create Btc chain 999 mined and 200 pending cctxs for rate limiter test
// the number 999 is to make it less than `MaxLookbackNonce` so the LoopBackwards gets the chance to hit nonce 0
btcMinedCctxs := createCctxsWithCoinTypeAndHeightRange(t, 1, 999, btcChainID, coin.CoinType_Gas, "", 1000, types.CctxStatus_OutboundMined)
btcPendingCctxs := createCctxsWithCoinTypeAndHeightRange(t, 1000, 1199, btcChainID, coin.CoinType_Gas, "", 1000, types.CctxStatus_PendingOutbound)
btcMinedCctxs := createCctxsWithCoinTypeAndHeightRange(t, 1, 999, zetaChainID, btcChainID, coin.CoinType_Gas, "", 1000, types.CctxStatus_OutboundMined)
btcPendingCctxs := createCctxsWithCoinTypeAndHeightRange(t, 1000, 1199, zetaChainID, btcChainID, coin.CoinType_Gas, "", 1000, types.CctxStatus_PendingOutbound)

// define test cases
tests := []struct {
Expand Down Expand Up @@ -387,6 +397,35 @@ func TestKeeper_ListPendingCctxWithinRateLimit(t *testing.T) {
expectedWithdrawRate: sdk.NewInt(3e18).String(), // 3 ZETA, (2.5 + 0.5) per block
rateLimitExceeded: false,
},
{
name: "can ignore reverted or pending revert cctxs and retrieve all pending cctx without exceeding rate limit",
rateLimitFlags: createTestRateLimiterFlags(500, math.NewUint(10*1e18), zrc20ETH, zrc20BTC, zrc20USDT, "2500", "50000", "0.8"),
ethMinedCctxs: ethRevertedCctxs, // replace mined cctxs with reverted cctxs, should be ignored
ethPendingCctxs: ethPendingRevertCctxs, // replace pending cctxs with pending revert cctxs, should be ignored
ethPendingNonces: observertypes.PendingNonces{
ChainId: ethChainID,
NonceLow: 1099,
NonceHigh: 1199,
Tss: tss.TssPubkey,
},
btcMinedCctxs: btcMinedCctxs,
btcPendingCctxs: btcPendingCctxs,
btcPendingNonces: observertypes.PendingNonces{
ChainId: btcChainID,
NonceLow: 1099,
NonceHigh: 1199,
Tss: tss.TssPubkey,
},
currentHeight: 1199,
queryLimit: keeper.MaxPendingCctxs,
expectedCctxs: append(
append([]*types.CrossChainTx{}, ethPendingRevertCctxs...),
btcPendingCctxs...),
expectedTotalPending: 400,
expectedWithdrawWindow: 500, // the sliding window
expectedWithdrawRate: sdk.NewInt(5e17).String(), // 0.5 ZETA per block, only btc cctxs should be counted
rateLimitExceeded: false,
},
{
name: "can loop backwards all the way to endNonce 0",
rateLimitFlags: createTestRateLimiterFlags(500, math.NewUint(10*1e18), zrc20ETH, zrc20BTC, zrc20USDT, "2500", "50000", "0.8"),
Expand Down

0 comments on commit e627adf

Please sign in to comment.