diff --git a/fuzz_helpers_test.go b/fuzz_helpers_test.go new file mode 100644 index 00000000..0831deb6 --- /dev/null +++ b/fuzz_helpers_test.go @@ -0,0 +1,149 @@ +package pubsub + +import ( + "encoding/binary" + + pb "github.com/libp2p/go-libp2p-pubsub/pb" +) + +func generateU16(data *[]byte) uint16 { + if len(*data) < 2 { + return 0 + } + + out := binary.LittleEndian.Uint16((*data)[:2]) + *data = (*data)[2:] + return out +} + +func generateBool(data *[]byte) bool { + if len(*data) < 1 { + return false + } + + out := (*data)[0]&1 == 1 + *data = (*data)[1:] + return out +} + +func generateMessage(data []byte, limit int) *pb.Message { + msgSize := int(generateU16(&data)) % limit + return &pb.Message{Data: make([]byte, msgSize)} +} + +func generateSub(data []byte, limit int) *pb.RPC_SubOpts { + topicIDSize := int(generateU16(&data)) % limit + subscribe := generateBool(&data) + + str := string(make([]byte, topicIDSize)) + return &pb.RPC_SubOpts{Subscribe: &subscribe, Topicid: &str} +} + +func generateControl(data []byte, limit int) *pb.ControlMessage { + numIWANTMsgs := int(generateU16(&data)) % (limit / 2) + numIHAVEMsgs := int(generateU16(&data)) % (limit / 2) + + ctl := &pb.ControlMessage{} + + ctl.Iwant = make([]*pb.ControlIWant, 0, numIWANTMsgs) + for i := 0; i < numIWANTMsgs; i++ { + msgSize := int(generateU16(&data)) % limit + msgCount := int(generateU16(&data)) % limit + ctl.Iwant = append(ctl.Iwant, &pb.ControlIWant{}) + ctl.Iwant[i].MessageIDs = make([]string, 0, msgCount) + for j := 0; j < msgCount; j++ { + ctl.Iwant[i].MessageIDs = append(ctl.Iwant[i].MessageIDs, string(make([]byte, msgSize))) + } + } + if ctl.Size() > limit { + return &pb.ControlMessage{} + } + + ctl.Ihave = make([]*pb.ControlIHave, 0, numIHAVEMsgs) + for i := 0; i < numIHAVEMsgs; i++ { + msgSize := int(generateU16(&data)) % limit + msgCount := int(generateU16(&data)) % limit + topicSize := int(generateU16(&data)) % limit + topic := string(make([]byte, topicSize)) + ctl.Ihave = append(ctl.Ihave, &pb.ControlIHave{TopicID: &topic}) + + ctl.Ihave[i].MessageIDs = make([]string, 0, msgCount) + for j := 0; j < msgCount; j++ { + ctl.Ihave[i].MessageIDs = append(ctl.Ihave[i].MessageIDs, string(make([]byte, msgSize))) + } + } + if ctl.Size() > limit { + return &pb.ControlMessage{} + } + + numGraft := int(generateU16(&data)) % limit + ctl.Graft = make([]*pb.ControlGraft, 0, numGraft) + for i := 0; i < numGraft; i++ { + topicSize := int(generateU16(&data)) % limit + topic := string(make([]byte, topicSize)) + ctl.Graft = append(ctl.Graft, &pb.ControlGraft{TopicID: &topic}) + } + if ctl.Size() > limit { + return &pb.ControlMessage{} + } + + numPrune := int(generateU16(&data)) % limit + ctl.Prune = make([]*pb.ControlPrune, 0, numPrune) + for i := 0; i < numPrune; i++ { + topicSize := int(generateU16(&data)) % limit + topic := string(make([]byte, topicSize)) + ctl.Prune = append(ctl.Prune, &pb.ControlPrune{TopicID: &topic}) + } + if ctl.Size() > limit { + return &pb.ControlMessage{} + } + + return ctl +} + +func generateRPC(data []byte, limit int) *RPC { + rpc := &RPC{RPC: pb.RPC{}} + sizeTester := RPC{RPC: pb.RPC{}} + + msgCount := int(generateU16(&data)) % (limit / 2) + rpc.Publish = make([]*pb.Message, 0, msgCount) + for i := 0; i < msgCount; i++ { + msg := generateMessage(data, limit) + + sizeTester.Publish = []*pb.Message{msg} + size := sizeTester.Size() + sizeTester.Publish = nil + if size > limit { + continue + } + + rpc.Publish = append(rpc.Publish, msg) + } + + subCount := int(generateU16(&data)) % (limit / 2) + rpc.Subscriptions = make([]*pb.RPC_SubOpts, 0, subCount) + for i := 0; i < subCount; i++ { + sub := generateSub(data, limit) + + sizeTester.Subscriptions = []*pb.RPC_SubOpts{sub} + size := sizeTester.Size() + sizeTester.Subscriptions = nil + if size > limit { + continue + } + + rpc.Subscriptions = append(rpc.Subscriptions, sub) + } + + ctl := generateControl(data, limit) + + sizeTester.Control = ctl + size := sizeTester.Size() + sizeTester.Control = nil + if size <= limit { + rpc.Control = ctl + + } + + return rpc +} diff --git a/gossipsub.go b/gossipsub.go index 9b302322..f2880e82 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1170,14 +1170,14 @@ func (gs *GossipSubRouter) sendRPC(p peer.ID, out *RPC) { return } - // If we're too big, fragment into multiple RPCs and send each sequentially - outRPCs, err := fragmentRPC(out, gs.p.maxMessageSize) - if err != nil { - gs.doDropRPC(out, p, fmt.Sprintf("unable to fragment RPC: %s", err)) - return - } - + // Potentially split the RPC into multiple RPCs that are below the max message size + outRPCs := appendOrMergeRPC(nil, gs.p.maxMessageSize, *out) for _, rpc := range outRPCs { + if rpc.Size() > gs.p.maxMessageSize { + // This should only happen if a single message/control is above the maxMessageSize. + gs.doDropRPC(out, p, fmt.Sprintf("Dropping oversized RPC. Size: %d, limit: %d. (Over by %d bytes)", rpc.Size(), gs.p.maxMessageSize, rpc.Size()-gs.p.maxMessageSize)) + continue + } gs.doSendRPC(rpc, p, mch) } } @@ -1201,119 +1201,134 @@ func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, mch chan *RPC) { } } -func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) { - if rpc.Size() < limit { - return []*RPC{rpc}, nil +// appendOrMergeRPC appends the given RPCs to the slice, merging them if possible. +// If any elem is too large to fit in a single RPC, it will be split into multiple RPCs. +// If an RPC is too large and can't be split further (e.g. Message data is +// bigger than the RPC limit), then it will be returned as an oversized RPC. +// The caller should filter out oversized RPCs. +func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { + if len(elems) == 0 { + return slice } - c := (rpc.Size() / limit) + 1 - rpcs := make([]*RPC, 1, c) - rpcs[0] = &RPC{RPC: pb.RPC{}, from: rpc.from} + if len(slice) == 0 && len(elems) == 1 && elems[0].Size() < limit { + // Fast path: no merging needed and only one element + return append(slice, &elems[0]) + } - // outRPC returns the current RPC message if it will fit sizeToAdd more bytes - // otherwise, it will create a new RPC message and add it to the list. - // if withCtl is true, the returned message will have a non-nil empty Control message. - outRPC := func(sizeToAdd int, withCtl bool) *RPC { - current := rpcs[len(rpcs)-1] - // check if we can fit the new data, plus an extra byte for the protobuf field tag - if current.Size()+sizeToAdd+1 < limit { - if withCtl && current.Control == nil { - current.Control = &pb.ControlMessage{} + out := slice + if len(out) == 0 { + out = append(out, &RPC{RPC: pb.RPC{}}) + out[0].from = elems[0].from + } + + for _, elem := range elems { + lastRPC := out[len(out)-1] + + // Merge/Append publish messages + // TODO: Never merge messages. The current behavior is the same as the + // old behavior. In the future let's not merge messages. Since, + // it may increase message latency. + for _, msg := range elem.GetPublish() { + if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit { + lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1] + lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from} + lastRPC.Publish = append(lastRPC.Publish, msg) + out = append(out, lastRPC) } - return current } - var ctl *pb.ControlMessage - if withCtl { - ctl = &pb.ControlMessage{} - } - next := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from} - rpcs = append(rpcs, next) - return next - } - for _, msg := range rpc.GetPublish() { - s := msg.Size() - // if an individual message is too large, we can't fragment it and have to fail entirely - if s > limit { - return nil, fmt.Errorf("message with len=%d exceeds limit %d", s, limit) + // Merge/Append Subscriptions + for _, sub := range elem.GetSubscriptions() { + if lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub); lastRPC.Size() > limit { + lastRPC.Subscriptions = lastRPC.Subscriptions[:len(lastRPC.Subscriptions)-1] + lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from} + lastRPC.Subscriptions = append(lastRPC.Subscriptions, sub) + out = append(out, lastRPC) + } } - out := outRPC(s, false) - out.Publish = append(out.Publish, msg) - } - for _, sub := range rpc.GetSubscriptions() { - out := outRPC(sub.Size(), false) - out.Subscriptions = append(out.Subscriptions, sub) - } + // Merge/Append Control messages + if ctl := elem.GetControl(); ctl != nil { + if lastRPC.Control == nil { + lastRPC.Control = &pb.ControlMessage{} + if lastRPC.Size() > limit { + lastRPC.Control = nil + lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} + out = append(out, lastRPC) + } + } - ctl := rpc.GetControl() - if ctl == nil { - // if there were no control messages, we're done - return rpcs, nil - } - // if all the control messages fit into one RPC, we just add it to the end and return - ctlOut := &RPC{RPC: pb.RPC{Control: ctl}, from: rpc.from} - if ctlOut.Size() < limit { - rpcs = append(rpcs, ctlOut) - return rpcs, nil - } + for _, graft := range ctl.GetGraft() { + if lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft); lastRPC.Size() > limit { + lastRPC.Control.Graft = lastRPC.Control.Graft[:len(lastRPC.Control.Graft)-1] + lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} + lastRPC.Control.Graft = append(lastRPC.Control.Graft, graft) + out = append(out, lastRPC) + } + } - // we need to split up the control messages into multiple RPCs - for _, graft := range ctl.Graft { - out := outRPC(graft.Size(), true) - out.Control.Graft = append(out.Control.Graft, graft) - } - for _, prune := range ctl.Prune { - out := outRPC(prune.Size(), true) - out.Control.Prune = append(out.Control.Prune, prune) - } + for _, prune := range ctl.GetPrune() { + if lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune); lastRPC.Size() > limit { + lastRPC.Control.Prune = lastRPC.Control.Prune[:len(lastRPC.Control.Prune)-1] + lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{}}, from: elem.from} + lastRPC.Control.Prune = append(lastRPC.Control.Prune, prune) + out = append(out, lastRPC) + } + } - // An individual IWANT or IHAVE message could be larger than the limit if we have - // a lot of message IDs. fragmentMessageIds will split them into buckets that - // fit within the limit, with some overhead for the control messages themselves - for _, iwant := range ctl.Iwant { - const protobufOverhead = 6 - idBuckets := fragmentMessageIds(iwant.MessageIDs, limit-protobufOverhead) - for _, ids := range idBuckets { - iwant := &pb.ControlIWant{MessageIDs: ids} - out := outRPC(iwant.Size(), true) - out.Control.Iwant = append(out.Control.Iwant, iwant) - } - } - for _, ihave := range ctl.Ihave { - const protobufOverhead = 6 - idBuckets := fragmentMessageIds(ihave.MessageIDs, limit-protobufOverhead) - for _, ids := range idBuckets { - ihave := &pb.ControlIHave{MessageIDs: ids} - out := outRPC(ihave.Size(), true) - out.Control.Ihave = append(out.Control.Ihave, ihave) - } - } - return rpcs, nil -} + for _, iwant := range ctl.GetIwant() { + if len(lastRPC.Control.Iwant) == 0 { + // Initialize with a single IWANT. + // For IWANTs we don't need more than a single one, + // since there are no topic IDs here. + newIWant := &pb.ControlIWant{} + if lastRPC.Control.Iwant = append(lastRPC.Control.Iwant, newIWant); lastRPC.Size() > limit { + lastRPC.Control.Iwant = lastRPC.Control.Iwant[:len(lastRPC.Control.Iwant)-1] + lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + Iwant: []*pb.ControlIWant{newIWant}, + }}, from: elem.from} + out = append(out, lastRPC) + } + } + for _, msgID := range iwant.GetMessageIDs() { + if lastRPC.Control.Iwant[0].MessageIDs = append(lastRPC.Control.Iwant[0].MessageIDs, msgID); lastRPC.Size() > limit { + lastRPC.Control.Iwant[0].MessageIDs = lastRPC.Control.Iwant[0].MessageIDs[:len(lastRPC.Control.Iwant[0].MessageIDs)-1] + lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + Iwant: []*pb.ControlIWant{{MessageIDs: []string{msgID}}}, + }}, from: elem.from} + out = append(out, lastRPC) + } + } + } -func fragmentMessageIds(msgIds []string, limit int) [][]string { - // account for two bytes of protobuf overhead per array element - const protobufOverhead = 2 - - out := [][]string{{}} - var currentBucket int - var bucketLen int - for i := 0; i < len(msgIds); i++ { - size := len(msgIds[i]) + protobufOverhead - if size > limit { - // pathological case where a single message ID exceeds the limit. - log.Warnf("message ID length %d exceeds limit %d, removing from outgoing gossip", size, limit) - continue - } - bucketLen += size - if bucketLen > limit { - out = append(out, []string{}) - currentBucket++ - bucketLen = size + for _, ihave := range ctl.GetIhave() { + if len(lastRPC.Control.Ihave) == 0 || + lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1].TopicID != ihave.TopicID { + // Start a new IHAVE if we are referencing a new topic ID + newIhave := &pb.ControlIHave{TopicID: ihave.TopicID} + if lastRPC.Control.Ihave = append(lastRPC.Control.Ihave, newIhave); lastRPC.Size() > limit { + lastRPC.Control.Ihave = lastRPC.Control.Ihave[:len(lastRPC.Control.Ihave)-1] + lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + Ihave: []*pb.ControlIHave{newIhave}, + }}, from: elem.from} + out = append(out, lastRPC) + } + } + for _, msgID := range ihave.GetMessageIDs() { + lastIHave := lastRPC.Control.Ihave[len(lastRPC.Control.Ihave)-1] + if lastIHave.MessageIDs = append(lastIHave.MessageIDs, msgID); lastRPC.Size() > limit { + lastIHave.MessageIDs = lastIHave.MessageIDs[:len(lastIHave.MessageIDs)-1] + lastRPC = &RPC{RPC: pb.RPC{Control: &pb.ControlMessage{ + Ihave: []*pb.ControlIHave{{TopicID: ihave.TopicID, MessageIDs: []string{msgID}}}, + }}, from: elem.from} + out = append(out, lastRPC) + } + } + } } - out[currentBucket] = append(out[currentBucket], msgIds[i]) } + return out } diff --git a/gossipsub_test.go b/gossipsub_test.go index 69194663..5ce3a041 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -2335,7 +2335,24 @@ func (iwe *iwantEverything) handleStream(s network.Stream) { } } +func validRPCSizes(slice []*RPC, limit int) bool { + for _, rpc := range slice { + if rpc.Size() > limit { + return false + } + } + return true +} + func TestFragmentRPCFunction(t *testing.T) { + fragmentRPC := func(rpc *RPC, limit int) ([]*RPC, error) { + rpcs := appendOrMergeRPC(nil, limit, *rpc) + if allValid := validRPCSizes(rpcs, limit); !allValid { + return rpcs, fmt.Errorf("RPC size exceeds limit") + } + return rpcs, nil + } + p := peer.ID("some-peer") topic := "test" rpc := &RPC{from: p} @@ -2485,7 +2502,24 @@ func TestFragmentRPCFunction(t *testing.T) { {MessageIDs: []string{"hello", string(giantIdBytes)}}, }, } - results, err = fragmentRPC(rpc, limit) + results, _ = fragmentRPC(rpc, limit) + + // The old behavior would silently drop the giant ID. + // Now we return a the giant ID in a RPC by itself so that it can be + // dropped before actually sending the RPC. This lets us log the anamoly. + // To keep this test useful, we implement the old behavior here. + filtered := make([]*RPC, 0, len(results)) + for _, r := range results { + if r.Size() < limit { + filtered = append(filtered, r) + } + } + results = filtered + err = nil + if !validRPCSizes(results, limit) { + err = fmt.Errorf("RPC size exceeds limit") + } + if err != nil { t.Fatal(err) } @@ -2500,3 +2534,20 @@ func TestFragmentRPCFunction(t *testing.T) { results[0].Control.Iwant[0].MessageIDs[0]) } } + +func FuzzAppendOrMergeRPC(f *testing.F) { + minMaxMsgSize := 100 + maxMaxMsgSize := 2048 + f.Fuzz(func(t *testing.T, data []byte) { + maxSize := int(generateU16(&data)) % maxMaxMsgSize + if maxSize < minMaxMsgSize { + maxSize = minMaxMsgSize + } + rpc := generateRPC(data, maxSize) + rpcs := appendOrMergeRPC(nil, maxSize, *rpc) + + if !validRPCSizes(rpcs, maxSize) { + t.Fatalf("invalid RPC size") + } + }) +}