Skip to content

Commit

Permalink
added total value in rate limiter window for monitoring purpose
Browse files Browse the repository at this point in the history
  • Loading branch information
ws4charlie committed Apr 20, 2024
1 parent 16955b8 commit 249bcaa
Show file tree
Hide file tree
Showing 10 changed files with 193 additions and 136 deletions.
3 changes: 3 additions & 0 deletions docs/openapi/openapi.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54013,6 +54013,9 @@ definitions:
total_pending:
type: string
format: uint64
value_within_window:
type: string
format: uint64
rate_limit_exceeded:
type: boolean
crosschainQueryMessagePassingProtocolFeeResponse:
Expand Down
3 changes: 2 additions & 1 deletion proto/crosschain/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,8 @@ message QueryListPendingCctxWithinRateLimitRequest {
message QueryListPendingCctxWithinRateLimitResponse {
repeated CrossChainTx cross_chain_tx = 1;
uint64 total_pending = 2;
bool rate_limit_exceeded = 3;
uint64 value_within_window = 3;
bool rate_limit_exceeded = 4;
}

message QueryLastZetaHeightRequest {}
Expand Down
7 changes: 6 additions & 1 deletion typescript/crosschain/query_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -910,7 +910,12 @@ export declare class QueryListPendingCctxWithinRateLimitResponse extends Message
totalPending: bigint;

/**
* @generated from field: bool rate_limit_exceeded = 3;
* @generated from field: uint64 value_within_window = 3;
*/
valueWithinWindow: bigint;

/**
* @generated from field: bool rate_limit_exceeded = 4;
*/
rateLimitExceeded: boolean;

Expand Down
1 change: 1 addition & 0 deletions x/crosschain/keeper/grpc_query_cctx_rate_limit.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ LoopForwards:
return &types.QueryListPendingCctxWithinRateLimitResponse{
CrossChainTx: cctxs,
TotalPending: totalPending,
ValueWithinWindow: totalCctxValueInZeta.TruncateInt().Uint64(),
RateLimitExceeded: limitExceeded,
}, nil
}
Expand Down
8 changes: 7 additions & 1 deletion x/crosschain/keeper/grpc_query_cctx_rate_limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,13 @@ func TestKeeper_ListPendingCctxWithinRateLimit(t *testing.T) {
require.EqualValues(t, cctxsBTC, res.CrossChainTx[200:400])
require.EqualValues(t, uint64(400), res.TotalPending)
require.False(t, res.RateLimitExceeded)
require.Equal(t, uint64(1500), res.ValueWithinWindow) // 500 * (2.5 + 0.5)
})
t.Run("Set rate to a lower value (< 1200) to early break the LoopBackwards with criteria #2", func(t *testing.T) {
k, ctx, cctxsETH, cctxsBTC, rlFlags := createKeeperForRateLimiterTest(t)

rlFlags.Rate = math.NewUint(1000) // 1000 ZETA
rate := uint64(1000) // 1000 ZETA
rlFlags.Rate = math.NewUint(rate)
k.SetRateLimiterFlags(ctx, rlFlags)

res, err := k.ListPendingCctxWithinRateLimit(ctx, &types.QueryListPendingCctxWithinRateLimitRequest{})
Expand All @@ -420,6 +422,7 @@ func TestKeeper_ListPendingCctxWithinRateLimit(t *testing.T) {
require.EqualValues(t, cctxsBTC[:100], res.CrossChainTx[100:200])
require.EqualValues(t, uint64(400), res.TotalPending)
require.True(t, res.RateLimitExceeded)
require.True(t, res.ValueWithinWindow >= rate)
})
t.Run("Set high rate and big window to early to break inner loop with the criteria #1", func(t *testing.T) {
k, ctx, cctxsETH, cctxsBTC, rlFlags := createKeeperForRateLimiterTest(t)
Expand All @@ -436,6 +439,7 @@ func TestKeeper_ListPendingCctxWithinRateLimit(t *testing.T) {
require.EqualValues(t, cctxsBTC, res.CrossChainTx[200:400])
require.EqualValues(t, uint64(400), res.TotalPending)
require.False(t, res.RateLimitExceeded)
require.EqualValues(t, uint64(3450), res.ValueWithinWindow) // 1150 * (2.5 + 0.5)
})
t.Run("Set lower request limit to early break the LoopForwards loop", func(t *testing.T) {
k, ctx, cctxsETH, cctxsBTC, _ := createKeeperForRateLimiterTest(t)
Expand All @@ -447,6 +451,7 @@ func TestKeeper_ListPendingCctxWithinRateLimit(t *testing.T) {
require.EqualValues(t, cctxsBTC, res.CrossChainTx[100:300])
require.EqualValues(t, uint64(400), res.TotalPending)
require.False(t, res.RateLimitExceeded)
require.EqualValues(t, uint64(1250), res.ValueWithinWindow) // 500 * 0.5 + 400 * 2.5
})
t.Run("Set rate to middle value (1200 < rate < 1500) to early break the LoopForwards loop with criteria #2", func(t *testing.T) {
k, ctx, cctxsETH, cctxsBTC, rlFlags := createKeeperForRateLimiterTest(t)
Expand All @@ -462,5 +467,6 @@ func TestKeeper_ListPendingCctxWithinRateLimit(t *testing.T) {
require.EqualValues(t, cctxsBTC, res.CrossChainTx[120:320])
require.EqualValues(t, uint64(400), res.TotalPending)
require.True(t, res.RateLimitExceeded)
require.True(t, res.ValueWithinWindow >= 1300)
})
}
279 changes: 158 additions & 121 deletions x/crosschain/types/query.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion zetaclient/interfaces/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type ZetaCoreBridger interface {
GetZetaBlockHeight() (int64, error)
GetLastBlockHeightByChain(chain chains.Chain) (*crosschaintypes.LastBlockHeight, error)
ListPendingCctx(chainID int64) ([]*crosschaintypes.CrossChainTx, uint64, error)
ListPendingCctxWithinRatelimit() ([]*crosschaintypes.CrossChainTx, uint64, bool, error)
ListPendingCctxWithinRatelimit() ([]*crosschaintypes.CrossChainTx, uint64, uint64, bool, error)
GetPendingNoncesByChain(chainID int64) (observertypes.PendingNonces, error)
GetCctxByNonce(chainID int64, nonce uint64) (*crosschaintypes.CrossChainTx, error)
GetOutTxTracker(chain chains.Chain, nonce uint64) (*crosschaintypes.OutTxTracker, error)
Expand Down
6 changes: 3 additions & 3 deletions zetaclient/testutils/stub/core_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ func (z *MockZetaCoreBridge) ListPendingCctx(_ int64) ([]*cctxtypes.CrossChainTx
return []*cctxtypes.CrossChainTx{}, 0, nil
}

func (z *MockZetaCoreBridge) ListPendingCctxWithinRatelimit() ([]*cctxtypes.CrossChainTx, uint64, bool, error) {
func (z *MockZetaCoreBridge) ListPendingCctxWithinRatelimit() ([]*cctxtypes.CrossChainTx, uint64, uint64, bool, error) {
if z.paused {
return nil, 0, false, errors.New(ErrMsgPaused)
return nil, 0, 0, false, errors.New(ErrMsgPaused)
}
return []*cctxtypes.CrossChainTx{}, 0, false, nil
return []*cctxtypes.CrossChainTx{}, 0, 0, false, nil
}

func (z *MockZetaCoreBridge) GetPendingNoncesByChain(_ int64) (observerTypes.PendingNonces, error) {
Expand Down
6 changes: 3 additions & 3 deletions zetaclient/zetabridge/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (b *ZetaCoreBridge) ListPendingCctx(chainID int64) ([]*types.CrossChainTx,
// ListPendingCctxWithinRatelimit returns a list of pending cctxs that do not exceed the outbound rate limit
// - The max size of the list is crosschainkeeper.MaxPendingCctxs
// - The returned `rateLimitExceeded` flag indicates if the rate limit is exceeded or not
func (b *ZetaCoreBridge) ListPendingCctxWithinRatelimit() ([]*types.CrossChainTx, uint64, bool, error) {
func (b *ZetaCoreBridge) ListPendingCctxWithinRatelimit() ([]*types.CrossChainTx, uint64, uint64, bool, error) {

Check warning on line 142 in zetaclient/zetabridge/query.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetabridge/query.go#L142

Added line #L142 was not covered by tests
client := types.NewQueryClient(b.grpcConn)
maxSizeOption := grpc.MaxCallRecvMsgSize(32 * 1024 * 1024)
resp, err := client.ListPendingCctxWithinRateLimit(
Expand All @@ -148,9 +148,9 @@ func (b *ZetaCoreBridge) ListPendingCctxWithinRatelimit() ([]*types.CrossChainTx
maxSizeOption,
)
if err != nil {
return nil, 0, false, err
return nil, 0, 0, false, err

Check warning on line 151 in zetaclient/zetabridge/query.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetabridge/query.go#L151

Added line #L151 was not covered by tests
}
return resp.CrossChainTx, resp.TotalPending, resp.RateLimitExceeded, nil
return resp.CrossChainTx, resp.TotalPending, resp.ValueWithinWindow, resp.RateLimitExceeded, nil

Check warning on line 153 in zetaclient/zetabridge/query.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetabridge/query.go#L153

Added line #L153 was not covered by tests
}

func (b *ZetaCoreBridge) GetAbortedZetaAmount() (string, error) {
Expand Down
14 changes: 9 additions & 5 deletions zetaclient/zetacore_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,14 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) {
metrics.HotKeyBurnRate.Set(float64(co.ts.HotKeyBurnRate.GetBurnRate().Int64()))

// query pending cctxs across all foreign chains with rate limit
cctxMap, err := co.getAllPendingCctxWithRatelimit()
cctxMap, valueWithinWindow, err := co.getAllPendingCctxWithRatelimit()

Check warning on line 137 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L137

Added line #L137 was not covered by tests
if err != nil {
co.logger.ZetaChainWatcher.Error().Err(err).Msgf("startCctxScheduler: queryPendingCctxWithRatelimit failed")
}
// print value within rate limiter window every minute
if bn%10 == 0 {
co.logger.ZetaChainWatcher.Debug().Msgf("startCctxScheduler: value within rate limiter window is %d ZETA", valueWithinWindow)

Check warning on line 143 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L142-L143

Added lines #L142 - L143 were not covered by tests
}

// schedule keysign for pending cctxs on each chain
coreContext := appContext.ZetaCoreContext()
Expand Down Expand Up @@ -190,10 +194,10 @@ func (co *CoreObserver) startCctxScheduler(appContext *appcontext.AppContext) {
}

// getAllPendingCctxWithRatelimit get pending cctxs across all foreign chains with rate limit
func (co *CoreObserver) getAllPendingCctxWithRatelimit() (map[int64][]*types.CrossChainTx, error) {
cctxList, totalPending, rateLimitExceeded, err := co.bridge.ListPendingCctxWithinRatelimit()
func (co *CoreObserver) getAllPendingCctxWithRatelimit() (map[int64][]*types.CrossChainTx, uint64, error) {
cctxList, totalPending, valueWithinWindow, rateLimitExceeded, err := co.bridge.ListPendingCctxWithinRatelimit()

Check warning on line 198 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L197-L198

Added lines #L197 - L198 were not covered by tests
if err != nil {
return nil, err
return nil, 0, err

Check warning on line 200 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L200

Added line #L200 was not covered by tests
}
if rateLimitExceeded {
co.logger.ZetaChainWatcher.Warn().Msgf("rate limit exceeded, fetched %d cctxs out of %d", len(cctxList), totalPending)
Expand All @@ -209,7 +213,7 @@ func (co *CoreObserver) getAllPendingCctxWithRatelimit() (map[int64][]*types.Cro
cctxMap[chainID] = append(cctxMap[chainID], cctx)
}

return cctxMap, nil
return cctxMap, valueWithinWindow, nil

Check warning on line 216 in zetaclient/zetacore_observer.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore_observer.go#L216

Added line #L216 was not covered by tests
}

// scheduleCctxEVM schedules evm outtx keysign on each ZetaChain block (the ticker)
Expand Down

0 comments on commit 249bcaa

Please sign in to comment.