From 8e32ed736372aa90db4c0ce3b85888b7b473a337 Mon Sep 17 00:00:00 2001 From: Shivansh Arora Date: Thu, 20 Jun 2024 00:21:52 +0530 Subject: [PATCH] Add tests for RemoteGlobalMetadataManager (#14394) * Add tests for RemoteGlobalMetadataManager Signed-off-by: Shivansh Arora * Add TestCapturingListener Signed-off-by: Shivansh Arora * Move TestCapturingListener to test/framework Signed-off-by: Shivansh Arora * Added javadoc Signed-off-by: Shivansh Arora --------- Signed-off-by: Shivansh Arora --- .../RemoteGlobalMetadataManagerTests.java | 532 +++++++++++++++++- .../RemoteCoordinationMetadataTests.java | 2 +- .../model/RemoteCustomMetadataTests.java | 2 +- .../model/RemoteGlobalMetadataTests.java | 2 +- ...RemoteHashesOfConsistentSettingsTests.java | 2 +- ...RemotePersistentSettingsMetadataTests.java | 2 +- .../model/RemoteTemplatesMetadataTests.java | 2 +- .../common/util/TestCapturingListener.java | 39 ++ 8 files changed, 573 insertions(+), 10 deletions(-) create mode 100644 test/framework/src/main/java/org/opensearch/common/util/TestCapturingListener.java diff --git a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java index bd01bc1ab0cdb..c543f986b3e86 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/RemoteGlobalMetadataManagerTests.java @@ -9,19 +9,36 @@ package org.opensearch.gateway.remote; import org.opensearch.Version; +import org.opensearch.action.LatchedActionListener; import org.opensearch.cluster.ClusterModule; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.DiffableStringMap; import org.opensearch.cluster.metadata.IndexGraveyard; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.Metadata.XContentContext; +import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.common.blobstore.BlobPath; import org.opensearch.common.network.NetworkModule; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.TestCapturingListener; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.compress.Compressor; import org.opensearch.core.compress.NoneCompressor; import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.gateway.remote.model.RemoteCoordinationMetadata; +import org.opensearch.gateway.remote.model.RemoteCustomMetadata; +import org.opensearch.gateway.remote.model.RemoteGlobalMetadata; +import org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings; +import org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata; +import org.opensearch.gateway.remote.model.RemoteReadResult; +import org.opensearch.gateway.remote.model.RemoteTemplatesMetadata; +import org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata; +import org.opensearch.index.remote.RemoteStoreUtils; import org.opensearch.index.translog.transfer.BlobStoreTransferService; import org.opensearch.indices.IndicesModule; import org.opensearch.repositories.blobstore.BlobStoreRepository; @@ -32,14 +49,48 @@ import org.junit.After; import org.junit.Before; +import java.io.IOException; +import java.io.InputStream; import java.util.EnumSet; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; import java.util.function.Function; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; +import static org.opensearch.cluster.metadata.Metadata.isGlobalStateEquals; +import static org.opensearch.common.blobstore.stream.write.WritePriority.URGENT; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CLUSTER_STATE_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.CUSTOM_DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.FORMAT_PARAMS; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_CURRENT_CODEC_VERSION; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.GLOBAL_METADATA_PATH_TOKEN; +import static org.opensearch.gateway.remote.RemoteClusterStateUtils.PATH_DELIMITER; +import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA; +import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA_FORMAT; +import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadataTests.getCoordinationMetadata; +import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA; +import static org.opensearch.gateway.remote.model.RemoteCustomMetadataTests.getCustomMetadata; +import static org.opensearch.gateway.remote.model.RemoteGlobalMetadata.GLOBAL_METADATA; +import static org.opensearch.gateway.remote.model.RemoteGlobalMetadata.GLOBAL_METADATA_FORMAT; +import static org.opensearch.gateway.remote.model.RemoteGlobalMetadataTests.getGlobalMetadata; +import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS; +import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettings.HASHES_OF_CONSISTENT_SETTINGS_FORMAT; +import static org.opensearch.gateway.remote.model.RemoteHashesOfConsistentSettingsTests.getHashesOfConsistentSettings; +import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA; +import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadataTests.getSettings; +import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA; +import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA_FORMAT; +import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadataTests.getTemplatesMetadata; +import static org.opensearch.gateway.remote.model.RemoteTransientSettingsMetadata.TRANSIENT_SETTING_METADATA; import static org.hamcrest.Matchers.is; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyIterable; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -47,26 +98,36 @@ public class RemoteGlobalMetadataManagerTests extends OpenSearchTestCase { private RemoteGlobalMetadataManager remoteGlobalMetadataManager; private ClusterSettings clusterSettings; private BlobStoreRepository blobStoreRepository; + private BlobStoreTransferService blobStoreTransferService; + private Compressor compressor; + private NamedXContentRegistry xContentRegistry; + private NamedWriteableRegistry namedWriteableRegistry; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + private final long METADATA_VERSION = 7331L; + private final String CLUSTER_NAME = "test-cluster"; + private final String CLUSTER_UUID = "test-cluster-uuid"; @Before public void setup() { clusterSettings = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); blobStoreRepository = mock(BlobStoreRepository.class); - BlobStoreTransferService blobStoreTransferService = mock(BlobStoreTransferService.class); - NamedXContentRegistry xContentRegistry = new NamedXContentRegistry( + blobStoreTransferService = mock(BlobStoreTransferService.class); + compressor = new NoneCompressor(); + xContentRegistry = new NamedXContentRegistry( Stream.of( NetworkModule.getNamedXContents().stream(), IndicesModule.getNamedXContents().stream(), ClusterModule.getNamedXWriteables().stream() ).flatMap(Function.identity()).collect(toList()) ); - Compressor compressor = new NoneCompressor(); + namedWriteableRegistry = writableRegistry(); + BlobPath blobPath = new BlobPath(); when(blobStoreRepository.getCompressor()).thenReturn(compressor); when(blobStoreRepository.getNamedXContentRegistry()).thenReturn(xContentRegistry); + when(blobStoreRepository.basePath()).thenReturn(blobPath); remoteGlobalMetadataManager = new RemoteGlobalMetadataManager( clusterSettings, - "test-cluster", + CLUSTER_NAME, blobStoreRepository, blobStoreTransferService, writableRegistry(), @@ -96,6 +157,469 @@ public void testGlobalMetadataUploadWaitTimeSetting() { assertEquals(globalMetadataUploadTimeout, remoteGlobalMetadataManager.getGlobalMetadataUploadTimeout().seconds()); } + public void testGetReadMetadataAsyncAction_CoordinationMetadata() throws Exception { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + String fileName = randomAlphaOfLength(10); + RemoteCoordinationMetadata coordinationMetadataForDownload = new RemoteCoordinationMetadata( + fileName, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + COORDINATION_METADATA_FORMAT.serialize(coordinationMetadata, fileName, compressor, FORMAT_PARAMS).streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + coordinationMetadataForDownload, + COORDINATION_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(coordinationMetadata, listener.getResult().getObj()); + assertEquals(COORDINATION_METADATA, listener.getResult().getComponent()); + assertEquals(COORDINATION_METADATA, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_CoordinationMetadata() throws Exception { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata( + coordinationMetadata, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) + .run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(COORDINATION_METADATA, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(COORDINATION_METADATA, splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_PersistentSettings() throws Exception { + Settings settingsMetadata = getSettings(); + String fileName = randomAlphaOfLength(10); + RemotePersistentSettingsMetadata persistentSettings = new RemotePersistentSettingsMetadata( + fileName, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + RemotePersistentSettingsMetadata.SETTINGS_METADATA_FORMAT.serialize(settingsMetadata, fileName, compressor, FORMAT_PARAMS) + .streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + persistentSettings, + SETTING_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(settingsMetadata, listener.getResult().getObj()); + assertEquals(SETTING_METADATA, listener.getResult().getComponent()); + assertEquals(SETTING_METADATA, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_PersistentSettings() throws Exception { + Settings settingsMetadata = getSettings(); + RemotePersistentSettingsMetadata persistentSettings = new RemotePersistentSettingsMetadata( + settingsMetadata, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(persistentSettings, new LatchedActionListener<>(listener, latch)).run(); + + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(SETTING_METADATA, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(SETTING_METADATA, splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_TransientSettings() throws Exception { + Settings settingsMetadata = getSettings(); + String fileName = randomAlphaOfLength(10); + RemoteTransientSettingsMetadata transientSettings = new RemoteTransientSettingsMetadata( + fileName, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + RemoteTransientSettingsMetadata.SETTINGS_METADATA_FORMAT.serialize(settingsMetadata, fileName, compressor, FORMAT_PARAMS) + .streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + transientSettings, + TRANSIENT_SETTING_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(settingsMetadata, listener.getResult().getObj()); + assertEquals(TRANSIENT_SETTING_METADATA, listener.getResult().getComponent()); + assertEquals(TRANSIENT_SETTING_METADATA, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_TransientSettings() throws Exception { + Settings settingsMetadata = getSettings(); + RemoteTransientSettingsMetadata transientSettings = new RemoteTransientSettingsMetadata( + settingsMetadata, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(transientSettings, new LatchedActionListener<>(listener, latch)).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(TRANSIENT_SETTING_METADATA, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(TRANSIENT_SETTING_METADATA, splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_HashesOfConsistentSettings() throws Exception { + DiffableStringMap hashesOfConsistentSettings = getHashesOfConsistentSettings(); + String fileName = randomAlphaOfLength(10); + RemoteHashesOfConsistentSettings hashesOfConsistentSettingsForDownload = new RemoteHashesOfConsistentSettings( + fileName, + CLUSTER_UUID, + compressor + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + HASHES_OF_CONSISTENT_SETTINGS_FORMAT.serialize(hashesOfConsistentSettings, fileName, compressor).streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + hashesOfConsistentSettingsForDownload, + HASHES_OF_CONSISTENT_SETTINGS, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(hashesOfConsistentSettings, listener.getResult().getObj()); + assertEquals(HASHES_OF_CONSISTENT_SETTINGS, listener.getResult().getComponent()); + assertEquals(HASHES_OF_CONSISTENT_SETTINGS, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_HashesOfConsistentSettings() throws Exception { + DiffableStringMap hashesOfConsistentSettings = getHashesOfConsistentSettings(); + RemoteHashesOfConsistentSettings hashesOfConsistentSettingsForUpload = new RemoteHashesOfConsistentSettings( + hashesOfConsistentSettings, + METADATA_VERSION, + CLUSTER_UUID, + compressor + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction( + hashesOfConsistentSettingsForUpload, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(HASHES_OF_CONSISTENT_SETTINGS, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(HASHES_OF_CONSISTENT_SETTINGS, splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_TemplatesMetadata() throws Exception { + TemplatesMetadata templatesMetadata = getTemplatesMetadata(); + String fileName = randomAlphaOfLength(10); + RemoteTemplatesMetadata templatesMetadataForDownload = new RemoteTemplatesMetadata( + fileName, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + TEMPLATES_METADATA_FORMAT.serialize(templatesMetadata, fileName, compressor, FORMAT_PARAMS).streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + templatesMetadataForDownload, + TEMPLATES_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(templatesMetadata, listener.getResult().getObj()); + assertEquals(TEMPLATES_METADATA, listener.getResult().getComponent()); + assertEquals(TEMPLATES_METADATA, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_TemplatesMetadata() throws Exception { + TemplatesMetadata templatesMetadata = getTemplatesMetadata(); + RemoteTemplatesMetadata templateMetadataForUpload = new RemoteTemplatesMetadata( + templatesMetadata, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(templateMetadataForUpload, new LatchedActionListener<>(listener, latch)) + .run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(TEMPLATES_METADATA, uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(TEMPLATES_METADATA, splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_CustomMetadata() throws Exception { + Metadata.Custom customMetadata = getCustomMetadata(); + String fileName = randomAlphaOfLength(10); + RemoteCustomMetadata customMetadataForDownload = new RemoteCustomMetadata( + fileName, + IndexGraveyard.TYPE, + CLUSTER_UUID, + compressor, + namedWriteableRegistry + ); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + customMetadataForDownload.customBlobStoreFormat.serialize(customMetadata, fileName, compressor).streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + customMetadataForDownload, + IndexGraveyard.TYPE, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertEquals(customMetadata, listener.getResult().getObj()); + assertEquals(CUSTOM_METADATA, listener.getResult().getComponent()); + assertEquals(IndexGraveyard.TYPE, listener.getResult().getComponentName()); + } + + public void testGetAsyncMetadataWriteAction_CustomMetadata() throws Exception { + Metadata.Custom customMetadata = getCustomMetadata(); + RemoteCustomMetadata customMetadataForUpload = new RemoteCustomMetadata( + customMetadata, + IndexGraveyard.TYPE, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + namedWriteableRegistry + ); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onResponse(null); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(customMetadataForUpload, new LatchedActionListener<>(listener, latch)) + .run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + ClusterMetadataManifest.UploadedMetadata uploadedMetadata = listener.getResult(); + assertEquals(String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, IndexGraveyard.TYPE), uploadedMetadata.getComponent()); + String uploadedFileName = uploadedMetadata.getUploadedFilename(); + String[] pathTokens = uploadedFileName.split(PATH_DELIMITER); + assertEquals(5, pathTokens.length); + assertEquals(RemoteClusterStateUtils.encodeString(CLUSTER_NAME), pathTokens[0]); + assertEquals(CLUSTER_STATE_PATH_TOKEN, pathTokens[1]); + assertEquals(CLUSTER_UUID, pathTokens[2]); + assertEquals(GLOBAL_METADATA_PATH_TOKEN, pathTokens[3]); + String[] splitFileName = pathTokens[4].split(DELIMITER); + assertEquals(4, splitFileName.length); + assertEquals(String.join(CUSTOM_DELIMITER, CUSTOM_METADATA, IndexGraveyard.TYPE), splitFileName[0]); + assertEquals(RemoteStoreUtils.invertLong(METADATA_VERSION), splitFileName[1]); + assertEquals(GLOBAL_METADATA_CURRENT_CODEC_VERSION, Integer.parseInt(splitFileName[3])); + } + + public void testGetReadMetadataAsyncAction_GlobalMetadata() throws Exception { + Metadata metadata = getGlobalMetadata(); + String fileName = randomAlphaOfLength(10); + RemoteGlobalMetadata globalMetadataForDownload = new RemoteGlobalMetadata(fileName, CLUSTER_UUID, compressor, xContentRegistry); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenReturn( + GLOBAL_METADATA_FORMAT.serialize(metadata, fileName, compressor, FORMAT_PARAMS).streamInput() + ); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + globalMetadataForDownload, + GLOBAL_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getFailure()); + assertNotNull(listener.getResult()); + assertTrue(isGlobalStateEquals(metadata, (Metadata) listener.getResult().getObj())); + assertEquals(GLOBAL_METADATA, listener.getResult().getComponent()); + assertEquals(GLOBAL_METADATA, listener.getResult().getComponentName()); + } + + public void testGetReadMetadataAsyncAction_IOException() throws Exception { + String fileName = randomAlphaOfLength(10); + RemoteCoordinationMetadata coordinationMetadataForDownload = new RemoteCoordinationMetadata( + fileName, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + IOException ioException = new IOException("mock test exception"); + when(blobStoreTransferService.downloadBlob(anyIterable(), anyString())).thenThrow(ioException); + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataReadAction( + coordinationMetadataForDownload, + COORDINATION_METADATA, + new LatchedActionListener<>(listener, latch) + ).run(); + latch.await(); + assertNull(listener.getResult()); + assertNotNull(listener.getFailure()); + assertEquals(ioException, listener.getFailure()); + } + + public void testGetAsyncMetadataWriteAction_IOException() throws Exception { + CoordinationMetadata coordinationMetadata = getCoordinationMetadata(); + RemoteCoordinationMetadata remoteCoordinationMetadata = new RemoteCoordinationMetadata( + coordinationMetadata, + METADATA_VERSION, + CLUSTER_UUID, + compressor, + xContentRegistry + ); + IOException ioException = new IOException("mock test exception"); + doAnswer(invocationOnMock -> { + invocationOnMock.getArgument(4, ActionListener.class).onFailure(ioException); + return null; + }).when(blobStoreTransferService) + .uploadBlob(any(InputStream.class), anyIterable(), anyString(), eq(URGENT), any(ActionListener.class)); + + TestCapturingListener listener = new TestCapturingListener<>(); + CountDownLatch latch = new CountDownLatch(1); + remoteGlobalMetadataManager.getAsyncMetadataWriteAction(remoteCoordinationMetadata, new LatchedActionListener<>(listener, latch)) + .run(); + assertNull(listener.getResult()); + assertNotNull(listener.getFailure()); + assertTrue(listener.getFailure() instanceof RemoteStateTransferException); + assertEquals(ioException, listener.getFailure().getCause()); + } + public void testGetUpdatedCustoms() { Map previousCustoms = Map.of( CustomMetadata1.TYPE, diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java index 9484afe6b7d6c..63d6de05a737c 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCoordinationMetadataTests.java @@ -229,7 +229,7 @@ public void testSerDe() throws IOException { } } - private CoordinationMetadata getCoordinationMetadata() { + public static CoordinationMetadata getCoordinationMetadata() { return CoordinationMetadata.builder() .term(TERM) .lastAcceptedConfiguration(new VotingConfiguration(Set.of("node1"))) diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java index 1bce176273270..1e28817be79f2 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteCustomMetadataTests.java @@ -232,7 +232,7 @@ public void testSerDe() throws IOException { } } - private Custom getCustomMetadata() { + public static Custom getCustomMetadata() { return IndexGraveyard.builder().addTombstone(new Index("test-index", "3q2423")).build(); } diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java index 02ddc8ba93071..23de485357547 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteGlobalMetadataTests.java @@ -180,7 +180,7 @@ public void testSerDe() throws IOException { } } - private Metadata getGlobalMetadata() { + public static Metadata getGlobalMetadata() { return Metadata.builder() .templates( TemplatesMetadata.builder() diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java index d883eabf9fbc9..b931f24f98631 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteHashesOfConsistentSettingsTests.java @@ -185,7 +185,7 @@ public void testSerDe() throws IOException { } } - private DiffableStringMap getHashesOfConsistentSettings() { + public static DiffableStringMap getHashesOfConsistentSettings() { Map hashesOfConsistentSettings = new HashMap<>(); hashesOfConsistentSettings.put("secure-setting-key", "secure-setting-value"); return new DiffableStringMap(hashesOfConsistentSettings); diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java index 850c18f03fa49..5e4d5d66ca1b7 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemotePersistentSettingsMetadataTests.java @@ -224,7 +224,7 @@ public void testSerDe() throws IOException { } } - private Settings getSettings() { + public static Settings getSettings() { return Settings.builder().put("random_index_setting_" + randomAlphaOfLength(3), randomAlphaOfLength(5)).build(); } } diff --git a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java index b86044003aa55..d7ecd2ad3f44a 100644 --- a/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java +++ b/server/src/test/java/org/opensearch/gateway/remote/model/RemoteTemplatesMetadataTests.java @@ -227,7 +227,7 @@ public void testSerDe() throws IOException { } } - private TemplatesMetadata getTemplatesMetadata() { + public static TemplatesMetadata getTemplatesMetadata() { return TemplatesMetadata.builder() .put( IndexTemplateMetadata.builder("template" + randomAlphaOfLength(3)) diff --git a/test/framework/src/main/java/org/opensearch/common/util/TestCapturingListener.java b/test/framework/src/main/java/org/opensearch/common/util/TestCapturingListener.java new file mode 100644 index 0000000000000..a3c8cc15de927 --- /dev/null +++ b/test/framework/src/main/java/org/opensearch/common/util/TestCapturingListener.java @@ -0,0 +1,39 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.util; + +import org.opensearch.core.action.ActionListener; + +/** + * A simple implementation of {@link ActionListener} that captures the response and failures used for testing purposes. + * + * @param the result type + */ +public class TestCapturingListener implements ActionListener { + private T result; + private Exception failure; + + @Override + public void onResponse(T result) { + this.result = result; + } + + @Override + public void onFailure(Exception e) { + this.failure = e; + } + + public T getResult() { + return result; + } + + public Exception getFailure() { + return failure; + } +}