Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: outbound cctx scheduling with rate limiter #2045

Merged
merged 15 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
* [1954](https://github.com/zeta-chain/node/pull/1954) - add metric for concurrent keysigns
* [1979](https://github.com/zeta-chain/node/pull/1979) - add script to import genesis data into an existing genesis file
* [2006](https://github.com/zeta-chain/node/pull/2006) - add Amoy testnet static chain information
* [2045](https://github.com/zeta-chain/node/pull/2046) - add grpc query with outbound rate limit for zetaclient to use
* [2046](https://github.com/zeta-chain/node/pull/2046) - add state variable in crosschain for rate limiter flags
* [2034](https://github.com/zeta-chain/node/pull/2034) - add support for zEVM message passing

Expand Down
2 changes: 1 addition & 1 deletion cmd/zetae2e/stress.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func EchoNetworkMetrics(runner *runner.E2ERunner) {
case <-ticker.C:
numTicks++
// Get all pending outbound transactions
cctxResp, err := runner.CctxClient.CctxListPending(context.Background(), &crosschaintypes.QueryListCctxPendingRequest{
cctxResp, err := runner.CctxClient.ListPendingCctx(context.Background(), &crosschaintypes.QueryListPendingCctxRequest{
ChainId: chainID.Int64(),
})
if err != nil {
Expand Down
88 changes: 61 additions & 27 deletions docs/openapi/openapi.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26582,32 +26582,6 @@ paths:
type: string
tags:
- Query
/zeta-chain/crosschain/cctxPending:
get:
summary: Queries a list of pending cctxs.
operationId: Query_CctxListPending
responses:
"200":
description: A successful response.
schema:
$ref: '#/definitions/crosschainQueryListCctxPendingResponse'
default:
description: An unexpected error response.
schema:
$ref: '#/definitions/googlerpcStatus'
parameters:
- name: chain_id
in: query
required: false
type: string
format: int64
- name: limit
in: query
required: false
type: integer
format: int64
tags:
- Query
/zeta-chain/crosschain/convertGasToZeta:
get:
operationId: Query_ConvertGasToZeta
Expand Down Expand Up @@ -27173,6 +27147,53 @@ paths:
type: boolean
tags:
- Query
/zeta-chain/crosschain/pendingCctx:
get:
summary: Queries a list of pending cctxs.
operationId: Query_ListPendingCctx
responses:
"200":
description: A successful response.
schema:
$ref: '#/definitions/crosschainQueryListPendingCctxResponse'
default:
description: An unexpected error response.
schema:
$ref: '#/definitions/googlerpcStatus'
parameters:
- name: chain_id
in: query
required: false
type: string
format: int64
- name: limit
in: query
required: false
type: integer
format: int64
tags:
- Query
/zeta-chain/crosschain/pendingCctxWithinRateLimit:
get:
summary: Queries a list of pending cctxs within rate limit.
operationId: Query_ListPendingCctxWithinRateLimit
responses:
"200":
description: A successful response.
schema:
$ref: '#/definitions/crosschainQueryListPendingCctxWithinRateLimitResponse'
default:
description: An unexpected error response.
schema:
$ref: '#/definitions/googlerpcStatus'
parameters:
- name: limit
in: query
required: false
type: integer
format: int64
tags:
- Query
/zeta-chain/crosschain/protocolFee:
get:
operationId: Query_ProtocolFee
Expand Down Expand Up @@ -53970,7 +53991,7 @@ definitions:
Height:
type: string
format: int64
crosschainQueryListCctxPendingResponse:
crosschainQueryListPendingCctxResponse:
type: object
properties:
CrossChainTx:
Expand All @@ -53981,6 +54002,19 @@ definitions:
totalPending:
type: string
format: uint64
crosschainQueryListPendingCctxWithinRateLimitResponse:
type: object
properties:
cross_chain_tx:
type: array
items:
type: object
$ref: '#/definitions/crosschainCrossChainTx'
total_pending:
type: string
format: uint64
rate_limit_exceeded:
type: boolean
crosschainQueryMessagePassingProtocolFeeResponse:
type: object
properties:
Expand Down
23 changes: 19 additions & 4 deletions proto/crosschain/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,13 @@ service Query {
}

// Queries a list of pending cctxs.
rpc CctxListPending(QueryListCctxPendingRequest) returns (QueryListCctxPendingResponse) {
option (google.api.http).get = "/zeta-chain/crosschain/cctxPending";
rpc ListPendingCctx(QueryListPendingCctxRequest) returns (QueryListPendingCctxResponse) {
option (google.api.http).get = "/zeta-chain/crosschain/pendingCctx";
}

// Queries a list of pending cctxs within rate limit.
rpc ListPendingCctxWithinRateLimit(QueryListPendingCctxWithinRateLimitRequest) returns (QueryListPendingCctxWithinRateLimitResponse) {
option (google.api.http).get = "/zeta-chain/crosschain/pendingCctxWithinRateLimit";
}

rpc ZetaAccounting(QueryZetaAccountingRequest) returns (QueryZetaAccountingResponse) {
Expand Down Expand Up @@ -248,16 +253,26 @@ message QueryAllCctxResponse {
cosmos.base.query.v1beta1.PageResponse pagination = 2;
}

message QueryListCctxPendingRequest {
message QueryListPendingCctxRequest {
int64 chain_id = 1;
uint32 limit = 2;
}

message QueryListCctxPendingResponse {
message QueryListPendingCctxResponse {
repeated CrossChainTx CrossChainTx = 1;
uint64 totalPending = 2;
}

message QueryListPendingCctxWithinRateLimitRequest {
uint32 limit = 1;
}

message QueryListPendingCctxWithinRateLimitResponse {
repeated CrossChainTx cross_chain_tx = 1;
uint64 total_pending = 2;
bool rate_limit_exceeded = 3;
}

message QueryLastZetaHeightRequest {}

message QueryLastZetaHeightResponse {
Expand Down
20 changes: 20 additions & 0 deletions testutil/keeper/mocks/crosschain/fungible.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions testutil/keeper/mocks/crosschain/observer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

90 changes: 74 additions & 16 deletions typescript/crosschain/query_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -814,9 +814,9 @@ export declare class QueryAllCctxResponse extends Message<QueryAllCctxResponse>
}

/**
* @generated from message zetachain.zetacore.crosschain.QueryListCctxPendingRequest
* @generated from message zetachain.zetacore.crosschain.QueryListPendingCctxRequest
*/
export declare class QueryListCctxPendingRequest extends Message<QueryListCctxPendingRequest> {
export declare class QueryListPendingCctxRequest extends Message<QueryListPendingCctxRequest> {
/**
* @generated from field: int64 chain_id = 1;
*/
Expand All @@ -827,25 +827,25 @@ export declare class QueryListCctxPendingRequest extends Message<QueryListCctxPe
*/
limit: number;

constructor(data?: PartialMessage<QueryListCctxPendingRequest>);
constructor(data?: PartialMessage<QueryListPendingCctxRequest>);

static readonly runtime: typeof proto3;
static readonly typeName = "zetachain.zetacore.crosschain.QueryListCctxPendingRequest";
static readonly typeName = "zetachain.zetacore.crosschain.QueryListPendingCctxRequest";
static readonly fields: FieldList;

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): QueryListCctxPendingRequest;
static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): QueryListPendingCctxRequest;

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): QueryListCctxPendingRequest;
static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): QueryListPendingCctxRequest;

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): QueryListCctxPendingRequest;
static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): QueryListPendingCctxRequest;

static equals(a: QueryListCctxPendingRequest | PlainMessage<QueryListCctxPendingRequest> | undefined, b: QueryListCctxPendingRequest | PlainMessage<QueryListCctxPendingRequest> | undefined): boolean;
static equals(a: QueryListPendingCctxRequest | PlainMessage<QueryListPendingCctxRequest> | undefined, b: QueryListPendingCctxRequest | PlainMessage<QueryListPendingCctxRequest> | undefined): boolean;
}

/**
* @generated from message zetachain.zetacore.crosschain.QueryListCctxPendingResponse
* @generated from message zetachain.zetacore.crosschain.QueryListPendingCctxResponse
*/
export declare class QueryListCctxPendingResponse extends Message<QueryListCctxPendingResponse> {
export declare class QueryListPendingCctxResponse extends Message<QueryListPendingCctxResponse> {
/**
* @generated from field: repeated zetachain.zetacore.crosschain.CrossChainTx CrossChainTx = 1;
*/
Expand All @@ -856,19 +856,77 @@ export declare class QueryListCctxPendingResponse extends Message<QueryListCctxP
*/
totalPending: bigint;

constructor(data?: PartialMessage<QueryListCctxPendingResponse>);
constructor(data?: PartialMessage<QueryListPendingCctxResponse>);

static readonly runtime: typeof proto3;
static readonly typeName = "zetachain.zetacore.crosschain.QueryListCctxPendingResponse";
static readonly typeName = "zetachain.zetacore.crosschain.QueryListPendingCctxResponse";
static readonly fields: FieldList;

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): QueryListCctxPendingResponse;
static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): QueryListPendingCctxResponse;

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): QueryListCctxPendingResponse;
static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): QueryListPendingCctxResponse;

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): QueryListCctxPendingResponse;
static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): QueryListPendingCctxResponse;

static equals(a: QueryListCctxPendingResponse | PlainMessage<QueryListCctxPendingResponse> | undefined, b: QueryListCctxPendingResponse | PlainMessage<QueryListCctxPendingResponse> | undefined): boolean;
static equals(a: QueryListPendingCctxResponse | PlainMessage<QueryListPendingCctxResponse> | undefined, b: QueryListPendingCctxResponse | PlainMessage<QueryListPendingCctxResponse> | undefined): boolean;
}

/**
* @generated from message zetachain.zetacore.crosschain.QueryListPendingCctxWithinRateLimitRequest
*/
export declare class QueryListPendingCctxWithinRateLimitRequest extends Message<QueryListPendingCctxWithinRateLimitRequest> {
/**
* @generated from field: uint32 limit = 1;
*/
limit: number;

constructor(data?: PartialMessage<QueryListPendingCctxWithinRateLimitRequest>);

static readonly runtime: typeof proto3;
static readonly typeName = "zetachain.zetacore.crosschain.QueryListPendingCctxWithinRateLimitRequest";
static readonly fields: FieldList;

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): QueryListPendingCctxWithinRateLimitRequest;

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): QueryListPendingCctxWithinRateLimitRequest;

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): QueryListPendingCctxWithinRateLimitRequest;

static equals(a: QueryListPendingCctxWithinRateLimitRequest | PlainMessage<QueryListPendingCctxWithinRateLimitRequest> | undefined, b: QueryListPendingCctxWithinRateLimitRequest | PlainMessage<QueryListPendingCctxWithinRateLimitRequest> | undefined): boolean;
}

/**
* @generated from message zetachain.zetacore.crosschain.QueryListPendingCctxWithinRateLimitResponse
*/
export declare class QueryListPendingCctxWithinRateLimitResponse extends Message<QueryListPendingCctxWithinRateLimitResponse> {
/**
* @generated from field: repeated zetachain.zetacore.crosschain.CrossChainTx cross_chain_tx = 1;
*/
crossChainTx: CrossChainTx[];

/**
* @generated from field: uint64 total_pending = 2;
*/
totalPending: bigint;

/**
* @generated from field: bool rate_limit_exceeded = 3;
*/
rateLimitExceeded: boolean;

constructor(data?: PartialMessage<QueryListPendingCctxWithinRateLimitResponse>);

static readonly runtime: typeof proto3;
static readonly typeName = "zetachain.zetacore.crosschain.QueryListPendingCctxWithinRateLimitResponse";
static readonly fields: FieldList;

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): QueryListPendingCctxWithinRateLimitResponse;

static fromJson(jsonValue: JsonValue, options?: Partial<JsonReadOptions>): QueryListPendingCctxWithinRateLimitResponse;

static fromJsonString(jsonString: string, options?: Partial<JsonReadOptions>): QueryListPendingCctxWithinRateLimitResponse;

static equals(a: QueryListPendingCctxWithinRateLimitResponse | PlainMessage<QueryListPendingCctxWithinRateLimitResponse> | undefined, b: QueryListPendingCctxWithinRateLimitResponse | PlainMessage<QueryListPendingCctxWithinRateLimitResponse> | undefined): boolean;
}

