diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java index 994a99186..8293fbca3 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/CodedOutputStreamSizeUtil.java @@ -2,6 +2,7 @@ import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Timestamp; +import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication; import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; @@ -22,10 +23,11 @@ public static int getSizeOfTimestamp(Instant t) { } /** - * This function calculates the maximum bytes needed to store a message ByteBuffer and its associated - * Traffic Stream overhead into a CodedOutputStream. The actual required bytes could be marginally smaller. + * This function calculates the maximum bytes needed to store a message ByteBuffer that needs to be segmented into + * different ReadSegmentObservation or WriteSegmentObservation and its associated Traffic Stream overhead into a + * CodedOutputStream. The actual required bytes could be marginally smaller. */ - public static int maxBytesNeededForMessage(Instant timestamp, int observationFieldNumber, int dataFieldNumber, + public static int maxBytesNeededForSegmentedMessage(Instant timestamp, int observationFieldNumber, int dataFieldNumber, int dataCountFieldNumber, int dataCount, ByteBuffer buffer, int flushes) { // Timestamp closure bytes int tsContentSize = getSizeOfTimestamp(timestamp); @@ -40,10 +42,27 @@ public static int maxBytesNeededForMessage(Instant timestamp, int observationFie // Observation tag and closure size needed bytes int observationTagAndClosureSize = CodedOutputStream.computeInt32Size(TrafficStream.SUBSTREAM_FIELD_NUMBER, tsClosureSize + captureClosureSize); + // Size for additional SegmentEndObservation to signify end of segments + int segmentEndBytes = bytesNeededForSegmentEndObservation(timestamp); + // Size for closing index, use arbitrary field to calculate int indexSize = CodedOutputStream.computeInt32Size(TrafficStream.NUMBER_FIELD_NUMBER, flushes); - return observationTagAndClosureSize + tsClosureSize + captureClosureSize + indexSize; + return observationTagAndClosureSize + tsClosureSize + captureClosureSize + segmentEndBytes + indexSize; + } + + public static int bytesNeededForSegmentEndObservation(Instant timestamp) { + // Timestamp closure bytes + int tsContentSize = getSizeOfTimestamp(timestamp); + int tsClosureSize = CodedOutputStream.computeInt32Size(TrafficObservation.TS_FIELD_NUMBER, tsContentSize) + tsContentSize; + + // Capture closure bytes + int captureClosureSize = CodedOutputStream.computeMessageSize(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance()); + + // Observation tag and closure size needed bytes + int observationTagAndClosureSize = CodedOutputStream.computeInt32Size(TrafficStream.SUBSTREAM_FIELD_NUMBER, tsClosureSize + captureClosureSize); + + return observationTagAndClosureSize + tsClosureSize + captureClosureSize; } diff --git a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java index 14ab68e9b..83b549bb2 100644 --- a/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java +++ b/TrafficCapture/captureOffloader/src/main/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializer.java @@ -9,6 +9,7 @@ import org.opensearch.migrations.trafficcapture.protos.CloseObservation; import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation; import org.opensearch.migrations.trafficcapture.protos.EndOfMessageIndication; +import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication; import org.opensearch.migrations.trafficcapture.protos.ReadObservation; import org.opensearch.migrations.trafficcapture.protos.ReadSegmentObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; @@ -253,10 +254,10 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant } // The message bytes here are not optimizing for space and instead are calculated on the worst case estimate of - // the potentially required bytes for simplicity. This could leave ~5 bytes of unused space in the CodedOutputStream - // when considering the case of a message that does not need segments or the case of a smaller segment created + // the potentially required bytes for simplicity. This could leave ~25 bytes of unused space in the CodedOutputStream + // when considering the case of a message that does not need segments or ~5 for the case of a smaller segment created // from a much larger message - int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForMessage(timestamp, + int messageAndOverheadBytesLeft = CodedOutputStreamSizeUtil.maxBytesNeededForSegmentedMessage(timestamp, segmentFieldNumber, segmentDataFieldNumber, segmentCountFieldNumber, 2, byteBuffer, numFlushesSoFar + 1); int trafficStreamOverhead = messageAndOverheadBytesLeft - byteBuffer.capacity(); @@ -298,6 +299,7 @@ private void addDataMessage(int captureFieldNumber, int dataFieldNumber, Instant messageAndOverheadBytesLeft = messageAndOverheadBytesLeft - chunkBytes; } } + writeEndOfSegmentMessage(timestamp); } @@ -425,4 +427,9 @@ private void writeEndOfHttpMessage(Instant timestamp) throws IOException { getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.FIRSTLINEBYTELENGTH_FIELD_NUMBER, firstLineByteLength); getOrCreateCodedOutputStream().writeInt32(EndOfMessageIndication.HEADERSBYTELENGTH_FIELD_NUMBER, headersByteLength); } + + private void writeEndOfSegmentMessage(Instant timestamp) throws IOException { + beginSubstreamObservation(timestamp, TrafficObservation.SEGMENTEND_FIELD_NUMBER, 1); + getOrCreateCodedOutputStream().writeMessage(TrafficObservation.SEGMENTEND_FIELD_NUMBER, EndOfSegmentsIndication.getDefaultInstance()); + } } diff --git a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java index 15c4f7f8b..6e86d8322 100644 --- a/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java +++ b/TrafficCapture/captureOffloader/src/test/java/org/opensearch/migrations/trafficcapture/StreamChannelConnectionCaptureSerializerTest.java @@ -10,6 +10,7 @@ import org.opensearch.migrations.trafficcapture.protos.CloseObservation; import org.opensearch.migrations.trafficcapture.protos.ConnectionExceptionObservation; import org.opensearch.migrations.trafficcapture.protos.EndOfMessageIndication; +import org.opensearch.migrations.trafficcapture.protos.EndOfSegmentsIndication; import org.opensearch.migrations.trafficcapture.protos.ReadObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficObservation; import org.opensearch.migrations.trafficcapture.protos.TrafficStream; @@ -129,7 +130,7 @@ public void testBasicDataConsistencyWhenChunking() throws IOException, Execution var outputBuffersCreated = new ConcurrentLinkedQueue(); // Arbitrarily picking small buffer that can hold the overhead TrafficStream bytes as well as some // data bytes but not all the data bytes and require chunking - var serializer = createSerializerWithTestHandler(outputBuffersCreated, 55); + var serializer = createSerializerWithTestHandler(outputBuffersCreated, 85); var bb = Unpooled.wrappedBuffer(packetBytes); serializer.addWriteEvent(referenceTimestamp, bb); @@ -161,7 +162,7 @@ public void testEmptyPacketIsHandledForSmallCodedOutputStream() var outputBuffersCreated = new ConcurrentLinkedQueue(); // Arbitrarily picking small buffer size that can only hold one empty message var serializer = createSerializerWithTestHandler(outputBuffersCreated, - TEST_NODE_ID_STRING.length() + 40); + TEST_NODE_ID_STRING.length() + 60); var bb = Unpooled.buffer(0); serializer.addWriteEvent(referenceTimestamp, bb); serializer.addWriteEvent(referenceTimestamp, bb); @@ -214,6 +215,75 @@ public void testThatReadCanBeDeserialized() throws IOException, ExecutionExcepti Assertions.assertEquals(groundTruth, reconstitutedTrafficStream); } + @Test + public void testEndOfSegmentsIndicationAddedWhenChunking() throws IOException, ExecutionException, InterruptedException { + final var referenceTimestamp = Instant.ofEpochMilli(1686593191*1000); + String packetData = ""; + for (int i = 0; i < 500; i++) { + packetData += "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + } + byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8); + var outputBuffersCreated = new ConcurrentLinkedQueue(); + // Arbitrarily picking small buffer that can hold the overhead TrafficStream bytes as well as some + // data bytes but not all the data bytes and require chunking + var serializer = createSerializerWithTestHandler(outputBuffersCreated, 85); + + var bb = Unpooled.wrappedBuffer(packetBytes); + serializer.addWriteEvent(referenceTimestamp, bb); + CompletableFuture future = serializer.flushCommitAndResetStream(true); + future.get(); + bb.release(); + + List observations = new ArrayList<>(); + for (ByteBuffer buffer : outputBuffersCreated) { + var trafficStream = TrafficStream.parseFrom(buffer); + observations.addAll(trafficStream.getSubStreamList()); + } + + int foundEndOfSegments = 0; + for (TrafficObservation observation : observations) { + if (observation.hasSegmentEnd()) { + foundEndOfSegments++; + EndOfSegmentsIndication endOfSegment = observation.getSegmentEnd(); + Assertions.assertEquals(EndOfSegmentsIndication.getDefaultInstance(), endOfSegment); + } + } + Assertions.assertEquals(1, foundEndOfSegments); + } + + @Test + public void testEndOfSegmentsIndicationNotAddedWhenNotChunking() throws IOException, ExecutionException, InterruptedException { + final var referenceTimestamp = Instant.ofEpochMilli(1686593191*1000); + String packetData = ""; + for (int i = 0; i < 10; i++) { + packetData += "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + } + byte[] packetBytes = packetData.getBytes(StandardCharsets.UTF_8); + var outputBuffersCreated = new ConcurrentLinkedQueue(); + // Buffer size should be large enough to hold all packetData and overhead + var serializer = createSerializerWithTestHandler(outputBuffersCreated, 500); + + var bb = Unpooled.wrappedBuffer(packetBytes); + serializer.addWriteEvent(referenceTimestamp, bb); + CompletableFuture future = serializer.flushCommitAndResetStream(true); + future.get(); + bb.release(); + + List observations = new ArrayList<>(); + for (ByteBuffer buffer : outputBuffersCreated) { + var trafficStream = TrafficStream.parseFrom(buffer); + observations.addAll(trafficStream.getSubStreamList()); + } + + int foundEndOfSegments = 0; + for (TrafficObservation observation : observations) { + if (observation.hasSegmentEnd()) { + foundEndOfSegments++; + } + } + Assertions.assertEquals(0, foundEndOfSegments); + } + private StreamChannelConnectionCaptureSerializer createSerializerWithTestHandler(ConcurrentLinkedQueue outputBuffers, int bufferSize) throws IOException { diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index f9bde7d4e..9781c71c6 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -30,10 +30,7 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import java.util.Set; -import java.util.StringJoiner; import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; @@ -529,8 +526,7 @@ private static String formatWorkItem(DiagnosticTrackableCompletableFuture trafficChunkStream, CapturedTrafficToHttpTransactionAccumulator trafficToHttpTransactionAccumulator) { - trafficChunkStream - .forEach(ts-> trafficToHttpTransactionAccumulator.accept(ts)); + trafficChunkStream.forEach(ts-> trafficToHttpTransactionAccumulator.accept(ts)); } }