From 9dc5dfc30d957d96b9a1e506e4ce75801b6c95c3 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Tue, 14 Nov 2023 08:47:02 -0700 Subject: [PATCH] next cosigner retry --- signer/cosigner_health.go | 16 +++++--- signer/cosigner_health_test.go | 3 +- signer/threshold_validator.go | 75 ++++++++++++++++++++++------------ 3 files changed, 60 insertions(+), 34 deletions(-) diff --git a/signer/cosigner_health.go b/signer/cosigner_health.go index cf916880..923fb1e3 100644 --- a/signer/cosigner_health.go +++ b/signer/cosigner_health.go @@ -48,8 +48,14 @@ func (ch *CosignerHealth) Start(ctx context.Context) { } } +func (ch *CosignerHealth) MarkUnhealthy(cosigner Cosigner) { + ch.mu.Lock() + defer ch.mu.Unlock() + ch.rtt[cosigner.GetID()] = -1 +} + func (ch *CosignerHealth) updateRTT(ctx context.Context, cosigner Cosigner) { - var rtt int64 + rtt := int64(-1) defer func() { ch.mu.Lock() defer ch.mu.Unlock() @@ -62,14 +68,12 @@ func (ch *CosignerHealth) updateRTT(ctx context.Context, cosigner Cosigner) { } client := proto.NewCosignerClient(conn) _, err = client.Ping(ctx, &proto.PingRequest{}) - if err != nil { - rtt = -1 - } else { + if err == nil { rtt = time.Since(start).Nanoseconds() } } -func (ch *CosignerHealth) GetFastest(n int) []Cosigner { +func (ch *CosignerHealth) GetFastest() []Cosigner { ch.mu.RLock() defer ch.mu.RUnlock() @@ -88,5 +92,5 @@ func (ch *CosignerHealth) GetFastest(n int) []Cosigner { return rtt1 < rtt2 }) - return fastest[:n] + return fastest } diff --git a/signer/cosigner_health_test.go b/signer/cosigner_health_test.go index d1c1990a..68fd35b3 100644 --- a/signer/cosigner_health_test.go +++ b/signer/cosigner_health_test.go @@ -21,9 +21,8 @@ func TestCosignerHealth(t *testing.T) { 5: 300, } - fastest := ch.GetFastest(2) + fastest := ch.GetFastest() - require.Len(t, fastest, 2) require.Equal(t, 4, fastest[0].GetID()) require.Equal(t, 2, fastest[1].GetID()) } diff --git a/signer/threshold_validator.go b/signer/threshold_validator.go index e486b5c6..6b7c75a1 100644 --- a/signer/threshold_validator.go +++ b/signer/threshold_validator.go @@ -513,7 +513,8 @@ func (pv *ThresholdValidator) Sign(ctx context.Context, chainID string, block Bl peerStartTime := time.Now() - cosignersForThisBlock := pv.cosignerHealth.GetFastest(pv.threshold - 1) + cosignersOrderedByFastest := pv.cosignerHealth.GetFastest() + cosignersForThisBlock := cosignersOrderedByFastest[:pv.threshold-1] cosignersForThisBlock = append(cosignersForThisBlock, pv.myCosigner) nonceCtx, nonceCancel := context.WithTimeout(ctx, pv.grpcTimeout) defer nonceCancel() @@ -529,6 +530,19 @@ func (pv *ThresholdValidator) Sign(ctx context.Context, chainID string, block Bl return nil, stamp, fmt.Errorf("failed to get nonces: %w", err) } + nextFastestCosignerIndex := pv.threshold + var nextFastestCosignerIndexMu sync.Mutex + getNextFastestCosigner := func() Cosigner { + nextFastestCosignerIndexMu.Lock() + defer nextFastestCosignerIndexMu.Unlock() + if nextFastestCosignerIndex >= numPeers { + return nil + } + cosigner := cosignersOrderedByFastest[nextFastestCosignerIndex] + nextFastestCosignerIndex++ + return cosigner + } + timedSignBlockThresholdLag.Observe(time.Since(timeStartSignBlock).Seconds()) for _, peer := range pv.peerCosigners { @@ -549,32 +563,38 @@ func (pv *ThresholdValidator) Sign(ctx context.Context, chainID string, block Bl for _, cosigner := range cosignersForThisBlock { cosigner := cosigner eg.Go(func() error { - ctx, cancel := context.WithTimeout(ctx, pv.grpcTimeout) - defer cancel() - - peerStartTime := time.Now() - - // set peerNonces and sign in single rpc call. - sigRes, err := cosigner.SetNoncesAndSign(ctx, CosignerSetNoncesAndSignRequest{ - ChainID: chainID, - Nonces: nonces.For(cosigner.GetID()), - HRST: hrst, - SignBytes: signBytes, - }) - if err != nil { - pv.logger.Error( - "Cosigner failed to set nonces and sign", - "id", cosigner.GetID(), - "err", err.Error(), - ) - return err - } + for cosigner != nil { + ctx, cancel := context.WithTimeout(ctx, pv.grpcTimeout) + defer cancel() + + peerStartTime := time.Now() + + // set peerNonces and sign in single rpc call. + sigRes, err := cosigner.SetNoncesAndSign(ctx, CosignerSetNoncesAndSignRequest{ + ChainID: chainID, + Nonces: nonces.For(cosigner.GetID()), + HRST: hrst, + SignBytes: signBytes, + }) + if err != nil { + pv.logger.Error( + "Cosigner failed to set nonces and sign", + "id", cosigner.GetID(), + "err", err.Error(), + ) + pv.cosignerHealth.MarkUnhealthy(cosigner) + cosigner = getNextFastestCosigner() + continue + } + + if cosigner != pv.myCosigner { + timedCosignerSignLag.WithLabelValues(cosigner.GetAddress()).Observe(time.Since(peerStartTime).Seconds()) + } + shareSignatures[cosigner.GetID()-1] = sigRes.Signature - if cosigner != pv.myCosigner { - timedCosignerSignLag.WithLabelValues(cosigner.GetAddress()).Observe(time.Since(peerStartTime).Seconds()) + return nil } - shareSignatures[cosigner.GetID()-1] = sigRes.Signature - return nil + return fmt.Errorf("no cosigners available to sign") }) } @@ -599,11 +619,14 @@ func (pv *ThresholdValidator) Sign(ctx context.Context, chainID string, block Bl continue } + sig := make([]byte, len(shareSig)) + copy(sig, shareSig) + // we are ok to use the share signatures - complete boolean // prevents future concurrent access shareSigs = append(shareSigs, PartialSignature{ ID: idx + 1, - Signature: shareSig, + Signature: sig, }) }