Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #307 from ipfs/perf/message-queue
Browse files Browse the repository at this point in the history
Perf/message queue
  • Loading branch information
Stebalien authored Mar 19, 2020
2 parents 03e6d1f + b4763e2 commit 89d39a6
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 91 deletions.
68 changes: 39 additions & 29 deletions internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ type MessageQueue struct {
rebroadcastIntervalLk sync.RWMutex
rebroadcastInterval time.Duration
rebroadcastTimer *time.Timer
// For performance reasons we just clear out the fields of the message
// instead of creating a new one every time.
msg bsmsg.BitSwapMessage
}

// recallWantlist keeps a list of pending wants, and a list of all wants that
Expand Down Expand Up @@ -176,6 +179,9 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
priority: maxPriority,
// For performance reasons we just clear out the fields of the message
// after using it, instead of creating a new one every time.
msg: bsmsg.New(false),
}

return mq
Expand Down Expand Up @@ -399,20 +405,25 @@ func (mq *MessageQueue) sendMessage() {
mq.dhTimeoutMgr.Start()

// Convert want lists to a Bitswap Message
message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave())
if message == nil || message.Empty() {
message := mq.extractOutgoingMessage(mq.sender.SupportsHave())

// After processing the message, clear out its fields to save memory
defer mq.msg.Reset(false)

if message.Empty() {
return
}

mq.logOutgoingMessage(message)
wantlist := message.Wantlist()
mq.logOutgoingMessage(wantlist)

// Try to send this message repeatedly
for i := 0; i < maxRetries; i++ {
if mq.attemptSendAndRecovery(message) {
// We were able to send successfully.
onSent()
mq.onMessageSent(wantlist)

mq.simulateDontHaveWithTimeout(message)
mq.simulateDontHaveWithTimeout(wantlist)

// If the message was too big and only a subset of wants could be
// sent, schedule sending the rest of the wants in the next
Expand All @@ -430,12 +441,12 @@ func (mq *MessageQueue) sendMessage() {
// This is necessary when making requests to peers running an older version of
// Bitswap that doesn't support the DONT_HAVE response, and is also useful to
// mitigate getting blocked by a peer that takes a long time to respond.
func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) {
mq.wllock.Lock()

func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) {
// Get the CID of each want-block that expects a DONT_HAVE response
wantlist := msg.Wantlist()
wants := make([]cid.Cid, 0, len(wantlist))

mq.wllock.Lock()

for _, entry := range wantlist {
if entry.WantType == pb.Message_Wantlist_Block && entry.SendDontHave {
// Unlikely, but just in case check that the block hasn't been
Expand All @@ -453,15 +464,14 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(msg bsmsg.BitSwapMessage) {
mq.dhTimeoutMgr.AddPending(wants)
}

func (mq *MessageQueue) logOutgoingMessage(msg bsmsg.BitSwapMessage) {
func (mq *MessageQueue) logOutgoingMessage(wantlist []bsmsg.Entry) {
// Save some CPU cycles and allocations if log level is higher than debug
if ce := sflog.Check(zap.DebugLevel, "Bitswap -> send wants"); ce == nil {
return
}

self := mq.network.Self()
entries := msg.Wantlist()
for _, e := range entries {
for _, e := range wantlist {
if e.Cancel {
if e.WantType == pb.Message_Wantlist_Have {
log.Debugw("Bitswap -> cancel-have", "local", self, "to", mq.p, "cid", e.Cid)
Expand All @@ -478,21 +488,21 @@ func (mq *MessageQueue) logOutgoingMessage(msg bsmsg.BitSwapMessage) {
}
}

// Whether there is work to be processed
func (mq *MessageQueue) hasPendingWork() bool {
return mq.pendingWorkCount() > 0
}

// The amount of work that is waiting to be processed
func (mq *MessageQueue) pendingWorkCount() int {
mq.wllock.Lock()
defer mq.wllock.Unlock()

return mq.bcstWants.pending.Len() + mq.peerWants.pending.Len() + mq.cancels.Len()
}

func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
// Create a new message
msg := bsmsg.New(false)

// Convert the lists of wants into a Bitswap message
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) bsmsg.BitSwapMessage {
mq.wllock.Lock()
defer mq.wllock.Unlock()

Expand All @@ -515,7 +525,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
}

e := bcstEntries[i]
msgSize += msg.AddEntry(e.Cid, e.Priority, wantType, false)
msgSize += mq.msg.AddEntry(e.Cid, e.Priority, wantType, false)
}

// Add each regular want-have / want-block to the message
Expand All @@ -526,7 +536,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
if !supportsHave && e.WantType == pb.Message_Wantlist_Have {
mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have)
} else {
msgSize += msg.AddEntry(e.Cid, e.Priority, e.WantType, true)
msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true)
}
}

Expand All @@ -535,26 +545,26 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
for i := 0; i < len(cancels) && msgSize < mq.maxMessageSize; i++ {
c := cancels[i]

msgSize += msg.Cancel(c)
msgSize += mq.msg.Cancel(c)

// Clear the cancel - we make a best effort to let peers know about
// cancels but won't save them to resend if there's a failure.
mq.cancels.Remove(c)
}

// Called when the message has been successfully sent.
return mq.msg
}

// Called when the message has been successfully sent.
func (mq *MessageQueue) onMessageSent(wantlist []bsmsg.Entry) {
// Remove the sent keys from the broadcast and regular wantlists.
onSent := func() {
mq.wllock.Lock()
defer mq.wllock.Unlock()
mq.wllock.Lock()
defer mq.wllock.Unlock()

for _, e := range msg.Wantlist() {
mq.bcstWants.pending.Remove(e.Cid)
mq.peerWants.pending.RemoveType(e.Cid, e.WantType)
}
for _, e := range wantlist {
mq.bcstWants.pending.Remove(e.Cid)
mq.peerWants.pending.RemoveType(e.Cid, e.WantType)
}

return msg, onSent
}

func (mq *MessageQueue) initializeSender() error {
Expand Down
Loading

0 comments on commit 89d39a6

Please sign in to comment.