Skip to content

Commit

Permalink
MIGRATIONS-1289: Add end of segment message to capture serializer (op…
Browse files Browse the repository at this point in the history
…ensearch-project#276)

* MIGRATIONS-1289: Add end of segment message to capture serializer

Signed-off-by: Tanner Lewis <[email protected]>
  • Loading branch information
lewijacn authored Aug 22, 2023
1 parent 4165dc4 commit 74b249f
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.Timestamp;
import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;

Expand All @@ -22,10 +23,11 @@ public static int getSizeOfTimestamp(Instant t) {
}

/**
* This function calculates the maximum bytes needed to store a message ByteBuffer and its associated
* Traffic Stream overhead into a CodedOutputStream. The actual required bytes could be marginally smaller.
* This function calculates the maximum bytes needed to store a message ByteBuffer that needs to be segmented into
* different ReadSegmentObservation or WriteSegmentObservation and its associated Traffic Stream overhead into a
* CodedOutputStream. The actual required bytes could be marginally smaller.
*/
public static int maxBytesNeededForMessage(Instant timestamp, int observationFieldNumber, int dataFieldNumber,
public static int maxBytesNeededForSegmentedMessage(Instant timestamp, int observationFieldNumber, int dataFieldNumber,
int dataCountFieldNumber, int dataCount, ByteBuffer buffer, int flushes) {
// Timestamp closure bytes
int tsContentSize = getSizeOfTimestamp(timestamp);
Expand All @@ -40,10 +42,27 @@ public static int maxBytesNeededForMessage(Instant timestamp, int observationFie
// Observation tag and closure size needed bytes
int observationTagAndClosureSize = CodedOutputStream.computeInt32Size(TrafficStream.SUBSTREAM_FIELD_NUMBER, tsClosureSize + captureClosureSize);

// Size for additional SegmentEndObservation to signify end of segments
int segmentEndBytes = bytesNeededForSegmentEndObservation(timestamp);

// Size for closing index, use arbitrary field to calculate
int indexSize = CodedOutputStream.computeInt32Size(TrafficStream.NUMBER_FIELD_NUMBER, flushes);

return observationTagAndClosureSize + tsClosureSize + captureClosureSize + indexSize;
return observationTagAndClosureSize + tsClosureSize + captureClosureSize + segmentEndBytes + indexSize;
}

public static int bytesNeededForSegmentEndObservation(Instant timestamp) {
// Timestamp closure bytes
int tsContentSize = getSizeOfTimestamp(timestamp);
int tsClosureSize = CodedOutputStream.computeInt32Size(TrafficObservation.TS_FIELD_NUMBER, tsContentSize) + tsContentSize;

// Capture closure bytes
int captureClosureSize = CodedOutputStream.computeMessageSize(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance());

// Observation tag and closure size needed bytes
int observationTagAndClosureSize = CodedOutputStream.computeInt32Size(TrafficStream.SUBSTREAM_FIELD_NUMBER, tsClosureSize + captureClosureSize);

return observationTagAndClosureSize + tsClosureSize + captureClosureSize;
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation;
import org.opensearch.migrations.trafficcapture.protos.EndOfMessageIndication;
import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication;
import org.opensearch.migrations.trafficcapture.protos.ReadObservation;
import org.opensearch.migrations.trafficcapture.protos.ReadSegmentObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
Expand Down Expand Up @@ -253,10 +254,10 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
}

// The message bytes here are not optimizing for space and instead are calculated on the worst case estimate of
// the potentially required bytes for simplicity. This could leave ~5 bytes of unused space in the CodedOutputStream
// when considering the case of a message that does not need segments or the case of a smaller segment created
// the potentially required bytes for simplicity. This could leave ~25 bytes of unused space in the CodedOutputStream
// when considering the case of a message that does not need segments or ~5 for the case of a smaller segment created
// from a much larger message
int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForMessage(timestamp,
int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForSegmentedMessage(timestamp,
segmentFieldNumber, segmentDataFieldNumber, segmentCountFieldNumber, 2, byteBuffer, numFlushesSoFar + 1);
int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity();

Expand Down Expand Up @@ -298,6 +299,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant
messageAndOverheadBytesLeft = messageAndOverheadBytesLeft - chunkBytes;
}
}
writeEndOfSegmentMessage(timestamp);

}

Expand Down Expand Up @@ -425,4 +427,9 @@ private void writeEndOfHttpMessage(Instant timestamp) throws IOException {
getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength);
getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength);
}

