Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
Merge pull request #349 from ipfs/refactor/simplify-mq-onSent
Browse files Browse the repository at this point in the history
refactor: simplify messageQueue onSent
  • Loading branch information
Stebalien authored Apr 13, 2020
2 parents 906b2fb + b6a8a73 commit 6099047
Showing 1 changed file with 15 additions and 26 deletions.
41 changes: 15 additions & 26 deletions internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func (r *recallWantlist) RemoveType(c cid.Cid, wtype pb.Message_Wantlist_WantTyp
r.pending.RemoveType(c, wtype)
}

// Sent moves the want from the pending to the sent list
func (r *recallWantlist) Sent(e bsmsg.Entry) {
// MarkSent moves the want from the pending to the sent list
func (r *recallWantlist) MarkSent(e wantlist.Entry) {
r.pending.RemoveType(e.Cid, e.WantType)
r.sent.Add(e.Cid, e.Priority, e.WantType)
}
Expand Down Expand Up @@ -439,7 +439,7 @@ func (mq *MessageQueue) sendMessage() {
for i := 0; i < maxRetries; i++ {
if mq.attemptSendAndRecovery(message) {
// We were able to send successfully.
onSent(wantlist)
onSent()

mq.simulateDontHaveWithTimeout(wantlist)

Expand Down Expand Up @@ -540,7 +540,7 @@ func (mq *MessageQueue) pendingWorkCount() int {
}

// Convert the lists of wants into a Bitswap message
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func([]bsmsg.Entry)) {
func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwapMessage, func()) {
mq.wllock.Lock()
defer mq.wllock.Unlock()

Expand All @@ -566,6 +566,7 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
}

// Add each regular want-have / want-block to the message
peerSent := make([]wantlist.Entry, 0, len(peerEntries))
for i := 0; i < len(peerEntries) && msgSize < mq.maxMessageSize; i++ {
e := peerEntries[i]
// If the remote peer doesn't support HAVE / DONT_HAVE messages,
Expand All @@ -574,11 +575,13 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
mq.peerWants.RemoveType(e.Cid, pb.Message_Wantlist_Have)
} else {
msgSize += mq.msg.AddEntry(e.Cid, e.Priority, e.WantType, true)
peerSent = append(peerSent, e)
}
}

// Add each broadcast want-have to the message
for i := 0; i < len(bcstEntries) && msgSize < mq.maxMessageSize; i++ {
bcstSentCount := 0
for ; bcstSentCount < len(bcstEntries) && msgSize < mq.maxMessageSize; bcstSentCount++ {
// Broadcast wants are sent as want-have
wantType := pb.Message_Wantlist_Have

Expand All @@ -588,41 +591,27 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
wantType = pb.Message_Wantlist_Block
}

e := bcstEntries[i]
e := bcstEntries[bcstSentCount]
msgSize += mq.msg.AddEntry(e.Cid, e.Priority, wantType, false)
}

// Called when the message has been successfully sent.
onMessageSent := func(wantlist []bsmsg.Entry) {
bcst := keysToSet(bcstEntries)
prws := keysToSet(peerEntries)

onMessageSent := func() {
mq.wllock.Lock()
defer mq.wllock.Unlock()

// Move the keys from pending to sent
for _, e := range wantlist {
if _, ok := bcst[e.Cid]; ok {
mq.bcstWants.Sent(e)
}
if _, ok := prws[e.Cid]; ok {
mq.peerWants.Sent(e)
}
for i := 0; i < bcstSentCount; i++ {
mq.bcstWants.MarkSent(bcstEntries[i])
}
for _, e := range peerSent {
mq.peerWants.MarkSent(e)
}
}

return mq.msg, onMessageSent
}

// Convert wantlist entries into a set of cids
func keysToSet(wl []wantlist.Entry) map[cid.Cid]struct{} {
set := make(map[cid.Cid]struct{}, len(wl))
for _, e := range wl {
set[e.Cid] = struct{}{}
}
return set
}

func (mq *MessageQueue) initializeSender() error {
if mq.sender != nil {
return nil
Expand Down

0 comments on commit 6099047

Please sign in to comment.