Skip to content

Commit

Permalink
Merge branch 'unit-test-rate-limit' into e2e/rate-limiter
Browse files Browse the repository at this point in the history
  • Loading branch information
lumtis authored Apr 25, 2024
2 parents 294ad9b + daba269 commit 78ad7a3
Show file tree
Hide file tree
Showing 13 changed files with 432 additions and 277 deletions.
6 changes: 4 additions & 2 deletions docs/openapi/openapi.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54013,9 +54013,11 @@ definitions:
total_pending:
type: string
format: uint64
value_within_window:
current_withdraw_window:
type: string
format: int64
current_withdraw_rate:
type: string
format: uint64
rate_limit_exceeded:
type: boolean
crosschainQueryMessagePassingProtocolFeeResponse:
Expand Down
5 changes: 3 additions & 2 deletions proto/crosschain/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,9 @@ message QueryListPendingCctxWithinRateLimitRequest {
message QueryListPendingCctxWithinRateLimitResponse {
repeated CrossChainTx cross_chain_tx = 1;
uint64 total_pending = 2;
uint64 value_within_window = 3;
bool rate_limit_exceeded = 4;
int64 current_withdraw_window = 3;
string current_withdraw_rate = 4;
bool rate_limit_exceeded = 5;
}

message QueryLastZetaHeightRequest {}
Expand Down
11 changes: 8 additions & 3 deletions typescript/crosschain/query_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -910,12 +910,17 @@ export declare class QueryListPendingCctxWithinRateLimitResponse extends Message
totalPending: bigint;

/**
* @generated from field: uint64 value_within_window = 3;
* @generated from field: int64 current_withdraw_window = 3;
*/
valueWithinWindow: bigint;
currentWithdrawWindow: bigint;

/**
* @generated from field: bool rate_limit_exceeded = 4;
* @generated from field: string current_withdraw_rate = 4;
*/
currentWithdrawRate: string;

/**
* @generated from field: bool rate_limit_exceeded = 5;
*/
rateLimitExceeded: boolean;

Expand Down
5 changes: 4 additions & 1 deletion x/crosschain/keeper/grpc_query_cctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
const (
// MaxPendingCctxs is the maximum number of pending cctxs that can be queried
MaxPendingCctxs = 500

// MaxLookbackNonce is the maximum number of nonces to look back to find missed pending cctxs
MaxLookbackNonce = 1000
)

func (k Keeper) ZetaAccounting(c context.Context, _ *types.QueryZetaAccountingRequest) (*types.QueryZetaAccountingResponse, error) {
Expand Down Expand Up @@ -122,7 +125,7 @@ func (k Keeper) ListPendingCctx(c context.Context, req *types.QueryListPendingCc
// now query the previous nonces up to 1000 prior to find any pending cctx that we might have missed
// need this logic because a confirmation of higher nonce will automatically update the p.NonceLow
// therefore might mask some lower nonce cctx that is still pending.
startNonce := pendingNonces.NonceLow - 1000
startNonce := pendingNonces.NonceLow - MaxLookbackNonce
if startNonce < 0 {
startNonce = 0
}
Expand Down
87 changes: 49 additions & 38 deletions x/crosschain/keeper/grpc_query_cctx_rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (k Keeper) ListPendingCctxWithinRateLimit(c context.Context, req *types.Que
// define a few variables to be used in the query loops
limitExceeded := false
totalPending := uint64(0)
totalCctxValueInZeta := sdk.NewDec(0)
totalWithdrawInZeta := sdk.NewDec(0)
cctxs := make([]*types.CrossChainTx, 0)
chains := k.zetaObserverKeeper.GetSupportedForeignChains(ctx)

Expand All @@ -41,6 +41,9 @@ func (k Keeper) ListPendingCctxWithinRateLimit(c context.Context, req *types.Que
if !found || !rateLimitFlags.Enabled {
applyLimit = false
}
if rateLimitFlags.Rate.IsNil() || rateLimitFlags.Rate.IsZero() {
applyLimit = false
}

// fallback to non-rate-limited query if rate limiter is disabled
if !applyLimit {
Expand Down Expand Up @@ -83,12 +86,14 @@ func (k Keeper) ListPendingCctxWithinRateLimit(c context.Context, req *types.Que
if applyLimit {
gasCoinRates, erc20CoinRates = k.GetRateLimiterRates(ctx)
foreignCoinMap = k.fungibleKeeper.GetAllForeignCoinMap(ctx)
windowLimitInZeta = sdk.NewDecFromBigInt(rateLimitFlags.Rate.BigInt())
blockLimitInZeta = windowLimitInZeta.Quo(sdk.NewDec(rateLimitFlags.Window))

// convert the rate limit from aZETA to ZETA
blockLimitInZeta = sdk.NewDecFromBigInt(rateLimitFlags.Rate.BigInt()).Quo(sdk.NewDec(10).Power(18))
windowLimitInZeta = blockLimitInZeta.Mul(sdk.NewDec(rateLimitFlags.Window))
}

// the criteria to stop adding cctxs to the rpc response
maxCCTXsReached := func() bool {
maxCCTXsReached := func(cctxs []*types.CrossChainTx) bool {
// #nosec G701 len always positive
return uint32(len(cctxs)) >= limit
}
Expand Down Expand Up @@ -124,19 +129,21 @@ func (k Keeper) ListPendingCctxWithinRateLimit(c context.Context, req *types.Que
}
}

// invariant: for period of time > window, the average rate per block cannot exceed `blockLimitInZeta`
pendingCctxsLimitInZeta := windowLimitInZeta
// invariant: for period of time >= `rateLimitFlags.Window`, the zetaclient-side average withdraw rate should be <= `blockLimitInZeta`
// otherwise, this query should return empty result and wait for the average rate to drop below `blockLimitInZeta`
withdrawWindow := rateLimitFlags.Window
withdrawLimitInZeta := windowLimitInZeta
if lowestPendingCctxHeight != 0 {
// `pendingCctxWindow` is the width of [lowestPendingCctxHeight, height] window
// if the window can be wider than the rate limit window, we should adjust the total limit proportionally
// if the window can be wider than `rateLimitFlags.Window`, we should adjust the total withdraw limit proportionally
pendingCctxWindow := height - lowestPendingCctxHeight + 1
if pendingCctxWindow > rateLimitFlags.Window {
pendingCctxsLimitInZeta = blockLimitInZeta.Mul(sdk.NewDec(pendingCctxWindow))
withdrawWindow = pendingCctxWindow
withdrawLimitInZeta = blockLimitInZeta.Mul(sdk.NewDec(pendingCctxWindow))
}
}

// query backwards for potential missed pending cctxs for each foreign chain
LoopBackwards:
for _, chain := range chains {
// we should at least query 1000 prior to find any pending cctx that we might have missed
// this logic is needed because a confirmation of higher nonce will automatically update the p.NonceLow
Expand All @@ -148,17 +155,11 @@ LoopBackwards:
}

startNonce := pendingNonces.NonceLow - 1
endNonce := pendingNonces.NonceLow - 1000
endNonce := pendingNonces.NonceLow - MaxLookbackNonce
if endNonce < 0 {
endNonce = 0
}

// add the pending nonces to the total pending
// Note: the `totalPending` may not be accurate only if the rate limiter triggers early exit
// `totalPending` is now used for metrics only and it's okay to trade off accuracy for performance
// #nosec G701 always in range
totalPending += uint64(pendingNonces.NonceHigh - pendingNonces.NonceLow)

// query cctx by nonce backwards to the left boundary of the rate limit sliding window
for nonce := startNonce; nonce >= 0; nonce-- {
cctx, err := getCctxByChainIDAndNonce(k, ctx, tss.TssPubkey, chain.ChainId, nonce)
Expand All @@ -167,31 +168,32 @@ LoopBackwards:
}
inWindow := isCctxInWindow(cctx)

// We should at least go backwards by 1000 nonces to pick up missed pending cctxs
// We might go even further back if rate limiter is enabled and the endNonce hasn't hit the left window boundary yet
// There are two criteria to stop scanning backwards:
// criteria #1: we'll stop at the left window boundary if the `endNonce` hasn't hit it yet
// we should at least go backwards by 1000 nonces to pick up missed pending cctxs
// we might go even further back if rate limiter is enabled and the endNonce hasn't hit the left window boundary yet
// stop at the left window boundary if the `endNonce` hasn't hit it yet
if nonce < endNonce && !inWindow {
break
}
// criteria #2: we should finish the RPC call if the rate limit is exceeded
if inWindow && rateLimitExceeded(chain.ChainId, cctx, gasCoinRates, erc20CoinRates, foreignCoinMap, &totalCctxValueInZeta, windowLimitInZeta) {
// skip the cctx if rate limit is exceeded but still accumulate the total withdraw value
if inWindow && rateLimitExceeded(chain.ChainId, cctx, gasCoinRates, erc20CoinRates, foreignCoinMap, &totalWithdrawInZeta, withdrawLimitInZeta) {
limitExceeded = true
break LoopBackwards
continue
}

// only take a `limit` number of pending cctxs as result but still count the total pending cctxs
if IsPending(cctx) {
totalPending++
if !maxCCTXsReached() {
if !maxCCTXsReached(cctxs) {
cctxs = append(cctxs, cctx)
}
}
}
}

// remember the number of missed pending cctxs
missedPending := len(cctxs)

// query forwards for pending cctxs for each foreign chain
LoopForwards:
for _, chain := range chains {
// query the pending cctxs in range [NonceLow, NonceHigh)
pendingNonces := pendingNoncesMap[chain.ChainId]
Expand All @@ -200,25 +202,33 @@ LoopForwards:
continue
}

// #nosec G701 always in range

Check failure on line 205 in x/crosschain/keeper/grpc_query_cctx_rate_limit.go

View workflow job for this annotation

GitHub Actions / lint

File is not `goimports`-ed (goimports)
totalPending += uint64(pendingNonces.NonceHigh - pendingNonces.NonceLow)

for nonce := pendingNonces.NonceLow; nonce < pendingNonces.NonceHigh; nonce++ {
cctx, err := getCctxByChainIDAndNonce(k, ctx, tss.TssPubkey, chain.ChainId, nonce)
if err != nil {
return nil, err
}

// only take a `limit` number of pending cctxs as result
if maxCCTXsReached() {
break LoopForwards
}
// criteria #2: we should finish the RPC call if the rate limit is exceeded
if rateLimitExceeded(chain.ChainId, cctx, gasCoinRates, erc20CoinRates, foreignCoinMap, &totalCctxValueInZeta, pendingCctxsLimitInZeta) {
// skip the cctx if rate limit is exceeded but still accumulate the total withdraw value
if rateLimitExceeded(chain.ChainId, cctx, gasCoinRates, erc20CoinRates, foreignCoinMap, &totalWithdrawInZeta, withdrawLimitInZeta) {
limitExceeded = true
break LoopForwards
continue
}
// only take a `limit` number of pending cctxs as result
if maxCCTXsReached(cctxs) {
continue
}
cctxs = append(cctxs, cctx)
}
}

// if the rate limit is exceeded, only return the missed pending cctxs
if limitExceeded {
cctxs = cctxs[:missedPending]
}

// sort the cctxs by chain ID and nonce (lower nonce holds higher priority for scheduling)
sort.Slice(cctxs, func(i, j int) bool {
if cctxs[i].GetCurrentOutTxParam().ReceiverChainId == cctxs[j].GetCurrentOutTxParam().ReceiverChainId {
Expand All @@ -228,10 +238,11 @@ LoopForwards:
})

return &types.QueryListPendingCctxWithinRateLimitResponse{
CrossChainTx: cctxs,
TotalPending: totalPending,
ValueWithinWindow: totalCctxValueInZeta.TruncateInt().Uint64(),
RateLimitExceeded: limitExceeded,
CrossChainTx: cctxs,
TotalPending: totalPending,
CurrentWithdrawWindow: withdrawWindow,
CurrentWithdrawRate: totalWithdrawInZeta.Mul(sdk.NewDec(10).Power(18)).Quo(sdk.NewDec(withdrawWindow)).String(),
RateLimitExceeded: limitExceeded,
}, nil
}

Expand Down Expand Up @@ -306,9 +317,9 @@ func rateLimitExceeded(
erc20CoinRates map[int64]map[string]sdk.Dec,
foreignCoinMap map[int64]map[string]fungibletypes.ForeignCoins,
currentCctxValue *sdk.Dec,
rateLimitValue sdk.Dec,
withdrawLimitInZeta sdk.Dec,
) bool {
amountZeta := ConvertCctxValue(chainID, cctx, gasCoinRates, erc20CoinRates, foreignCoinMap)
*currentCctxValue = currentCctxValue.Add(amountZeta)
return currentCctxValue.GT(rateLimitValue)
return currentCctxValue.GT(withdrawLimitInZeta)
}
Loading

0 comments on commit 78ad7a3

Please sign in to comment.