diff --git a/gossipsub.go b/gossipsub.go index dcc5d193..c22481af 100644 --- a/gossipsub.go +++ b/gossipsub.go @@ -1361,6 +1361,14 @@ func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, q *rpcQueue, urgent bo gs.tracer.SendRPC(rpc, p) } +// maxProtobufOverhead denotes the protobuf encoding overhead for a message. +// it is based on the RPC.Size function excerpt: +// l = e.Size() +// n += 1 + l + sovRpc(uint64(l)) +// where sovRpc is a number of bytes needed to encode some uint64 value. +// Assuming that the message size is 10^10, the number of bytes needed to encode it is 5. +const maxProtobufOverhead = 1 + 5 + // appendOrMergeRPC appends the given RPCs to the slice, merging them if possible. // If any elem is too large to fit in a single RPC, it will be split into multiple RPCs. // If an RPC is too large and can't be split further (e.g. Message data is @@ -1384,16 +1392,21 @@ func appendOrMergeRPC(slice []*RPC, limit int, elems ...RPC) []*RPC { for _, elem := range elems { lastRPC := out[len(out)-1] + lastSize := lastRPC.Size() // Merge/Append publish messages // TODO: Never merge messages. The current behavior is the same as the // old behavior. In the future let's not merge messages. Since, // it may increase message latency. for _, msg := range elem.GetPublish() { - if lastRPC.Publish = append(lastRPC.Publish, msg); lastRPC.Size() > limit { + lastRPC.Publish = append(lastRPC.Publish, msg) + // do not use lastRPC.Size() here to avoid lastRPC.Publish iteration calling Size on each element. + lastSize += msg.Size() + maxProtobufOverhead + if lastSize > limit { lastRPC.Publish = lastRPC.Publish[:len(lastRPC.Publish)-1] lastRPC = &RPC{RPC: pb.RPC{}, from: elem.from} lastRPC.Publish = append(lastRPC.Publish, msg) + lastSize = lastRPC.Size() // single element calcualtion out = append(out, lastRPC) } } diff --git a/gossipsub_test.go b/gossipsub_test.go index d515654f..eef6b9b4 100644 --- a/gossipsub_test.go +++ b/gossipsub_test.go @@ -2416,6 +2416,7 @@ func TestFragmentRPCFunction(t *testing.T) { ensureBelowLimit(results) msgsPerRPC := limit / msgSize expectedRPCs := nMessages / msgsPerRPC + expectedRPCs += 1 // add one more message to account for message size approximation when fragmenting if len(results) != expectedRPCs { t.Fatalf("expected %d RPC messages in output, got %d", expectedRPCs, len(results)) }