From 395829176ed4bef519f35d1f7db812c30df9c0e9 Mon Sep 17 00:00:00 2001 From: pk910 Date: Thu, 21 Nov 2024 17:47:23 +0100 Subject: [PATCH] fix execution timeouts & bump concurrent request limit --- .../clients/execution/rpc/executionapi.go | 52 ++++++++++++++----- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/pkg/coordinator/clients/execution/rpc/executionapi.go b/pkg/coordinator/clients/execution/rpc/executionapi.go index 24f56c8..060203b 100644 --- a/pkg/coordinator/clients/execution/rpc/executionapi.go +++ b/pkg/coordinator/clients/execution/rpc/executionapi.go @@ -2,6 +2,7 @@ package rpc import ( "context" + "fmt" "math/big" "time" @@ -30,7 +31,7 @@ func NewExecutionClient(name, url string, headers map[string]string) (*Execution name: name, endpoint: url, headers: headers, - concurrencyLimit: 20, + concurrencyLimit: 50, requestTimeout: 30 * time.Second, } @@ -59,11 +60,14 @@ func (ec *ExecutionClient) Initialize(ctx context.Context) error { return nil } -func (ec *ExecutionClient) enforceConcurrencyLimit() func() { - ec.concurrencyChan <- struct{}{} - - return func() { - <-ec.concurrencyChan +func (ec *ExecutionClient) enforceConcurrencyLimit(ctx context.Context) func() { + select { + case <-ctx.Done(): + return func() {} + case ec.concurrencyChan <- struct{}{}: + return func() { + <-ec.concurrencyChan + } } } @@ -136,7 +140,11 @@ func (ec *ExecutionClient) GetBlockByHash(ctx context.Context, hash common.Hash) } func (ec *ExecutionClient) GetNonceAt(ctx context.Context, wallet common.Address, blockNumber *big.Int) (uint64, error) { - closeFn := ec.enforceConcurrencyLimit() + closeFn := ec.enforceConcurrencyLimit(ctx) + if closeFn == nil { + return 0, fmt.Errorf("client busy") + } + defer closeFn() reqCtx, reqCtxCancel := context.WithTimeout(ctx, ec.requestTimeout) @@ -146,7 +154,11 @@ func (ec *ExecutionClient) GetNonceAt(ctx context.Context, wallet common.Address } func (ec *ExecutionClient) GetBalanceAt(ctx context.Context, wallet common.Address, blockNumber *big.Int) (*big.Int, error) { - closeFn := ec.enforceConcurrencyLimit() + closeFn := ec.enforceConcurrencyLimit(ctx) + if closeFn == nil { + return nil, fmt.Errorf("client busy") + } + defer closeFn() reqCtx, reqCtxCancel := context.WithTimeout(ctx, ec.requestTimeout) @@ -156,7 +168,11 @@ func (ec *ExecutionClient) GetBalanceAt(ctx context.Context, wallet common.Addre } func (ec *ExecutionClient) GetTransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { - closeFn := ec.enforceConcurrencyLimit() + closeFn := ec.enforceConcurrencyLimit(ctx) + if closeFn == nil { + return nil, fmt.Errorf("client busy") + } + defer closeFn() reqCtx, reqCtxCancel := context.WithTimeout(ctx, ec.requestTimeout) @@ -166,7 +182,11 @@ func (ec *ExecutionClient) GetTransactionReceipt(ctx context.Context, txHash com } func (ec *ExecutionClient) GetBlockReceipts(ctx context.Context, blockHash common.Hash) ([]*types.Receipt, error) { - closeFn := ec.enforceConcurrencyLimit() + closeFn := ec.enforceConcurrencyLimit(ctx) + if closeFn == nil { + return nil, fmt.Errorf("client busy") + } + defer closeFn() reqCtx, reqCtxCancel := context.WithTimeout(ctx, ec.requestTimeout) @@ -178,7 +198,11 @@ func (ec *ExecutionClient) GetBlockReceipts(ctx context.Context, blockHash commo } func (ec *ExecutionClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { - closeFn := ec.enforceConcurrencyLimit() + closeFn := ec.enforceConcurrencyLimit(ctx) + if closeFn == nil { + return fmt.Errorf("client busy") + } + defer closeFn() reqCtx, reqCtxCancel := context.WithTimeout(ctx, ec.requestTimeout) @@ -189,7 +213,11 @@ func (ec *ExecutionClient) SendTransaction(ctx context.Context, tx *types.Transa //nolint:gocritic // ignore func (ec *ExecutionClient) GetEthCall(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { - closeFn := ec.enforceConcurrencyLimit() + closeFn := ec.enforceConcurrencyLimit(ctx) + if closeFn == nil { + return nil, fmt.Errorf("client busy") + } + defer closeFn() return ec.ethClient.CallContract(ctx, msg, blockNumber)