Skip to content

Commit

Permalink
Merge pull request opensearch-project#977 from gregschohn/ReplayerCod…
Browse files Browse the repository at this point in the history
…eSmells
  • Loading branch information
gregschohn authored Sep 20, 2024
2 parents 75171fb + d5edad8 commit df5b2d6
Show file tree
Hide file tree
Showing 15 changed files with 71 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,7 @@ public ByteBuf asByteBuf() {
var compositeBuf = Unpooled.compositeBuffer();
packetBytes.stream()
.map(Unpooled::wrappedBuffer)
.forEach(buffer -> {
compositeBuf.addComponent(true, buffer);
});
.forEach(buffer -> compositeBuf.addComponent(true, buffer));
return compositeBuf.asReadOnly();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ParsedHttpMessagesAsDicts {
public static final String STATUS_CODE_KEY = "Status-Code";
public static final String RESPONSE_TIME_MS_KEY = "response_time_ms";
public static final int MAX_PAYLOAD_BYTES_TO_CAPTURE = 256 * 1024 * 1024;
public static final String EXCEPTION_KEY_STRING = "Exception";

public final Optional<Map<String, Object>> sourceRequestOp;
public final Optional<Map<String, Object>> sourceResponseOp;
Expand Down Expand Up @@ -160,7 +161,7 @@ private static Map<String, Object> makeSafeMap(
)
.setCause(e)
.log();
return Map.of("Exception", (Object) e.toString());
return Map.of(EXCEPTION_KEY_STRING, (Object) e.toString());
}
}

