diff --git a/benchmarks_test.go b/benchmarks_test.go index ea676771..c989792a 100644 --- a/benchmarks_test.go +++ b/benchmarks_test.go @@ -18,8 +18,8 @@ import ( protocol "github.com/libp2p/go-libp2p-core/protocol" "github.com/ipfs/go-bitswap" - testinstance "github.com/ipfs/go-bitswap/client/testinstance" bsnet "github.com/ipfs/go-bitswap/network" + testinstance "github.com/ipfs/go-bitswap/testinstance" tn "github.com/ipfs/go-bitswap/testnet" cid "github.com/ipfs/go-cid" delay "github.com/ipfs/go-ipfs-delay" diff --git a/polyfill.go b/bitswap.go similarity index 90% rename from polyfill.go rename to bitswap.go index 95dcd5dc..f6fdb4cb 100644 --- a/polyfill.go +++ b/bitswap.go @@ -5,8 +5,8 @@ import ( "fmt" "github.com/ipfs/go-bitswap/client" + "github.com/ipfs/go-bitswap/internal/defaults" "github.com/ipfs/go-bitswap/message" - "github.com/ipfs/go-bitswap/metrics" "github.com/ipfs/go-bitswap/network" "github.com/ipfs/go-bitswap/server" "github.com/ipfs/go-bitswap/tracer" @@ -24,7 +24,7 @@ import ( var log = logging.Logger("bitswap") // old interface we are targeting -type old interface { +type bitswap interface { Close() error GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) @@ -44,7 +44,8 @@ type old interface { } var _ exchange.SessionExchange = (*Bitswap)(nil) -var _ old = (*Bitswap)(nil) +var _ bitswap = (*Bitswap)(nil) +var HasBlockBufferSize = defaults.HasBlockBufferSize type Bitswap struct { *client.Client @@ -81,9 +82,12 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc serverOptions = append(serverOptions, server.WithTracer(tracer)) } - stats := metrics.New(ctx) - bs.Server = server.New(ctx, net, bstore, stats, serverOptions...) - bs.Client = client.New(ctx, net, bstore, stats, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...) + if HasBlockBufferSize != defaults.HasBlockBufferSize { + serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize)) + } + + bs.Server = server.New(ctx, net, bstore, serverOptions...) + bs.Client = client.New(ctx, net, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...) net.Start(bs) // use the polyfill receiver to log received errors and trace messages only once return bs diff --git a/bitswap_test.go b/bitswap_test.go index 33603726..055a9030 100644 --- a/bitswap_test.go +++ b/bitswap_test.go @@ -10,9 +10,9 @@ import ( "time" "github.com/ipfs/go-bitswap" - testinstance "github.com/ipfs/go-bitswap/client/testinstance" bsmsg "github.com/ipfs/go-bitswap/message" "github.com/ipfs/go-bitswap/server" + testinstance "github.com/ipfs/go-bitswap/testinstance" tn "github.com/ipfs/go-bitswap/testnet" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" diff --git a/client/bitswap_with_sessions_test.go b/client/bitswap_with_sessions_test.go index 8ba2d6e9..5e4d2454 100644 --- a/client/bitswap_with_sessions_test.go +++ b/client/bitswap_with_sessions_test.go @@ -8,7 +8,7 @@ import ( "github.com/ipfs/go-bitswap" "github.com/ipfs/go-bitswap/client/internal/session" - testinstance "github.com/ipfs/go-bitswap/client/testinstance" + testinstance "github.com/ipfs/go-bitswap/testinstance" tn "github.com/ipfs/go-bitswap/testnet" blocks "github.com/ipfs/go-block-format" cid "github.com/ipfs/go-cid" diff --git a/client/client.go b/client/client.go index 1380e0d9..3a208749 100644 --- a/client/client.go +++ b/client/client.go @@ -82,16 +82,14 @@ func WithBlockReceivedNotifier(brn BlockReceivedNotifier) Option { } type BlockReceivedNotifier interface { - // ReceivedBlocks notify the decision engine that a peer is well behaving - // and gave us usefull data, potentially increasing it's score and making us + // ReceivedBlocks notifies the decision engine that a peer is well-behaving + // and gave us useful data, potentially increasing its score and making us // send them more data in exchange. ReceivedBlocks(peer.ID, []blocks.Block) } -// New initializes a BitSwap instance that communicates over the provided -// BitSwapNetwork. This function registers the returned instance as the network -// delegate. Runs until context is cancelled or bitswap.Close is called. -func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, m *bmetrics.Metrics, options ...Option) *Client { +// New initializes a Bitswap client that runs until client.Close is called. +func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Client { // important to use provided parent context (since it may include important // loggable data). It's probably not a good idea to allow bitswap to be // coupled to the concerns of the ipfs daemon in this way. @@ -155,8 +153,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore sim: sim, notif: notif, counters: new(counters), - dupMetric: m.DupHist(), - allMetric: m.AllHist(), + dupMetric: bmetrics.DupHist(), + allMetric: bmetrics.AllHist(), provSearchDelay: defaults.ProvSearchDelay, rebroadcastDelay: delay.Fixed(time.Minute), simulateDontHavesOnTimeout: true, diff --git a/decision/forward.go b/decision/forward.go new file mode 100644 index 00000000..d19cda94 --- /dev/null +++ b/decision/forward.go @@ -0,0 +1,12 @@ +package decision + +import "github.com/ipfs/go-bitswap/server" + +type ( + // DEPRECATED use server.Receipt instead + Receipt = server.Receipt + // DEPRECATED use server.ScoreLedger instead + ScoreLedger = server.ScoreLedger + // DEPRECATED use server.ScorePeerFunc instead + ScorePeerFunc = server.ScorePeerFunc +) diff --git a/forward.go b/forward.go new file mode 100644 index 00000000..2beb7590 --- /dev/null +++ b/forward.go @@ -0,0 +1,17 @@ +package bitswap + +import ( + "github.com/ipfs/go-bitswap/server" + "github.com/ipfs/go-bitswap/tracer" +) + +type ( + // DEPRECATED + PeerBlockRequestFilter = server.PeerBlockRequestFilter + // DEPRECATED + TaskComparator = server.TaskComparator + // DEPRECATED + TaskInfo = server.TaskInfo + // DEPRECATED + Tracer = tracer.Tracer +) diff --git a/internal/defaults/defaults.go b/internal/defaults/defaults.go index 54a9eaa6..6f7c2e74 100644 --- a/internal/defaults/defaults.go +++ b/internal/defaults/defaults.go @@ -19,4 +19,9 @@ const ( BitswapMaxOutstandingBytesPerPeer = 1 << 20 // the number of bytes we attempt to make each outgoing bitswap message BitswapEngineTargetMessageSize = 16 * 1024 + // HasBlockBufferSize is the buffer size of the channel for new blocks + // that need to be provided. They should get pulled over by the + // provideCollector even before they are actually provided. + // TODO: Does this need to be this large givent that? + HasBlockBufferSize = 256 ) diff --git a/metrics/gen.go b/metrics/gen.go deleted file mode 100644 index 000a8cde..00000000 --- a/metrics/gen.go +++ /dev/null @@ -1,132 +0,0 @@ -package metrics - -import ( - "context" - "sync" - - "github.com/ipfs/go-metrics-interface" -) - -var ( - // the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size - metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} - - timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600} -) - -// Metrics is a type which lazy initialize metrics objects. -// It MUST not be copied. -type Metrics struct { - ctx context.Context - lock sync.Mutex - - dupHist metrics.Histogram - allHist metrics.Histogram - sentHist metrics.Histogram - sendTimeHist metrics.Histogram - - pendingEngineGauge metrics.Gauge - activeEngineGauge metrics.Gauge - pendingBlocksGauge metrics.Gauge - activeBlocksGauge metrics.Gauge -} - -func New(ctx context.Context) *Metrics { - return &Metrics{ctx: metrics.CtxSubScope(ctx, "bitswap")} -} - -// DupHist return recv_dup_blocks_bytes. -// Threadsafe -func (m *Metrics) DupHist() metrics.Histogram { - m.lock.Lock() - defer m.lock.Unlock() - if m.dupHist != nil { - return m.dupHist - } - m.dupHist = metrics.NewCtx(m.ctx, "recv_dup_blocks_bytes", "Summary of duplicate data blocks recived").Histogram(metricsBuckets) - return m.dupHist -} - -// AllHist returns recv_all_blocks_bytes. -// Threadsafe -func (m *Metrics) AllHist() metrics.Histogram { - m.lock.Lock() - defer m.lock.Unlock() - if m.allHist != nil { - return m.allHist - } - m.allHist = metrics.NewCtx(m.ctx, "recv_all_blocks_bytes", "Summary of all data blocks recived").Histogram(metricsBuckets) - return m.allHist -} - -// SentHist returns sent_all_blocks_bytes. -// Threadsafe -func (m *Metrics) SentHist() metrics.Histogram { - m.lock.Lock() - defer m.lock.Unlock() - if m.sentHist != nil { - return m.sentHist - } - m.sentHist = metrics.NewCtx(m.ctx, "sent_all_blocks_bytes", "Histogram of blocks sent by this bitswap").Histogram(metricsBuckets) - return m.sentHist -} - -// SendTimeHist returns send_times. -// Threadsafe -func (m *Metrics) SendTimeHist() metrics.Histogram { - m.lock.Lock() - defer m.lock.Unlock() - if m.sendTimeHist != nil { - return m.sendTimeHist - } - m.sendTimeHist = metrics.NewCtx(m.ctx, "send_times", "Histogram of how long it takes to send messages in this bitswap").Histogram(timeMetricsBuckets) - return m.sendTimeHist -} - -// PendingEngineGauge returns pending_tasks. -// Threadsafe -func (m *Metrics) PendingEngineGauge() metrics.Gauge { - m.lock.Lock() - defer m.lock.Unlock() - if m.pendingEngineGauge != nil { - return m.pendingEngineGauge - } - m.pendingEngineGauge = metrics.NewCtx(m.ctx, "pending_tasks", "Total number of pending tasks").Gauge() - return m.pendingEngineGauge -} - -// ActiveEngineGauge returns active_tasks. -// Threadsafe -func (m *Metrics) ActiveEngineGauge() metrics.Gauge { - m.lock.Lock() - defer m.lock.Unlock() - if m.activeEngineGauge != nil { - return m.activeEngineGauge - } - m.activeEngineGauge = metrics.NewCtx(m.ctx, "active_tasks", "Total number of active tasks").Gauge() - return m.activeEngineGauge -} - -// PendingBlocksGauge returns pending_block_tasks. -// Threadsafe -func (m *Metrics) PendingBlocksGauge() metrics.Gauge { - m.lock.Lock() - defer m.lock.Unlock() - if m.pendingBlocksGauge != nil { - return m.pendingBlocksGauge - } - m.pendingBlocksGauge = metrics.NewCtx(m.ctx, "pending_block_tasks", "Total number of pending blockstore tasks").Gauge() - return m.pendingBlocksGauge -} - -// ActiveBlocksGauge returns active_block_tasks. -// Threadsafe -func (m *Metrics) ActiveBlocksGauge() metrics.Gauge { - m.lock.Lock() - defer m.lock.Unlock() - if m.activeBlocksGauge != nil { - return m.activeBlocksGauge - } - m.activeBlocksGauge = metrics.NewCtx(m.ctx, "active_block_tasks", "Total number of active blockstore tasks").Gauge() - return m.activeBlocksGauge -} diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000..8d679a51 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,44 @@ +package metrics + +import ( + "github.com/ipfs/go-metrics-interface" +) + +var ( + // the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size + metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22} + + timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600} +) + +func DupHist() metrics.Histogram { + return metrics.New("recv_dup_blocks_bytes", "Summary of duplicate data blocks recived").Histogram(metricsBuckets) +} + +func AllHist() metrics.Histogram { + return metrics.New("recv_all_blocks_bytes", "Summary of all data blocks recived").Histogram(metricsBuckets) +} + +func SentHist() metrics.Histogram { + return metrics.New("sent_all_blocks_bytes", "Histogram of blocks sent by this bitswap").Histogram(metricsBuckets) +} + +func SendTimeHist() metrics.Histogram { + return metrics.New("send_times", "Histogram of how long it takes to send messages in this bitswap").Histogram(timeMetricsBuckets) +} + +func PendingEngineGauge() metrics.Gauge { + return metrics.New("pending_tasks", "Total number of pending tasks").Gauge() +} + +func ActiveEngineGauge() metrics.Gauge { + return metrics.New("active_tasks", "Total number of active tasks").Gauge() +} + +func PendingBlocksGauge() metrics.Gauge { + return metrics.New("pending_block_tasks", "Total number of pending blockstore tasks").Gauge() +} + +func ActiveBlocksGauge() metrics.Gauge { + return metrics.New("active_block_tasks", "Total number of active blockstore tasks").Gauge() +} diff --git a/server/forward.go b/server/forward.go index 67f5b2a5..79c39d5d 100644 --- a/server/forward.go +++ b/server/forward.go @@ -8,6 +8,7 @@ type ( Receipt = decision.Receipt PeerBlockRequestFilter = decision.PeerBlockRequestFilter TaskComparator = decision.TaskComparator + TaskInfo = decision.TaskInfo ScoreLedger = decision.ScoreLedger ScorePeerFunc = decision.ScorePeerFunc ) diff --git a/server/internal/decision/engine.go b/server/internal/decision/engine.go index d1ccdeb0..04bcb143 100644 --- a/server/internal/decision/engine.go +++ b/server/internal/decision/engine.go @@ -308,7 +308,6 @@ func NewEngine( bs bstore.Blockstore, peerTagger PeerTagger, self peer.ID, - metrics *bmetrics.Metrics, opts ...Option, ) *Engine { return newEngine( @@ -316,7 +315,6 @@ func NewEngine( peerTagger, self, maxBlockSizeReplaceHasWithBlock, - metrics, opts..., ) } @@ -326,10 +324,8 @@ func newEngine( peerTagger PeerTagger, self peer.ID, maxReplaceSize int, - metrics *bmetrics.Metrics, opts ...Option, ) *Engine { - e := &Engine{ ledgerMap: make(map[peer.ID]*ledger), scoreLedger: NewDefaultScoreLedger(), @@ -344,8 +340,8 @@ func newEngine( sendDontHaves: true, self: self, peerLedger: newPeerLedger(), - pendingGauge: metrics.PendingEngineGauge(), - activeGauge: metrics.ActiveEngineGauge(), + pendingGauge: bmetrics.PendingEngineGauge(), + activeGauge: bmetrics.ActiveEngineGauge(), targetMessageSize: defaultTargetMessageSize, tagQueued: fmt.Sprintf(tagFormat, "queued", uuid.New().String()), tagUseful: fmt.Sprintf(tagFormat, "useful", uuid.New().String()), @@ -355,7 +351,7 @@ func newEngine( opt(e) } - e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, metrics.PendingBlocksGauge(), metrics.ActiveBlocksGauge()) + e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(), bmetrics.ActiveBlocksGauge()) // default peer task queue options peerTaskQueueOpts := []peertaskqueue.Option{ diff --git a/server/internal/decision/engine_test.go b/server/internal/decision/engine_test.go index 853cc3bf..3ae8f150 100644 --- a/server/internal/decision/engine_test.go +++ b/server/internal/decision/engine_test.go @@ -14,7 +14,6 @@ import ( "github.com/ipfs/go-bitswap/internal/testutil" message "github.com/ipfs/go-bitswap/message" pb "github.com/ipfs/go-bitswap/message/pb" - "github.com/ipfs/go-bitswap/metrics" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -197,7 +196,6 @@ func newEngineForTesting( peerTagger, self, maxReplaceSize, - metrics.New(ctx), opts..., ) } diff --git a/server/server.go b/server/server.go index 8cbe4682..b39c34f1 100644 --- a/server/server.go +++ b/server/server.go @@ -26,14 +26,7 @@ import ( "go.uber.org/zap" ) -var ( - // HasBlockBufferSize is the buffer size of the channel for new blocks - // that need to be provided. They should get pulled over by the - // provideCollector even before they are actually provided. - // TODO: Does this need to be this large givent that? - HasBlockBufferSize = 256 - provideKeysBufferSize = 2048 -) +var provideKeysBufferSize = 2048 var log = logging.Logger("bitswap-server") var sflog = log.Desugar() @@ -74,11 +67,13 @@ type Server struct { // Extra options to pass to the decision manager engineOptions []decision.Option + // the size of channel buffer to use + hasBlockBufferSize int // whether or not to make provide announcements provideEnabled bool } -func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, m *bmetrics.Metrics, options ...Option) *Server { +func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Server { ctx, cancel := context.WithCancel(ctx) px := process.WithTeardown(func() error { @@ -90,15 +85,16 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl }() s := &Server{ - sentHistogram: m.SentHist(), - sendTimeHistogram: m.SendTimeHist(), - taskWorkerCount: defaults.BitswapTaskWorkerCount, - network: network, - process: px, - provideEnabled: true, - newBlocks: make(chan cid.Cid, HasBlockBufferSize), - provideKeys: make(chan cid.Cid, provideKeysBufferSize), + sentHistogram: bmetrics.SentHist(), + sendTimeHistogram: bmetrics.SendTimeHist(), + taskWorkerCount: defaults.BitswapTaskWorkerCount, + network: network, + process: px, + provideEnabled: true, + hasBlockBufferSize: defaults.HasBlockBufferSize, + provideKeys: make(chan cid.Cid, provideKeysBufferSize), } + s.newBlocks = make(chan cid.Cid, s.hasBlockBufferSize) for _, o := range options { o(s) @@ -109,7 +105,6 @@ func New(ctx context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Bl bstore, network.ConnectionManager(), network.Self(), - m, s.engineOptions..., ) s.engineOptions = nil @@ -215,6 +210,16 @@ func MaxOutstandingBytesPerPeer(count int) Option { } } +// HasBlockBufferSize configure how big the new blocks buffer should be. +func HasBlockBufferSize(count int) Option { + if count < 0 { + panic("cannot have negative buffer size") + } + return func(bs *Server) { + bs.hasBlockBufferSize = count + } +} + // WantlistForPeer returns the currently understood list of blocks requested by a // given peer. func (bs *Server) WantlistForPeer(p peer.ID) []cid.Cid { diff --git a/client/testinstance/testinstance.go b/testinstance/testinstance.go similarity index 100% rename from client/testinstance/testinstance.go rename to testinstance/testinstance.go diff --git a/wantlist/forward.go b/wantlist/forward.go new file mode 100644 index 00000000..c7eba707 --- /dev/null +++ b/wantlist/forward.go @@ -0,0 +1,23 @@ +package wantlist + +import ( + "github.com/ipfs/go-bitswap/client/wantlist" + "github.com/ipfs/go-cid" +) + +type ( + // DEPRECATED use wantlist.Entry instead + Entry = wantlist.Entry + // DEPRECATED use wantlist.Wantlist instead + Wantlist = wantlist.Wantlist +) + +// DEPRECATED use wantlist.New instead +func New() *Wantlist { + return wantlist.New() +} + +// DEPRECATED use wantlist.NewRefEntry instead +func NewRefEntry(c cid.Cid, p int32) Entry { + return wantlist.NewRefEntry(c, p) +}