Skip to content

Commit

Permalink
Merge remote-tracking branch 'opensearch-main/2.x' into 2.x
Browse files Browse the repository at this point in the history
  • Loading branch information
Himshikha Gupta committed Jun 8, 2024
2 parents e52f5a2 + fe61816 commit 7505b6a
Show file tree
Hide file tree
Showing 11 changed files with 1,071 additions and 442 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote Store] Add support to disable flush based on translog reader count ([#14027](https://github.com/opensearch-project/OpenSearch/pull/14027))
- [Query Insights] Add exporter support for top n queries ([#12982](https://github.com/opensearch-project/OpenSearch/pull/12982))
- [Query Insights] Add X-Opaque-Id to search request metadata for top n queries ([#13374](https://github.com/opensearch-project/OpenSearch/pull/13374))
- [Remote State] Add async remote state deletion task running on an interval, configurable by a setting ([#13995](https://github.com/opensearch-project/OpenSearch/pull/13995))
- Add remote routing table for remote state publication with experimental feature flag ([#13304](https://github.com/opensearch-project/OpenSearch/pull/13304))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.gateway.remote;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.RETAINED_MANIFESTS;
import static org.opensearch.gateway.remote.RemoteClusterStateCleanupManager.SKIP_CLEANUP_STATE_CHANGES;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.indices.IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateCleanupManagerIT extends RemoteStoreBaseIntegTestCase {

private static final String INDEX_NAME = "test-index";

@Before
public void setup() {
asyncUploadMockFsRepo = false;
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
}

private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
Map<String, Long> indexStats = indexData(1, false, INDEX_NAME);
assertEquals(shardCount * (replicaCount + 1), getNumShards(INDEX_NAME).totalNumShards);
ensureGreen(INDEX_NAME);
return indexStats;
}

public void testRemoteCleanupTaskUpdated() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
RemoteClusterStateCleanupManager remoteClusterStateCleanupManager = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateCleanupManager.class
);

assertEquals(CLUSTER_STATE_CLEANUP_INTERVAL_DEFAULT, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval());
assertTrue(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());

// now disable
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), -1))
.get();

assertEquals(-1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMillis());
assertFalse(remoteClusterStateCleanupManager.getStaleFileDeletionTask().isScheduled());

// now set Clean up interval to 1 min
client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "1m"))
.get();
assertEquals(1, remoteClusterStateCleanupManager.getStaleFileDeletionTask().getInterval().getMinutes());
}

public void testRemoteCleanupDeleteStale() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);

// update cluster state 21 times to ensure that clean up has run after this will upload 42 manifest files
// to repository, if manifest files are less than that it means clean up has run
updateClusterStateNTimes(RETAINED_MANIFESTS + SKIP_CLEANUP_STATE_CHANGES + 1);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);
BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
BlobPath baseMetadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());
BlobPath manifestContainerPath = baseMetadataPath.add("manifest");

// set cleanup interval to 100 ms to make the test faster
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "100ms"))
.get();

assertTrue(response.isAcknowledged());

assertBusy(() -> {
int manifestFiles = repository.blobStore().blobContainer(manifestContainerPath).listBlobsByPrefix("manifest").size();
logger.info("number of current manifest file: {}", manifestFiles);
// we can't guarantee that we have same number of manifest as Retained manifest in our repo as there can be other queued task
// other than replica count change which can upload new manifest files, that's why we check that number of manifests is between
// Retained manifests and Retained manifests + 2 * Skip cleanup state changes (each cluster state update uploads 2 manifests)
assertTrue(
"Current number of manifest files: " + manifestFiles,
manifestFiles >= RETAINED_MANIFESTS && manifestFiles < RETAINED_MANIFESTS + 2 * SKIP_CLEANUP_STATE_CHANGES
);
}, 500, TimeUnit.MILLISECONDS);

// disable the clean up to avoid race condition during shutdown
response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING.getKey(), "-1"))
.get();

assertTrue(response.isAcknowledged());
}

