Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote State] fix lock release before deletion is completed #10611

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1072,7 +1072,8 @@ public void onFailure(Exception e) {
* @param clusterUUID uuid of cluster state to refer to in remote
* @param manifestsToRetain no of latest manifest files to keep in remote
*/
private void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
// package private for testing
void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int manifestsToRetain) {
if (deleteStaleMetadataRunning.compareAndSet(false, true) == false) {
logger.info("Delete stale cluster metadata task is already in progress.");
return;
Expand Down Expand Up @@ -1109,8 +1110,9 @@ public void onFailure(Exception e) {
}
}
);
} finally {
} catch (Exception e) {
deleteStaleMetadataRunning.set(false);
throw e;
}
}

Expand Down Expand Up @@ -1190,7 +1192,7 @@ private void deleteStalePaths(String clusterName, String clusterUUID, List<Strin
public void deleteStaleClusterUUIDs(ClusterState clusterState, ClusterMetadataManifest committedManifest) {
threadpool.executor(ThreadPool.Names.REMOTE_PURGE).execute(() -> {
String clusterName = clusterState.getClusterName().value();
logger.info("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
logger.debug("Deleting stale cluster UUIDs data from remote [{}]", clusterName);
Set<String> allClustersUUIDsInRemote;
try {
allClustersUUIDsInRemote = new HashSet<>(getAllClusterUUIDs(clusterState.getClusterName().value()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

import org.mockito.ArgumentCaptor;
Expand All @@ -73,6 +76,7 @@
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_CURRENT_CODEC_VERSION;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.RETAINED_MANIFESTS;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_CLUSTER_STATE_REPOSITORY_NAME_ATTRIBUTE_KEY;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_SETTINGS_ATTRIBUTE_KEY_PREFIX;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_REPOSITORY_TYPE_ATTRIBUTE_KEY_FORMAT;
Expand Down Expand Up @@ -1004,6 +1008,36 @@ public void testFileNames() {
assertThat(splittedName[3], is("P"));
}

public void testSingleConcurrentExecutionOfStaleManifestCleanup() throws Exception {
BlobContainer blobContainer = mock(BlobContainer.class);
BlobPath blobPath = new BlobPath().add("random-path");
when((blobStoreRepository.basePath())).thenReturn(blobPath);
when(blobStore.blobContainer(any())).thenReturn(blobContainer);

CountDownLatch latch = new CountDownLatch(1);
AtomicInteger callCount = new AtomicInteger(0);
doAnswer(invocation -> {
callCount.incrementAndGet();
if (latch.await(5000, TimeUnit.SECONDS) == false) {
throw new Exception("Timed out waiting for delete task queuing to complete");
}
return null;
}).when(blobContainer)
.listBlobsByPrefixInSortedOrder(
any(String.class),
any(int.class),
any(BlobContainer.BlobNameSortOrder.class),
any(ActionListener.class)
);

remoteClusterStateService.start();
remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);
remoteClusterStateService.deleteStaleClusterMetadata("cluster-name", "cluster-uuid", RETAINED_MANIFESTS);

latch.countDown();
assertBusy(() -> assertEquals(1, callCount.get()));
}

private void mockObjectsForGettingPreviousClusterUUID(Map<String, String> clusterUUIDsPointers) throws IOException {
final BlobPath blobPath = mock(BlobPath.class);
when((blobStoreRepository.basePath())).thenReturn(blobPath);
Expand Down
Loading