Skip to content

Commit

Permalink
Merge branch 'main' into CaptureReplayDesignDoc
Browse files Browse the repository at this point in the history
  • Loading branch information
gregschohn authored Sep 19, 2024
2 parents dd4773b + f7d6fb6 commit 6fc74c0
Show file tree
Hide file tree
Showing 150 changed files with 3,093 additions and 1,206 deletions.
68 changes: 36 additions & 32 deletions CreateSnapshot/src/main/java/com/rfs/CreateSnapshot.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.rfs;

import java.util.function.Function;

import org.opensearch.migrations.snapshot.creation.tracing.RootSnapshotContext;
import org.opensearch.migrations.tracing.ActiveContextTracker;
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
Expand All @@ -17,13 +15,14 @@
import com.rfs.common.OpenSearchClient;
import com.rfs.common.S3SnapshotCreator;
import com.rfs.common.SnapshotCreator;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.http.ConnectionContext;
import com.rfs.tracing.IRfsContexts.ICreateSnapshotContext;
import com.rfs.worker.SnapshotRunner;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

@AllArgsConstructor
@Slf4j
public class CreateSnapshot {
public static class Args {
Expand Down Expand Up @@ -100,39 +99,44 @@ public static void main(String[] args) throws Exception {
}

log.info("Running CreateSnapshot with {}", String.join(" ", args));
run(
c -> ((arguments.fileSystemRepoPath != null)
? new FileSystemSnapshotCreator(
var snapshotCreator = new CreateSnapshot(arguments, rootContext.createSnapshotCreateContext());
snapshotCreator.run();
}

private Args arguments;
private ICreateSnapshotContext context;

public void run() {
var client = new OpenSearchClient(arguments.sourceArgs.toConnectionContext());
SnapshotCreator snapshotCreator;
if (arguments.fileSystemRepoPath != null) {
snapshotCreator = new FileSystemSnapshotCreator(
arguments.snapshotName,
c,
client,
arguments.fileSystemRepoPath,
rootContext.createSnapshotCreateContext()
)
: new S3SnapshotCreator(
arguments.snapshotName,
c,
arguments.s3RepoUri,
arguments.s3Region,
arguments.maxSnapshotRateMBPerNode,
arguments.s3RoleArn,
rootContext.createSnapshotCreateContext()
)),
new OpenSearchClient(arguments.sourceArgs.toConnectionContext()),
arguments.noWait
);
}
context
);
} else {
snapshotCreator = new S3SnapshotCreator(
arguments.snapshotName,
client,
arguments.s3RepoUri,
arguments.s3Region,
arguments.maxSnapshotRateMBPerNode,
arguments.s3RoleArn,
context
);
}

public static void run(
Function<OpenSearchClient, SnapshotCreator> snapshotCreatorFactory,
OpenSearchClient openSearchClient,
boolean noWait
) throws Exception {
TryHandlePhaseFailure.executeWithTryCatch(() -> {
if (noWait) {
SnapshotRunner.run(snapshotCreatorFactory.apply(openSearchClient));
try {
if (arguments.noWait) {
SnapshotRunner.run(snapshotCreator);
} else {
SnapshotRunner.runAndWaitForCompletion(snapshotCreatorFactory.apply(openSearchClient));
SnapshotRunner.runAndWaitForCompletion(snapshotCreator);
}
});
} catch (Exception e) {
log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log();
throw e;
}
}
}
18 changes: 18 additions & 0 deletions CreateSnapshot/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
status = WARN

property.ownedPackagesLogLevel=${sys:migrationLogLevel:-INFO}

appender.console.type = Console
appender.console.name = Console
appender.console.target = SYSTEM_OUT
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS}{UTC} %p %c{1.} [%t] %m%n

rootLogger.level = info
rootLogger.appenderRef.console.ref = Console

# Allow customization of owned package logs
logger.rfs.name = com.rfs
logger.rfs.level = ${ownedPackagesLogLevel}
logger.migration.name = org.opensearch.migrations
logger.migration.level = ${ownedPackagesLogLevel}
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
import java.time.Clock;
import java.time.Duration;
import java.util.List;
import java.util.function.BiFunction;
import java.util.function.Function;

import org.opensearch.migrations.Version;
import org.opensearch.migrations.VersionConverter;
import org.opensearch.migrations.cluster.ClusterProviderRegistry;
import org.opensearch.migrations.cluster.ClusterSnapshotReader;
import org.opensearch.migrations.reindexer.tracing.RootDocumentMigrationContext;
import org.opensearch.migrations.tracing.ActiveContextTracker;
import org.opensearch.migrations.tracing.ActiveContextTrackerByActivityType;
Expand All @@ -23,6 +25,7 @@
import com.beust.jcommander.Parameter;
import com.beust.jcommander.ParameterException;
import com.beust.jcommander.ParametersDelegate;
import com.rfs.RfsMigrateDocuments.RunParameters;
import com.rfs.cms.CoordinateWorkHttpClient;
import com.rfs.cms.IWorkCoordinator;
import com.rfs.cms.LeaseExpireTrigger;
Expand All @@ -37,12 +40,13 @@
import com.rfs.common.S3Uri;
import com.rfs.common.SnapshotShardUnpacker;
import com.rfs.common.SourceRepo;
import com.rfs.common.TryHandlePhaseFailure;
import com.rfs.common.http.ConnectionContext;
import com.rfs.models.IndexMetadata;
import com.rfs.models.ShardMetadata;
import com.rfs.worker.DocumentsRunner;
import com.rfs.worker.ShardWorkPreparer;
import lombok.Builder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;

