forked from libp2p/go-libp2p-pubsub
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Adds exponential backoff to re-spawing new streams for supposedly dea…
…d peers (libp2p#483) * updates gitignore * implements draft solution * consolidates update and get * extends test * adds cleaner logic * removes a redundant else case * refactors cleanup in a goroutine * adds a jitter to backoff * stretches the sleep for cleanup * reduces jitter time * fixes a test * adds maximum backoff attempts * returns error for closing channel * refactors peer status exceed backoff threshold * converts if-else to switch * nit * consolidates update and maximum backoff check * bug fix * nit * refactors cleanup with a ticker object
- Loading branch information
1 parent
0ea9140
commit 06b5ba4
Showing
5 changed files
with
251 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
cover.out | ||
prof.out | ||
go-floodsub.test | ||
|
||
.idea/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package pubsub | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math/rand" | ||
"sync" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
const ( | ||
MinBackoffDelay = 100 * time.Millisecond | ||
MaxBackoffDelay = 10 * time.Second | ||
TimeToLive = 10 * time.Minute | ||
BackoffCleanupInterval = 1 * time.Minute | ||
BackoffMultiplier = 2 | ||
MaxBackoffJitterCoff = 100 | ||
MaxBackoffAttempts = 4 | ||
) | ||
|
||
type backoffHistory struct { | ||
duration time.Duration | ||
lastTried time.Time | ||
attempts int | ||
} | ||
|
||
type backoff struct { | ||
mu sync.Mutex | ||
info map[peer.ID]*backoffHistory | ||
ct int // size threshold that kicks off the cleaner | ||
ci time.Duration // cleanup intervals | ||
maxAttempts int // maximum backoff attempts prior to ejection | ||
} | ||
|
||
func newBackoff(ctx context.Context, sizeThreshold int, cleanupInterval time.Duration, maxAttempts int) *backoff { | ||
b := &backoff{ | ||
mu: sync.Mutex{}, | ||
ct: sizeThreshold, | ||
ci: cleanupInterval, | ||
maxAttempts: maxAttempts, | ||
info: make(map[peer.ID]*backoffHistory), | ||
} | ||
|
||
rand.Seed(time.Now().UnixNano()) // used for jitter | ||
go b.cleanupLoop(ctx) | ||
|
||
return b | ||
} | ||
|
||
func (b *backoff) updateAndGet(id peer.ID) (time.Duration, error) { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
h, ok := b.info[id] | ||
switch { | ||
case !ok || time.Since(h.lastTried) > TimeToLive: | ||
// first request goes immediately. | ||
h = &backoffHistory{ | ||
duration: time.Duration(0), | ||
attempts: 0, | ||
} | ||
case h.attempts >= b.maxAttempts: | ||
return 0, fmt.Errorf("peer %s has reached its maximum backoff attempts", id) | ||
|
||
case h.duration < MinBackoffDelay: | ||
h.duration = MinBackoffDelay | ||
|
||
case h.duration < MaxBackoffDelay: | ||
jitter := rand.Intn(MaxBackoffJitterCoff) | ||
h.duration = (BackoffMultiplier * h.duration) + time.Duration(jitter)*time.Millisecond | ||
if h.duration > MaxBackoffDelay || h.duration < 0 { | ||
h.duration = MaxBackoffDelay | ||
} | ||
} | ||
|
||
h.attempts += 1 | ||
h.lastTried = time.Now() | ||
b.info[id] = h | ||
return h.duration, nil | ||
} | ||
|
||
func (b *backoff) cleanup() { | ||
b.mu.Lock() | ||
defer b.mu.Unlock() | ||
|
||
for id, h := range b.info { | ||
if time.Since(h.lastTried) > TimeToLive { | ||
delete(b.info, id) | ||
} | ||
} | ||
} | ||
|
||
func (b *backoff) cleanupLoop(ctx context.Context) { | ||
ticker := time.NewTicker(b.ci) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return // pubsub shutting down | ||
case <-ticker.C: | ||
b.cleanup() | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
package pubsub | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"math" | ||
"testing" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/peer" | ||
) | ||
|
||
func TestBackoff_Update(t *testing.T) { | ||
id1 := peer.ID("peer-1") | ||
id2 := peer.ID("peer-2") | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
size := 10 | ||
cleanupInterval := 5 * time.Second | ||
maxBackoffAttempts := 10 | ||
|
||
b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts) | ||
|
||
if len(b.info) > 0 { | ||
t.Fatal("non-empty info map for backoff") | ||
} | ||
|
||
if d, err := b.updateAndGet(id1); d != time.Duration(0) || err != nil { | ||
t.Fatalf("invalid initialization: %v, \t, %s", d, err) | ||
} | ||
if d, err := b.updateAndGet(id2); d != time.Duration(0) || err != nil { | ||
t.Fatalf("invalid initialization: %v, \t, %s", d, err) | ||
} | ||
|
||
for i := 0; i < maxBackoffAttempts-1; i++ { | ||
got, err := b.updateAndGet(id1) | ||
if err != nil { | ||
t.Fatalf("unexpected error post update: %s", err) | ||
} | ||
|
||
expected := time.Duration(math.Pow(BackoffMultiplier, float64(i)) * | ||
float64(MinBackoffDelay+MaxBackoffJitterCoff*time.Millisecond)) | ||
if expected > MaxBackoffDelay { | ||
expected = MaxBackoffDelay | ||
} | ||
|
||
if expected < got { // considering jitter, expected backoff must always be greater than or equal to actual. | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", expected, got) | ||
} | ||
} | ||
|
||
// trying once more beyond the threshold, hence expecting exceeding threshold | ||
if _, err := b.updateAndGet(id1); err == nil { | ||
t.Fatalf("expected an error for going beyond threshold but got nil") | ||
} | ||
|
||
got, err := b.updateAndGet(id2) | ||
if err != nil { | ||
t.Fatalf("unexpected error post update: %s", err) | ||
} | ||
if got != MinBackoffDelay { | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", MinBackoffDelay, got) | ||
} | ||
|
||
// sets last tried of id2 to long ago that it resets back upon next try. | ||
// update attempts on id2 are below threshold, hence peer should never go beyond backoff attempt threshold. | ||
b.info[id2].lastTried = time.Now().Add(-TimeToLive) | ||
got, err = b.updateAndGet(id2) | ||
if err != nil { | ||
t.Fatalf("unexpected error post update: %s", err) | ||
} | ||
if got != time.Duration(0) { | ||
t.Fatalf("invalid ttl expiration, expected: %v, got: %v", time.Duration(0), got) | ||
} | ||
|
||
if len(b.info) != 2 { | ||
t.Fatalf("pre-invalidation attempt, info map size mismatch, expected: %d, got: %d", 2, len(b.info)) | ||
} | ||
|
||
} | ||
|
||
func TestBackoff_Clean(t *testing.T) { | ||
ctx, cancel := context.WithCancel(context.Background()) | ||
defer cancel() | ||
|
||
size := 10 | ||
cleanupInterval := 2 * time.Second | ||
maxBackoffAttempts := 100 // setting attempts to a high number hence testing cleanup logic. | ||
b := newBackoff(ctx, size, cleanupInterval, maxBackoffAttempts) | ||
|
||
for i := 0; i < size; i++ { | ||
id := peer.ID(fmt.Sprintf("peer-%d", i)) | ||
_, err := b.updateAndGet(id) | ||
if err != nil { | ||
t.Fatalf("unexpected error post update: %s", err) | ||
} | ||
b.info[id].lastTried = time.Now().Add(-TimeToLive) // enforces expiry | ||
} | ||
|
||
if len(b.info) != size { | ||
t.Fatalf("info map size mismatch, expected: %d, got: %d", size, len(b.info)) | ||
} | ||
|
||
// waits for a cleanup loop to kick-in | ||
time.Sleep(2 * cleanupInterval) | ||
|
||
// next update should trigger cleanup | ||
got, err := b.updateAndGet(peer.ID("some-new-peer")) | ||
if err != nil { | ||
t.Fatalf("unexpected error post update: %s", err) | ||
} | ||
if got != time.Duration(0) { | ||
t.Fatalf("invalid backoff result, expected: %v, got: %v", time.Duration(0), got) | ||
} | ||
|
||
// except "some-new-peer" every other records must be cleaned up | ||
if len(b.info) != 1 { | ||
t.Fatalf("info map size mismatch, expected: %d, got: %d", 1, len(b.info)) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters