Skip to content

Commit

Permalink
next cosigner retry
Browse files Browse the repository at this point in the history
  • Loading branch information
agouin committed Nov 14, 2023
1 parent 85eab9c commit 9dc5dfc
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 34 deletions.
16 changes: 10 additions & 6 deletions signer/cosigner_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,14 @@ func (ch *CosignerHealth) Start(ctx context.Context) {
}
}

func (ch *CosignerHealth) MarkUnhealthy(cosigner Cosigner) {
ch.mu.Lock()
defer ch.mu.Unlock()
ch.rtt[cosigner.GetID()] = -1
}

func (ch *CosignerHealth) updateRTT(ctx context.Context, cosigner Cosigner) {
var rtt int64
rtt := int64(-1)
defer func() {
ch.mu.Lock()
defer ch.mu.Unlock()
Expand All @@ -62,14 +68,12 @@ func (ch *CosignerHealth) updateRTT(ctx context.Context, cosigner Cosigner) {
}
client := proto.NewCosignerClient(conn)
_, err = client.Ping(ctx, &proto.PingRequest{})
if err != nil {
rtt = -1
} else {
if err == nil {
rtt = time.Since(start).Nanoseconds()
}
}

func (ch *CosignerHealth) GetFastest(n int) []Cosigner {
func (ch *CosignerHealth) GetFastest() []Cosigner {
ch.mu.RLock()
defer ch.mu.RUnlock()

Expand All @@ -88,5 +92,5 @@ func (ch *CosignerHealth) GetFastest(n int) []Cosigner {
return rtt1 < rtt2
})

return fastest[:n]
return fastest
}
3 changes: 1 addition & 2 deletions signer/cosigner_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ func TestCosignerHealth(t *testing.T) {
5: 300,
}

fastest := ch.GetFastest(2)
fastest := ch.GetFastest()

require.Len(t, fastest, 2)
require.Equal(t, 4, fastest[0].GetID())
require.Equal(t, 2, fastest[1].GetID())
}
75 changes: 49 additions & 26 deletions signer/threshold_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ func (pv *ThresholdValidator) Sign(ctx context.Context, chainID string, block Bl

peerStartTime := time.Now()

cosignersForThisBlock := pv.cosignerHealth.GetFastest(pv.threshold - 1)
cosignersOrderedByFastest := pv.cosignerHealth.GetFastest()
cosignersForThisBlock := cosignersOrderedByFastest[:pv.threshold-1]
cosignersForThisBlock = append(cosignersForThisBlock, pv.myCosigner)
nonceCtx, nonceCancel := context.WithTimeout(ctx, pv.grpcTimeout)
defer nonceCancel()
Expand All @@ -529,6 +530,19 @@ func (pv *ThresholdValidator) Sign(ctx context.Context, chainID string, block Bl
return nil, stamp, fmt.Errorf("failed to get nonces: %w", err)
}

nextFastestCosignerIndex := pv.threshold
var nextFastestCosignerIndexMu sync.Mutex
getNextFastestCosigner := func() Cosigner {
nextFastestCosignerIndexMu.Lock()
defer nextFastestCosignerIndexMu.Unlock()
if nextFastestCosignerIndex >= numPeers {
return nil
}
cosigner := cosignersOrderedByFastest[nextFastestCosignerIndex]
nextFastestCosignerIndex++
return cosigner
}

timedSignBlockThresholdLag.Observe(time.Since(timeStartSignBlock).Seconds())

for _, peer := range pv.peerCosigners {
Expand All @@ -549,32 +563,38 @@ func (pv *ThresholdValidator) Sign(ctx context.Context, chainID string, block Bl
for _, cosigner := range cosignersForThisBlock {
cosigner := cosigner
eg.Go(func() error {
ctx, cancel := context.WithTimeout(ctx, pv.grpcTimeout)
defer cancel()

peerStartTime := time.Now()

// set peerNonces and sign in single rpc call.
sigRes, err := cosigner.SetNoncesAndSign(ctx, CosignerSetNoncesAndSignRequest{
ChainID: chainID,
Nonces: nonces.For(cosigner.GetID()),
HRST: hrst,
SignBytes: signBytes,
})
if err != nil {
pv.logger.Error(
"Cosigner failed to set nonces and sign",
"id", cosigner.GetID(),
"err", err.Error(),
)
return err
}
for cosigner != nil {
ctx, cancel := context.WithTimeout(ctx, pv.grpcTimeout)
defer cancel()

peerStartTime := time.Now()

// set peerNonces and sign in single rpc call.
sigRes, err := cosigner.SetNoncesAndSign(ctx, CosignerSetNoncesAndSignRequest{
ChainID: chainID,
Nonces: nonces.For(cosigner.GetID()),
HRST: hrst,
SignBytes: signBytes,
})
if err != nil {
pv.logger.Error(
"Cosigner failed to set nonces and sign",
"id", cosigner.GetID(),
"err", err.Error(),
)
pv.cosignerHealth.MarkUnhealthy(cosigner)
cosigner = getNextFastestCosigner()
continue
}

if cosigner != pv.myCosigner {
timedCosignerSignLag.WithLabelValues(cosigner.GetAddress()).Observe(time.Since(peerStartTime).Seconds())
}
shareSignatures[cosigner.GetID()-1] = sigRes.Signature

if cosigner != pv.myCosigner {
timedCosignerSignLag.WithLabelValues(cosigner.GetAddress()).Observe(time.Since(peerStartTime).Seconds())
return nil
}
shareSignatures[cosigner.GetID()-1] = sigRes.Signature
return nil
return fmt.Errorf("no cosigners available to sign")
})
}

Expand All @@ -599,11 +619,14 @@ func (pv *ThresholdValidator) Sign(ctx context.Context, chainID string, block Bl
continue
}

sig := make([]byte, len(shareSig))
copy(sig, shareSig)

// we are ok to use the share signatures - complete boolean
// prevents future concurrent access
shareSigs = append(shareSigs, PartialSignature{
ID: idx + 1,
Signature: shareSig,
Signature: sig,
})
}

Expand Down

0 comments on commit 9dc5dfc

Please sign in to comment.