diff --git a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java index c1303adc0..7c50cee66 100644 --- a/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java +++ b/src/main/java/org/opensearch/flowframework/common/WorkflowResources.java @@ -18,6 +18,7 @@ import org.opensearch.flowframework.workflow.CreateSearchPipelineStep; import org.opensearch.flowframework.workflow.DeleteAgentStep; import org.opensearch.flowframework.workflow.DeleteConnectorStep; +import org.opensearch.flowframework.workflow.DeleteIndexStep; import org.opensearch.flowframework.workflow.DeleteModelStep; import org.opensearch.flowframework.workflow.DeployModelStep; import org.opensearch.flowframework.workflow.NoOpStep; @@ -57,7 +58,7 @@ public enum WorkflowResources { /** Workflow steps for creating an ingest-pipeline and associated created resource */ CREATE_SEARCH_PIPELINE(CreateSearchPipelineStep.NAME, WorkflowResources.PIPELINE_ID, null), // TODO delete step /** Workflow steps for creating an index and associated created resource */ - CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, null), // TODO delete step + CREATE_INDEX(CreateIndexStep.NAME, WorkflowResources.INDEX_NAME, DeleteIndexStep.NAME), /** Workflow steps for registering/deleting an agent and the associated created resource */ REGISTER_AGENT(RegisterAgentStep.NAME, WorkflowResources.AGENT_ID, DeleteAgentStep.NAME); diff --git a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java index b53f5b84d..509d4b417 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java +++ b/src/main/java/org/opensearch/flowframework/workflow/CreateIndexStep.java @@ -24,9 +24,9 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.Map; import java.util.Set; -import java.util.Collections; import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; @@ -86,7 +86,7 @@ public PlainActionFuture execute( byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8); BytesReference configurationsBytes = new BytesArray(byteArr); - CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).mapping(configurationsBytes, XContentType.JSON); + CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).source(configurationsBytes, XContentType.JSON); client.admin().indices().create(createIndexRequest, ActionListener.wrap(acknowledgedResponse -> { String resourceName = getResourceByWorkflowStep(getName()); logger.info("Created index: {}", indexName); @@ -118,7 +118,7 @@ public PlainActionFuture execute( createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(ex))); } }, e -> { - String errorMessage = "Failed to create index"; + String errorMessage = "Failed to create the index " + indexName; logger.error(errorMessage, e); createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); })); diff --git a/src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java b/src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java new file mode 100644 index 000000000..c2ed86696 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/workflow/DeleteIndexStep.java @@ -0,0 +1,105 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.ExceptionsHelper; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.Client; +import org.opensearch.core.action.ActionListener; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.util.ParseUtils; + +import java.util.Collections; +import java.util.Map; +import java.util.Set; + +import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; + +/** + * Step to delete an index + */ +public class DeleteIndexStep implements WorkflowStep { + private static final Logger logger = LogManager.getLogger(DeleteConnectorStep.class); + + private final Client client; + + /** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */ + public static final String NAME = "delete_index"; + + /** + * Instantiate this class + * + * @param client Client to create an index + */ + public DeleteIndexStep(Client client) { + this.client = client; + } + + @Override + public PlainActionFuture execute( + String currentNodeId, + WorkflowData currentNodeInputs, + Map outputs, + Map previousNodeInputs, + Map params + ) { + + PlainActionFuture deleteIndexFuture = PlainActionFuture.newFuture(); + + Set requiredKeys = Set.of(INDEX_NAME); + Set optionalKeys = Collections.emptySet(); + + try { + Map inputs = ParseUtils.getInputsFromPreviousSteps( + requiredKeys, + optionalKeys, + currentNodeInputs, + outputs, + previousNodeInputs, + params + ); + String indexName = (String) inputs.get(INDEX_NAME); + + DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indexName); + client.admin().indices().delete(deleteIndexRequest, new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse response) { + logger.info("Deleted index: {}", indexName); + deleteIndexFuture.onResponse( + new WorkflowData( + Map.ofEntries(Map.entry(INDEX_NAME, indexName)), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + } + + @Override + public void onFailure(Exception e) { + String errorMessage = "Failed to delete the index " + indexName; + logger.error(errorMessage, e); + deleteIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + } + }); + } catch (FlowFrameworkException e) { + deleteIndexFuture.onFailure(e); + } + + return deleteIndexFuture; + } + + @Override + public String getName() { + return NAME; + } +} diff --git a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java index 4bd23d25d..92f212fdc 100644 --- a/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java +++ b/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java @@ -52,7 +52,11 @@ import static org.opensearch.flowframework.common.CommonValue.TYPE; import static org.opensearch.flowframework.common.CommonValue.URL; import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD; -import static org.opensearch.flowframework.common.WorkflowResources.*; +import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID; +import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID; +import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; +import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID; +import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID; /** * Generates instances implementing {@link WorkflowStep}. @@ -80,6 +84,7 @@ public WorkflowStepFactory( ) { stepMap.put(NoOpStep.NAME, NoOpStep::new); stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler)); + stepMap.put(DeleteIndexStep.NAME, () -> new DeleteIndexStep(client)); stepMap.put( RegisterLocalCustomModelStep.NAME, () -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings) @@ -185,6 +190,9 @@ public enum WorkflowSteps { null ), + /** Delete Index Step */ + DELETE_INDEX(DeleteIndexStep.NAME, List.of(INDEX_NAME), List.of(INDEX_NAME), Collections.emptyList(), null), + /** Deploy Model Step */ DEPLOY_MODEL(DeployModelStep.NAME, List.of(MODEL_ID), List.of(MODEL_ID), List.of(OPENSEARCH_ML), TimeValue.timeValueSeconds(15)), diff --git a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java index 3266820e2..1beb2b66f 100644 --- a/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java +++ b/src/test/java/org/opensearch/flowframework/model/WorkflowValidatorTests.java @@ -46,7 +46,7 @@ public void testParseWorkflowValidator() throws IOException { WorkflowValidator validator = new WorkflowValidator(workflowStepValidators); - assertEquals(16, validator.getWorkflowStepValidators().size()); + assertEquals(18, validator.getWorkflowStepValidators().size()); assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector")); assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size()); diff --git a/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java index 2f2eda803..56a26429a 100644 --- a/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java +++ b/src/test/java/org/opensearch/flowframework/workflow/CreateIndexStepTests.java @@ -15,11 +15,8 @@ import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.client.IndicesAdminClient; -import org.opensearch.cluster.ClusterName; -import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; -import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.core.action.ActionListener; @@ -38,6 +35,7 @@ import org.mockito.MockitoAnnotations; import static org.opensearch.action.DocWriteResponse.Result.UPDATED; +import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS; import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_STATE_INDEX; import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; @@ -58,8 +56,6 @@ public class CreateIndexStepTests extends OpenSearchTestCase { private ThreadContext threadContext; private Metadata metadata; - @Mock - private ClusterService clusterService; @Mock private IndicesAdminClient indicesAdminClient; @Mock @@ -73,12 +69,14 @@ public void setUp() throws Exception { super.setUp(); this.flowFrameworkIndicesHandler = mock(FlowFrameworkIndicesHandler.class); MockitoAnnotations.openMocks(this); + String configurations = + "{\"settings\":{\"index\":{\"number_of_shards\":2,\"number_of_replicas\":1}},\"mappings\":{\"_doc\":{\"properties\":{\"age\":{\"type\":\"integer\"}}}},\"aliases\":{\"sample-alias1\":{}}}"; + inputData = new WorkflowData( - Map.ofEntries(Map.entry(INDEX_NAME, "demo"), Map.entry("default_mapping_option", "knn")), + Map.ofEntries(Map.entry(INDEX_NAME, "demo"), Map.entry(CONFIGURATIONS, configurations)), "test-id", "test-node-id" ); - clusterService = mock(ClusterService.class); client = mock(Client.class); adminClient = mock(AdminClient.class); metadata = mock(Metadata.class); @@ -89,7 +87,6 @@ public void setUp() throws Exception { when(threadPool.getThreadContext()).thenReturn(threadContext); when(client.admin()).thenReturn(adminClient); when(adminClient.indices()).thenReturn(indicesAdminClient); - when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test cluster")).build()); when(metadata.indices()).thenReturn(Map.of(GLOBAL_CONTEXT_INDEX, indexMetadata)); createIndexStep = new CreateIndexStep(client, flowFrameworkIndicesHandler); @@ -141,6 +138,6 @@ public void testCreateIndexStepFailure() throws ExecutionException, InterruptedE assertTrue(future.isDone()); ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); assertTrue(ex.getCause() instanceof Exception); - assertEquals("Failed to create an index", ex.getCause().getMessage()); + assertEquals("Failed to create the index demo", ex.getCause().getMessage()); } } diff --git a/src/test/java/org/opensearch/flowframework/workflow/DeleteIndexStepTests.java b/src/test/java/org/opensearch/flowframework/workflow/DeleteIndexStepTests.java new file mode 100644 index 000000000..f9de33dd5 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/workflow/DeleteIndexStepTests.java @@ -0,0 +1,118 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.workflow; + +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.client.AdminClient; +import org.opensearch.client.Client; +import org.opensearch.client.IndicesAdminClient; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.action.ActionListener; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; +import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DeleteIndexStepTests extends OpenSearchTestCase { + + private WorkflowData inputData = WorkflowData.EMPTY; + private Client client; + private AdminClient adminClient; + private DeleteIndexStep deleteIndexStep; + private ThreadContext threadContext; + private Metadata metadata; + + @Mock + private IndicesAdminClient indicesAdminClient; + @Mock + private ThreadPool threadPool; + @Mock + IndexMetadata indexMetadata; + + @Override + public void setUp() throws Exception { + super.setUp(); + MockitoAnnotations.openMocks(this); + inputData = new WorkflowData(Map.ofEntries(Map.entry(INDEX_NAME, "demo")), "test-id", "test-node-id"); + client = mock(Client.class); + adminClient = mock(AdminClient.class); + metadata = mock(Metadata.class); + Settings settings = Settings.builder().build(); + threadContext = new ThreadContext(settings); + + when(client.threadPool()).thenReturn(threadPool); + when(threadPool.getThreadContext()).thenReturn(threadContext); + when(client.admin()).thenReturn(adminClient); + when(adminClient.indices()).thenReturn(indicesAdminClient); + when(metadata.indices()).thenReturn(Map.of(GLOBAL_CONTEXT_INDEX, indexMetadata)); + + deleteIndexStep = new DeleteIndexStep(client); + } + + public void testDeleteIndex() throws IOException, ExecutionException, InterruptedException { + + @SuppressWarnings({ "unchecked" }) + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + PlainActionFuture future = deleteIndexStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + assertFalse(future.isDone()); + verify(indicesAdminClient, times(1)).delete(any(DeleteIndexRequest.class), actionListenerCaptor.capture()); + actionListenerCaptor.getValue().onResponse(new AcknowledgedResponse(true)); + + assertTrue(future.isDone()); + + Map outputData = Map.of(INDEX_NAME, "demo"); + assertEquals(outputData, future.get().getContent()); + } + + public void testDeleteIndexStepFailure() throws ExecutionException, InterruptedException { + @SuppressWarnings({ "unchecked" }) + ArgumentCaptor> actionListenerCaptor = ArgumentCaptor.forClass(ActionListener.class); + PlainActionFuture future = deleteIndexStep.execute( + inputData.getNodeId(), + inputData, + Collections.emptyMap(), + Collections.emptyMap(), + Collections.emptyMap() + ); + assertFalse(future.isDone()); + verify(indicesAdminClient, times(1)).delete(any(DeleteIndexRequest.class), actionListenerCaptor.capture()); + actionListenerCaptor.getValue().onFailure(new Exception("Failed to delete the index")); + + assertTrue(future.isDone()); + ExecutionException ex = assertThrows(ExecutionException.class, () -> future.get().getContent()); + assertTrue(ex.getCause() instanceof Exception); + assertEquals("Failed to delete the index demo", ex.getCause().getMessage()); + } +}