/**
Expand Down
4 changes: 2 additions & 2 deletions x/crosschain/client/cli/cli_cctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,13 @@ func CmdPendingCctx() *cobra.Command {
return err
}

params := &types.QueryListCctxPendingRequest{
params := &types.QueryListPendingCctxRequest{
ChainId: chainID,
// #nosec G701 bit size verified
Limit: uint32(limit),
}

res, err := queryClient.CctxListPending(context.Background(), params)
res, err := queryClient.ListPendingCctx(context.Background(), params)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion x/crosschain/keeper/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ IterateChains:
for _, chain := range chains {
// support only external evm chains
if zetachains.IsEVMChain(chain.ChainId) && !zetachains.IsZetaChain(chain.ChainId) {
res, err := k.CctxListPending(sdk.UnwrapSDKContext(ctx), &types.QueryListCctxPendingRequest{
res, err := k.ListPendingCctx(sdk.UnwrapSDKContext(ctx), &types.QueryListPendingCctxRequest{
ChainId: chain.ChainId,
Limit: gasPriceIncreaseFlags.MaxPendingCctxs,
})
Expand Down
2 changes: 1 addition & 1 deletion x/crosschain/keeper/cctx_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (k Keeper) GetRevertGasLimit(ctx sdk.Context, cctx types.CrossChainTx) (uin
return 0, nil
}

func IsPending(cctx types.CrossChainTx) bool {
func IsPending(cctx *types.CrossChainTx) bool {
ws4charlie marked this conversation as resolved.
Show resolved Hide resolved
// pending inbound is not considered a "pending" state because it has not reached consensus yet
return cctx.CctxStatus.Status == types.CctxStatus_PendingOutbound || cctx.CctxStatus.Status == types.CctxStatus_PendingRevert
}
Expand Down
Loading
Loading