Skip to content

Commit

Permalink
Merge pull request opensearch-project#1047 from gregschohn/RfsEntrypo…
Browse files Browse the repository at this point in the history
…intLoopOptimization

Run the RFS Container's DocumentMigration application repeatedly as long as it's successful
  • Loading branch information
AndreKurait authored Oct 7, 2024
2 parents 4ad6d93 + 863f034 commit 5709add
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 16 deletions.
44 changes: 42 additions & 2 deletions DocumentsFromSnapshotMigration/docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,45 @@ if [[ $RFS_COMMAND != *"--target-password"* ]]; then
fi
fi

echo "Executing RFS Command"
eval $RFS_COMMAND
# Extract the value passed after --s3-local-dir
S3_LOCAL_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--s3-local-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"')
# Extract the value passed after --lucene-dir
LUCENE_DIR=$(echo "$RFS_COMMAND" | sed -n 's/.*--lucene-dir\s\+\("[^"]\+"\|[^ ]\+\).*/\1/p' | tr -d '"')
if [[ -n "$S3_LOCAL_DIR" ]]; then
echo "Will delete S3 local directory between runs: $S3_LOCAL_DIR"
else
echo "--s3-local-dir argument not found in RFS_COMMAND. Will not delete S3 local directory between runs."
fi

if [[ -n "$LUCENE_DIR" ]]; then
echo "Will delete lucene local directory between runs: $LUCENE_DIR"
else
echo "--lucene-dir argument not found in RFS_COMMAND. This is required."
exit 1
fi

cleanup_directories() {
if [[ -n "$S3_LOCAL_DIR" ]]; then
echo "Cleaning up S3 local directory: $S3_LOCAL_DIR"
rm -rf "$S3_LOCAL_DIR"
echo "Directory $S3_LOCAL_DIR has been cleaned up."
fi

if [[ -n "$LUCENE_DIR" ]]; then
echo "Cleaning up Lucene local directory: $LUCENE_DIR"
rm -rf "$LUCENE_DIR"
echo "Directory $LUCENE_DIR has been cleaned up."
fi
}



[ -z "$RFS_COMMAND" ] && \
{ echo "Warning: RFS_COMMAND is empty! Exiting."; exit 1; } || \
until ! {
echo "Running command $RFS_COMMAND"
eval "$RFS_COMMAND"
}; do
echo "Cleaning up directories before the next run."
cleanup_directories
done
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@

@Slf4j
public class RfsMigrateDocuments {
public static final int PROCESS_TIMED_OUT = 2;
public static final int PROCESS_TIMED_OUT_EXIT_CODE = 2;
public static final int NO_WORK_LEFT_EXIT_CODE = 3;
public static final int TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS = 5;
public static final String LOGGING_MDC_WORKER_ID = "workerId";

Expand Down Expand Up @@ -184,15 +185,12 @@ public static void main(String[] args) throws Exception {
var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null;

var connectionContext = arguments.targetArgs.toConnectionContext();
try (var processManager = new LeaseExpireTrigger(workItemId -> {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId
)) {
try (var processManager = new LeaseExpireTrigger(RfsMigrateDocuments::exitOnLeaseTimeout, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId)
) {
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
DocumentReindexer reindexer = new DocumentReindexer(targetClient,
Expand Down Expand Up @@ -233,12 +231,20 @@ public static void main(String[] args) throws Exception {
unpackerFactory,
arguments.maxShardSizeBytes,
context);
} catch (NoWorkLeftException e) {
log.atWarn().setMessage("No work left to acquire. Exiting with error code to signal that.").log();
System.exit(NO_WORK_LEFT_EXIT_CODE);
} catch (Exception e) {
log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log();
throw e;
}
}

private static void exitOnLeaseTimeout(String workItemId) {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT_EXIT_CODE);
}

private static RootDocumentMigrationContext makeRootContext(Args arguments, String workerId) {
var compositeContextTracker = new CompositeContextTracker(
new ActiveContextTracker(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

Expand All @@ -24,6 +26,9 @@
import org.opensearch.testcontainers.OpensearchContainer;

import eu.rekawek.toxiproxy.model.ToxicDirection;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.testcontainers.containers.Network;
Expand All @@ -47,6 +52,37 @@ enum FailHow {
WITH_DELAYS
}

@AllArgsConstructor
@Getter
private static class RunData {
Path tempDirSnapshot;
Path tempDirLucene;
ToxiProxyWrapper proxyContainer;
}

@Test
@Tag("longTest")
public void testExitsZeroThenThreeForSimpleSetup() throws Exception {
testProcess(3,
d -> {
var firstExitCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
Assertions.assertEquals(0, firstExitCode);
for (int i=0; i<10; ++i) {
var secondExitCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
if (secondExitCode != 0) {
var lastErrorCode =
runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, FailHow.NEVER);
Assertions.assertEquals(secondExitCode, lastErrorCode);
return lastErrorCode;
}
}
Assertions.fail("Ran for many test iterations and didn't get a No Work Available exit code");
return -1; // won't be evaluated
});
}

@ParameterizedTest
@CsvSource(value = {
// This test will go through a proxy that doesn't add any defects and the process will use defaults
Expand All @@ -62,6 +98,12 @@ enum FailHow {
"WITH_DELAYS, 2" })
public void testProcessExitsAsExpected(String failAfterString, int expectedExitCode) throws Exception {
final var failHow = FailHow.valueOf(failAfterString);
testProcess(expectedExitCode,
d -> runProcessAgainstToxicTarget(d.tempDirSnapshot, d.tempDirLucene, d.proxyContainer, failHow));
}

@SneakyThrows
private void testProcess(int expectedExitCode, Function<RunData, Integer> processRunner) {
final var testSnapshotContext = SnapshotTestContext.factory().noOtelTracking();

var sourceImageArgs = makeParamsForBase(SearchClusterContainer.ES_V7_10_2);
Expand Down Expand Up @@ -108,7 +150,7 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC

esSourceContainer.copySnapshotData(tempDirSnapshot.toString());

int actualExitCode = runProcessAgainstToxicTarget(tempDirSnapshot, tempDirLucene, proxyContainer, failHow);
int actualExitCode = processRunner.apply(new RunData(tempDirSnapshot, tempDirLucene, proxyContainer));
log.atInfo().setMessage("Process exited with code: " + actualExitCode).log();

// Check if the exit code is as expected
Expand All @@ -123,12 +165,13 @@ public void testProcessExitsAsExpected(String failAfterString, int expectedExitC
}
}

@SneakyThrows
private static int runProcessAgainstToxicTarget(
Path tempDirSnapshot,
Path tempDirLucene,
ToxiProxyWrapper proxyContainer,
FailHow failHow
) throws IOException, InterruptedException {
FailHow failHow)
{
String targetAddress = proxyContainer.getProxyUriAsString();
var tp = proxyContainer.getProxy();
if (failHow == FailHow.AT_STARTUP) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ def setup_backfill(request):
assert metadata_result.success
backfill_start_result: CommandResult = backfill.start()
assert backfill_start_result.success
backfill_scale_result: CommandResult = backfill.scale(units=10)
# small enough to allow containers to be reused, big enough to test scaling out
backfill_scale_result: CommandResult = backfill.scale(units=2)
assert backfill_scale_result.success


Expand Down

0 comments on commit 5709add

Please sign in to comment.