From 569e574ebcc605e5a7c3154026db5fdf82a607b8 Mon Sep 17 00:00:00 2001 From: cfredri4 Date: Fri, 15 Nov 2024 09:01:05 +0100 Subject: [PATCH] Composite message additions This change borrows the optimization from BatchMessage to not serialize dest and src for each message This also fixes incorrect size being reported for BatchMessage and in various places where Message.writeToNoAddrs is used. --- src/org/jgroups/BaseMessage.java | 19 ++++++++++++++++--- src/org/jgroups/BatchMessage.java | 8 ++++---- src/org/jgroups/BytesMessage.java | 8 +------- src/org/jgroups/CompositeMessage.java | 12 +++++------- src/org/jgroups/EmptyMessage.java | 2 ++ src/org/jgroups/FragmentedMessage.java | 2 +- src/org/jgroups/LongMessage.java | 4 ++-- src/org/jgroups/Message.java | 4 +++- src/org/jgroups/NioMessage.java | 4 +--- src/org/jgroups/ObjectMessage.java | 8 ++------ .../jgroups/protocols/RingBufferBundler.java | 2 +- .../protocols/RingBufferBundlerLockless.java | 2 +- .../protocols/RingBufferBundlerLockless2.java | 2 +- .../org/jgroups/tests/MessageFactoryTest.java | 4 ++++ 14 files changed, 44 insertions(+), 37 deletions(-) diff --git a/src/org/jgroups/BaseMessage.java b/src/org/jgroups/BaseMessage.java index 0c6cf68655b..7c089ab8c6f 100644 --- a/src/org/jgroups/BaseMessage.java +++ b/src/org/jgroups/BaseMessage.java @@ -224,6 +224,8 @@ public String toString() { transient_flags > 0? ", transient_flags=" + Util.transientFlagsToString(transient_flags) : ""); } + protected abstract int payloadSize(); + public int serializedSize() { return size(); } @@ -238,6 +240,19 @@ public int size() { retval+=Global.SHORT_SIZE; // number of headers retval+=Headers.marshalledSize(this.headers); + retval+=payloadSize(); + return retval; + } + + public int sizeNoAddrs(Address src) { + int retval=Global.BYTE_SIZE // leading byte + + Global.SHORT_SIZE; // flags + if(sender != null && !sender.equals(src)) + retval+=Util.size(sender); + + retval+=Global.SHORT_SIZE; // number of headers + retval+=Headers.marshalledSize(this.headers); + retval+=payloadSize(); return retval; } @@ -274,7 +289,7 @@ public void writeTo(DataOutput out) throws IOException { public void writeToNoAddrs(Address src, DataOutput out) throws IOException { byte leading=0; - boolean write_src_addr=src == null || sender != null && !sender.equals(src); + boolean write_src_addr=sender != null && !sender.equals(src); if(write_src_addr) leading=Util.setFlag(leading, SRC_SET); @@ -340,6 +355,4 @@ protected Message copyPayload(Message copy) { protected static Header[] createHeaders(int size) { return size > 0? new Header[size] : new Header[Util.DEFAULT_HEADERS]; } - - } diff --git a/src/org/jgroups/BatchMessage.java b/src/org/jgroups/BatchMessage.java index 5a0977c0a14..49fd2c8ab5d 100644 --- a/src/org/jgroups/BatchMessage.java +++ b/src/org/jgroups/BatchMessage.java @@ -118,12 +118,12 @@ public String toString() { return String.format("%s, %d message(s)", super.toString(), getNumberOfMessages()); } - public int size() { - int retval=super.size() + Global.INT_SIZE; + @Override protected int payloadSize() { + int retval=Global.INT_SIZE; // count retval+=Util.size(orig_src); if(msgs != null) { for(int i=0; i < index; i++) - retval+=msgs[i].size() + Global.SHORT_SIZE; // type + retval+=msgs[i].sizeNoAddrs(getSrc()) + Global.SHORT_SIZE; // type } return retval; } @@ -140,7 +140,7 @@ public void writePayload(DataOutput out) throws IOException { for(int i=0; i < index; i++) { Message msg=msgs[i]; out.writeShort(msg.getType()); - msg.writeToNoAddrs(this.src(), out); + msg.writeToNoAddrs(getSrc(), out); } } } diff --git a/src/org/jgroups/BytesMessage.java b/src/org/jgroups/BytesMessage.java index eb64b384a58..584ff1e4d02 100644 --- a/src/org/jgroups/BytesMessage.java +++ b/src/org/jgroups/BytesMessage.java @@ -233,12 +233,6 @@ public T getObject(ClassLoader loader) { } } - - public int size() { - return super.size() +sizeOfPayload(); - } - - /** * Copies the byte array. If offset and length are used (to refer to another array), the copy will contain only * the subset that offset and length point to, copying the subset into the new copy.

@@ -252,7 +246,7 @@ public int size() { return copy; } - protected int sizeOfPayload() { + @Override protected int payloadSize() { int retval=Global.INT_SIZE; // length if(array != null) retval+=length; // number of bytes in the array diff --git a/src/org/jgroups/CompositeMessage.java b/src/org/jgroups/CompositeMessage.java index 3a960f9fd19..9299f8bea3f 100644 --- a/src/org/jgroups/CompositeMessage.java +++ b/src/org/jgroups/CompositeMessage.java @@ -3,7 +3,6 @@ import org.jgroups.util.ByteArray; - import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -118,11 +117,11 @@ public String toString() { return String.format("%s, %d message(s)", super.toString(), getNumberOfMessages()); } - public int size() { - int retval=super.size() + Global.INT_SIZE; // length + @Override protected int payloadSize() { + int retval=Global.INT_SIZE; // count if(msgs != null) { for(int i=0; i < index; i++) - retval+=msgs[i].size() + Global.SHORT_SIZE; // type + retval+=msgs[i].sizeNoAddrs(getSrc()) + Global.SHORT_SIZE; // type } return retval; } @@ -142,8 +141,7 @@ public void writePayload(DataOutput out) throws IOException { for(int i=0; i < index; i++) { Message msg=msgs[i]; out.writeShort(msg.getType()); - // msg.writeToNoAddrs(src(), out); - msg.writeTo(out); + msg.writeToNoAddrs(getSrc(), out); } } } @@ -156,7 +154,7 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException short type=in.readShort(); Message msg=MessageFactory.create(type).setDest(getDest()); if(msg.getSrc() == null) - msg.setSrc(src()); + msg.setSrc(getSrc()); msg.readFrom(in); msgs[i]=msg; } diff --git a/src/org/jgroups/EmptyMessage.java b/src/org/jgroups/EmptyMessage.java index 80b1457e356..9e09e499df5 100644 --- a/src/org/jgroups/EmptyMessage.java +++ b/src/org/jgroups/EmptyMessage.java @@ -40,4 +40,6 @@ public void writePayload(DataOutput out) throws IOException { public void readPayload(DataInput in) throws IOException, ClassNotFoundException { // no payload to read } + + protected int payloadSize() { return 0; } } diff --git a/src/org/jgroups/FragmentedMessage.java b/src/org/jgroups/FragmentedMessage.java index 63be9f9441a..9266b102276 100644 --- a/src/org/jgroups/FragmentedMessage.java +++ b/src/org/jgroups/FragmentedMessage.java @@ -35,7 +35,7 @@ public FragmentedMessage(Message original_msg, int off, int len) { public boolean hasArray() {return false;} public boolean hasPayload() {return true;} public Supplier create() {return FragmentedMessage::new;} - protected int sizeOfPayload() {return Global.INT_SIZE + length;} + protected int payloadSize() {return Global.INT_SIZE + length;} @Override protected Message copyPayload(Message copy) { diff --git a/src/org/jgroups/LongMessage.java b/src/org/jgroups/LongMessage.java index 53706c704d5..f9cb845dade 100644 --- a/src/org/jgroups/LongMessage.java +++ b/src/org/jgroups/LongMessage.java @@ -60,8 +60,8 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException value=Bits.readLongCompressed(in); } - public int size() { - return super.size() + Bits.size(value); + @Override protected int payloadSize() { + return Bits.size(value); } public String toString() { diff --git a/src/org/jgroups/Message.java b/src/org/jgroups/Message.java index 30270d0a465..2070d0e62ec 100644 --- a/src/org/jgroups/Message.java +++ b/src/org/jgroups/Message.java @@ -197,13 +197,15 @@ default Message setFlag(short flag, boolean transient_flags) { */ Message setPayload(Object pl); - /** * Returns the exact size of the marshalled message * @return The number of bytes for the marshalled message */ int size(); + /** Returns the exact size of the marshalled message without destination (and possibly source) address */ + int sizeNoAddrs(Address src); + /** Writes the message to an output stream excluding the destination (and possibly source) address */ void writeToNoAddrs(Address src, DataOutput out) throws IOException; diff --git a/src/org/jgroups/NioMessage.java b/src/org/jgroups/NioMessage.java index 32a0b482572..01576eb86a2 100644 --- a/src/org/jgroups/NioMessage.java +++ b/src/org/jgroups/NioMessage.java @@ -179,8 +179,6 @@ public NioMessage copy(boolean copy_payload, boolean copy_headers) { /* ----------------------------------- Interface Streamable ------------------------------- */ - public int size() {return super.size() +sizeOfPayload();} - public String toString() { return String.format("%s %s", super.toString(), use_direct_memory_for_allocations? "(direct)" : ""); } @@ -191,7 +189,7 @@ public String toString() { return copy; } - protected int sizeOfPayload() { + @Override protected int payloadSize() { return Global.INT_SIZE + getLength() + Global.BYTE_SIZE; // for use_direct_memory_for_allocations } diff --git a/src/org/jgroups/ObjectMessage.java b/src/org/jgroups/ObjectMessage.java index ea0d65e8659..70645919c88 100644 --- a/src/org/jgroups/ObjectMessage.java +++ b/src/org/jgroups/ObjectMessage.java @@ -67,7 +67,7 @@ public ObjectMessage(Address dest, SizeStreamable obj) { public boolean hasPayload() {return obj != null;} public boolean hasArray() {return false;} public int getOffset() {return 0;} - public int getLength() {return obj != null? objSize() : 0;} + public int getLength() {return obj != null? payloadSize() : 0;} public byte[] getArray() {throw new UnsupportedOperationException();} public ObjectMessage setArray(byte[] b, int off, int len) {throw new UnsupportedOperationException();} public ObjectMessage setArray(ByteArray buf) {throw new UnsupportedOperationException();} @@ -132,10 +132,6 @@ public T getObject() { return isWrapped() || obj instanceof ObjectWrapperPrimitive? ((ObjectWrapperPrimitive)obj).getObject() : (T)obj; } - public int size() { - return super.size() + objSize(); - } - public void writePayload(DataOutput out) throws IOException { Util.writeGenericStreamable(obj, out); } @@ -157,7 +153,7 @@ public String toString() { return super.toString() + String.format(", obj: %s", obj); } - protected int objSize() { + @Override protected int payloadSize() { return Util.size(obj); } } diff --git a/src/org/jgroups/protocols/RingBufferBundler.java b/src/org/jgroups/protocols/RingBufferBundler.java index b0b48bf63c5..40f753c25fe 100644 --- a/src/org/jgroups/protocols/RingBufferBundler.java +++ b/src/org/jgroups/protocols/RingBufferBundler.java @@ -204,7 +204,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int if(msg != null && Objects.equals(dest, msg.getDest())) { if(list != null) list.add(msg); - int size=msg.size() + Global.SHORT_SIZE; + int size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE; if(bytes + size > max_bundle_size) break; bytes+=size; diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless.java b/src/org/jgroups/protocols/RingBufferBundlerLockless.java index 0540ba8fdb9..6bea7b1afb2 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless.java @@ -222,7 +222,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, while(available_msgs > 0) { Message msg=buf[start_index]; if(msg != null && Objects.equals(dest, msg.getDest())) { - int msg_size=msg.size() + Global.SHORT_SIZE;; + int msg_size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE; if(bytes + msg_size > max_bundle_size) break; bytes+=msg_size; diff --git a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java index 02c6e1eeb3b..4e1f4bc7c88 100644 --- a/src/org/jgroups/protocols/RingBufferBundlerLockless2.java +++ b/src/org/jgroups/protocols/RingBufferBundlerLockless2.java @@ -207,7 +207,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, fina for(int i=start_index; i != end_index; i=increment(i)) { Message msg=buf[i]; if(msg != null && msg != NULL_MSG && Objects.equals(dest, msg.getDest())) { - int msg_size=msg.size() + Global.SHORT_SIZE; + int msg_size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE; if(bytes + msg_size > max_bundle_size) break; bytes+=msg_size; diff --git a/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java b/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java index 2154d9b5328..0c798b62005 100644 --- a/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java +++ b/tests/junit-functional/org/jgroups/tests/MessageFactoryTest.java @@ -87,5 +87,9 @@ public void writePayload(DataOutput out) throws IOException { public void readPayload(DataInput in) throws IOException, ClassNotFoundException { } + + protected int payloadSize() { + return 0; + } } }