diff --git a/src/org/jgroups/BaseMessage.java b/src/org/jgroups/BaseMessage.java index 0c6cf68655..7c089ab8c6 100644 --- a/src/org/jgroups/BaseMessage.java +++ b/src/org/jgroups/BaseMessage.java @@ -224,6 +224,8 @@ public String toString() { transient_flags > 0? ", transient_flags=" + Util.transientFlagsToString(transient_flags) : ""); } + protected abstract int payloadSize(); + public int serializedSize() { return size(); } @@ -238,6 +240,19 @@ public int size() { retval+=Global.SHORT_SIZE; // number of headers retval+=Headers.marshalledSize(this.headers); + retval+=payloadSize(); + return retval; + } + + public int sizeNoAddrs(Address src) { + int retval=Global.BYTE_SIZE // leading byte + + Global.SHORT_SIZE; // flags + if(sender != null && !sender.equals(src)) + retval+=Util.size(sender); + + retval+=Global.SHORT_SIZE; // number of headers + retval+=Headers.marshalledSize(this.headers); + retval+=payloadSize(); return retval; } @@ -274,7 +289,7 @@ public void writeTo(DataOutput out) throws IOException { public void writeToNoAddrs(Address src, DataOutput out) throws IOException { byte leading=0; - boolean write_src_addr=src == null || sender != null && !sender.equals(src); + boolean write_src_addr=sender != null && !sender.equals(src); if(write_src_addr) leading=Util.setFlag(leading, SRC_SET); @@ -340,6 +355,4 @@ protected Message copyPayload(Message copy) { protected static Header[] createHeaders(int size) { return size > 0? new Header[size] : new Header[Util.DEFAULT_HEADERS]; } - - } diff --git a/src/org/jgroups/BatchMessage.java b/src/org/jgroups/BatchMessage.java index 5a0977c0a1..49fd2c8ab5 100644 --- a/src/org/jgroups/BatchMessage.java +++ b/src/org/jgroups/BatchMessage.java @@ -118,12 +118,12 @@ public String toString() { return String.format("%s, %d message(s)", super.toString(), getNumberOfMessages()); } - public int size() { - int retval=super.size() + Global.INT_SIZE; + @Override protected int payloadSize() { + int retval=Global.INT_SIZE; // count retval+=Util.size(orig_src); if(msgs != null) { for(int i=0; i < index; i++) - retval+=msgs[i].size() + Global.SHORT_SIZE; // type + retval+=msgs[i].sizeNoAddrs(getSrc()) + Global.SHORT_SIZE; // type } return retval; } @@ -140,7 +140,7 @@ public void writePayload(DataOutput out) throws IOException { for(int i=0; i < index; i++) { Message msg=msgs[i]; out.writeShort(msg.getType()); - msg.writeToNoAddrs(this.src(), out); + msg.writeToNoAddrs(getSrc(), out); } } } diff --git a/src/org/jgroups/BytesMessage.java b/src/org/jgroups/BytesMessage.java index eb64b384a5..584ff1e4d0 100644 --- a/src/org/jgroups/BytesMessage.java +++ b/src/org/jgroups/BytesMessage.java @@ -233,12 +233,6 @@ public T getObject(ClassLoader loader) { } } - - public int size() { - return super.size() +sizeOfPayload(); - } - - /** * Copies the byte array. If offset and length are used (to refer to another array), the copy will contain only * the subset that offset and length point to, copying the subset into the new copy.

@@ -252,7 +246,7 @@ public int size() { return copy; } - protected int sizeOfPayload() { + @Override protected int payloadSize() { int retval=Global.INT_SIZE; // length if(array != null) retval+=length; // number of bytes in the array diff --git a/src/org/jgroups/CompositeMessage.java b/src/org/jgroups/CompositeMessage.java index 3a960f9fd1..9299f8bea3 100644 --- a/src/org/jgroups/CompositeMessage.java +++ b/src/org/jgroups/CompositeMessage.java @@ -3,7 +3,6 @@ import org.jgroups.util.ByteArray; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -118,11 +117,11 @@ public String toString() { return String.format("%s, %d message(s)", super.toString(), getNumberOfMessages()); } - public int size() { - int retval=super.size() + Global.INT_SIZE; // length + @Override protected int payloadSize() { + int retval=Global.INT_SIZE; // count if(msgs != null) { for(int i=0; i < index; i++) - retval+=msgs[i].size() + Global.SHORT_SIZE; // type + retval+=msgs[i].sizeNoAddrs(getSrc()) + Global.SHORT_SIZE; // type } return retval; } @@ -142,8 +141,7 @@ public void writePayload(DataOutput out) throws IOException { for(int i=0; i < index; i++) { Message msg=msgs[i]; out.writeShort(msg.getType()); - // msg.writeToNoAddrs(src(), out); - msg.writeTo(out); + msg.writeToNoAddrs(getSrc(), out); } } } @@ -156,7 +154,7 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException short type=in.readShort(); Message msg=MessageFactory.create(type).setDest(getDest()); if(msg.getSrc() == null) - msg.setSrc(src()); + msg.setSrc(getSrc()); msg.readFrom(in); msgs[i]=msg; } diff --git a/src/org/jgroups/EmptyMessage.java b/src/org/jgroups/EmptyMessage.java index 80b1457e35..9e09e499df 100644 --- a/src/org/jgroups/EmptyMessage.java +++ b/src/org/jgroups/EmptyMessage.java @@ -40,4 +40,6 @@ public void writePayload(DataOutput out) throws IOException { public void readPayload(DataInput in) throws IOException, ClassNotFoundException { // no payload to read } + + protected int payloadSize() { return 0; } } diff --git a/src/org/jgroups/FragmentedMessage.java b/src/org/jgroups/FragmentedMessage.java index 63be9f9441..9266b10227 100644 --- a/src/org/jgroups/FragmentedMessage.java +++ b/src/org/jgroups/FragmentedMessage.java @@ -35,7 +35,7 @@ public FragmentedMessage(Message original_msg, int off, int len) { public boolean hasArray() {return false;} public boolean hasPayload() {return true;} public Supplier create() {return FragmentedMessage::new;} - protected int sizeOfPayload() {return Global.INT_SIZE + length;} + protected int payloadSize() {return Global.INT_SIZE + length;} @Override protected Message copyPayload(Message copy) { diff --git a/src/org/jgroups/LongMessage.java b/src/org/jgroups/LongMessage.java index 53706c704d..f9cb845dad 100644 --- a/src/org/jgroups/LongMessage.java +++ b/src/org/jgroups/LongMessage.java @@ -60,8 +60,8 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException value=Bits.readLongCompressed(in); } - public int size() { - return super.size() + Bits.size(value); + @Override protected int payloadSize() { + return Bits.size(value); } public String toString() { diff --git a/src/org/jgroups/Message.java b/src/org/jgroups/Message.java index 30270d0a46..2070d0e62e 100644 --- a/src/org/jgroups/Message.java +++ b/src/org/jgroups/Message.java @@ -197,13 +197,15 @@ default Message setFlag(short flag, boolean transient_flags) { */ Message setPayload(Object pl); - /** * Returns the exact size of the marshalled message * @return The number of bytes for the marshalled message */ int size(); + /** Returns the exact size of the marshalled message without destination (and possibly source) address */ + int sizeNoAddrs(Address src); + /** Writes the message to an output stream excluding the destination (and possibly source) address */ void writeToNoAddrs(Address src, DataOutput out) throws IOException; diff --git a/src/org/jgroups/NioMessage.java b/src/org/jgroups/NioMessage.java index 32a0b48257..01576eb86a 100644 --- a/src/org/jgroups/NioMessage.java +++ b/src/org/jgroups/NioMessage.java @@ -179,8 +179,6 @@ public NioMessage copy(boolean copy_payload, boolean copy_headers) { /* ----------------------------------- Interface Streamable ------------------------------- */ - public int size() {return super.size() +sizeOfPayload();} - public String toString() { return String.format("%s %s", super.toString(), use_direct_memory_for_allocations? "(direct)" : ""); } @@ -191,7 +189,7 @@ public String toString() { return copy; } - protected int sizeOfPayload() { + @Override protected int payloadSize() { return Global.INT_SIZE + getLength() + Global.BYTE_SIZE; // for use_direct_memory_for_allocations } diff --git a/src/org/jgroups/ObjectMessage.java b/src/org/jgroups/ObjectMessage.java index ea0d65e865..70645919c8 100644 --- a/src/org/jgroups/ObjectMessage.java +++ b/src/org/jgroups/ObjectMessage.java @@ -67,7 +67,7 @@ public ObjectMessage(Address dest, SizeStreamable obj) { public boolean hasPayload() {return obj != null;} public boolean hasArray() {return false;} public int getOffset() {return 0;} - public int getLength() {return obj != null? objSize() : 0;} + public int getLength() {return obj != null? payloadSize() : 0;} public byte[] getArray() {throw new UnsupportedOperationException();} public ObjectMessage setArray(byte[] b, int off, int len) {throw new UnsupportedOperationException();} public ObjectMessage setArray(ByteArray buf) {throw new UnsupportedOperationException();} @@ -132,10 +132,6 @@ public T getObject() { return isWrapped() || obj instanceof ObjectWrapperPrimitive? ((ObjectWrapperPrimitive)obj).getObject() : (T)obj; } - public int size() { - return super.size() + objSize(); - } - public void writePayload(DataOutput out) throws IOException { Util.writeGenericStreamable(obj, out); } @@ -157,7 +153,7 @@ public String toString() { return super.toString() + String.format(", obj: %s", obj); } - protected int objSize() { + @Override protected int payloadSize() { return Util.size(obj); } } diff --git a/src/org/jgroups/protocols/RingBufferBundler.java b/src/org/jgroups/protocols/RingBufferBundler.java index b0b48bf63c..40f753c25f 100644 --- a/src/org/jgroups/protocols/RingBufferBundler.java +++ b/src/org/jgroups/protocols/RingBufferBundler.java @@ -204,7 +204,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int if(msg != null && Objects.equals(dest, msg.getDest())) { if(list != null) list.add(msg); - int size=msg.size() + Global.SHORT_SIZE; + int size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE; if(bytes + size > max_bundle_size) break; bytes+=size; diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless.java b/src/org/jgroups/protocols/RingBufferBundlerLockless.java index 0540ba8fdb..6bea7b1afb 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless.java @@ -222,7 +222,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, while(available_msgs > 0) { Message msg=buf[start_index]; if(msg != null && Objects.equals(dest, msg.getDest())) { - int msg_size=msg.size() + Global.SHORT_SIZE;; + int msg_size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE; if(bytes + msg_size > max_bundle_size) break; bytes+=msg_size; diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java index 02c6e1eeb3..4e1f4bc7c8 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java @@ -207,7 +207,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, fina for(int i=start_index; i != end_index; i=increment(i)) { Message msg=buf[i]; if(msg != null && msg != NULL_MSG && Objects.equals(dest, msg.getDest())) { - int msg_size=msg.size() + Global.SHORT_SIZE; + int msg_size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE; if(bytes + msg_size > max_bundle_size) break; bytes+=msg_size; diff --git a/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java b/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java index 2154d9b532..0c798b6200 100644 --- a/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java +++ b/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java @@ -87,5 +87,9 @@ public void writePayload(DataOutput out) throws IOException { public void readPayload(DataInput in) throws IOException, ClassNotFoundException { } + + protected int payloadSize() { + return 0; + } } }