Expand All @@ -185,7 +186,7 @@ private static Map<String, Object> convertRequest(
context.setHttpVersion(message.protocolVersion().toString());
return fillMap(map, message.headers(), message.content());
} else {
return Map.of("Exception", "Message couldn't be parsed as a full http message");
return Map.of(EXCEPTION_KEY_STRING, "Message couldn't be parsed as a full http message");
}
}
});
Expand All @@ -211,7 +212,7 @@ private static Map<String, Object> convertResponse(
map.put(RESPONSE_TIME_MS_KEY, latency.toMillis());
return fillMap(map, message.headers(), message.content());
} else {
return Map.of("Exception", "Message couldn't be parsed as a full http message");
return Map.of(EXCEPTION_KEY_STRING, "Message couldn't be parsed as a full http message");
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class TrafficReplayer {
public static final String PACKET_TIMEOUT_SECONDS_PARAMETER_NAME = "--packet-timeout-seconds";

public static final String LOOKAHEAD_TIME_WINDOW_PARAMETER_NAME = "--lookahead-time-window";
private static final long ACTIVE_WORK_MONITOR_CADENCE_MS = 30 * 1000;
private static final long ACTIVE_WORK_MONITOR_CADENCE_MS = 30 * 1000L;

public static class DualException extends Exception {
public final Throwable originalCause;
Expand Down Expand Up @@ -357,7 +357,8 @@ public static void main(String[] args) throws Exception {
params,
Duration.ofSeconds(params.lookaheadTimeSeconds)
);
var authTransformer = buildAuthTransformerFactory(params)
var authTransformer = buildAuthTransformerFactory(params);
var trafficStreamLimiter = new TrafficStreamLimiter(params.maxConcurrentRequests)
) {
var timeShifter = new TimeShifter(params.speedupFactor);
var serverTimeout = Duration.ofSeconds(params.targetServerResponseTimeoutSeconds);
Expand All @@ -379,7 +380,7 @@ public static void main(String[] args) throws Exception {
params.allowInsecureConnections,
params.numClientThreads
),
new TrafficStreamLimiter(params.maxConcurrentRequests),
trafficStreamLimiter,
orderedRequestTracker
);
activeContextMonitor = new ActiveContextMonitor(
Expand Down Expand Up @@ -429,20 +430,20 @@ private static void setupShutdownHookForReplayer(TrafficReplayerTopLevel tr) {
// both Log4J and the java builtin loggers add shutdown hooks.
// The API for addShutdownHook says that those hooks registered will run in an undetermined order.
// Hence, the reason that this code logs via slf4j logging AND stderr.
{
var beforeMsg = "Running TrafficReplayer Shutdown. "
Optional.of("Running TrafficReplayer Shutdown. "
+ "The logging facilities may also be shutting down concurrently, "
+ "resulting in missing logs messages.";
log.atWarn().setMessage(beforeMsg).log();
System.err.println(beforeMsg);
}
+ "resulting in missing logs messages.")
.ifPresent(beforeMsg -> {
log.atWarn().setMessage(beforeMsg).log();
System.err.println(beforeMsg);
});
Optional.ofNullable(weakTrafficReplayer.get()).ifPresent(o -> o.shutdown(null));
{
var afterMsg = "Done shutting down TrafficReplayer (due to Runtime shutdown). "
+ "Logs may be missing for events that have happened after the Shutdown event was received.";
log.atWarn().setMessage(afterMsg).log();
System.err.println(afterMsg);
}
Optional.of("Done shutting down TrafficReplayer (due to Runtime shutdown). "
+ "Logs may be missing for events that have happened after the Shutdown event was received.")
.ifPresent(afterMsg -> {
log.atWarn().setMessage(afterMsg).log();
System.err.println(afterMsg);
});
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ private void packageAndWriteResponse(
) {
log.trace("done sending and finalizing data to the packet handler");

SourceTargetCaptureTuple requestResponseTuple1;
if (t != null) {
log.error("Got exception in CompletableFuture callback: ", t);
}
Expand Down Expand Up @@ -318,6 +317,7 @@ public TrackedFuture<String, TransformedTargetRequestAndResponseList> transformA
request.packetBytes::stream);
}

@Override
protected void perResponseConsumer(AggregatedRawResponse summary,
HttpRequestTransformationStatus transformationStatus,
IReplayContexts.IReplayerHttpTransactionContext context) {
Expand All @@ -330,7 +330,7 @@ protected void perResponseConsumer(AggregatedRawResponse summary,
exceptionRequestCount.incrementAndGet();
} else if (transformationStatus.isError()) {
log.atInfo()
.setCause(summary.getError())
.setCause(Optional.ofNullable(summary).map(AggregatedRawResponse::getError).orElse(null))
.setMessage("Unknown error transforming {}: ")
.addArgument(context)
.log();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,16 @@ public void setupRunAndWaitForReplayToFinish(
}
}

/**
* Called after the TrafficReplayer has finished accumulating and reconstructing every transaction from
* the incoming stream. This implementation will NOT wait for the ReplayEngine independently to complete,
* but rather call waitForRemainingWork. If a subclass wants more details from either of the two main
* non-field components of a TrafficReplayer, they have access to each of them here.
*
* @param replayEngine The ReplayEngine that may still be working to send the accumulated requests.
* @param trafficToHttpTransactionAccumulator The accumulator that had reconstructed the incoming records and
* has now finished
*/
protected void wrapUpWorkAndEmitSummary(
ReplayEngine replayEngine,
CapturedTrafficToHttpTransactionAccumulator trafficToHttpTransactionAccumulator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ public IReplayContexts.ITargetRequestContext getParentContext() {
return (eventLoop, ctx) -> NettyPacketToHttpConsumer.createClientConnection(eventLoop, sslContext, uri, ctx);
}

public static class ChannelNotActiveException extends IOException {
public ChannelNotActiveException() {}
}
public static class ChannelNotActiveException extends IOException { }

public static TrackedFuture<String, ChannelFuture> createClientConnection(
EventLoop eventLoop,
Expand Down Expand Up @@ -382,9 +380,8 @@ public TrackedFuture<String, Void> consumeBytes(ByteBuf packetData) {
+ packetData.toString(StandardCharsets.UTF_8)
)
.log();
return writePacketAndUpdateFuture(packetData).whenComplete((v2, t2) -> {
log.atTrace().setMessage(() -> "finished writing " + httpContext() + " t=" + t2).log();
}, () -> "");
return writePacketAndUpdateFuture(packetData).whenComplete((v2, t2) ->
log.atTrace().setMessage(() -> "finished writing " + httpContext() + " t=" + t2).log(), () -> "");
} else {
log.atWarn()
.setMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public int size() {
return underlyingMap.entrySet();
}
}

@Override
public Object put(String key, Object value) {
return underlyingMap.put(key, value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public static boolean retryIsUnnecessaryGivenStatusCode(int statusCode) {
Optional.ofNullable(HttpByteBufFormatter.processHttpMessageFromBufs(
HttpByteBufFormatter.HttpMessageType.RESPONSE,
Stream.of(sourceResponse.asByteBuf()))))
.filter(o -> o instanceof HttpResponse)
.filter(HttpResponse.class::isInstance)
.map(responseMsg -> shouldRetry(((HttpResponse)responseMsg).status().code(),
rr.status().code()))
.orElse(RequestSenderOrchestrator.RetryDirective.RETRY),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
@Slf4j
public class OpenSearchDefaultRetry extends DefaultRetry {

private static final Pattern bulkPathMatcher = Pattern.compile("^(/[^/]*)?/_bulk([/?]+.*)*$");
private static final Pattern bulkPathMatcher = Pattern.compile("^(/[^/]*)?/_bulk(/.*)?$");

private static class BulkErrorFindingHandler extends ChannelInboundHandlerAdapter {
private final JsonParser parser;
Expand Down Expand Up @@ -82,11 +82,12 @@ private void consumeInput() throws IOException {
break;
} else if (parser.getParsingContext().inRoot() && token == JsonToken.END_OBJECT) {
break;
} else if (token != JsonToken.START_OBJECT && token != JsonToken.END_OBJECT) {
} else if (token != JsonToken.START_OBJECT &&
token != JsonToken.END_OBJECT &&
!parser.getParsingContext().inRoot())
{
// Skip non-root level content
if (!parser.getParsingContext().inRoot()) {
parser.skipChildren();
}
parser.skipChildren();
}
}
}
Expand All @@ -109,24 +110,27 @@ boolean bulkResponseHadNoErrors(ByteBuf responseByteBuf) {
var targetRequestByteBuf = Unpooled.wrappedBuffer(targetRequestBytes);
var parsedRequest = HttpByteBufFormatter.parseHttpRequestFromBufs(Stream.of(targetRequestByteBuf), 0);
if (parsedRequest != null &&
bulkPathMatcher.matcher(parsedRequest.uri()).matches()) {
bulkPathMatcher.matcher(parsedRequest.uri()).matches() &&
// do a more granular check. If the raw response wasn't present, then just push it to the superclass
// since it isn't going to be any kind of response, let alone a bulk one
if (Optional.ofNullable(currentResponse.getRawResponse()).map(r->r.status().code() == 200).orElse(false)) {
if (bulkResponseHadNoErrors(currentResponse.getResponseAsByteBuf())) {
return TextTrackedFuture.completedFuture(RequestSenderOrchestrator.RetryDirective.DONE,
() -> "no errors found in the target response, so not retrying");
} else {
return reconstructedSourceTransactionFuture.thenCompose(rrp ->
TextTrackedFuture.completedFuture(
bulkResponseHadNoErrors(rrp.getResponseData().asByteBuf()) ?
RequestSenderOrchestrator.RetryDirective.RETRY :
RequestSenderOrchestrator.RetryDirective.DONE,
() -> "evaluating retry status dependent upon source error field"),
() -> "checking the accumulated source response value");
}
Optional.ofNullable(currentResponse.getRawResponse())
.map(r->r.status().code() == 200)
.orElse(false))
{
if (bulkResponseHadNoErrors(currentResponse.getResponseAsByteBuf())) {
return TextTrackedFuture.completedFuture(RequestSenderOrchestrator.RetryDirective.DONE,
() -> "no errors found in the target response, so not retrying");
} else {
return reconstructedSourceTransactionFuture.thenCompose(rrp ->
TextTrackedFuture.completedFuture(
bulkResponseHadNoErrors(rrp.getResponseData().asByteBuf()) ?
RequestSenderOrchestrator.RetryDirective.RETRY :
RequestSenderOrchestrator.RetryDirective.DONE,
() -> "evaluating retry status dependent upon source error field"),
() -> "checking the accumulated source response value");
}
}

return super.shouldRetry(targetRequestBytes, currentResponse, reconstructedSourceTransactionFuture);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,17 @@ void add(long offset) {
Optional<Long> removeAndReturnNewHead(long offsetToRemove) {
synchronized (pQueue) {
var topCursor = pQueue.peek();
assert topCursor != null : "Expected pQueue to be non-empty but it was when asked to remove "
+ offsetToRemove;
var didRemove = pQueue.remove(offsetToRemove);
assert didRemove : "Expected all live records to have an entry and for them to be removed only once";
if (topCursor == null) {
throw new IllegalStateException(
"pQueue looks to have been empty by the time we tried to remove " + offsetToRemove
);
}
var didRemove = pQueue.remove(offsetToRemove);
if (!didRemove) {
throw new IllegalStateException(
"Expected all live records to have an entry and for them to be removed only once");
}

if (offsetToRemove == topCursor) {
topCursor = Optional.ofNullable(pQueue.peek()).orElse(cursorHighWatermark + 1); // most recent cursor
// was previously popped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public IReplayContexts.IChannelKeyContext retainOrCreateContext(ITrafficStreamKe
public IReplayContexts.IChannelKeyContext releaseContextFor(IReplayContexts.IChannelKeyContext ctx) {
var connId = ctx.getConnectionId();
var refCountedCtx = connectionToChannelContextMap.get(connId);
assert ctx == refCountedCtx.context;
assert ctx == refCountedCtx.context : "consistency mismatch";
var finalRelease = refCountedCtx.release();
if (finalRelease) {
ctx.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ private KafkaConsumerContexts() {}
public static class AsyncListeningContext implements IKafkaConsumerContexts.IAsyncListeningContext {
@Getter
@NonNull
public final RootReplayerContext enclosingScope; // TODO - rename this to rootScope
public final RootReplayerContext enclosingScope;
@Getter
@Setter
Exception observedExceptionToIncludeInMetrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ private RefSafeHolder(@Nullable T resource) {
}

@MustBeClosed
static public <T> RefSafeHolder<T> create(@Nullable T resource) {
public static <T> RefSafeHolder<T> create(@Nullable T resource) {
return new RefSafeHolder<>(resource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static String summarizeTrafficStream(TrafficStream ts) {
return ts.getConnectionId() + " (#" + getTrafficStreamIndex(ts) + ")[" + listSummaryStr + "]";
}

private static Object getOptionalContext(TrafficObservation tso) {
private static String getOptionalContext(TrafficObservation tso) {
return Optional.ofNullable(getByteArrayForDataOf(tso))
.map(b -> " " + new String(b, 0, Math.min(3, b.length), StandardCharsets.UTF_8))
.orElse("");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ public IAuthTransformer getAuthTransformer(HttpJsonMessageWithFaultingPayload ht
@Override
public void rewriteHeaders(HttpJsonMessageWithFaultingPayload msg) {
msg.headers().put("authorization", authHeaderValue);
// TODO - wipe out more headers too?
}
};
}
Expand Down

0 comments on commit df5b2d6

Please sign in to comment.