Expand Down Expand Up @@ -167,7 +171,7 @@ public static void validateArgs(Args args) {
}

public static void main(String[] args) throws Exception {
System.err.println("Got args: " + String.join("; ", args));
log.info("Got args: " + String.join("; ", args));
var workerId = ProcessHelpers.getNodeInstanceName();
log.info("Starting RfsMigrateDocuments with workerId =" + workerId);

Expand All @@ -182,63 +186,63 @@ public static void main(String[] args) throws Exception {

validateArgs(arguments);

var rootDocumentContext = makeRootContext(arguments, workerId);
var context = makeRootContext(arguments, workerId);
var luceneDirPath = Paths.get(arguments.luceneDir);
var snapshotLocalDirPath = arguments.snapshotLocalDir != null ? Paths.get(arguments.snapshotLocalDir) : null;

var connectionContext = arguments.targetArgs.toConnectionContext();
try (var processManager = new LeaseExpireTrigger(workItemId -> {
log.error("Terminating RfsMigrateDocuments because the lease has expired for " + workItemId);
System.exit(PROCESS_TIMED_OUT);
}, Clock.systemUTC())) {
ConnectionContext connectionContext = arguments.targetArgs.toConnectionContext();
}, Clock.systemUTC());
var workCoordinator = new OpenSearchWorkCoordinator(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId
);
)) {
MDC.put(LOGGING_MDC_WORKER_ID, workerId); // I don't see a need to clean this up since we're in main
TryHandlePhaseFailure.executeWithTryCatch(() -> {
OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
DocumentReindexer reindexer = new DocumentReindexer(targetClient,
arguments.numDocsPerBulkRequest,
arguments.numBytesPerBulkRequest,
arguments.maxConnections);

SourceRepo sourceRepo;
if (snapshotLocalDirPath == null) {
sourceRepo = S3Repo.create(
Paths.get(arguments.s3LocalDir),
new S3Uri(arguments.s3RepoUri),
arguments.s3Region
);
} else {
sourceRepo = new FileSystemRepo(snapshotLocalDirPath);
}
DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);

var sourceResourceProvider = ClusterProviderRegistry.getSnapshotReader(arguments.sourceVersion, sourceRepo);

SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(
repoAccessor,
luceneDirPath,
sourceResourceProvider.getBufferSizeInBytes()
OpenSearchClient targetClient = new OpenSearchClient(connectionContext);
DocumentReindexer reindexer = new DocumentReindexer(targetClient,
arguments.numDocsPerBulkRequest,
arguments.numBytesPerBulkRequest,
arguments.maxConnections);

SourceRepo sourceRepo;
if (snapshotLocalDirPath == null) {
sourceRepo = S3Repo.create(
Paths.get(arguments.s3LocalDir),
new S3Uri(arguments.s3RepoUri),
arguments.s3Region
);
} else {
sourceRepo = new FileSystemRepo(snapshotLocalDirPath);
}
var repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);

run(
LuceneDocumentsReader.getFactory(sourceResourceProvider),
reindexer,
workCoordinator,
arguments.initialLeaseDuration,
processManager,
sourceResourceProvider.getIndexMetadata(),
arguments.snapshotName,
arguments.indexAllowlist,
sourceResourceProvider.getShardMetadata(),
unpackerFactory,
arguments.maxShardSizeBytes,
rootDocumentContext
);
});
var sourceResourceProvider = ClusterProviderRegistry.getSnapshotReader(arguments.sourceVersion, sourceRepo);

var unpackerFactory = new SnapshotShardUnpacker.Factory(
repoAccessor,
luceneDirPath,
sourceResourceProvider.getBufferSizeInBytes()
);

run(RunParameters.builder()
.leaseExpireTrigger(processManager)
.workCoordinator(workCoordinator)
.reindexer(reindexer)
.snapshotName(arguments.snapshotName)
.snapshotReader(sourceResourceProvider)
.snapshotUnpacker(unpackerFactory)
.documentReader(LuceneDocumentsReader.getFactory(sourceResourceProvider))
.indexAllowlist(arguments.indexAllowlist)
.maxInitialLeaseDuration(arguments.initialLeaseDuration)
.maxShardSizeBytes(arguments.maxShardSizeBytes)
.tracingContext(context)
.build());
} catch (Exception e) {
log.atError().setMessage("Unexpected error running RfsWorker").setCause(e).log();
throw e;
}
}

