Skip to content

Commit

Permalink
Merge branch 'main' into ReplayerCodeSmells
Browse files Browse the repository at this point in the history
Signed-off-by: Greg Schohn <[email protected]>

# Conflicts:
#	TrafficCapture/trafficReplayer/src/main/java/org/opensearch/migrations/replay/datahandlers/PayloadAccessFaultingMap.java
  • Loading branch information
gregschohn committed Sep 19, 2024
2 parents 681c87d + 802ae21 commit d5edad8
Show file tree
Hide file tree
Showing 37 changed files with 1,131 additions and 543 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317"
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317" #--transformer-config-base64 W3sgIkpzb25Kb2x0VHJhbnNmb3JtZXJQcm92aWRlciI6ClsKICB7CiAgICAic2NyaXB0IjogewogICAgICAib3BlcmF0aW9uIjogInNoaWZ0IiwKICAgICAgInNwZWMiOiB7CiAgICAgICAgIm1ldGhvZCI6ICJtZXRob2QiLAogICAgICAgICJVUkkiOiAiVVJJIiwKICAgICAgICAiaGVhZGVycyI6ICJoZWFkZXJzIiwKICAgICAgICAicGF5bG9hZCI6IHsKICAgICAgICAgICJpbmxpbmVkSnNvbkJvZHkiOiB7CiAgICAgICAgICAgICJ0b3AiOiB7CiAgICAgICAgICAgICAgInRhZ1RvRXhjaXNlIjogewogICAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiIAogICAgICAgICAgICAgIH0sCiAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAiKiI6ICJwYXlsb2FkLmlubGluZWRKc29uQm9keS4mIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIH0sIAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPXNwbGl0KCcvZXh0cmFUaGluZ1RvUmVtb3ZlJyxAKDEsJikpIgogICAgIH0KICB9CiB9LAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPWpvaW4oJycsQCgxLCYpKSIKICAgICB9CiAgfQogfQpdCn1dCg=="

opensearchtarget:
image: 'opensearchproject/opensearch:2.15.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
## Monitoring Progress via Instrumentation

The replayer and capture proxy (if started with the `--otelCollectorEndpoint` argument) emit metrics through an
otel-collector endpoint, which is deployed within Migrations Assistant tasks as a sidecar container. The
otel-collectors will publish metrics and traces to Amazon CloudWatch and AWS X-Ray.

Some of these metrics will show simple progress, such as bytes or records transmitted. Other records can show higher
level information, such the number of responses with status codes that match vs those that don't. To observe those,
search for `statusCodesMatch` in the CloudWatch Console. That's emitted as an attribute along with the method and
the source/target status code (rounded down to the last hundred; i.e. a status code of 201 has a 200 attribute).

Other metrics will show latencies, the number of requests, unique connections at a time and more. Low-level and
high-level metrics are being improved and added. For the latest information, see the
[README.md](../../../../../../coreUtilities/README.md).

Along with metrics, traces are emitted by the replayer and the proxy (when proxy is run with metrics enabled, e.g. by
launching with --otelCollectorEndpoint set to the otel-collector sidecar). Traces will include very granular data for
each connection, including how long the TCP connections are open, how long the source and target clusters took to send
a response, as well as other internal details that can explain the progress of each request.

Notice that traces for the replayer will show connections and Kafka records open, in some cases, much longer than their
representative HTTP transactions. This is because records are considered 'active' to the replayer until they are
committed and records are only committed once _all_ previous records have also been committed. Details such as that
are defensive for when further diagnosis is necessary.
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers;

import org.opensearch.migrations.testutils.SharedDockerImageNames;

import org.testcontainers.containers.GenericContainer;

