Skip to content

Commit

Permalink
Fix stuck leader when cosigner goes down (#232)
Browse files Browse the repository at this point in the history
* Fix stuck leader when cosigner goes down

* Use ticker with reset instead of timer

* Fix local cosigner getnonce

* lint
  • Loading branch information
agouin authored Dec 11, 2023
1 parent 4fe383e commit 26c340c
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 150 deletions.
53 changes: 18 additions & 35 deletions signer/cosigner_nonce_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

cometlog "github.com/cometbft/cometbft/libs/log"
Expand All @@ -22,7 +23,7 @@ type CosignerNonceCache struct {

leader Leader

lastReconcileNonces lastCount
lastReconcileNonces atomic.Uint64
lastReconcileTime time.Time

getNoncesInterval time.Duration
Expand Down Expand Up @@ -86,29 +87,6 @@ func (m *movingAverage) average() float64 {
return weightedSum / duration
}

type lastCount struct {
count int
mu sync.RWMutex
}

func (lc *lastCount) Set(n int) {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.count = n
}

func (lc *lastCount) Inc() {
lc.mu.Lock()
defer lc.mu.Unlock()
lc.count++
}

func (lc *lastCount) Get() int {
lc.mu.RLock()
defer lc.mu.RUnlock()
return lc.count
}

type NonceCachePruner interface {
PruneNonces() int
}
Expand Down Expand Up @@ -174,8 +152,9 @@ func NewCosignerNonceCache(
nonceExpiration: nonceExpiration,
threshold: threshold,
pruner: pruner,
empty: make(chan struct{}, 1),
movingAverage: newMovingAverage(4 * getNoncesInterval), // weighted average over 4 intervals
// buffer up to 1000 empty events so that we don't ever block
empty: make(chan struct{}, 1000),
movingAverage: newMovingAverage(4 * getNoncesInterval), // weighted average over 4 intervals
}
// the only time pruner is expected to be non-nil is during tests, otherwise we use the cache logic.
if pruner == nil {
Expand Down Expand Up @@ -211,9 +190,9 @@ func (cnc *CosignerNonceCache) reconcile(ctx context.Context) {
remainingNonces := cnc.cache.Size()
timeSinceLastReconcile := time.Since(cnc.lastReconcileTime)

lastReconcileNonces := cnc.lastReconcileNonces.Get()
lastReconcileNonces := cnc.lastReconcileNonces.Load()
// calculate nonces per minute
noncesPerMin := float64(lastReconcileNonces-remainingNonces-pruned) / timeSinceLastReconcile.Minutes()
noncesPerMin := float64(int(lastReconcileNonces)-remainingNonces-pruned) / timeSinceLastReconcile.Minutes()
if noncesPerMin < 0 {
noncesPerMin = 0
}
Expand All @@ -230,7 +209,7 @@ func (cnc *CosignerNonceCache) reconcile(ctx context.Context) {
additional := t - remainingNonces

defer func() {
cnc.lastReconcileNonces.Set(remainingNonces + additional)
cnc.lastReconcileNonces.Store(uint64(remainingNonces + additional))
cnc.lastReconcileTime = time.Now()
}()

Expand Down Expand Up @@ -325,19 +304,23 @@ func (cnc *CosignerNonceCache) LoadN(ctx context.Context, n int) {
}

func (cnc *CosignerNonceCache) Start(ctx context.Context) {
cnc.lastReconcileNonces.Set(cnc.cache.Size())
cnc.lastReconcileNonces.Store(uint64(cnc.cache.Size()))
cnc.lastReconcileTime = time.Now()

ticker := time.NewTicker(cnc.getNoncesInterval)
ticker := time.NewTimer(cnc.getNoncesInterval)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
cnc.reconcile(ctx)
case <-cnc.empty:
cnc.reconcile(ctx)
// clear out channel
for len(cnc.empty) > 0 {
<-cnc.empty
}
}
cnc.reconcile(ctx)
ticker.Reset(cnc.getNoncesInterval)
}
}

Expand Down Expand Up @@ -365,7 +348,7 @@ CheckNoncesLoop:
// remove this set of nonces from the cache
cnc.cache.Delete(i)

if len(cnc.cache.cache) == 0 {
if len(cnc.cache.cache) == 0 && len(cnc.empty) == 0 {
cnc.logger.Debug("Nonce cache is empty, triggering reload")
cnc.empty <- struct{}{}
}
Expand All @@ -378,7 +361,7 @@ CheckNoncesLoop:
}

// increment so it's taken into account in the nonce burn rate in the next reconciliation
cnc.lastReconcileNonces.Inc()
cnc.lastReconcileNonces.Add(1)

// no nonces found
cosignerInts := make([]int, len(fastestPeers))
Expand Down
25 changes: 14 additions & 11 deletions signer/local_cosigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ func (cosigner *LocalCosigner) GetNonces(
u := u

outerEg.Go(func() error {
meta, err := cosigner.generateNoncesIfNecessary(u)
if err != nil {
return err
}

var eg errgroup.Group

nonces := make([]CosignerNonce, total-1)
Expand All @@ -353,7 +358,7 @@ func (cosigner *LocalCosigner) GetNonces(
i := i

eg.Go(func() error {
secretPart, err := cosigner.getNonce(u, peerID)
secretPart, err := cosigner.getNonce(meta, peerID)

if i >= id {
nonces[i-1] = secretPart
Expand Down Expand Up @@ -387,10 +392,10 @@ func (cosigner *LocalCosigner) GetNonces(

func (cosigner *LocalCosigner) generateNoncesIfNecessary(uuid uuid.UUID) (*NoncesWithExpiration, error) {
// protects the meta map
cosigner.noncesMu.Lock()
defer cosigner.noncesMu.Unlock()

if nonces, ok := cosigner.nonces[uuid]; ok {
cosigner.noncesMu.RLock()
nonces, ok := cosigner.nonces[uuid]
cosigner.noncesMu.RUnlock()
if ok {
return nonces, nil
}

Expand All @@ -404,25 +409,23 @@ func (cosigner *LocalCosigner) generateNoncesIfNecessary(uuid uuid.UUID) (*Nonce
Expiration: time.Now().Add(nonceExpiration),
}

cosigner.noncesMu.Lock()
cosigner.nonces[uuid] = &res
cosigner.noncesMu.Unlock()

return &res, nil
}

// Get the ephemeral secret part for an ephemeral share
// The ephemeral secret part is encrypted for the receiver
func (cosigner *LocalCosigner) getNonce(
uuid uuid.UUID,
meta *NoncesWithExpiration,
peerID int,
) (CosignerNonce, error) {
zero := CosignerNonce{}

id := cosigner.GetID()

meta, err := cosigner.generateNoncesIfNecessary(uuid)
if err != nil {
return zero, err
}

ourCosignerMeta := meta.Nonces[id-1]
nonce, err := cosigner.security.EncryptAndSign(peerID, ourCosignerMeta.PubKey, ourCosignerMeta.Shares[peerID-1])
if err != nil {
Expand Down
104 changes: 0 additions & 104 deletions signer/threshold_signer_bls.go

This file was deleted.

0 comments on commit 26c340c

Please sign in to comment.