Expand All @@ -255,41 +259,38 @@ private static RootDocumentMigrationContext makeRootContext(Args arguments, Stri
return new RootDocumentMigrationContext(otelSdk, compositeContextTracker);
}

public static DocumentsRunner.CompletionStatus run(
Function<Path, LuceneDocumentsReader> readerFactory,
DocumentReindexer reindexer,
IWorkCoordinator workCoordinator,
Duration maxInitialLeaseDuration,
LeaseExpireTrigger leaseExpireTrigger,
IndexMetadata.Factory indexMetadataFactory,
String snapshotName,
List<String> indexAllowlist,
ShardMetadata.Factory shardMetadataFactory,
SnapshotShardUnpacker.Factory unpackerFactory,
long maxShardSizeBytes,
RootDocumentMigrationContext rootDocumentContext
) throws IOException, InterruptedException, NoWorkLeftException {
var scopedWorkCoordinator = new ScopedWorkCoordinator(workCoordinator, leaseExpireTrigger);
public static DocumentsRunner.CompletionStatus run(RunParameters params) throws Exception {
var scopedWorkCoordinator = new ScopedWorkCoordinator(params.workCoordinator, params.leaseExpireTrigger);
confirmShardPrepIsComplete(
indexMetadataFactory,
snapshotName,
indexAllowlist,
params.snapshotReader.getIndexMetadata(),
params.snapshotName,
params.indexAllowlist,
scopedWorkCoordinator,
rootDocumentContext
params.tracingContext
);
if (!workCoordinator.workItemsArePending(
rootDocumentContext.getWorkCoordinationContext()::createItemsPendingContext
if (!params.workCoordinator.workItemsArePending(
params.tracingContext.getWorkCoordinationContext()::createItemsPendingContext
)) {
throw new NoWorkLeftException("No work items are pending/all work items have been processed. Returning.");
}
return new DocumentsRunner(scopedWorkCoordinator, maxInitialLeaseDuration, (name, shard) -> {
var shardMetadata = shardMetadataFactory.fromRepo(snapshotName, name, shard);
BiFunction<String, Integer, ShardMetadata> shardFactory = (name, shard) -> {
var shardMetadataFactory = params.snapshotReader.getShardMetadata();
var shardMetadata = shardMetadataFactory.fromRepo(params.snapshotName, name, shard);
log.info("Shard size: " + shardMetadata.getTotalSizeBytes());
if (shardMetadata.getTotalSizeBytes() > maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), maxShardSizeBytes);
if (shardMetadata.getTotalSizeBytes() > params.maxShardSizeBytes) {
throw new DocumentsRunner.ShardTooLargeException(shardMetadata.getTotalSizeBytes(), params.maxShardSizeBytes);
}
return shardMetadata;
}, unpackerFactory, readerFactory, reindexer).migrateNextShard(rootDocumentContext::createReindexContext);
};
var runner = new DocumentsRunner(
scopedWorkCoordinator,
params.maxInitialLeaseDuration,
shardFactory,
params.snapshotUnpacker,
params.documentReader,
params.reindexer);
var migrationStatus = runner.migrateNextShard(params.tracingContext::createReindexContext);
return migrationStatus;
}

private static void confirmShardPrepIsComplete(
Expand Down Expand Up @@ -329,8 +330,32 @@ private static void confirmShardPrepIsComplete(
.log();
Thread.sleep(lockRenegotiationMillis);
lockRenegotiationMillis *= 2;
continue;
}
}
}

@Builder
static class RunParameters {
@NonNull
final LeaseExpireTrigger leaseExpireTrigger;
@NonNull
final IWorkCoordinator workCoordinator;
@NonNull
final String snapshotName;
@NonNull
final ClusterSnapshotReader snapshotReader;
@NonNull
final SnapshotShardUnpacker.Factory snapshotUnpacker;
@NonNull
final Function<Path, LuceneDocumentsReader> documentReader;
@NonNull
final DocumentReindexer reindexer;
@NonNull
final List<String> indexAllowlist;
@NonNull
final Duration maxInitialLeaseDuration;
final long maxShardSizeBytes;
@NonNull
final RootDocumentMigrationContext tracingContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Random;
import java.util.concurrent.CompletableFuture;

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
Expand All @@ -25,6 +26,7 @@
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

@Tag("isolatedTest")
public class EndToEndTest extends SourceTestBase {
@TempDir
private File localDirectory;
Expand Down Expand Up @@ -110,7 +112,7 @@ private void migrateFrom_ES(
SnapshotRunner.runAndWaitForCompletion(snapshotCreator);
sourceCluster.copySnapshotData(localDirectory.toString());
var sourceRepo = new FileSystemRepo(localDirectory.toPath());

// === ACTION: Migrate the documents ===
final var clockJitter = new Random(1);
var result = migrateDocumentsWithOneWorker(
Expand Down
Loading

0 comments on commit 6fc74c0

Please sign in to comment.