diff --git a/pkg/coordinator/clients/consensus/clienttype.go b/pkg/coordinator/clients/consensus/clienttype.go index 305ec1a..9c3b8b7 100644 --- a/pkg/coordinator/clients/consensus/clienttype.go +++ b/pkg/coordinator/clients/consensus/clienttype.go @@ -8,14 +8,14 @@ import ( type ClientType int8 var ( - UnspecifiedClient ClientType - UnknownClient ClientType = -1 - LighthouseClient ClientType = 1 - LodestarClient ClientType = 2 - NimbusClient ClientType = 3 - PrysmClient ClientType = 4 - TekuClient ClientType = 5 - GrandineClient ClientType = 6 + AnyClient ClientType + UnknownClient ClientType = -1 + LighthouseClient ClientType = 1 + LodestarClient ClientType = 2 + NimbusClient ClientType = 3 + PrysmClient ClientType = 4 + TekuClient ClientType = 5 + GrandineClient ClientType = 6 ) var clientTypePatterns = map[ClientType]*regexp.Regexp{ LighthouseClient: regexp.MustCompile("(?i)^Lighthouse/.*"), diff --git a/pkg/coordinator/clients/consensus/pool.go b/pkg/coordinator/clients/consensus/pool.go index ea98a99..57b58f1 100644 --- a/pkg/coordinator/clients/consensus/pool.go +++ b/pkg/coordinator/clients/consensus/pool.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" v1 "github.com/attestantio/go-eth2-client/api/v1" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -70,7 +71,7 @@ func (pool *Pool) GetBlockCache() *BlockCache { func (pool *Pool) GetValidatorSet() map[phase0.ValidatorIndex]*v1.Validator { return pool.blockCache.getCachedValidatorSet(func() map[phase0.ValidatorIndex]*v1.Validator { - client := pool.GetReadyEndpoint(UnspecifiedClient) + client := pool.GetReadyEndpoint(AnyClient) if client == nil { pool.logger.Errorf("could not load validator set: no ready client") return nil @@ -120,6 +121,21 @@ func (pool *Pool) GetReadyEndpoint(clientType ClientType) *Client { return selectedClient } +func (pool *Pool) AwaitReadyEndpoint(ctx context.Context, clientType ClientType) *Client { + for { + client := pool.GetReadyEndpoint(clientType) + if client != nil { + return client + } + + select { + case <-ctx.Done(): + return nil + case <-time.After(1 * time.Second): + } + } +} + func (pool *Pool) IsClientReady(client *Client) bool { if client == nil { return false @@ -148,7 +164,7 @@ func (pool *Pool) runClientScheduler(readyClients []*Client, clientType ClientTy var firstReadyClient *Client for _, client := range readyClients { - if clientType != UnspecifiedClient && clientType != client.clientType { + if clientType != AnyClient && clientType != client.clientType { continue } diff --git a/pkg/coordinator/clients/execution/clienttype.go b/pkg/coordinator/clients/execution/clienttype.go index e7ab457..ad2195c 100644 --- a/pkg/coordinator/clients/execution/clienttype.go +++ b/pkg/coordinator/clients/execution/clienttype.go @@ -8,14 +8,14 @@ import ( type ClientType int8 var ( - UnspecifiedClient ClientType - UnknownClient ClientType = -1 - BesuClient ClientType = 1 - ErigonClient ClientType = 2 - EthjsClient ClientType = 3 - GethClient ClientType = 4 - NethermindClient ClientType = 5 - RethClient ClientType = 6 + AnyClient ClientType + UnknownClient ClientType = -1 + BesuClient ClientType = 1 + ErigonClient ClientType = 2 + EthjsClient ClientType = 3 + GethClient ClientType = 4 + NethermindClient ClientType = 5 + RethClient ClientType = 6 ) var clientTypePatterns = map[ClientType]*regexp.Regexp{ BesuClient: regexp.MustCompile("(?i)^Besu/.*"), diff --git a/pkg/coordinator/clients/execution/pool.go b/pkg/coordinator/clients/execution/pool.go index aa192fc..e6429e7 100644 --- a/pkg/coordinator/clients/execution/pool.go +++ b/pkg/coordinator/clients/execution/pool.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/sirupsen/logrus" ) @@ -91,6 +92,21 @@ func (pool *Pool) GetReadyEndpoint(clientType ClientType) *Client { return selectedClient } +func (pool *Pool) AwaitReadyEndpoint(ctx context.Context, clientType ClientType) *Client { + for { + client := pool.GetReadyEndpoint(clientType) + if client != nil { + return client + } + + select { + case <-ctx.Done(): + return nil + case <-time.After(1 * time.Second): + } + } +} + func (pool *Pool) GetReadyEndpoints() []*Client { canonicalFork := pool.GetCanonicalFork(-1) if canonicalFork == nil { @@ -133,7 +149,7 @@ func (pool *Pool) runClientScheduler(readyClients []*Client, clientType ClientTy var firstReadyClient *Client for _, client := range readyClients { - if clientType != UnspecifiedClient && clientType != client.clientType { + if clientType != AnyClient && clientType != client.clientType { continue } diff --git a/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go b/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go index d94e269..8d3b2f0 100644 --- a/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go +++ b/pkg/coordinator/tasks/check_consensus_proposer_duty/task.go @@ -128,7 +128,11 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) loadEpochDuties(ctx context.Context, epoch uint64) { - client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().AwaitReadyEndpoint(ctx, consensus.AnyClient) + if client == nil { + return + } + proposerDuties, err := client.GetRPCClient().GetProposerDuties(ctx, epoch) if err != nil { diff --git a/pkg/coordinator/tasks/generate_blob_transactions/task.go b/pkg/coordinator/tasks/generate_blob_transactions/task.go index 2592c83..4c5d6eb 100644 --- a/pkg/coordinator/tasks/generate_blob_transactions/task.go +++ b/pkg/coordinator/tasks/generate_blob_transactions/task.go @@ -335,22 +335,25 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c } err = nil + if len(clients) == 0 { + err = fmt.Errorf("no ready clients available") + } else { + for i := 0; i < len(clients); i++ { + client := clients[(transactionIdx+uint64(i))%uint64(len(clients))] - for i := 0; i < len(clients); i++ { - client := clients[(transactionIdx+uint64(i))%uint64(len(clients))] + t.logger.WithFields(logrus.Fields{ + "client": client.GetName(), + }).Infof("sending tx %v: %v", transactionIdx, tx.Hash().Hex()) - t.logger.WithFields(logrus.Fields{ - "client": client.GetName(), - }).Infof("sending tx %v: %v", transactionIdx, tx.Hash().Hex()) + err = client.GetRPCClient().SendTransaction(ctx, tx) + if err == nil { + break + } - err = client.GetRPCClient().SendTransaction(ctx, tx) - if err == nil { - break + t.logger.WithFields(logrus.Fields{ + "client": client.GetName(), + }).Warnf("RPC error when sending tx %v: %v", transactionIdx, err) } - - t.logger.WithFields(logrus.Fields{ - "client": client.GetName(), - }).Warnf("RPC error when sending tx %v: %v", transactionIdx, err) } if err != nil { diff --git a/pkg/coordinator/tasks/generate_bls_changes/task.go b/pkg/coordinator/tasks/generate_bls_changes/task.go index 34b2edb..41760c7 100644 --- a/pkg/coordinator/tasks/generate_bls_changes/task.go +++ b/pkg/coordinator/tasks/generate_bls_changes/task.go @@ -238,7 +238,10 @@ func (t *Task) generateBlsChange(ctx context.Context, accountIdx uint64) error { var client *consensus.Client if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" { - client = clientPool.GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client = clientPool.GetConsensusPool().AwaitReadyEndpoint(ctx, consensus.AnyClient) + if client == nil { + return ctx.Err() + } } else { clients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern) if len(clients) == 0 { diff --git a/pkg/coordinator/tasks/generate_deposits/task.go b/pkg/coordinator/tasks/generate_deposits/task.go index de8caf4..d90594f 100644 --- a/pkg/coordinator/tasks/generate_deposits/task.go +++ b/pkg/coordinator/tasks/generate_deposits/task.go @@ -345,7 +345,10 @@ func (t *Task) generateDeposit(ctx context.Context, accountIdx uint64, onConfirm var client *execution.Client if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" { - client = clientPool.GetExecutionPool().GetReadyEndpoint(execution.UnspecifiedClient) + client = clientPool.GetExecutionPool().AwaitReadyEndpoint(ctx, execution.AnyClient) + if client == nil { + return nil, nil, ctx.Err() + } } else { clients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern) if len(clients) == 0 { diff --git a/pkg/coordinator/tasks/generate_eoa_transactions/task.go b/pkg/coordinator/tasks/generate_eoa_transactions/task.go index ac81381..3b71119 100644 --- a/pkg/coordinator/tasks/generate_eoa_transactions/task.go +++ b/pkg/coordinator/tasks/generate_eoa_transactions/task.go @@ -371,22 +371,25 @@ func (t *Task) generateTransaction(ctx context.Context, transactionIdx uint64, c } err = nil + if len(clients) == 0 { + err = fmt.Errorf("no ready clients available") + } else { + for i := 0; i < len(clients); i++ { + client := clients[(transactionIdx+uint64(i))%uint64(len(clients))] - for i := 0; i < len(clients); i++ { - client := clients[(transactionIdx+uint64(i))%uint64(len(clients))] + t.logger.WithFields(logrus.Fields{ + "client": client.GetName(), + }).Infof("sending tx %v: %v", transactionIdx, tx.Hash().Hex()) - t.logger.WithFields(logrus.Fields{ - "client": client.GetName(), - }).Infof("sending tx %v: %v", transactionIdx, tx.Hash().Hex()) + err = client.GetRPCClient().SendTransaction(ctx, tx) + if err == nil { + break + } - err = client.GetRPCClient().SendTransaction(ctx, tx) - if err == nil { - break + t.logger.WithFields(logrus.Fields{ + "client": client.GetName(), + }).Warnf("RPC error when sending tx %v: %v", transactionIdx, err) } - - t.logger.WithFields(logrus.Fields{ - "client": client.GetName(), - }).Warnf("RPC error when sending tx %v: %v", transactionIdx, err) } if err != nil { diff --git a/pkg/coordinator/tasks/generate_exits/task.go b/pkg/coordinator/tasks/generate_exits/task.go index 1fc916c..156acd9 100644 --- a/pkg/coordinator/tasks/generate_exits/task.go +++ b/pkg/coordinator/tasks/generate_exits/task.go @@ -169,7 +169,10 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) loadChainState(ctx context.Context) (*phase0.Fork, error) { - client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().AwaitReadyEndpoint(ctx, consensus.AnyClient) + if client == nil { + return nil, ctx.Err() + } fork, err := client.GetRPCClient().GetForkState(ctx, "head") if err != nil { @@ -212,7 +215,7 @@ func (t *Task) generateVoluntaryExit(ctx context.Context, accountIdx uint64, for clientPool := t.ctx.Scheduler.GetServices().ClientPool() if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" { - client = clientPool.GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client = clientPool.GetConsensusPool().GetReadyEndpoint(consensus.AnyClient) } else { clients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern) if len(clients) == 0 { diff --git a/pkg/coordinator/tasks/generate_slashings/task.go b/pkg/coordinator/tasks/generate_slashings/task.go index e7e9dc4..b0d1bcf 100644 --- a/pkg/coordinator/tasks/generate_slashings/task.go +++ b/pkg/coordinator/tasks/generate_slashings/task.go @@ -167,7 +167,10 @@ func (t *Task) Execute(ctx context.Context) error { } func (t *Task) loadChainState(ctx context.Context) (*phase0.Fork, error) { - client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client := t.ctx.Scheduler.GetServices().ClientPool().GetConsensusPool().AwaitReadyEndpoint(ctx, consensus.AnyClient) + if client == nil { + return nil, ctx.Err() + } forkState, err := client.GetRPCClient().GetForkState(ctx, "head") if err != nil { @@ -228,7 +231,7 @@ func (t *Task) generateSlashing(ctx context.Context, accountIdx uint64, forkStat var client *consensus.Client if t.config.ClientPattern == "" && t.config.ExcludeClientPattern == "" { - client = clientPool.GetConsensusPool().GetReadyEndpoint(consensus.UnspecifiedClient) + client = clientPool.GetConsensusPool().GetReadyEndpoint(consensus.AnyClient) } else { clients := clientPool.GetClientsByNamePatterns(t.config.ClientPattern, t.config.ExcludeClientPattern) if len(clients) == 0 { diff --git a/pkg/coordinator/tasks/generate_transaction/task.go b/pkg/coordinator/tasks/generate_transaction/task.go index b110831..e83f318 100644 --- a/pkg/coordinator/tasks/generate_transaction/task.go +++ b/pkg/coordinator/tasks/generate_transaction/task.go @@ -158,17 +158,20 @@ func (t *Task) Execute(ctx context.Context) error { } err = nil + if len(clients) == 0 { + err = fmt.Errorf("no ready clients available") + } else { + for i := 0; i < len(clients); i++ { + client := clients[i%len(clients)] - for i := 0; i < len(clients); i++ { - client := clients[i%len(clients)] - - t.logger.WithFields(logrus.Fields{ - "client": client.GetName(), - }).Infof("sending tx: %v", tx.Hash().Hex()) + t.logger.WithFields(logrus.Fields{ + "client": client.GetName(), + }).Infof("sending tx: %v", tx.Hash().Hex()) - err = client.GetRPCClient().SendTransaction(ctx, tx) - if err == nil { - break + err = client.GetRPCClient().SendTransaction(ctx, tx) + if err == nil { + break + } } } diff --git a/pkg/coordinator/wallet/wallet.go b/pkg/coordinator/wallet/wallet.go index 7ba6b84..bce4c78 100644 --- a/pkg/coordinator/wallet/wallet.go +++ b/pkg/coordinator/wallet/wallet.go @@ -74,7 +74,7 @@ func (wallet *Wallet) loadState() { go func() { for { - client := wallet.manager.clientPool.GetReadyEndpoint(execution.UnspecifiedClient) + client := wallet.manager.clientPool.GetReadyEndpoint(execution.AnyClient) if client == nil { time.Sleep(500 * time.Millisecond) continue @@ -287,7 +287,10 @@ func (wallet *Wallet) AwaitTransaction(ctx context.Context, tx *types.Transactio } } - client := wallet.manager.clientPool.GetCanonicalFork(0).ReadyClients[0] + client := wallet.manager.clientPool.AwaitReadyEndpoint(ctx, execution.AnyClient) + if client == nil { + return nil, ctx.Err() + } return client.GetRPCClient().GetTransactionReceipt(ctx, txHash) }