diff --git a/connmgr.go b/connmgr.go index 804730d..b385533 100644 --- a/connmgr.go +++ b/connmgr.go @@ -36,9 +36,11 @@ type BasicConnMgr struct { plk sync.RWMutex protected map[peer.ID]map[string]struct{} - // channel-based semaphore that enforces only a single trim is in progress - trimRunningCh chan struct{} - lastTrim time.Time + trimTrigger chan chan<- struct{} + + lastTrimMu sync.RWMutex + lastTrim time.Time + silencePeriod time.Duration ctx context.Context @@ -96,7 +98,7 @@ func NewConnManager(low, hi int, grace time.Duration) *BasicConnMgr { highWater: hi, lowWater: low, gracePeriod: grace, - trimRunningCh: make(chan struct{}, 1), + trimTrigger: make(chan chan<- struct{}), protected: make(map[peer.ID]map[string]struct{}, 16), silencePeriod: SilencePeriod, ctx: ctx, @@ -164,28 +166,29 @@ type peerInfo struct { // pruning those peers with the lowest scores first, as long as they are not within their // grace period. // -// TODO: error return value so we can cleanly signal we are aborting because: -// (a) there's another trim in progress, or (b) the silence period is in effect. +// This function blocks until a trim is completed. If a trim is underway, a new +// one won't be started, and instead it'll wait until that one is completed before +// returning. func (cm *BasicConnMgr) TrimOpenConns(ctx context.Context) { + // TODO: error return value so we can cleanly signal we are aborting because: + // (a) there's another trim in progress, or (b) the silence period is in effect. + + // Trigger a trim. + ch := make(chan struct{}) select { - case cm.trimRunningCh <- struct{}{}: - default: - return - } - defer func() { <-cm.trimRunningCh }() - if time.Since(cm.lastTrim) < cm.silencePeriod { - // skip this attempt to trim as the last one just took place. - return + case cm.trimTrigger <- ch: + case <-cm.ctx.Done(): + case <-ctx.Done(): + // TODO: return an error? } - defer log.EventBegin(ctx, "connCleanup").Done() - for _, c := range cm.getConnsToClose(ctx) { - log.Info("closing conn: ", c.RemotePeer()) - log.Event(ctx, "closeConn", c.RemotePeer()) - c.Close() + // Wait for the trim. + select { + case <-ch: + case <-cm.ctx.Done(): + case <-ctx.Done(): + // TODO: return an error? } - - cm.lastTrim = time.Now() } func (cm *BasicConnMgr) background() { @@ -193,21 +196,66 @@ func (cm *BasicConnMgr) background() { defer ticker.Stop() for { + var waiting chan<- struct{} select { case <-ticker.C: - if atomic.LoadInt32(&cm.connCount) > int32(cm.highWater) { - cm.TrimOpenConns(cm.ctx) + if atomic.LoadInt32(&cm.connCount) < int32(cm.highWater) { + // Below high water, skip. + continue } - + case waiting = <-cm.trimTrigger: case <-cm.ctx.Done(): return } + cm.trim() + + // Notify anyone waiting on this trim. + if waiting != nil { + close(waiting) + } + + for { + select { + case waiting = <-cm.trimTrigger: + if waiting != nil { + close(waiting) + } + continue + default: + } + break + } + } +} + +func (cm *BasicConnMgr) trim() { + cm.lastTrimMu.RLock() + // read the last trim time under the lock + lastTrim := cm.lastTrim + cm.lastTrimMu.RUnlock() + + // skip this attempt to trim if the last one just took place. + if time.Since(lastTrim) < cm.silencePeriod { + return + } + + // do the actual trim. + defer log.EventBegin(cm.ctx, "connCleanup").Done() + for _, c := range cm.getConnsToClose() { + log.Info("closing conn: ", c.RemotePeer()) + log.Event(cm.ctx, "closeConn", c.RemotePeer()) + c.Close() } + + // finally, update the last trim time. + cm.lastTrimMu.Lock() + cm.lastTrim = time.Now() + cm.lastTrimMu.Unlock() } // getConnsToClose runs the heuristics described in TrimOpenConns and returns the // connections to close. -func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []network.Conn { +func (cm *BasicConnMgr) getConnsToClose() []network.Conn { if cm.lowWater == 0 || cm.highWater == 0 { // disabled return nil @@ -386,10 +434,14 @@ type CMInfo struct { // GetInfo returns the configuration and status data for this connection manager. func (cm *BasicConnMgr) GetInfo() CMInfo { + cm.lastTrimMu.RLock() + lastTrim := cm.lastTrim + cm.lastTrimMu.RUnlock() + return CMInfo{ HighWater: cm.highWater, LowWater: cm.lowWater, - LastTrim: cm.lastTrim, + LastTrim: lastTrim, GracePeriod: cm.gracePeriod, ConnCount: int(atomic.LoadInt32(&cm.connCount)), } diff --git a/connmgr_test.go b/connmgr_test.go index ad78786..8e1b4b1 100644 --- a/connmgr_test.go +++ b/connmgr_test.go @@ -2,6 +2,7 @@ package connmgr import ( "context" + "sync" "testing" "time" @@ -47,6 +48,82 @@ func randConn(t testing.TB, discNotify func(network.Network, network.Conn)) netw return &tconn{peer: pid, disconnectNotify: discNotify} } +// Make sure multiple trim calls block. +func TestTrimBlocks(t *testing.T) { + cm := NewConnManager(200, 300, 0) + + cm.lastTrimMu.RLock() + + doneCh := make(chan struct{}, 2) + go func() { + cm.TrimOpenConns(context.Background()) + doneCh <- struct{}{} + }() + go func() { + cm.TrimOpenConns(context.Background()) + doneCh <- struct{}{} + }() + time.Sleep(time.Millisecond) + select { + case <-doneCh: + cm.lastTrimMu.RUnlock() + t.Fatal("expected trim to block") + default: + cm.lastTrimMu.RUnlock() + } + <-doneCh + <-doneCh +} + +// Make sure we return from trim when the context is canceled. +func TestTrimCancels(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + cm := NewConnManager(200, 300, 0) + + cm.lastTrimMu.RLock() + defer cm.lastTrimMu.RUnlock() + + doneCh := make(chan struct{}) + go func() { + defer close(doneCh) + cm.TrimOpenConns(ctx) + }() + time.Sleep(time.Millisecond) + cancel() + <-doneCh +} + +// Make sure trim returns when closed. +func TestTrimClosed(t *testing.T) { + cm := NewConnManager(200, 300, 0) + cm.Close() + cm.TrimOpenConns(context.Background()) +} + +// Make sure joining an existing trim works. +func TestTrimJoin(t *testing.T) { + cm := NewConnManager(200, 300, 0) + cm.lastTrimMu.RLock() + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + cm.TrimOpenConns(context.Background()) + }() + time.Sleep(time.Millisecond) + go func() { + defer wg.Done() + cm.TrimOpenConns(context.Background()) + }() + go func() { + defer wg.Done() + cm.TrimOpenConns(context.Background()) + }() + time.Sleep(time.Millisecond) + cm.lastTrimMu.RUnlock() + wg.Wait() +} + func TestConnTrimming(t *testing.T) { cm := NewConnManager(200, 300, 0) not := cm.Notifee() @@ -86,19 +163,19 @@ func TestConnTrimming(t *testing.T) { func TestConnsToClose(t *testing.T) { cm := NewConnManager(0, 10, 0) - conns := cm.getConnsToClose(context.Background()) + conns := cm.getConnsToClose() if conns != nil { t.Fatal("expected no connections") } cm = NewConnManager(10, 0, 0) - conns = cm.getConnsToClose(context.Background()) + conns = cm.getConnsToClose() if conns != nil { t.Fatal("expected no connections") } cm = NewConnManager(1, 1, 0) - conns = cm.getConnsToClose(context.Background()) + conns = cm.getConnsToClose() if conns != nil { t.Fatal("expected no connections") } @@ -109,7 +186,7 @@ func TestConnsToClose(t *testing.T) { conn := randConn(t, nil) not.Connected(nil, conn) } - conns = cm.getConnsToClose(context.Background()) + conns = cm.getConnsToClose() if len(conns) != 0 { t.Fatal("expected no connections") } diff --git a/go.mod b/go.mod index 108169f..4b7073d 100644 --- a/go.mod +++ b/go.mod @@ -6,3 +6,5 @@ require ( github.com/libp2p/go-libp2p-core v0.2.5 github.com/multiformats/go-multiaddr v0.2.0 ) + +go 1.12 diff --git a/go.sum b/go.sum index 7b31bb2..577e452 100644 --- a/go.sum +++ b/go.sum @@ -1,12 +1,9 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= -github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32 h1:qkOC5Gd33k54tobS36cXdAzJbeHaduLtnLQQwNoIi78= -github.com/btcsuite/btcd v0.0.0-20190213025234-306aecffea32/go.mod h1:DrZx5ec/dmnfpw9KyYoQyYo7d0KEvTkk/5M/vbZjAr8= github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3 h1:A/EVblehb75cUgXA5njHPn0kLAsykn6mJGz7rnmW5W0= github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3/go.mod h1:3J08xEfcugPacsc34/LKRU2yO7YmuT8yt28J8k2+rrI= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= -github.com/btcsuite/btcutil v0.0.0-20190207003914-4c204d697803/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= github.com/btcsuite/go-socks v0.0.0-20170105172521-4720035b7bfd/go.mod h1:HHNXQzUsZCxOoE+CPiyCTO6x34Zs86zZUiwtpXoGdtg= github.com/btcsuite/goleveldb v0.0.0-20160330041536-7834afc9e8cd/go.mod h1:F+uVaaLLH7j4eDXPRvw78tMflu7Ie2bzYOH4Y8rRKBY= @@ -18,6 +15,7 @@ github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3Ee github.com/davecgh/go-spew v0.0.0-20171005155431-ecdeabc65495/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= @@ -35,15 +33,12 @@ github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfm github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= -github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-detect-race v0.0.1 h1:qX/xay2W3E4Q1U7d9lNs1sU9nvguX0a7319XbyQ6cOk= github.com/ipfs/go-detect-race v0.0.1/go.mod h1:8BNT7shDZPo99Q74BpGMK+4D8Mn4j46UU0LZ723meps= github.com/ipfs/go-log v0.0.1 h1:9XTUN/rW64BCG1YhPK9Hoy3q8nr4gOmHHBpgFdfw6Lc= github.com/ipfs/go-log v0.0.1/go.mod h1:kL1d2/hzSpI0thNYjiKfjanbVNU+IIGA/WnNESY9leM= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= -github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8 h1:bspPhN+oKYFk5fcGNuQzp6IGzYQSenLEgH3s6jkXrWw= -github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= github.com/jbenet/goprocess v0.1.3 h1:YKyIEECS/XvcfHtBzxtjBBbWK+MbvA6dG8ASiqwvr10= github.com/jbenet/goprocess v0.1.3/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -58,10 +53,7 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxv github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/libp2p/go-flow-metrics v0.0.1/go.mod h1:Iv1GH0sG8DtYN3SVJ2eG221wMiNpZxBdp967ls1g+k8= github.com/libp2p/go-flow-metrics v0.0.2/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= -github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8/DT/1I= -github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= github.com/libp2p/go-libp2p-core v0.2.5 h1:iP1PIiIrlRrGbE1fYq2918yBc5NlCH3pFuIPSWU9hds= github.com/libp2p/go-libp2p-core v0.2.5/go.mod h1:6+5zJmKhsf7yHn1RbmYDu08qDUpIUxGdqHuEZckmZOA= github.com/libp2p/go-openssl v0.0.3 h1:wjlG7HvQkt4Fq4cfH33Ivpwp0omaElYEi9z26qaIkIk= @@ -84,13 +76,9 @@ github.com/minio/sha256-simd v0.1.1 h1:5QHSlgo3nt5yKOJrC7W8w7X+NFl8cMPZm96iu8kKU github.com/minio/sha256-simd v0.1.1/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/mr-tron/base58 v1.1.0 h1:Y51FGVJ91WBqCEabAi5OPUz38eAx8DakuAm5svLcsfQ= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= -github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs= -github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA= -github.com/multiformats/go-multiaddr v0.0.2 h1:RBysRCv5rv3FWlhKWKoXv8tnsCUpEpIZpCmqAGZos2s= -github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multiaddr v0.1.1 h1:rVAztJYMhCQ7vEFr8FvxW3mS+HF2eY/oPbOMeS0ZDnE= github.com/multiformats/go-multiaddr v0.1.1/go.mod h1:aMKBKNEYmzmDmxfX88/vz+J5IU55txyt0p4aiWVohjo= github.com/multiformats/go-multiaddr v0.1.2 h1:HWYHNSyyllbQopmVIF5K7JKJugiah+L9/kuZKHbmNdQ= @@ -114,8 +102,6 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/smola/gocompat v0.2.0 h1:6b1oIMlUXIpz//VKEDzPVBK8KG7beVwmHIUEBIs/Pns= github.com/smola/gocompat v0.2.0/go.mod h1:1B0MlxbmoZNo3h8guHp8HztB3BSYR5itql9qtVc0ypY= -github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a h1:/eS3yfGjQKG+9kayBkj0ip1BGhq6zJ3eaVksphxAaek= -github.com/spacemonkeygo/openssl v0.0.0-20181017203307-c2dcc5cca94a/go.mod h1:7AyxJNCJ7SBZ1MfVQCWD6Uqo2oubI2Eq2y2eqf+A5r0= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 h1:RC6RW7j+1+HkWaX/Yh71Ee5ZHaHYt7ZP4sQgUrm6cDU= github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572/go.mod h1:w0SWMsp6j9O/dk4/ZpIhL+3CkG8ofA2vuv7k+ltqUMc= github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= @@ -133,8 +119,6 @@ go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX1gxn7irUTF1fLpQovfQ5M= -golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190611184440-5c40567a22f8/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= @@ -151,6 +135,7 @@ golang.org/x/net v0.0.0-20190227160552-c95aed5357e7 h1:C2F/nMkR/9sfUTpvR3QrjBuTd golang.org/x/net v0.0.0-20190227160552-c95aed5357e7/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859 h1:R/3boaszxrf1GEUWTVDzSKVwLmSJpwZ1yqXm8j0v2QI= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=