private void writeEndOfSegmentMessage(Instant timestamp) throws IOException {
beginSubstreamObservation(timestamp, TrafficObservation.SEGMENTEND_FIELD_NUMBER, 1);
getOrCreateCodedOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.migrations.trafficcapture.protos.CloseObservation;
import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation;
import org.opensearch.migrations.trafficcapture.protos.EndOfMessageIndication;
import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication;
import org.opensearch.migrations.trafficcapture.protos.ReadObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficObservation;
import org.opensearch.migrations.trafficcapture.protos.TrafficStream;
Expand Down Expand Up @@ -129,7 +130,7 @@ public void testBasicDataConsistencyWhenChunking() throws IOException, Execution
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
// Arbitrarily picking small buffer that can hold the overhead TrafficStream bytes as well as some
// data bytes but not all the data bytes and require chunking
var serializer = createSerializerWithTestHandler(outputBuffersCreated, 55);
var serializer = createSerializerWithTestHandler(outputBuffersCreated, 85);

var bb = Unpooled.wrappedBuffer(packetBytes);
serializer.addWriteEvent(referenceTimestamp, bb);
Expand Down Expand Up @@ -161,7 +162,7 @@ public void testEmptyPacketIsHandledForSmallCodedOutputStream()
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
// Arbitrarily picking small buffer size that can only hold one empty message
var serializer = createSerializerWithTestHandler(outputBuffersCreated,
TEST_NODE_ID_STRING.length() + 40);
TEST_NODE_ID_STRING.length() + 60);
var bb = Unpooled.buffer(0);
serializer.addWriteEvent(referenceTimestamp, bb);
serializer.addWriteEvent(referenceTimestamp, bb);
Expand Down Expand Up @@ -214,6 +215,75 @@ public void testThatReadCanBeDeserialized() throws IOException, ExecutionExcepti
Assertions.assertEquals(groundTruth, reconstitutedTrafficStream);
}

@Test
public void testEndOfSegmentsIndicationAddedWhenChunking() throws IOException, ExecutionException, InterruptedException {
final var referenceTimestamp = Instant.ofEpochMilli(1686593191*1000);
String packetData = "";
for (int i = 0; i < 500; i++) {
packetData += "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
}
byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8);
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
// Arbitrarily picking small buffer that can hold the overhead TrafficStream bytes as well as some
// data bytes but not all the data bytes and require chunking
var serializer = createSerializerWithTestHandler(outputBuffersCreated, 85);

var bb = Unpooled.wrappedBuffer(packetBytes);
serializer.addWriteEvent(referenceTimestamp, bb);
CompletableFuture future = serializer.flushCommitAndResetStream(true);
future.get();
bb.release();

List<TrafficObservation> observations = new ArrayList<>();
for (ByteBuffer buffer : outputBuffersCreated) {
var trafficStream = TrafficStream.parseFrom(buffer);
observations.addAll(trafficStream.getSubStreamList());
}

int foundEndOfSegments = 0;
for (TrafficObservation observation : observations) {
if (observation.hasSegmentEnd()) {
foundEndOfSegments++;
EndOfSegmentsIndication endOfSegment = observation.getSegmentEnd();
Assertions.assertEquals(EndOfSegmentsIndication.getDefaultInstance(), endOfSegment);
}
}
Assertions.assertEquals(1, foundEndOfSegments);
}

@Test
public void testEndOfSegmentsIndicationNotAddedWhenNotChunking() throws IOException, ExecutionException, InterruptedException {
final var referenceTimestamp = Instant.ofEpochMilli(1686593191*1000);
String packetData = "";
for (int i = 0; i < 10; i++) {
packetData += "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
}
byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8);
var outputBuffersCreated = new ConcurrentLinkedQueue<ByteBuffer>();
// Buffer size should be large enough to hold all packetData and overhead
var serializer = createSerializerWithTestHandler(outputBuffersCreated, 500);

var bb = Unpooled.wrappedBuffer(packetBytes);
serializer.addWriteEvent(referenceTimestamp, bb);
CompletableFuture future = serializer.flushCommitAndResetStream(true);
future.get();
bb.release();

List<TrafficObservation> observations = new ArrayList<>();
for (ByteBuffer buffer : outputBuffersCreated) {
var trafficStream = TrafficStream.parseFrom(buffer);
observations.addAll(trafficStream.getSubStreamList());
}

int foundEndOfSegments = 0;
for (TrafficObservation observation : observations) {
if (observation.hasSegmentEnd()) {
foundEndOfSegments++;
}
}
Assertions.assertEquals(0, foundEndOfSegments);
}

private StreamChannelConnectionCaptureSerializer
createSerializerWithTestHandler(ConcurrentLinkedQueue<ByteBuffer> outputBuffers, int bufferSize)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.StringJoiner;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -529,8 +526,7 @@ private static String formatWorkItem(DiagnosticTrackableCompletableFuture<String

public void runReplay(Stream<TrafficStream> trafficChunkStream,
CapturedTrafficToHttpTransactionAccumulator trafficToHttpTransactionAccumulator) {
trafficChunkStream
.forEach(ts-> trafficToHttpTransactionAccumulator.accept(ts));
trafficChunkStream.forEach(ts-> trafficToHttpTransactionAccumulator.accept(ts));
}

}

0 comments on commit 74b249f

Please sign in to comment.