Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
refactor: remove metrics object and other review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Aug 11, 2022
1 parent 1ac4824 commit 81393bc
Show file tree
Hide file tree
Showing 16 changed files with 147 additions and 176 deletions.
2 changes: 1 addition & 1 deletion benchmarks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import (
protocol "github.com/libp2p/go-libp2p-core/protocol"

"github.com/ipfs/go-bitswap"
testinstance "github.com/ipfs/go-bitswap/client/testinstance"
bsnet "github.com/ipfs/go-bitswap/network"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
Expand Down
16 changes: 10 additions & 6 deletions polyfill.go → bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"fmt"

"github.com/ipfs/go-bitswap/client"
"github.com/ipfs/go-bitswap/internal/defaults"
"github.com/ipfs/go-bitswap/message"
"github.com/ipfs/go-bitswap/metrics"
"github.com/ipfs/go-bitswap/network"
"github.com/ipfs/go-bitswap/server"
"github.com/ipfs/go-bitswap/tracer"
Expand All @@ -24,7 +24,7 @@ import (
var log = logging.Logger("bitswap")

// old interface we are targeting
type old interface {
type bitswap interface {
Close() error
GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error)
GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error)
Expand All @@ -44,7 +44,8 @@ type old interface {
}

var _ exchange.SessionExchange = (*Bitswap)(nil)
var _ old = (*Bitswap)(nil)
var _ bitswap = (*Bitswap)(nil)
var HasBlockBufferSize = defaults.HasBlockBufferSize

type Bitswap struct {
*client.Client
Expand Down Expand Up @@ -81,9 +82,12 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
serverOptions = append(serverOptions, server.WithTracer(tracer))
}

stats := metrics.New(ctx)
bs.Server = server.New(ctx, net, bstore, stats, serverOptions...)
bs.Client = client.New(ctx, net, bstore, stats, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
if HasBlockBufferSize != defaults.HasBlockBufferSize {
serverOptions = append(serverOptions, server.HasBlockBufferSize(HasBlockBufferSize))
}

bs.Server = server.New(ctx, net, bstore, serverOptions...)
bs.Client = client.New(ctx, net, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
net.Start(bs) // use the polyfill receiver to log received errors and trace messages only once

return bs
Expand Down
2 changes: 1 addition & 1 deletion bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"time"

"github.com/ipfs/go-bitswap"
testinstance "github.com/ipfs/go-bitswap/client/testinstance"
bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/ipfs/go-bitswap/server"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down
2 changes: 1 addition & 1 deletion client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

"github.com/ipfs/go-bitswap"
"github.com/ipfs/go-bitswap/client/internal/session"
testinstance "github.com/ipfs/go-bitswap/client/testinstance"
testinstance "github.com/ipfs/go-bitswap/testinstance"
tn "github.com/ipfs/go-bitswap/testnet"
blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
Expand Down
14 changes: 6 additions & 8 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,14 @@ func WithBlockReceivedNotifier(brn BlockReceivedNotifier) Option {
}

type BlockReceivedNotifier interface {
// ReceivedBlocks notify the decision engine that a peer is well behaving
// and gave us usefull data, potentially increasing it's score and making us
// ReceivedBlocks notifies the decision engine that a peer is well-behaving
// and gave us useful data, potentially increasing its score and making us
// send them more data in exchange.
ReceivedBlocks(peer.ID, []blocks.Block)
}

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, m *bmetrics.Metrics, options ...Option) *Client {
// New initializes a Bitswap client that runs until client.Close is called.
func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Client {
// important to use provided parent context (since it may include important
// loggable data). It's probably not a good idea to allow bitswap to be
// coupled to the concerns of the ipfs daemon in this way.
Expand Down Expand Up @@ -155,8 +153,8 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: m.DupHist(),
allMetric: m.AllHist(),
dupMetric: bmetrics.DupHist(),
allMetric: bmetrics.AllHist(),
provSearchDelay: defaults.ProvSearchDelay,
rebroadcastDelay: delay.Fixed(time.Minute),
simulateDontHavesOnTimeout: true,
Expand Down
12 changes: 12 additions & 0 deletions decision/forward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package decision

import "github.com/ipfs/go-bitswap/server"

type (
// DEPRECATED use server.Receipt instead
Receipt = server.Receipt
// DEPRECATED use server.ScoreLedger instead
ScoreLedger = server.ScoreLedger
// DEPRECATED use server.ScorePeerFunc instead
ScorePeerFunc = server.ScorePeerFunc
)
17 changes: 17 additions & 0 deletions forward.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package bitswap

import (
"github.com/ipfs/go-bitswap/server"
"github.com/ipfs/go-bitswap/tracer"
)

type (
// DEPRECATED
PeerBlockRequestFilter = server.PeerBlockRequestFilter
// DEPRECATED
TaskComparator = server.TaskComparator
// DEPRECATED
TaskInfo = server.TaskInfo
// DEPRECATED
Tracer = tracer.Tracer
)
5 changes: 5 additions & 0 deletions internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,9 @@ const (
BitswapMaxOutstandingBytesPerPeer = 1 << 20
// the number of bytes we attempt to make each outgoing bitswap message
BitswapEngineTargetMessageSize = 16 * 1024
// HasBlockBufferSize is the buffer size of the channel for new blocks
// that need to be provided. They should get pulled over by the
// provideCollector even before they are actually provided.
// TODO: Does this need to be this large givent that?
HasBlockBufferSize = 256
)
132 changes: 0 additions & 132 deletions metrics/gen.go

This file was deleted.

44 changes: 44 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package metrics

import (
"github.com/ipfs/go-metrics-interface"
)

var (
// the 1<<18+15 is to observe old file chunks that are 1<<18 + 14 in size
metricsBuckets = []float64{1 << 6, 1 << 10, 1 << 14, 1 << 18, 1<<18 + 15, 1 << 22}

timeMetricsBuckets = []float64{1, 10, 30, 60, 90, 120, 600}
)

func DupHist() metrics.Histogram {
return metrics.New("recv_dup_blocks_bytes", "Summary of duplicate data blocks recived").Histogram(metricsBuckets)
}

func AllHist() metrics.Histogram {
return metrics.New("recv_all_blocks_bytes", "Summary of all data blocks recived").Histogram(metricsBuckets)
}

func SentHist() metrics.Histogram {
return metrics.New("sent_all_blocks_bytes", "Histogram of blocks sent by this bitswap").Histogram(metricsBuckets)
}

func SendTimeHist() metrics.Histogram {
return metrics.New("send_times", "Histogram of how long it takes to send messages in this bitswap").Histogram(timeMetricsBuckets)
}

func PendingEngineGauge() metrics.Gauge {
return metrics.New("pending_tasks", "Total number of pending tasks").Gauge()
}

func ActiveEngineGauge() metrics.Gauge {
return metrics.New("active_tasks", "Total number of active tasks").Gauge()
}

func PendingBlocksGauge() metrics.Gauge {
return metrics.New("pending_block_tasks", "Total number of pending blockstore tasks").Gauge()
}

func ActiveBlocksGauge() metrics.Gauge {
return metrics.New("active_block_tasks", "Total number of active blockstore tasks").Gauge()
}
1 change: 1 addition & 0 deletions server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type (
Receipt = decision.Receipt
PeerBlockRequestFilter = decision.PeerBlockRequestFilter
TaskComparator = decision.TaskComparator
TaskInfo = decision.TaskInfo
ScoreLedger = decision.ScoreLedger
ScorePeerFunc = decision.ScorePeerFunc
)
10 changes: 3 additions & 7 deletions server/internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,13 @@ func NewEngine(
bs bstore.Blockstore,
peerTagger PeerTagger,
self peer.ID,
metrics *bmetrics.Metrics,
opts ...Option,
) *Engine {
return newEngine(
bs,
peerTagger,
self,
maxBlockSizeReplaceHasWithBlock,
metrics,
opts...,
)
}
Expand All @@ -326,10 +324,8 @@ func newEngine(
peerTagger PeerTagger,
self peer.ID,
maxReplaceSize int,
metrics *bmetrics.Metrics,
opts ...Option,
) *Engine {

e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
scoreLedger: NewDefaultScoreLedger(),
Expand All @@ -344,8 +340,8 @@ func newEngine(
sendDontHaves: true,
self: self,
peerLedger: newPeerLedger(),
pendingGauge: metrics.PendingEngineGauge(),
activeGauge: metrics.ActiveEngineGauge(),
pendingGauge: bmetrics.PendingEngineGauge(),
activeGauge: bmetrics.ActiveEngineGauge(),
targetMessageSize: defaultTargetMessageSize,
tagQueued: fmt.Sprintf(tagFormat, "queued", uuid.New().String()),
tagUseful: fmt.Sprintf(tagFormat, "useful", uuid.New().String()),
Expand All @@ -355,7 +351,7 @@ func newEngine(
opt(e)
}

e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, metrics.PendingBlocksGauge(), metrics.ActiveBlocksGauge())
e.bsm = newBlockstoreManager(bs, e.bstoreWorkerCount, bmetrics.PendingBlocksGauge(), bmetrics.ActiveBlocksGauge())

// default peer task queue options
peerTaskQueueOpts := []peertaskqueue.Option{
Expand Down
2 changes: 0 additions & 2 deletions server/internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/ipfs/go-bitswap/internal/testutil"
message "github.com/ipfs/go-bitswap/message"
pb "github.com/ipfs/go-bitswap/message/pb"
"github.com/ipfs/go-bitswap/metrics"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -197,7 +196,6 @@ func newEngineForTesting(
peerTagger,
self,
maxReplaceSize,
metrics.New(ctx),
opts...,
)
}
Expand Down
Loading

0 comments on commit 81393bc

Please sign in to comment.