Skip to content

Commit

Permalink
Merge branch 'libp2p-master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Trottier committed Jan 16, 2020
2 parents 00f5eed + eb4cba0 commit b7f9333
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 104 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
language: go

go:
- '1.12'
- 1.13.x

env:
global:
Expand Down
101 changes: 76 additions & 25 deletions connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ type BasicConnMgr struct {
plk sync.RWMutex
protected map[peer.ID]map[string]struct{}

// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
lastTrim time.Time
trimTrigger chan chan<- struct{}

lastTrimMu sync.RWMutex
lastTrim time.Time

silencePeriod time.Duration

logger *zap.Logger
Expand Down Expand Up @@ -95,7 +97,7 @@ func NewConnManager(ctx context.Context, wg *sync.WaitGroup, logger *zap.Logger,
highWater: hi,
lowWater: low,
gracePeriod: grace,
trimRunningCh: make(chan struct{}, 1),
trimTrigger: make(chan chan<- struct{}),
protected: make(map[peer.ID]map[string]struct{}, 16),
silencePeriod: SilencePeriod,
ctx: ctx,
Expand Down Expand Up @@ -167,49 +169,94 @@ type peerInfo struct {
// pruning those peers with the lowest scores first, as long as they are not within their
// grace period.
//
// TODO: error return value so we can cleanly signal we are aborting because:
// (a) there's another trim in progress, or (b) the silence period is in effect.
// This function blocks until a trim is completed. If a trim is underway, a new
// one won't be started, and instead it'll wait until that one is completed before
// returning.
func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) {
// TODO: error return value so we can cleanly signal we are aborting because:
// (a) there's another trim in progress, or (b) the silence period is in effect.

// Trigger a trim.
ch := make(chan struct{})
select {
case cm.trimRunningCh <- struct{}{}:
default:
return
}
defer func() { <-cm.trimRunningCh }()
if time.Since(cm.lastTrim) < cm.silencePeriod {
// skip this attempt to trim as the last one just took place.
return
case cm.trimTrigger <- ch:
case <-cm.ctx.Done():
case <-ctx.Done():
// TODO: return an error?
}

for _, c := range cm.getConnsToClose(ctx) {
cm.logger.Debug("closing connection", zap.String("peer.id", c.RemotePeer().String()))
c.Close()
cm.logger.Debug("connection closed", zap.String("peer.id", c.RemotePeer().String()))
// Wait for the trim.
select {
case <-ch:
case <-cm.ctx.Done():
case <-ctx.Done():
// TODO: return an error?
}

cm.lastTrim = time.Now()
}

func (cm *BasicConnMgr) background(wg *sync.WaitGroup) {
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
defer wg.Done()
for {
var waiting chan<- struct{}
select {
case <-ticker.C:
if atomic.LoadInt32(&cm.connCount) > int32(cm.highWater) {
cm.TrimOpenConns(cm.ctx)
if atomic.LoadInt32(&cm.connCount) < int32(cm.highWater) {
// Below high water, skip.
continue
}

case waiting = <-cm.trimTrigger:
case <-cm.ctx.Done():
return
}
cm.trim()

// Notify anyone waiting on this trim.
if waiting != nil {
close(waiting)
}

for {
select {
case waiting = <-cm.trimTrigger:
if waiting != nil {
close(waiting)
}
continue
default:
}
break
}
}
}

func (cm *BasicConnMgr) trim() {
cm.lastTrimMu.RLock()
// read the last trim time under the lock
lastTrim := cm.lastTrim
cm.lastTrimMu.RUnlock()

// skip this attempt to trim if the last one just took place.
if time.Since(lastTrim) < cm.silencePeriod {
return
}

// do the actual trim.
for _, c := range cm.getConnsToClose() {
cm.logger.Info("closing connection", zap.String("remote.peer", c.RemotePeer().String()))
c.Close()
}

// finally, update the last trim time.
cm.lastTrimMu.Lock()
cm.lastTrim = time.Now()
cm.lastTrimMu.Unlock()
}

// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
// connections to close.
func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []network.Conn {
func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
if cm.lowWater == 0 || cm.highWater == 0 {
// disabled
return nil
Expand Down Expand Up @@ -388,10 +435,14 @@ type CMInfo struct {

// GetInfo returns the configuration and status data for this connection manager.
func (cm *BasicConnMgr) GetInfo() CMInfo {
cm.lastTrimMu.RLock()
lastTrim := cm.lastTrim
cm.lastTrimMu.RUnlock()

return CMInfo{
HighWater: cm.highWater,
LowWater: cm.lowWater,
LastTrim: cm.lastTrim,
LastTrim: lastTrim,
GracePeriod: cm.gracePeriod,
ConnCount: int(atomic.LoadInt32(&cm.connCount)),
}
Expand Down
84 changes: 80 additions & 4 deletions connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,82 @@ func randConn(t testing.TB, discNotify func(network.Network, network.Conn)) netw
return &tconn{peer: pid, disconnectNotify: discNotify}
}

// Make sure multiple trim calls block.
func TestTrimBlocks(t *testing.T) {
cm := NewConnManager(context.Background(), &sync.WaitGroup{}, zaptest.NewLogger(t), 200, 300, 0)

cm.lastTrimMu.RLock()

doneCh := make(chan struct{}, 2)
go func() {
cm.TrimOpenConns(context.Background())
doneCh <- struct{}{}
}()
go func() {
cm.TrimOpenConns(context.Background())
doneCh <- struct{}{}
}()
time.Sleep(time.Millisecond)
select {
case <-doneCh:
cm.lastTrimMu.RUnlock()
t.Fatal("expected trim to block")
default:
cm.lastTrimMu.RUnlock()
}
<-doneCh
<-doneCh
}

// Make sure we return from trim when the context is canceled.
func TestTrimCancels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cm := NewConnManager(context.Background(), &sync.WaitGroup{}, zaptest.NewLogger(t), 200, 300, 0)

cm.lastTrimMu.RLock()
defer cm.lastTrimMu.RUnlock()

doneCh := make(chan struct{})
go func() {
defer close(doneCh)
cm.TrimOpenConns(ctx)
}()
time.Sleep(time.Millisecond)
cancel()
<-doneCh
}

// Make sure trim returns when closed.
func TestTrimClosed(t *testing.T) {
cm := NewConnManager(context.Background(), &sync.WaitGroup{}, zaptest.NewLogger(t), 200, 300, 0)
cm.Close()
cm.TrimOpenConns(context.Background())
}

// Make sure joining an existing trim works.
func TestTrimJoin(t *testing.T) {
cm := NewConnManager(context.Background(), &sync.WaitGroup{}, zaptest.NewLogger(t), 200, 300, 0)
cm.lastTrimMu.RLock()
var wg sync.WaitGroup
wg.Add(3)
go func() {
defer wg.Done()
cm.TrimOpenConns(context.Background())
}()
time.Sleep(time.Millisecond)
go func() {
defer wg.Done()
cm.TrimOpenConns(context.Background())
}()
go func() {
defer wg.Done()
cm.TrimOpenConns(context.Background())
}()
time.Sleep(time.Millisecond)
cm.lastTrimMu.RUnlock()
wg.Wait()
}

func TestConnTrimming(t *testing.T) {
wg := &sync.WaitGroup{}
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -94,19 +170,19 @@ func TestConnsToClose(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm := NewConnManager(ctx, wg, zaptest.NewLogger(t), 0, 10, 0)
conns := cm.getConnsToClose(context.Background())
conns := cm.getConnsToClose()
if conns != nil {
t.Fatal("expected no connections")
}

cm = NewConnManager(ctx, wg, zaptest.NewLogger(t), 10, 0, 0)
conns = cm.getConnsToClose(context.Background())
conns = cm.getConnsToClose()
if conns != nil {
t.Fatal("expected no connections")
}

cm = NewConnManager(ctx, wg, zaptest.NewLogger(t), 1, 1, 0)
conns = cm.getConnsToClose(context.Background())
conns = cm.getConnsToClose()
if conns != nil {
t.Fatal("expected no connections")
}
Expand All @@ -117,7 +193,7 @@ func TestConnsToClose(t *testing.T) {
conn := randConn(t, nil)
not.Connected(nil, conn)
}
conns = cm.getConnsToClose(context.Background())
conns = cm.getConnsToClose()
if len(conns) != 0 {
t.Fatal("expected no connections")
}
Expand Down
11 changes: 4 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
module github.com/RTradeLtd/go-libp2p-connmgr

go 1.12
go 1.13

require (
github.com/ipfs/go-detect-race v0.0.1
github.com/libp2p/go-libp2p-core v0.2.5
github.com/multiformats/go-multiaddr v0.1.2
github.com/pkg/errors v0.8.1 // indirect
github.com/stretchr/testify v1.3.0 // indirect
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/multierr v1.1.0 // indirect
github.com/libp2p/go-libp2p-core v0.3.0
github.com/multiformats/go-multiaddr v0.2.0
go.uber.org/zap v1.13.0
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 // indirect
)
Loading

0 comments on commit b7f9333

Please sign in to comment.