public class HttpdContainerTestBase extends TestContainerTestBase<GenericContainer<?>> {

private static final GenericContainer<?> httpd = new GenericContainer("httpd:alpine").withExposedPorts(80); // Container
// Port
private static final GenericContainer<?> httpd = new GenericContainer(SharedDockerImageNames.HTTPD)
.withExposedPorts(80); // Container Port

public GenericContainer<?> getContainer() {
return httpd;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers;

import org.opensearch.migrations.testutils.SharedDockerImageNames;

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

public class KafkaContainerTestBase extends TestContainerTestBase<KafkaContainer> {

private static final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:latest")
);
private static final KafkaContainer kafka = new KafkaContainer(SharedDockerImageNames.KAFKA);

public KafkaContainer getContainer() {
return kafka;
Expand Down
29 changes: 19 additions & 10 deletions TrafficCapture/trafficReplayer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ which has comments throughout it to indicate how data percolates and is converte

## Handlers

With the exception of the preparation around JSON model and its transformation, all the other handlers (compression,
Except for the conversions around JSON payloads, all the other handlers (compression,
chunked, and JSON parsing/serialization), use streaming data models via mostly custom handlers. This should minimize the
memory load (working set size, cache misses, etc). However, attempts have not yet been made to reduce the number of
allocations. Those optimization may not have extremely high value, especially when JSON parsing will need to create
Expand All @@ -80,12 +80,20 @@ Transformations are performed via a simple interface defined by
[IJsonTransformer](../transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/IJsonTransformer.java) ('transformer'). They are loaded dynamically and are designed to allow for easy extension
of the TrafficReplayer to support a diverse set of needs.

The input to the transformer will be an HTTP message represented as a json-like `Map<String,Object>` with
The input to the transformer is an HTTP message represented as a json-like `Map<String,Object>` with
top-level key-value pairs defined in
[JsonKeysForHttpMessage.java](../transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/JsonKeysForHttpMessage.java).
Only bodies that are json-formatted will be accessible, and they will be accessible as a fully-parsed Map (at
the keypath `'payload'->'inlinedJsonBody'`). Transformers have the option to rewrite none, or any of the keys and
values within the original message. The transformer can return either the original message or a completely new message.
Bodies that are json-formatted will be accessible via the path `payload.inlinedJsonBody` and they will be accessible
as a fully-parsed Map. Newline-delimited json (ndjson) sequences will be accessible via
`payload.inlinedJsonSequenceBodies` as a List of json Maps. These two payload entries are mutually exclusive.
Any additional bytes that follow a json object (or all of the bytes if there wasn't a json object at all) will
be available as a ByteBuf in `payload.inlinedBinaryBody`.

Transformers have the option to rewrite none, or any of the keys and values within the original message.
The transformer can return either the original message or a completely new message. Notice that one json payload
could be broken into multiple ndjson entries or vice-versa by changing the payload key and supplying an appropriately
typed object as its value (e.g. a single Map or a List of Maps respectively for `inlinedJsonBody` and
`inlinedJsonSequenceBodies`).
Transformers may be used simultaneously from concurrent threads over the lifetime of the replayer. However,
a message will only be processed by one transformer at a time.

Expand All @@ -108,10 +116,10 @@ The name is defined by the `IJsonTransformerProvider::getName()`, which unless o
(e.g. 'JsonJoltTransformerProvider'). The value corresponding to that key is then passed to instantiate an
IJsonTransformer object.

The base [jsonJoltMessageTransformerProvider](../transformationPlugins/jsonMessageTransformers/jsonJoltMessageTransformerProvider)
package includes [JsonCompositeTransformer.java]
The jsonMessageTransformerInterface package includes [JsonCompositeTransformer.java]
(../transformationPlugins/jsonMessageTransformers/jsonMessageTransformerInterface/src/main/java/org/opensearch/migrations/transform/JsonCompositeTransformer.java),
which run transformers in serial. That composite transformer is also utilized by the TrafficReplayer to combine the
which runs configured transformers in serial.
That composite transformer is also utilized by the TrafficReplayer to combine the
list of loaded transformations with a transformer to rewrite the 'Host' header. That host transformation changes the
host header of every HTTP message to use the target domain-name rather than the source's. That will be run after
all loaded/specified transformations.
Expand Down Expand Up @@ -140,8 +148,9 @@ To run only one transformer without any configuration, the `--transformer-config
be set to the name of the transformer (e.g. 'JsonTransformerForOpenSearch23PlusTargetTransformerProvider',
without quotes or any json surrounding it).

The user can also specify a file to read the transformations from using the `--transformer-config-file`, but can't use
both transformer options.
The user can also specify a file to read the transformations from using the `--transformer-config-file`. Users can
also pass the script as an argument via `--transformer-config-base64`. Each of the `transformer-config` options
is mutually exclusive.

Some simple transformations are included to change headers to add compression or to force an HTTP message payload to
be chunked. Another transformer, [JsonTypeMappingTransformer.java](../transformationPlugins/jsonMessageTransformers/openSearch23PlusTargetTransformerProvider/src/main/java/org/opensearch/migrations/transform/JsonTypeMappingTransformer.java),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,17 @@
import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Stream;

import org.opensearch.migrations.replay.datatypes.ByteBufList;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpResponse;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@Getter
@Slf4j
public class AggregatedRawResponse {
public class AggregatedRawResponse extends AggregatedRawResult {

@Getter
protected final HttpResponse rawResponse;
@Getter
protected final int responseSizeInBytes;
@Getter
protected final Duration responseDuration;
protected final ArrayList<AbstractMap.SimpleEntry<Instant, byte[]>> responsePackets;
@Getter
protected final Throwable error;

public static Builder builder(Instant i) {
return new Builder(i);
}

public AggregatedRawResponse(
HttpResponse rawResponse,
Expand All @@ -43,82 +22,35 @@ public AggregatedRawResponse(
List<AbstractMap.SimpleEntry<Instant, byte[]>> responsePackets,
Throwable error
) {
super(responseSizeInBytes, responseDuration, responsePackets, error);
this.rawResponse = rawResponse;
this.responseSizeInBytes = responseSizeInBytes;
this.responseDuration = responseDuration;
this.responsePackets = responsePackets == null ? null : new ArrayList<>(responsePackets);
this.error = error;
}

public byte[][] getCopyOfPackets() {
return responsePackets.stream()
.map(Map.Entry::getValue)
.map(x -> Arrays.copyOf(x, x.length))
.toArray(byte[][]::new);
}

public ByteBuf getResponseAsByteBuf() {
return responsePackets == null ? Unpooled.EMPTY_BUFFER :
ByteBufList.asCompositeByteBufRetained(responsePackets.stream()
.map(Map.Entry::getValue).map(Unpooled::wrappedBuffer))
.asReadOnly();
}

public static class Builder {
private final ArrayList<AbstractMap.SimpleEntry<Instant, byte[]>> receiptTimeAndResponsePackets;
private final Instant requestSendTime;
public static class Builder extends AggregatedRawResult.Builder<Builder> {
protected HttpResponse rawResponse;
protected Throwable error;

public Builder(Instant requestSendTime) {
receiptTimeAndResponsePackets = new ArrayList<>();
this.requestSendTime = requestSendTime;
rawResponse = null;
super(requestSendTime);
}

public AggregatedRawResponse build() {
var totalBytes = receiptTimeAndResponsePackets.stream().mapToInt(kvp -> kvp.getValue().length).sum();
return new AggregatedRawResponse(
rawResponse,
totalBytes,
Duration.between(requestSendTime, Instant.now()),
getTotalBytes(),
Duration.between(startTime, Instant.now()),
receiptTimeAndResponsePackets,
error
);
}

public AggregatedRawResponse.Builder addResponsePacket(byte[] packet) {
return addResponsePacket(packet, Instant.now());
}

public AggregatedRawResponse.Builder addHttpParsedResponseObject(HttpResponse r) {
public Builder addHttpParsedResponseObject(HttpResponse r) {
this.rawResponse = r;
return this;
}

public AggregatedRawResponse.Builder addErrorCause(Throwable t) {
error = t;
return this;
}

public AggregatedRawResponse.Builder addResponsePacket(byte[] packet, Instant timestamp) {
receiptTimeAndResponsePackets.add(new AbstractMap.SimpleEntry<>(timestamp, packet));
return this;
}
}

Stream<AbstractMap.SimpleEntry<Instant, byte[]>> getReceiptTimeAndResponsePackets() {
return Optional.ofNullable(this.responsePackets).stream().flatMap(Collection::stream);
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("IResponseSummary{");
sb.append("responseSizeInBytes=").append(responseSizeInBytes);
sb.append(", responseDuration=").append(responseDuration);
sb.append(", # of responsePackets=")
.append((this.responsePackets == null ? "-1" : "" + this.responsePackets.size()));
sb.append('}');
return sb.toString();
public static Builder builder(Instant i) {
return new Builder(i);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package org.opensearch.migrations.replay;

import java.time.Duration;
import java.time.Instant;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.opensearch.migrations.replay.datatypes.ByteBufList;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import lombok.Getter;

public class AggregatedRawResult {
@Getter
protected final int sizeInBytes;
@Getter
protected final Duration duration;
protected final ArrayList<AbstractMap.SimpleEntry<Instant, byte[]>> packets;
@Getter
protected final Throwable error;

public static class Builder<B extends Builder<B>> {
protected final ArrayList<AbstractMap.SimpleEntry<Instant, byte[]>> receiptTimeAndResponsePackets;
protected final Instant startTime;
protected Throwable error;

public Builder(Instant startTime) {
receiptTimeAndResponsePackets = new ArrayList<>();
this.startTime = startTime;
}

public AggregatedRawResult build() {
var totalBytes = getTotalBytes();
return new AggregatedRawResult(
totalBytes,
Duration.between(startTime, Instant.now()),
receiptTimeAndResponsePackets,
error
);
}

protected int getTotalBytes() {
return receiptTimeAndResponsePackets.stream().mapToInt(kvp -> kvp.getValue().length).sum();
}

public B addErrorCause(Throwable t) {
error = t;
return (B) this;
}

public B addResponsePacket(byte[] packet) {
return (B) addResponsePacket(packet, Instant.now());
}

public B addResponsePacket(byte[] packet, Instant timestamp) {
receiptTimeAndResponsePackets.add(new AbstractMap.SimpleEntry<>(timestamp, packet));
return (B) this;
}
}

public AggregatedRawResult(int sizeInBytes,
Duration duration,
List<AbstractMap.SimpleEntry<Instant, byte[]>> packets,
Throwable error)
{
this.sizeInBytes = sizeInBytes;
this.duration = duration;
this.packets = packets == null ? null : new ArrayList<>(packets);
this.error = error;
}

public static Builder<?> builder(Instant i) {
return new Builder<>(i);
}

public byte[][] getCopyOfPackets() {
return packets.stream()
.map(Map.Entry::getValue)
.map(x -> Arrays.copyOf(x, x.length))
.toArray(byte[][]::new);
}

public ByteBuf getResponseAsByteBuf() {
return packets == null ? Unpooled.EMPTY_BUFFER :
ByteBufList.asCompositeByteBufRetained(packets.stream()
.map(Map.Entry::getValue).map(Unpooled::wrappedBuffer))
.asReadOnly();
}

@Override
public String toString() {
final StringBuilder sb = new StringBuilder("IResponseSummary{");
sb.append("responseSizeInBytes=").append(sizeInBytes);
sb.append(", responseDuration=").append(duration);
sb.append(", # of responsePackets=")
.append((this.packets == null ? "-1" : "" + this.packets.size()));
sb.append('}');
return sb.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public SourceTargetCaptureTuple(
transformedTargetRequestAndResponseList.getTransformationStatus();
this.responseList = transformedTargetRequestAndResponseList == null ? List.of() :
transformedTargetRequestAndResponseList.responses().stream()
.map(arr -> new Response(arr.responsePackets.stream().map(AbstractMap.SimpleEntry::getValue)
.collect(Collectors.toList()), arr.error, arr.responseDuration))
.map(arr -> new Response(arr.packets.stream().map(AbstractMap.SimpleEntry::getValue)
.collect(Collectors.toList()), arr.error, arr.duration))
.collect(Collectors.toList());
this.topLevelErrorCause = topLevelErrorCause;
}
Expand Down
Loading

0 comments on commit d5edad8

Please sign in to comment.