Skip to content

Commit

Permalink
Composite message additions
Browse files Browse the repository at this point in the history
This change borrows the optimization from BatchMessage to not serialize dest and src for each message
This also fixes incorrect size being reported for BatchMessage and in various places where Message.writeToNoAddrs is used.
  • Loading branch information
cfredri4 authored and belaban committed Nov 21, 2024
1 parent f23836f commit 569e574
Show file tree
Hide file tree
Showing 14 changed files with 44 additions and 37 deletions.
19 changes: 16 additions & 3 deletions src/org/jgroups/BaseMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ public String toString() {
transient_flags > 0? ", transient_flags=" + Util.transientFlagsToString(transient_flags) : "");
}

protected abstract int payloadSize();

public int serializedSize() {
return size();
}
Expand All @@ -238,6 +240,19 @@ public int size() {

retval+=Global.SHORT_SIZE; // number of headers
retval+=Headers.marshalledSize(this.headers);
retval+=payloadSize();
return retval;
}

public int sizeNoAddrs(Address src) {
int retval=Global.BYTE_SIZE // leading byte
+ Global.SHORT_SIZE; // flags
if(sender != null && !sender.equals(src))
retval+=Util.size(sender);

retval+=Global.SHORT_SIZE; // number of headers
retval+=Headers.marshalledSize(this.headers);
retval+=payloadSize();
return retval;
}

