Skip to content

Commit

Permalink
Implemented Create Index Step
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <[email protected]>
  • Loading branch information
owaiskazi19 committed Mar 13, 2024
1 parent 7198573 commit a182b2d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,24 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesArray;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.util.ParseUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Set;
import java.util.Collections;

import static org.opensearch.flowframework.common.CommonValue.DEFAULT_MAPPING_OPTION;
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.WorkflowResources.INDEX_NAME;
import static org.opensearch.flowframework.common.WorkflowResources.getResourceByWorkflowStep;

/**
Expand All @@ -37,23 +38,19 @@
public class CreateIndexStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(CreateIndexStep.class);
private final ClusterService clusterService;
private final Client client;
private final FlowFrameworkIndicesHandler flowFrameworkIndicesHandler;

/** The name of this step, used as a key in the template and the {@link WorkflowStepFactory} */
public static final String NAME = "create_index";
static Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();

/**
* Instantiate this class
*
* @param clusterService The OpenSearch cluster service
* @param client Client to create an index
* @param flowFrameworkIndicesHandler FlowFrameworkIndicesHandler class to update system indices
*/
public CreateIndexStep(ClusterService clusterService, Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.clusterService = clusterService;
public CreateIndexStep(Client client, FlowFrameworkIndicesHandler flowFrameworkIndicesHandler) {
this.client = client;
this.flowFrameworkIndicesHandler = flowFrameworkIndicesHandler;
}
Expand All @@ -67,89 +64,66 @@ public PlainActionFuture<WorkflowData> execute(
Map<String, String> params
) {
PlainActionFuture<WorkflowData> createIndexFuture = PlainActionFuture.newFuture();
ActionListener<CreateIndexResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(CreateIndexResponse createIndexResponse) {
Set<String> requiredKeys = Set.of(INDEX_NAME, CONFIGURATIONS);

Set<String> optionalKeys = Collections.emptySet();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs,
params
);

String indexName = (String) inputs.get(INDEX_NAME);

String configurations = (String) inputs.get(CONFIGURATIONS);

byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8);
BytesReference configurationsBytes = new BytesArray(byteArr);

CreateIndexRequest createIndexRequest = new CreateIndexRequest(indexName).mapping(configurationsBytes, XContentType.JSON);
client.admin().indices().create(createIndexRequest, ActionListener.wrap(acknowledgedResponse -> {
String resourceName = getResourceByWorkflowStep(getName());
logger.info("Created index: {}", indexName);
try {
String resourceName = getResourceByWorkflowStep(getName());
logger.info("created index: {}", createIndexResponse.index());
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
createIndexResponse.index(),
indexName,
ActionListener.wrap(response -> {
logger.info("successfully updated resource created in state index: {}", response.getIndex());
createIndexFuture.onResponse(
new WorkflowData(
Map.of(resourceName, createIndexResponse.index()),
currentNodeInputs.getWorkflowId(),
currentNodeId
)
new WorkflowData(Map.of(resourceName, indexName), currentNodeInputs.getWorkflowId(), currentNodeId)
);
}, exception -> {
String errorMessage = "Failed to update new created "
+ currentNodeId
+ " resource "
+ getName()
+ " id "
+ createIndexResponse.index();
+ indexName;
logger.error(errorMessage, exception);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)));
})
);
} catch (Exception e) {
} catch (IOException ex) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
logger.error(errorMessage, ex);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(ex)));
}
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to create an index";
}, e -> {
String errorMessage = "Failed to create index";
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
};

String index = null;
String defaultMappingOption = null;
Settings settings = null;

// TODO: Recreating the list to get this compiling
// Need to refactor the below iteration to pull directly from the maps
List<WorkflowData> data = new ArrayList<>();
data.add(currentNodeInputs);
data.addAll(outputs.values());

try {
for (WorkflowData workflowData : data) {
Map<String, Object> content = workflowData.getContent();
index = (String) content.get(getResourceByWorkflowStep(getName()));
defaultMappingOption = (String) content.get(DEFAULT_MAPPING_OPTION);
if (index != null && defaultMappingOption != null && settings != null) {
break;
}
}
} catch (Exception e) {
String errorMessage = "Failed to find the correct resource for the workflow step " + NAME;
logger.error(errorMessage, e);
createIndexFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}

// TODO:
// 1. Create settings based on the index settings received from content

try {
CreateIndexRequest request = new CreateIndexRequest(index).mapping(
FlowFrameworkIndicesHandler.getIndexMappings("mappings/" + defaultMappingOption + ".json"),
JsonXContent.jsonXContent.mediaType()
);
client.admin().indices().create(request, actionListener);
}));
} catch (Exception e) {
logger.error("Failed to find the right mapping for the index", e);
createIndexFuture.onFailure(e);
}

return createIndexFuture;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@
import static org.opensearch.flowframework.common.CommonValue.TYPE;
import static org.opensearch.flowframework.common.CommonValue.URL;
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;
import static org.opensearch.flowframework.common.WorkflowResources.AGENT_ID;
import static org.opensearch.flowframework.common.WorkflowResources.CONNECTOR_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.WorkflowResources.MODEL_ID;
import static org.opensearch.flowframework.common.WorkflowResources.*;

/**
* Generates instances implementing {@link WorkflowStep}.
Expand All @@ -82,6 +79,7 @@ public WorkflowStepFactory(
Client client
) {
stepMap.put(NoOpStep.NAME, NoOpStep::new);
stepMap.put(CreateIndexStep.NAME, () -> new CreateIndexStep(client, flowFrameworkIndicesHandler));
stepMap.put(
RegisterLocalCustomModelStep.NAME,
() -> new RegisterLocalCustomModelStep(threadPool, mlClient, flowFrameworkIndicesHandler, flowFrameworkSettings)
Expand Down Expand Up @@ -120,6 +118,9 @@ public enum WorkflowSteps {
/** Noop Step */
NOOP("noop", Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), null),

/** Create Index Step */
CREATE_INDEX(CreateIndexStep.NAME, List.of(INDEX_NAME, CONFIGURATIONS), List.of(INDEX_NAME), Collections.emptyList(), null),

/** Create Connector Step */
CREATE_CONNECTOR(
CreateConnectorStep.NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
Expand All @@ -59,7 +57,6 @@ public class CreateIndexStepTests extends OpenSearchTestCase {
private CreateIndexStep createIndexStep;
private ThreadContext threadContext;
private Metadata metadata;
private Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();

@Mock
private ClusterService clusterService;
Expand Down Expand Up @@ -95,8 +92,7 @@ public void setUp() throws Exception {
when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test cluster")).build());
when(metadata.indices()).thenReturn(Map.of(GLOBAL_CONTEXT_INDEX, indexMetadata));

createIndexStep = new CreateIndexStep(clusterService, client, flowFrameworkIndicesHandler);
CreateIndexStep.indexMappingUpdated = indexMappingUpdated;
createIndexStep = new CreateIndexStep(client, flowFrameworkIndicesHandler);
}

public void testCreateIndexStep() throws ExecutionException, InterruptedException, IOException {
Expand Down

0 comments on commit a182b2d

Please sign in to comment.