private void updateClusterStateNTimes(int n) {
int newReplicaCount = randomIntBetween(0, 3);
for (int i = n; i > 0; i--) {
ClusterUpdateSettingsResponse response = client().admin()
.cluster()
.prepareUpdateSettings()
.setPersistentSettings(Settings.builder().put(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.getKey(), i, TimeUnit.SECONDS))
.get();
assertTrue(response.isAcknowledged());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import org.opensearch.action.admin.cluster.node.stats.NodesStatsRequest;
import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.settings.Settings;
import org.opensearch.discovery.DiscoveryStats;
Expand All @@ -27,7 +26,6 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
Expand All @@ -51,16 +49,6 @@ protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder().put(super.nodeSettings(nodeOrdinal)).put(REMOTE_CLUSTER_STATE_ENABLED_SETTING.getKey(), true).build();
}

private void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}

private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int dataNodeCount, int clusterManagerNodeCount) {
prepareCluster(clusterManagerNodeCount, dataNodeCount, INDEX_NAME, replicaCount, shardCount);
Map<String, Long> indexStats = indexData(1, false, INDEX_NAME);
Expand All @@ -69,49 +57,6 @@ private Map<String, Long> initialTestSetup(int shardCount, int replicaCount, int
return indexStats;
}

public void testFullClusterRestoreStaleDelete() throws Exception {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
int dataNodeCount = shardCount * (replicaCount + 1);
int clusterManagerNodeCount = 1;

initialTestSetup(shardCount, replicaCount, dataNodeCount, clusterManagerNodeCount);
setReplicaCount(0);
setReplicaCount(2);
setReplicaCount(0);
setReplicaCount(1);
setReplicaCount(0);
setReplicaCount(1);
setReplicaCount(0);
setReplicaCount(2);
setReplicaCount(0);

RemoteClusterStateService remoteClusterStateService = internalCluster().getClusterManagerNodeInstance(
RemoteClusterStateService.class
);

RepositoriesService repositoriesService = internalCluster().getClusterManagerNodeInstance(RepositoriesService.class);

BlobStoreRepository repository = (BlobStoreRepository) repositoriesService.repository(REPOSITORY_NAME);
BlobPath baseMetadataPath = repository.basePath()
.add(
Base64.getUrlEncoder()
.withoutPadding()
.encodeToString(getClusterState().getClusterName().value().getBytes(StandardCharsets.UTF_8))
)
.add("cluster-state")
.add(getClusterState().metadata().clusterUUID());

assertEquals(10, repository.blobStore().blobContainer(baseMetadataPath.add("manifest")).listBlobsByPrefix("manifest").size());

Map<String, IndexMetadata> indexMetadataMap = remoteClusterStateService.getLatestClusterState(
cluster().getClusterName(),
getClusterState().metadata().clusterUUID()
).getMetadata().getIndices();
assertEquals(0, indexMetadataMap.values().stream().findFirst().get().getNumberOfReplicas());
assertEquals(shardCount, indexMetadataMap.values().stream().findFirst().get().getNumberOfShards());
}

public void testRemoteStateStats() {
int shardCount = randomIntBetween(1, 2);
int replicaCount = 1;
Expand Down Expand Up @@ -241,12 +186,4 @@ private void validateNodesStatsResponse(NodesStatsResponse nodesStatsResponse) {
assertNotNull(nodesStatsResponse.getNodes().get(0));
assertNotNull(nodesStatsResponse.getNodes().get(0).getDiscoveryStats());
}

private void setReplicaCount(int replicaCount) {
client().admin()
.indices()
.prepareUpdateSettings(INDEX_NAME)
.setSettings(Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, replicaCount))
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,4 +350,14 @@ protected void restore(boolean restoreAllShards, String... indices) {
PlainActionFuture.newFuture()
);
}

protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
for (String index : indices.split(",")) {
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
ensureYellowAndNoInitializingShards(index);
ensureGreen(index);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
import org.opensearch.gateway.GatewayService;
import org.opensearch.gateway.PersistedClusterStateService;
import org.opensearch.gateway.ShardsBatchGatewayAllocator;
import org.opensearch.gateway.remote.RemoteClusterStateCleanupManager;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.http.HttpTransportSettings;
import org.opensearch.index.IndexModule;
Expand Down Expand Up @@ -713,6 +714,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchRequestSlowLog.CLUSTER_SEARCH_REQUEST_SLOWLOG_LEVEL,

// Remote cluster state settings
RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
Expand Down
Loading

0 comments on commit 7505b6a

Please sign in to comment.