Skip to content

Commit

Permalink
fix execution timeouts & bump concurrent request limit
Browse files Browse the repository at this point in the history
  • Loading branch information
pk910 committed Nov 21, 2024
1 parent 8745bab commit 3958291
Showing 1 changed file with 40 additions and 12 deletions.
52 changes: 40 additions & 12 deletions pkg/coordinator/clients/execution/rpc/executionapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rpc

import (
"context"
"fmt"
"math/big"
"time"

Expand Down Expand Up @@ -30,7 +31,7 @@ func NewExecutionClient(name, url string, headers map[string]string) (*Execution
name: name,
endpoint: url,
headers: headers,
concurrencyLimit: 20,
concurrencyLimit: 50,
requestTimeout: 30 * time.Second,
}

Expand Down Expand Up @@ -59,11 +60,14 @@ func (ec *ExecutionClient) Initialize(ctx context.Context) error {
return nil
}

func (ec *ExecutionClient) enforceConcurrencyLimit() func() {
ec.concurrencyChan <- struct{}{}

return func() {
<-ec.concurrencyChan
func (ec *ExecutionClient) enforceConcurrencyLimit(ctx context.Context) func() {
select {
case <-ctx.Done():
return func() {}
case ec.concurrencyChan <- struct{}{}:
return func() {
<-ec.concurrencyChan
}
}
}

Expand Down Expand Up @@ -136,7 +140,11 @@ func (ec *ExecutionClient) GetBlockByHash(ctx context.Context, hash common.Hash)
}

func (ec *ExecutionClient) GetNonceAt(ctx context.Context, wallet common.Address, blockNumber *big.Int) (uint64, error) {
closeFn := ec.enforceConcurrencyLimit()
closeFn := ec.enforceConcurrencyLimit(ctx)
if closeFn == nil {
return 0, fmt.Errorf("client busy")
}

defer closeFn()

reqCtx, reqCtxCancel := context.WithTimeout(ctx, ec.requestTimeout)
Expand All @@ -146,7 +154,11 @@ func (ec *ExecutionClient) GetNonceAt(ctx context.Context, wallet common.Address
}

func (ec *ExecutionClient) GetBalanceAt(ctx context.Context, wallet common.Address, blockNumber *big.Int) (*big.Int, error) {
closeFn := ec.enforceConcurrencyLimit()
closeFn := ec.enforceConcurrencyLimit(ctx)
if closeFn == nil {
return nil, fmt.Errorf("client busy")
}

defer closeFn()

reqCtx, reqCtxCancel := context.WithTimeout(ctx, ec.requestTimeout)
Expand All @@ -156,7 +168,11 @@ func (ec *ExecutionClient) GetBalanceAt(ctx context.Context, wallet common.Addre
}

func (ec *ExecutionClient) GetTransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
closeFn := ec.enforceConcurrencyLimit()
closeFn := ec.enforceConcurrencyLimit(ctx)
if closeFn == nil {
return nil, fmt.Errorf("client busy")
}

defer closeFn()

reqCtx, reqCtxCancel := context.WithTimeout(ctx, ec.requestTimeout)
Expand All @@ -166,7 +182,11 @@ func (ec *ExecutionClient) GetTransactionReceipt(ctx context.Context, txHash com
}

func (ec *ExecutionClient) GetBlockReceipts(ctx context.Context, blockHash common.Hash) ([]*types.Receipt, error) {
closeFn := ec.enforceConcurrencyLimit()
closeFn := ec.enforceConcurrencyLimit(ctx)
if closeFn == nil {
return nil, fmt.Errorf("client busy")
}

defer closeFn()

reqCtx, reqCtxCancel := context.WithTimeout(ctx, ec.requestTimeout)
Expand All @@ -178,7 +198,11 @@ func (ec *ExecutionClient) GetBlockReceipts(ctx context.Context, blockHash commo
}

func (ec *ExecutionClient) SendTransaction(ctx context.Context, tx *types.Transaction) error {
closeFn := ec.enforceConcurrencyLimit()
closeFn := ec.enforceConcurrencyLimit(ctx)
if closeFn == nil {
return fmt.Errorf("client busy")
}

defer closeFn()

reqCtx, reqCtxCancel := context.WithTimeout(ctx, ec.requestTimeout)
Expand All @@ -189,7 +213,11 @@ func (ec *ExecutionClient) SendTransaction(ctx context.Context, tx *types.Transa

//nolint:gocritic // ignore
func (ec *ExecutionClient) GetEthCall(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) {
closeFn := ec.enforceConcurrencyLimit()
closeFn := ec.enforceConcurrencyLimit(ctx)
if closeFn == nil {
return nil, fmt.Errorf("client busy")
}

defer closeFn()

return ec.ethClient.CallContract(ctx, msg, blockNumber)
Expand Down

0 comments on commit 3958291

Please sign in to comment.