From d99a9d08aa2d6aeab0e5a89f067006e615b0f701 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Mon, 30 Sep 2024 11:37:28 +0530 Subject: [PATCH] Bugfix in RemoteFsTimestampAwareTranslog.trimUnreferencedReaders (#16078) (#16126) (cherry picked from commit 1bddf2f7255cf5facf17568a631cb605d78058a1) Signed-off-by: Sachin Kale Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] --- ...rePinnedTimestampsGarbageCollectionIT.java | 75 +++++++++++++ .../snapshots/DeleteSnapshotV2IT.java | 102 ++++++++++++++++++ .../RemoteFsTimestampAwareTranslog.java | 2 +- .../index/translog/RemoteFsTranslog.java | 8 ++ 4 files changed, 186 insertions(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java index 0a2668c60d3bd..08ece7df457cc 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStorePinnedTimestampsGarbageCollectionIT.java @@ -8,6 +8,7 @@ package org.opensearch.remotestore; +import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.Settings; @@ -32,6 +33,7 @@ import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA; import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class RemoteStorePinnedTimestampsGarbageCollectionIT extends RemoteStoreBaseIntegTestCase { @@ -288,6 +290,79 @@ public void testLiveIndexWithPinnedTimestamps() throws Exception { }); } + public void testLiveIndexWithPinnedTimestampsMultiplePrimaryTerms() throws Exception { + prepareCluster(1, 2, Settings.EMPTY); + Settings indexSettings = Settings.builder() + .put(remoteStoreIndexSettings(1, 1)) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 3) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + primaryNodeName(INDEX_NAME) + ); + + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + + int numDocs = randomIntBetween(5, 10); + for (int i = 0; i < numDocs; i++) { + keepPinnedTimestampSchedulerUpdated(); + indexSingleDoc(INDEX_NAME, true); + if (i == 2) { + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.timeValueMinutes(1)); + remoteStorePinnedTimestampService.pinTimestamp(System.currentTimeMillis(), "xyz", noOpActionListener); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + } + } + + ingestDocs(); + + internalCluster().restartNode(primaryNodeName(INDEX_NAME)); + ensureGreen(INDEX_NAME); + + ingestDocs(); + + String translogPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_PATH_PREFIX.get(getNodeSettings()); + String shardDataPath = getShardLevelBlobPath( + client(), + INDEX_NAME, + BlobPath.cleanPath(), + "0", + TRANSLOG, + DATA, + translogPathFixedPrefix + ).buildAsString(); + Path translogDataPath = Path.of(translogRepoPath + "/" + shardDataPath + "/1"); + + assertBusy(() -> { + List dataFiles = Files.list(translogDataPath).collect(Collectors.toList()); + assertFalse(dataFiles.isEmpty()); + }); + } + + private void ingestDocs() { + int numDocs = randomIntBetween(15, 20); + for (int i = 0; i < numDocs; i++) { + indexSingleDoc(INDEX_NAME, false); + } + + assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get()); + flushAndRefresh(INDEX_NAME); + + int numDocsPostFailover = randomIntBetween(15, 20); + for (int i = 0; i < numDocsPostFailover; i++) { + indexSingleDoc(INDEX_NAME, false); + } + + flushAndRefresh(INDEX_NAME); + assertNoFailures(client().admin().indices().prepareRefresh(INDEX_NAME).setIndicesOptions(IndicesOptions.lenientExpandOpen()).get()); + } + public void testIndexDeletionNoPinnedTimestamps() throws Exception { prepareCluster(1, 1, Settings.EMPTY); Settings indexSettings = Settings.builder() diff --git a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java index c4e3a478c8540..7b2ad2bccd2b1 100644 --- a/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java +++ b/server/src/internalClusterTest/java/org/opensearch/snapshots/DeleteSnapshotV2IT.java @@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.opensearch.index.IndexSettings.INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; @@ -312,6 +313,107 @@ public void testRemoteStoreCleanupForDeletedIndexForSnapshotV2MultipleSnapshots( // translogPostDeletionOfSnapshot1.size()), 60, TimeUnit.SECONDS); } + public void testRemoteStoreCleanupMultiplePrimaryOnSnapshotDeletion() throws Exception { + disableRepoConsistencyCheck("Remote store repository is being used in the test"); + final Path remoteStoreRepoPath = randomRepoPath(); + Settings settings = remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath); + settings = Settings.builder() + .put(settings) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PINNED_TIMESTAMP_ENABLED.getKey(), true) + .put(RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING.getKey(), RemoteStoreEnums.PathType.FIXED.toString()) + .build(); + String clusterManagerName = internalCluster().startClusterManagerOnlyNode(settings); + internalCluster().startDataOnlyNodes(3, settings); + final Client clusterManagerClient = internalCluster().clusterManagerClient(); + ensureStableCluster(4); + + RemoteStorePinnedTimestampService remoteStorePinnedTimestampService = internalCluster().getInstance( + RemoteStorePinnedTimestampService.class, + clusterManagerName + ); + remoteStorePinnedTimestampService.rescheduleAsyncUpdatePinnedTimestampTask(TimeValue.timeValueSeconds(1)); + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + + final String snapshotRepoName = "snapshot-repo-name"; + final Path snapshotRepoPath = randomRepoPath(); + createRepository(snapshotRepoName, "mock", snapshotRepoSettingsForShallowV2(snapshotRepoPath)); + + final String remoteStoreEnabledIndexName = "remote-index-1"; + final Settings remoteStoreEnabledIndexSettings = Settings.builder() + .put(getRemoteStoreBackedIndexSettings()) + .put(INDEX_REMOTE_TRANSLOG_KEEP_EXTRA_GEN_SETTING.getKey(), 2) + .build(); + createIndex(remoteStoreEnabledIndexName, remoteStoreEnabledIndexSettings); + ensureGreen(remoteStoreEnabledIndexName); + + // Create 2 snapshots for primary term 1 + keepPinnedTimestampSchedulerUpdated(); + indexRandomDocs(remoteStoreEnabledIndexName, 5); + createSnapshot(snapshotRepoName, "snap1"); + keepPinnedTimestampSchedulerUpdated(); + indexRandomDocs(remoteStoreEnabledIndexName, 5); + createSnapshot(snapshotRepoName, "snap2"); + + // Restart current primary to change the primary term + internalCluster().restartNode(primaryNodeName(remoteStoreEnabledIndexName)); + ensureGreen(remoteStoreEnabledIndexName); + + // Create 2 snapshots for primary term 2 + keepPinnedTimestampSchedulerUpdated(); + indexRandomDocs(remoteStoreEnabledIndexName, 5); + createSnapshot(snapshotRepoName, "snap3"); + keepPinnedTimestampSchedulerUpdated(); + indexRandomDocs(remoteStoreEnabledIndexName, 5); + createSnapshot(snapshotRepoName, "snap4"); + + String indexUUID = client().admin() + .indices() + .prepareGetSettings(remoteStoreEnabledIndexName) + .get() + .getSetting(remoteStoreEnabledIndexName, IndexMetadata.SETTING_INDEX_UUID); + + Path indexPath = Path.of(String.valueOf(remoteStoreRepoPath), indexUUID); + Path shardPath = Path.of(String.valueOf(indexPath), "0"); + Path translogPath = Path.of(String.valueOf(shardPath), "translog", "data", "1"); + + // Deleting snap1 will still keep files in primary term 1 due to snap2 + deleteSnapshot(clusterManagerClient, snapshotRepoName, "snap1"); + assertTrue(RemoteStoreBaseIntegTestCase.getFileCount(translogPath) > 0); + + // Deleting snap2 will not remove primary term 1 as we need to trigger trimUnreferencedReaders once + deleteSnapshot(clusterManagerClient, snapshotRepoName, "snap2"); + assertTrue(RemoteStoreBaseIntegTestCase.getFileCount(translogPath) > 0); + + // Index a doc to trigger trimUnreferencedReaders + RemoteStoreSettings.setPinnedTimestampsLookbackInterval(TimeValue.ZERO); + keepPinnedTimestampSchedulerUpdated(); + indexRandomDocs(remoteStoreEnabledIndexName, 5); + + assertBusy(() -> assertFalse(Files.exists(translogPath)), 30, TimeUnit.SECONDS); + } + + private void createSnapshot(String repoName, String snapshotName) { + CreateSnapshotResponse createSnapshotResponse = client().admin() + .cluster() + .prepareCreateSnapshot(repoName, snapshotName) + .setWaitForCompletion(true) + .get(); + SnapshotInfo snapshotInfo = createSnapshotResponse.getSnapshotInfo(); + + assertThat(snapshotInfo.state(), equalTo(SnapshotState.SUCCESS)); + assertThat(snapshotInfo.successfulShards(), greaterThan(0)); + assertThat(snapshotInfo.successfulShards(), equalTo(snapshotInfo.totalShards())); + assertThat(snapshotInfo.snapshotId().getName(), equalTo(snapshotName)); + } + + private void deleteSnapshot(Client clusterManagerClient, String repoName, String snapshotName) { + AcknowledgedResponse deleteSnapshotResponse = clusterManagerClient.admin() + .cluster() + .prepareDeleteSnapshot(repoName, snapshotName) + .get(); + assertAcked(deleteSnapshotResponse); + } + private Settings snapshotV2Settings(Path remoteStoreRepoPath) { Settings settings = Settings.builder() .put(remoteStoreClusterSettings(REMOTE_REPO_NAME, remoteStoreRepoPath)) diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java index e61a9606175ee..ede422ea3c4f7 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTimestampAwareTranslog.java @@ -121,7 +121,7 @@ public void trimUnreferencedReaders() throws IOException { protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal) throws IOException { if (trimLocal) { // clean up local translog files and updates readers - super.trimUnreferencedReaders(); + super.trimUnreferencedReaders(true); } // Update file tracker to reflect local translog state diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index f22236eb0ca1c..82fc149cf086b 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -549,9 +549,17 @@ protected Releasable drainSync() { @Override public void trimUnreferencedReaders() throws IOException { + trimUnreferencedReaders(false); + } + + protected void trimUnreferencedReaders(boolean onlyTrimLocal) throws IOException { // clean up local translog files and updates readers super.trimUnreferencedReaders(); + if (onlyTrimLocal) { + return; + } + // This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote // store. if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) {