Expand Down Expand Up @@ -274,7 +289,7 @@ public void writeTo(DataOutput out) throws IOException {
public void writeToNoAddrs(Address src, DataOutput out) throws IOException {
byte leading=0;

boolean write_src_addr=src == null || sender != null && !sender.equals(src);
boolean write_src_addr=sender != null && !sender.equals(src);

if(write_src_addr)
leading=Util.setFlag(leading, SRC_SET);
Expand Down Expand Up @@ -340,6 +355,4 @@ protected Message copyPayload(Message copy) {
protected static Header[] createHeaders(int size) {
return size > 0? new Header[size] : new Header[Util.DEFAULT_HEADERS];
}


}
8 changes: 4 additions & 4 deletions src/org/jgroups/BatchMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ public String toString() {
return String.format("%s, %d message(s)", super.toString(), getNumberOfMessages());
}

public int size() {
int retval=super.size() + Global.INT_SIZE;
@Override protected int payloadSize() {
int retval=Global.INT_SIZE; // count
retval+=Util.size(orig_src);
if(msgs != null) {
for(int i=0; i < index; i++)
retval+=msgs[i].size() + Global.SHORT_SIZE; // type
retval+=msgs[i].sizeNoAddrs(getSrc()) + Global.SHORT_SIZE; // type
}
return retval;
}
Expand All @@ -140,7 +140,7 @@ public void writePayload(DataOutput out) throws IOException {
for(int i=0; i < index; i++) {
Message msg=msgs[i];
out.writeShort(msg.getType());
msg.writeToNoAddrs(this.src(), out);
msg.writeToNoAddrs(getSrc(), out);
}
}
}
Expand Down
8 changes: 1 addition & 7 deletions src/org/jgroups/BytesMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,6 @@ public <T extends Object> T getObject(ClassLoader loader) {
}
}


public int size() {
return super.size() +sizeOfPayload();
}


/**
* Copies the byte array. If offset and length are used (to refer to another array), the copy will contain only
* the subset that offset and length point to, copying the subset into the new copy.<p/>
Expand All @@ -252,7 +246,7 @@ public int size() {
return copy;
}

protected int sizeOfPayload() {
@Override protected int payloadSize() {
int retval=Global.INT_SIZE; // length
if(array != null)
retval+=length; // number of bytes in the array
Expand Down
12 changes: 5 additions & 7 deletions src/org/jgroups/CompositeMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@


import org.jgroups.util.ByteArray;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
Expand Down Expand Up @@ -118,11 +117,11 @@ public String toString() {
return String.format("%s, %d message(s)", super.toString(), getNumberOfMessages());
}

public int size() {
int retval=super.size() + Global.INT_SIZE; // length
@Override protected int payloadSize() {
int retval=Global.INT_SIZE; // count
if(msgs != null) {
for(int i=0; i < index; i++)
retval+=msgs[i].size() + Global.SHORT_SIZE; // type
retval+=msgs[i].sizeNoAddrs(getSrc()) + Global.SHORT_SIZE; // type
}
return retval;
}
Expand All @@ -142,8 +141,7 @@ public void writePayload(DataOutput out) throws IOException {
for(int i=0; i < index; i++) {
Message msg=msgs[i];
out.writeShort(msg.getType());
// msg.writeToNoAddrs(src(), out);
msg.writeTo(out);
msg.writeToNoAddrs(getSrc(), out);
}
}
}
Expand All @@ -156,7 +154,7 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException
short type=in.readShort();
Message msg=MessageFactory.create(type).setDest(getDest());
if(msg.getSrc() == null)
msg.setSrc(src());
msg.setSrc(getSrc());
msg.readFrom(in);
msgs[i]=msg;
}
Expand Down
2 changes: 2 additions & 0 deletions src/org/jgroups/EmptyMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,6 @@ public void writePayload(DataOutput out) throws IOException {
public void readPayload(DataInput in) throws IOException, ClassNotFoundException {
// no payload to read
}

protected int payloadSize() { return 0; }
}
2 changes: 1 addition & 1 deletion src/org/jgroups/FragmentedMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public FragmentedMessage(Message original_msg, int off, int len) {
public boolean hasArray() {return false;}
public boolean hasPayload() {return true;}
public Supplier<Message> create() {return FragmentedMessage::new;}
protected int sizeOfPayload() {return Global.INT_SIZE + length;}
protected int payloadSize() {return Global.INT_SIZE + length;}

@Override
protected Message copyPayload(Message copy) {
Expand Down
4 changes: 2 additions & 2 deletions src/org/jgroups/LongMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ public void readPayload(DataInput in) throws IOException, ClassNotFoundException
value=Bits.readLongCompressed(in);
}

public int size() {
return super.size() + Bits.size(value);
@Override protected int payloadSize() {
return Bits.size(value);
}

public String toString() {
Expand Down
4 changes: 3 additions & 1 deletion src/org/jgroups/Message.java
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,15 @@ default Message setFlag(short flag, boolean transient_flags) {
*/
Message setPayload(Object pl);


/**
* Returns the exact size of the marshalled message
* @return The number of bytes for the marshalled message
*/
int size();

/** Returns the exact size of the marshalled message without destination (and possibly source) address */
int sizeNoAddrs(Address src);

/** Writes the message to an output stream excluding the destination (and possibly source) address */
void writeToNoAddrs(Address src, DataOutput out) throws IOException;

Expand Down
4 changes: 1 addition & 3 deletions src/org/jgroups/NioMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,6 @@ public NioMessage copy(boolean copy_payload, boolean copy_headers) {

/* ----------------------------------- Interface Streamable ------------------------------- */

public int size() {return super.size() +sizeOfPayload();}

public String toString() {
return String.format("%s %s", super.toString(), use_direct_memory_for_allocations? "(direct)" : "");
}
Expand All @@ -191,7 +189,7 @@ public String toString() {
return copy;
}

protected int sizeOfPayload() {
@Override protected int payloadSize() {
return Global.INT_SIZE + getLength() + Global.BYTE_SIZE; // for use_direct_memory_for_allocations
}

Expand Down
8 changes: 2 additions & 6 deletions src/org/jgroups/ObjectMessage.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public ObjectMessage(Address dest, SizeStreamable obj) {
public boolean hasPayload() {return obj != null;}
public boolean hasArray() {return false;}
public int getOffset() {return 0;}
public int getLength() {return obj != null? objSize() : 0;}
public int getLength() {return obj != null? payloadSize() : 0;}
public byte[] getArray() {throw new UnsupportedOperationException();}
public ObjectMessage setArray(byte[] b, int off, int len) {throw new UnsupportedOperationException();}
public ObjectMessage setArray(ByteArray buf) {throw new UnsupportedOperationException();}
Expand Down Expand Up @@ -132,10 +132,6 @@ public <T extends Object> T getObject() {
return isWrapped() || obj instanceof ObjectWrapperPrimitive? ((ObjectWrapperPrimitive)obj).getObject() : (T)obj;
}

public int size() {
return super.size() + objSize();
}

public void writePayload(DataOutput out) throws IOException {
Util.writeGenericStreamable(obj, out);
}
Expand All @@ -157,7 +153,7 @@ public String toString() {
return super.toString() + String.format(", obj: %s", obj);
}

protected int objSize() {
@Override protected int payloadSize() {
return Util.size(obj);
}
}
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RingBufferBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, int
if(msg != null && Objects.equals(dest, msg.getDest())) {
if(list != null)
list.add(msg);
int size=msg.size() + Global.SHORT_SIZE;
int size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE;
if(bytes + size > max_bundle_size)
break;
bytes+=size;
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RingBufferBundlerLockless.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf,
while(available_msgs > 0) {
Message msg=buf[start_index];
if(msg != null && Objects.equals(dest, msg.getDest())) {
int msg_size=msg.size() + Global.SHORT_SIZE;;
int msg_size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE;
if(bytes + msg_size > max_bundle_size)
break;
bytes+=msg_size;
Expand Down
2 changes: 1 addition & 1 deletion src/org/jgroups/protocols/RingBufferBundlerLockless2.java
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ protected int marshalMessagesToSameDestination(Address dest, Message[] buf, fina
for(int i=start_index; i != end_index; i=increment(i)) {
Message msg=buf[i];
if(msg != null && msg != NULL_MSG && Objects.equals(dest, msg.getDest())) {
int msg_size=msg.size() + Global.SHORT_SIZE;
int msg_size=msg.sizeNoAddrs(msg.getSrc()) + Global.SHORT_SIZE;
if(bytes + msg_size > max_bundle_size)
break;
bytes+=msg_size;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,9 @@ public void writePayload(DataOutput out) throws IOException {

public void readPayload(DataInput in) throws IOException, ClassNotFoundException {
}

protected int payloadSize() {
return 0;
}
}
}

0 comments on commit 569e574

Please sign in to comment.