Skip to content

Commit

Permalink
Merge pull request #15 from ethpandaops/pk910/pr-14
Browse files Browse the repository at this point in the history
fix panic when no execution client is available
  • Loading branch information
pk910 authored Mar 9, 2024
2 parents 7daa345 + 4042aa9 commit a24199a
Show file tree
Hide file tree
Showing 13 changed files with 121 additions and 61 deletions.
16 changes: 8 additions & 8 deletions pkg/coordinator/clients/consensus/clienttype.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
type ClientType int8

var (
UnspecifiedClient ClientType
UnknownClient ClientType = -1
LighthouseClient ClientType = 1
LodestarClient ClientType = 2
NimbusClient ClientType = 3
PrysmClient ClientType = 4
TekuClient ClientType = 5
GrandineClient ClientType = 6
AnyClient ClientType
UnknownClient ClientType = -1
LighthouseClient ClientType = 1
LodestarClient ClientType = 2
NimbusClient ClientType = 3
PrysmClient ClientType = 4
TekuClient ClientType = 5
GrandineClient ClientType = 6
)
var clientTypePatterns = map[ClientType]*regexp.Regexp{
LighthouseClient: regexp.MustCompile("(?i)^Lighthouse/.*"),
Expand Down
20 changes: 18 additions & 2 deletions pkg/coordinator/clients/consensus/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

v1 "github.com/attestantio/go-eth2-client/api/v1"
"github.com/attestantio/go-eth2-client/spec/phase0"
Expand Down Expand Up @@ -70,7 +71,7 @@ func (pool *Pool) GetBlockCache() *BlockCache {

func (pool *Pool) GetValidatorSet() map[phase0.ValidatorIndex]*v1.Validator {
return pool.blockCache.getCachedValidatorSet(func() map[phase0.ValidatorIndex]*v1.Validator {
client := pool.GetReadyEndpoint(UnspecifiedClient)
client := pool.GetReadyEndpoint(AnyClient)
if client == nil {
pool.logger.Errorf("could not load validator set: no ready client")
return nil
Expand Down Expand Up @@ -120,6 +121,21 @@ func (pool *Pool) GetReadyEndpoint(clientType ClientType) *Client {
return selectedClient
}

func (pool *Pool) AwaitReadyEndpoint(ctx context.Context, clientType ClientType) *Client {
for {
client := pool.GetReadyEndpoint(clientType)
if client != nil {
return client
}

select {
case <-ctx.Done():
return nil
case <-time.After(1 * time.Second):
}
}
}

func (pool *Pool) IsClientReady(client *Client) bool {
if client == nil {
return false
Expand Down Expand Up @@ -148,7 +164,7 @@ func (pool *Pool) runClientScheduler(readyClients []*Client, clientType ClientTy
var firstReadyClient *Client

for _, client := range readyClients {
if clientType != UnspecifiedClient && clientType != client.clientType {
if clientType != AnyClient && clientType != client.clientType {
continue
}

Expand Down
16 changes: 8 additions & 8 deletions pkg/coordinator/clients/execution/clienttype.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
type ClientType int8

var (
UnspecifiedClient ClientType
UnknownClient ClientType = -1
BesuClient ClientType = 1
ErigonClient ClientType = 2
EthjsClient ClientType = 3
GethClient ClientType = 4
NethermindClient ClientType = 5
RethClient ClientType = 6
AnyClient ClientType
UnknownClient ClientType = -1
BesuClient ClientType = 1
ErigonClient ClientType = 2
EthjsClient ClientType = 3
GethClient ClientType = 4
NethermindClient ClientType = 5
RethClient ClientType = 6
)
var clientTypePatterns = map[ClientType]*regexp.Regexp{
BesuClient: regexp.MustCompile("(?i)^Besu/.*"),
Expand Down
18 changes: 17 additions & 1 deletion pkg/coordinator/clients/execution/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"time"

"github.com/sirupsen/logrus"
)
Expand Down Expand Up @@ -91,6 +92,21 @@ func (pool *Pool) GetReadyEndpoint(clientType ClientType) *Client {
return selectedClient
}

func (pool *Pool) AwaitReadyEndpoint(ctx context.Context, clientType ClientType) *Client {
for {
client := pool.GetReadyEndpoint(clientType)
if client != nil {
return client
}

select {
case <-ctx.Done():
return nil
case <-time.After(1 * time.Second):
}
}
}

func (pool *Pool) GetReadyEndpoints() []*Client {
canonicalFork := pool.GetCanonicalFork(-1)
if canonicalFork == nil {
Expand Down Expand Up @@ -133,7 +149,7 @@ func (pool *Pool) runClientScheduler(readyClients []*Client, clientType ClientTy
var firstReadyClient *Client

for _, client := range readyClients {
if clientType != UnspecifiedClient && clientType != client.clientType {
if clientType != AnyClient && clientType != client.clientType {
continue
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/coordinator/tasks/check_consensus_proposer_duty/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ func (t *Task) Execute(ctx context.Context) error {
}

func (t *Task) loadEpochDuties(ctx context.Context, epoch uint64) {
client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient)
client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().AwaitReadyEndpoint(ctx, consensus.AnyClient)
if client == nil {
return
}

proposerDuties, err := client.GetRPCClient().GetProposerDuties(ctx, epoch)

if err != nil {
Expand Down
27 changes: 15 additions & 12 deletions pkg/coordinator/tasks/generate_blob_transactions/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,22 +335,25 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c
}

err = nil
if len(clients) == 0 {
err = fmt.Errorf("no ready clients available")
} else {
for i := 0; i < len(clients); i++ {
client := clients[(transactionIdx+uint64(i))%uint64(len(clients))]

for i := 0; i < len(clients); i++ {
client := clients[(transactionIdx+uint64(i))%uint64(len(clients))]
t.logger.WithFields(logrus.Fields{
"client": client.GetName(),
}).Infof("sending tx %v: %v", transactionIdx, tx.Hash().Hex())

t.logger.WithFields(logrus.Fields{
"client": client.GetName(),
}).Infof("sending tx %v: %v", transactionIdx, tx.Hash().Hex())
err = client.GetRPCClient().SendTransaction(ctx, tx)
if err == nil {
break
}

err = client.GetRPCClient().SendTransaction(ctx, tx)
if err == nil {
break
t.logger.WithFields(logrus.Fields{
"client": client.GetName(),
}).Warnf("RPC error when sending tx %v: %v", transactionIdx, err)
}

t.logger.WithFields(logrus.Fields{
"client": client.GetName(),
}).Warnf("RPC error when sending tx %v: %v", transactionIdx, err)
}

if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion pkg/coordinator/tasks/generate_bls_changes/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ func (t *Task) generateBlsChange(ctx context.Context, accountIdx uint64) error {
var client *consensus.Client

if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" {
client = clientPool.GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient)
client = clientPool.GetConsensusPool().AwaitReadyEndpoint(ctx, consensus.AnyClient)
if client == nil {
return ctx.Err()
}
} else {
clients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern)
if len(clients) == 0 {
Expand Down
5 changes: 4 additions & 1 deletion pkg/coordinator/tasks/generate_deposits/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,10 @@ func (t *Task) generateDeposit(ctx context.Context, accountIdx uint64, onConfirm
var client *execution.Client

if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" {
client = clientPool.GetExecutionPool().GetReadyEndpoint(execution.UnspecifiedClient)
client = clientPool.GetExecutionPool().AwaitReadyEndpoint(ctx, execution.AnyClient)
if client == nil {
return nil, nil, ctx.Err()
}
} else {
clients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern)
if len(clients) == 0 {
Expand Down
27 changes: 15 additions & 12 deletions pkg/coordinator/tasks/generate_eoa_transactions/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,22 +371,25 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c
}

err = nil
if len(clients) == 0 {
err = fmt.Errorf("no ready clients available")
} else {
for i := 0; i < len(clients); i++ {
client := clients[(transactionIdx+uint64(i))%uint64(len(clients))]

for i := 0; i < len(clients); i++ {
client := clients[(transactionIdx+uint64(i))%uint64(len(clients))]
t.logger.WithFields(logrus.Fields{
"client": client.GetName(),
}).Infof("sending tx %v: %v", transactionIdx, tx.Hash().Hex())

t.logger.WithFields(logrus.Fields{
"client": client.GetName(),
}).Infof("sending tx %v: %v", transactionIdx, tx.Hash().Hex())
err = client.GetRPCClient().SendTransaction(ctx, tx)
if err == nil {
break
}

err = client.GetRPCClient().SendTransaction(ctx, tx)
if err == nil {
break
t.logger.WithFields(logrus.Fields{
"client": client.GetName(),
}).Warnf("RPC error when sending tx %v: %v", transactionIdx, err)
}

t.logger.WithFields(logrus.Fields{
"client": client.GetName(),
}).Warnf("RPC error when sending tx %v: %v", transactionIdx, err)
}

if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions pkg/coordinator/tasks/generate_exits/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,10 @@ func (t *Task) Execute(ctx context.Context) error {
}

func (t *Task) loadChainState(ctx context.Context) (*phase0.Fork, error) {
client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient)
client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().AwaitReadyEndpoint(ctx, consensus.AnyClient)
if client == nil {
return nil, ctx.Err()
}

fork, err := client.GetRPCClient().GetForkState(ctx, "head")
if err != nil {
Expand Down Expand Up @@ -212,7 +215,7 @@ func (t *Task) generateVoluntaryExit(ctx context.Context, accountIdx uint64, for

clientPool := t.ctx.Scheduler.GetServices().ClientPool()
if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" {
client = clientPool.GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient)
client = clientPool.GetConsensusPool().GetReadyEndpoint(consensus.AnyClient)
} else {
clients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern)
if len(clients) == 0 {
Expand Down
7 changes: 5 additions & 2 deletions pkg/coordinator/tasks/generate_slashings/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ func (t *Task) Execute(ctx context.Context) error {
}

func (t *Task) loadChainState(ctx context.Context) (*phase0.Fork, error) {
client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient)
client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().AwaitReadyEndpoint(ctx, consensus.AnyClient)
if client == nil {
return nil, ctx.Err()
}

forkState, err := client.GetRPCClient().GetForkState(ctx, "head")
if err != nil {
Expand Down Expand Up @@ -228,7 +231,7 @@ func (t *Task) generateSlashing(ctx context.Context, accountIdx uint64, forkStat
var client *consensus.Client

if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" {
client = clientPool.GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient)
client = clientPool.GetConsensusPool().GetReadyEndpoint(consensus.AnyClient)
} else {
clients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern)
if len(clients) == 0 {
Expand Down
21 changes: 12 additions & 9 deletions pkg/coordinator/tasks/generate_transaction/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,20 @@ func (t *Task) Execute(ctx context.Context) error {
}

err = nil
if len(clients) == 0 {
err = fmt.Errorf("no ready clients available")
} else {
for i := 0; i < len(clients); i++ {
client := clients[i%len(clients)]

for i := 0; i < len(clients); i++ {
client := clients[i%len(clients)]

t.logger.WithFields(logrus.Fields{
"client": client.GetName(),
}).Infof("sending tx: %v", tx.Hash().Hex())
t.logger.WithFields(logrus.Fields{
"client": client.GetName(),
}).Infof("sending tx: %v", tx.Hash().Hex())

err = client.GetRPCClient().SendTransaction(ctx, tx)
if err == nil {
break
err = client.GetRPCClient().SendTransaction(ctx, tx)
if err == nil {
break
}
}
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/coordinator/wallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (wallet *Wallet) loadState() {

go func() {
for {
client := wallet.manager.clientPool.GetReadyEndpoint(execution.UnspecifiedClient)
client := wallet.manager.clientPool.GetReadyEndpoint(execution.AnyClient)
if client == nil {
time.Sleep(500 * time.Millisecond)
continue
Expand Down Expand Up @@ -287,7 +287,10 @@ func (wallet *Wallet) AwaitTransaction(ctx context.Context, tx *types.Transactio
}
}

client := wallet.manager.clientPool.GetCanonicalFork(0).ReadyClients[0]
client := wallet.manager.clientPool.AwaitReadyEndpoint(ctx, execution.AnyClient)
if client == nil {
return nil, ctx.Err()
}

return client.GetRPCClient().GetTransactionReceipt(ctx, txHash)
}
Expand Down

0 comments on commit a24199a

Please sign in to comment.