diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java index f0f9e9366..3251717b5 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/HttpMessageAndTimestamp.java @@ -61,9 +61,7 @@ public ByteBuf asByteBuf() { var compositeBuf = Unpooled.compositeBuffer(); packetBytes.stream() .map(Unpooled::wrappedBuffer) - .forEach(buffer -> { - compositeBuf.addComponent(true, buffer); - }); + .forEach(buffer -> compositeBuf.addComponent(true, buffer)); return compositeBuf.asReadOnly(); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java index 34b20a237..7962c5000 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/ParsedHttpMessagesAsDicts.java @@ -35,6 +35,7 @@ public class ParsedHttpMessagesAsDicts { public static final String STATUS_CODE_KEY = "Status-Code"; public static final String RESPONSE_TIME_MS_KEY = "response_time_ms"; public static final int MAX_PAYLOAD_BYTES_TO_CAPTURE = 256 * 1024 * 1024; + public static final String EXCEPTION_KEY_STRING = "Exception"; public final Optional> sourceRequestOp; public final Optional> sourceResponseOp; @@ -160,7 +161,7 @@ private static Map makeSafeMap( ) .setCause(e) .log(); - return Map.of("Exception", (Object) e.toString()); + return Map.of(EXCEPTION_KEY_STRING, (Object) e.toString()); } } @@ -185,7 +186,7 @@ private static Map convertRequest( context.setHttpVersion(message.protocolVersion().toString()); return fillMap(map, message.headers(), message.content()); } else { - return Map.of("Exception", "Message couldn't be parsed as a full http message"); + return Map.of(EXCEPTION_KEY_STRING, "Message couldn't be parsed as a full http message"); } } }); @@ -211,7 +212,7 @@ private static Map convertResponse( map.put(RESPONSE_TIME_MS_KEY, latency.toMillis()); return fillMap(map, message.headers(), message.content()); } else { - return Map.of("Exception", "Message couldn't be parsed as a full http message"); + return Map.of(EXCEPTION_KEY_STRING, "Message couldn't be parsed as a full http message"); } } }); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java index 084aa2801..45f8f732d 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayer.java @@ -52,7 +52,7 @@ public class TrafficReplayer { public static final String PACKET_TIMEOUT_SECONDS_PARAMETER_NAME = "--packet-timeout-seconds"; public static final String LOOKAHEAD_TIME_WINDOW_PARAMETER_NAME = "--lookahead-time-window"; - private static final long ACTIVE_WORK_MONITOR_CADENCE_MS = 30 * 1000; + private static final long ACTIVE_WORK_MONITOR_CADENCE_MS = 30 * 1000L; public static class DualException extends Exception { public final Throwable originalCause; @@ -357,7 +357,8 @@ public static void main(String[] args) throws Exception { params, Duration.ofSeconds(params.lookaheadTimeSeconds) ); - var authTransformer = buildAuthTransformerFactory(params) + var authTransformer = buildAuthTransformerFactory(params); + var trafficStreamLimiter = new TrafficStreamLimiter(params.maxConcurrentRequests) ) { var timeShifter = new TimeShifter(params.speedupFactor); var serverTimeout = Duration.ofSeconds(params.targetServerResponseTimeoutSeconds); @@ -379,7 +380,7 @@ public static void main(String[] args) throws Exception { params.allowInsecureConnections, params.numClientThreads ), - new TrafficStreamLimiter(params.maxConcurrentRequests), + trafficStreamLimiter, orderedRequestTracker ); activeContextMonitor = new ActiveContextMonitor( @@ -429,20 +430,20 @@ private static void setupShutdownHookForReplayer(TrafficReplayerTopLevel tr) { // both Log4J and the java builtin loggers add shutdown hooks. // The API for addShutdownHook says that those hooks registered will run in an undetermined order. // Hence, the reason that this code logs via slf4j logging AND stderr. - { - var beforeMsg = "Running TrafficReplayer Shutdown. " + Optional.of("Running TrafficReplayer Shutdown. " + "The logging facilities may also be shutting down concurrently, " - + "resulting in missing logs messages."; - log.atWarn().setMessage(beforeMsg).log(); - System.err.println(beforeMsg); - } + + "resulting in missing logs messages.") + .ifPresent(beforeMsg -> { + log.atWarn().setMessage(beforeMsg).log(); + System.err.println(beforeMsg); + }); Optional.ofNullable(weakTrafficReplayer.get()).ifPresent(o -> o.shutdown(null)); - { - var afterMsg = "Done shutting down TrafficReplayer (due to Runtime shutdown). " - + "Logs may be missing for events that have happened after the Shutdown event was received."; - log.atWarn().setMessage(afterMsg).log(); - System.err.println(afterMsg); - } + Optional.of("Done shutting down TrafficReplayer (due to Runtime shutdown). " + + "Logs may be missing for events that have happened after the Shutdown event was received.") + .ifPresent(afterMsg -> { + log.atWarn().setMessage(afterMsg).log(); + System.err.println(afterMsg); + }); })); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerCore.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerCore.java index 42788b4a8..4acf7d6ae 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerCore.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerCore.java @@ -281,7 +281,6 @@ private void packageAndWriteResponse( ) { log.trace("done sending and finalizing data to the packet handler"); - SourceTargetCaptureTuple requestResponseTuple1; if (t != null) { log.error("Got exception in CompletableFuture callback: ", t); } @@ -318,6 +317,7 @@ public TrackedFuture transformA request.packetBytes::stream); } + @Override protected void perResponseConsumer(AggregatedRawResponse summary, HttpRequestTransformationStatus transformationStatus, IReplayContexts.IReplayerHttpTransactionContext context) { @@ -330,7 +330,7 @@ protected void perResponseConsumer(AggregatedRawResponse summary, exceptionRequestCount.incrementAndGet(); } else if (transformationStatus.isError()) { log.atInfo() - .setCause(summary.getError()) + .setCause(Optional.ofNullable(summary).map(AggregatedRawResponse::getError).orElse(null)) .setMessage("Unknown error transforming {}: ") .addArgument(context) .log(); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerTopLevel.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerTopLevel.java index 9f21db0b4..7b0c32037 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerTopLevel.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/TrafficReplayerTopLevel.java @@ -177,6 +177,16 @@ public void setupRunAndWaitForReplayToFinish( } } + /** + * Called after the TrafficReplayer has finished accumulating and reconstructing every transaction from + * the incoming stream. This implementation will NOT wait for the ReplayEngine independently to complete, + * but rather call waitForRemainingWork. If a subclass wants more details from either of the two main + * non-field components of a TrafficReplayer, they have access to each of them here. + * + * @param replayEngine The ReplayEngine that may still be working to send the accumulated requests. + * @param trafficToHttpTransactionAccumulator The accumulator that had reconstructed the incoming records and + * has now finished + */ protected void wrapUpWorkAndEmitSummary( ReplayEngine replayEngine, CapturedTrafficToHttpTransactionAccumulator trafficToHttpTransactionAccumulator diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java index 4527d1f5d..57fa4a4a3 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/NettyPacketToHttpConsumer.java @@ -167,9 +167,7 @@ public IReplayContexts.ITargetRequestContext getParentContext() { return (eventLoop, ctx) -> NettyPacketToHttpConsumer.createClientConnection(eventLoop, sslContext, uri, ctx); } - public static class ChannelNotActiveException extends IOException { - public ChannelNotActiveException() {} - } + public static class ChannelNotActiveException extends IOException { } public static TrackedFuture createClientConnection( EventLoop eventLoop, @@ -382,9 +380,8 @@ public TrackedFuture consumeBytes(ByteBuf packetData) { + packetData.toString(StandardCharsets.UTF_8) ) .log(); - return writePacketAndUpdateFuture(packetData).whenComplete((v2, t2) -> { - log.atTrace().setMessage(() -> "finished writing " + httpContext() + " t=" + t2).log(); - }, () -> ""); + return writePacketAndUpdateFuture(packetData).whenComplete((v2, t2) -> + log.atTrace().setMessage(() -> "finished writing " + httpContext() + " t=" + t2).log(), () -> ""); } else { log.atWarn() .setMessage( diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java index 763cbd868..ff8c3f665 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java @@ -70,6 +70,7 @@ public int size() { return underlyingMap.entrySet(); } } + @Override public Object put(String key, Object value) { return underlyingMap.put(key, value); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java index 50a6ac6ef..c25dbe5dd 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/DefaultRetry.java @@ -71,7 +71,7 @@ public static boolean retryIsUnnecessaryGivenStatusCode(int statusCode) { Optional.ofNullable(HttpByteBufFormatter.processHttpMessageFromBufs( HttpByteBufFormatter.HttpMessageType.RESPONSE, Stream.of(sourceResponse.asByteBuf())))) - .filter(o -> o instanceof HttpResponse) + .filter(HttpResponse.class::isInstance) .map(responseMsg -> shouldRetry(((HttpResponse)responseMsg).status().code(), rr.status().code())) .orElse(RequestSenderOrchestrator.RetryDirective.RETRY), diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/OpenSearchDefaultRetry.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/OpenSearchDefaultRetry.java index b1039c777..47865ad0f 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/OpenSearchDefaultRetry.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/http/retries/OpenSearchDefaultRetry.java @@ -30,7 +30,7 @@ @Slf4j public class OpenSearchDefaultRetry extends DefaultRetry { - private static final Pattern bulkPathMatcher = Pattern.compile("^(/[^/]*)?/_bulk([/?]+.*)*$"); + private static final Pattern bulkPathMatcher = Pattern.compile("^(/[^/]*)?/_bulk(/.*)?$"); private static class BulkErrorFindingHandler extends ChannelInboundHandlerAdapter { private final JsonParser parser; @@ -82,11 +82,12 @@ private void consumeInput() throws IOException { break; } else if (parser.getParsingContext().inRoot() && token == JsonToken.END_OBJECT) { break; - } else if (token != JsonToken.START_OBJECT && token != JsonToken.END_OBJECT) { + } else if (token != JsonToken.START_OBJECT && + token != JsonToken.END_OBJECT && + !parser.getParsingContext().inRoot()) + { // Skip non-root level content - if (!parser.getParsingContext().inRoot()) { - parser.skipChildren(); - } + parser.skipChildren(); } } } @@ -109,24 +110,27 @@ boolean bulkResponseHadNoErrors(ByteBuf responseByteBuf) { var targetRequestByteBuf = Unpooled.wrappedBuffer(targetRequestBytes); var parsedRequest = HttpByteBufFormatter.parseHttpRequestFromBufs(Stream.of(targetRequestByteBuf), 0); if (parsedRequest != null && - bulkPathMatcher.matcher(parsedRequest.uri()).matches()) { + bulkPathMatcher.matcher(parsedRequest.uri()).matches() && // do a more granular check. If the raw response wasn't present, then just push it to the superclass // since it isn't going to be any kind of response, let alone a bulk one - if (Optional.ofNullable(currentResponse.getRawResponse()).map(r->r.status().code() == 200).orElse(false)) { - if (bulkResponseHadNoErrors(currentResponse.getResponseAsByteBuf())) { - return TextTrackedFuture.completedFuture(RequestSenderOrchestrator.RetryDirective.DONE, - () -> "no errors found in the target response, so not retrying"); - } else { - return reconstructedSourceTransactionFuture.thenCompose(rrp -> - TextTrackedFuture.completedFuture( - bulkResponseHadNoErrors(rrp.getResponseData().asByteBuf()) ? - RequestSenderOrchestrator.RetryDirective.RETRY : - RequestSenderOrchestrator.RetryDirective.DONE, - () -> "evaluating retry status dependent upon source error field"), - () -> "checking the accumulated source response value"); - } + Optional.ofNullable(currentResponse.getRawResponse()) + .map(r->r.status().code() == 200) + .orElse(false)) + { + if (bulkResponseHadNoErrors(currentResponse.getResponseAsByteBuf())) { + return TextTrackedFuture.completedFuture(RequestSenderOrchestrator.RetryDirective.DONE, + () -> "no errors found in the target response, so not retrying"); + } else { + return reconstructedSourceTransactionFuture.thenCompose(rrp -> + TextTrackedFuture.completedFuture( + bulkResponseHadNoErrors(rrp.getResponseData().asByteBuf()) ? + RequestSenderOrchestrator.RetryDirective.RETRY : + RequestSenderOrchestrator.RetryDirective.DONE, + () -> "evaluating retry status dependent upon source error field"), + () -> "checking the accumulated source response value"); } } + return super.shouldRetry(targetRequestBytes, currentResponse, reconstructedSourceTransactionFuture); } } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/OffsetLifecycleTracker.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/OffsetLifecycleTracker.java index a75888f4b..f96f02d4f 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/OffsetLifecycleTracker.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/kafka/OffsetLifecycleTracker.java @@ -42,15 +42,17 @@ void add(long offset) { Optional removeAndReturnNewHead(long offsetToRemove) { synchronized (pQueue) { var topCursor = pQueue.peek(); - assert topCursor != null : "Expected pQueue to be non-empty but it was when asked to remove " - + offsetToRemove; - var didRemove = pQueue.remove(offsetToRemove); - assert didRemove : "Expected all live records to have an entry and for them to be removed only once"; if (topCursor == null) { throw new IllegalStateException( "pQueue looks to have been empty by the time we tried to remove " + offsetToRemove ); } + var didRemove = pQueue.remove(offsetToRemove); + if (!didRemove) { + throw new IllegalStateException( + "Expected all live records to have an entry and for them to be removed only once"); + } + if (offsetToRemove == topCursor) { topCursor = Optional.ofNullable(pQueue.peek()).orElse(cursorHighWatermark + 1); // most recent cursor // was previously popped diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ChannelContextManager.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ChannelContextManager.java index ab31b450c..711b1d1de 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ChannelContextManager.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/ChannelContextManager.java @@ -57,7 +57,7 @@ public IReplayContexts.IChannelKeyContext retainOrCreateContext(ITrafficStreamKe public IReplayContexts.IChannelKeyContext releaseContextFor(IReplayContexts.IChannelKeyContext ctx) { var connId = ctx.getConnectionId(); var refCountedCtx = connectionToChannelContextMap.get(connId); - assert ctx == refCountedCtx.context; + assert ctx == refCountedCtx.context : "consistency mismatch"; var finalRelease = refCountedCtx.release(); if (finalRelease) { ctx.close(); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/KafkaConsumerContexts.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/KafkaConsumerContexts.java index 024e1e2c4..c1f97580d 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/KafkaConsumerContexts.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/tracing/KafkaConsumerContexts.java @@ -24,7 +24,7 @@ private KafkaConsumerContexts() {} public static class AsyncListeningContext implements IKafkaConsumerContexts.IAsyncListeningContext { @Getter @NonNull - public final RootReplayerContext enclosingScope; // TODO - rename this to rootScope + public final RootReplayerContext enclosingScope; @Getter @Setter Exception observedExceptionToIncludeInMetrics; diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java index a67a3602e..a9a4d4ef6 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/util/RefSafeHolder.java @@ -14,7 +14,7 @@ private RefSafeHolder(@Nullable T resource) { } @MustBeClosed - static public RefSafeHolder create(@Nullable T resource) { + public static RefSafeHolder create(@Nullable T resource) { return new RefSafeHolder<>(resource); } diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/trafficcapture/protos/TrafficStreamUtils.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/trafficcapture/protos/TrafficStreamUtils.java index 096082467..5f942e64b 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/trafficcapture/protos/TrafficStreamUtils.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/trafficcapture/protos/TrafficStreamUtils.java @@ -35,7 +35,7 @@ public static String summarizeTrafficStream(TrafficStream ts) { return ts.getConnectionId() + " (#" + getTrafficStreamIndex(ts) + ")[" + listSummaryStr + "]"; } - private static Object getOptionalContext(TrafficObservation tso) { + private static String getOptionalContext(TrafficObservation tso) { return Optional.ofNullable(getByteArrayForDataOf(tso)) .map(b -> " " + new String(b, 0, Math.min(3, b.length), StandardCharsets.UTF_8)) .orElse(""); diff --git a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/transform/StaticAuthTransformerFactory.java b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/transform/StaticAuthTransformerFactory.java index 7e319f495..dfdae7de0 100644 --- a/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/transform/StaticAuthTransformerFactory.java +++ b/TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/transform/StaticAuthTransformerFactory.java @@ -15,7 +15,6 @@ public IAuthTransformer getAuthTransformer(HttpJsonMessageWithFaultingPayload ht @Override public void rewriteHeaders(HttpJsonMessageWithFaultingPayload msg) { msg.headers().put("authorization", authHeaderValue); - // TODO - wipe out more headers too? } }; }