Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Search APIs to fetch all the workflows present #152

Merged
merged 1 commit into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ dependencies {
implementation 'org.junit.jupiter:junit-jupiter:5.10.1'
implementation "com.google.guava:guava:32.1.3-jre"
api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}"
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.10'
implementation "org.opensearch:common-utils:${common_utils_version}"

configurations.all {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
import org.opensearch.flowframework.transport.SearchWorkflowTransportAction;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.ml.client.MachineLearningNodeClient;
Expand Down Expand Up @@ -104,15 +107,17 @@ public List<RestHandler> getRestHandlers(
) {
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting)
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting)
);
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class)
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import org.opensearch.action.ActionType;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.rest.action.RestResponseListener;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.util.RestHandlerUtils.getSourceContext;

/**
* Abstract class to handle search request.
*/
public abstract class AbstractSearchWorkflowAction<T extends ToXContentObject> extends BaseRestHandler {

protected final List<String> urlPaths;
protected final String index;
protected final Class<T> clazz;
protected final ActionType<SearchResponse> actionType;
protected final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new AbstractSearchWorkflowAction
* @param urlPaths urlPaths to create routes
* @param index index the search should be done on
* @param clazz model class
* @param actionType from which action abstract class is called
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public AbstractSearchWorkflowAction(
List<String> urlPaths,
String index,
Class<T> clazz,
ActionType<SearchResponse> actionType,
FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting
) {
this.urlPaths = urlPaths;
this.index = index;
this.clazz = clazz;
this.actionType = actionType;
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.parseXContent(request.contentOrSourceParamParser());
searchSourceBuilder.fetchSource(getSourceContext(request, searchSourceBuilder));
searchSourceBuilder.seqNoAndPrimaryTerm(true).version(true);
SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder).indices(index);
return channel -> client.execute(actionType, searchRequest, search(channel));

Check warning on line 85 in src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java#L81-L85

Added lines #L81 - L85 were not covered by tests
}

/**
* Builds the action response for the Search Request
*
* @param channel the REST channel
* @return the action response
*/
protected RestResponseListener<SearchResponse> search(RestChannel channel) {
return new RestResponseListener<SearchResponse>(channel) {

Check warning on line 95 in src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java#L95

Added line #L95 was not covered by tests
@Override
public RestResponse buildResponse(SearchResponse response) throws Exception {
if (response.isTimedOut()) {
return new BytesRestResponse(RestStatus.REQUEST_TIMEOUT, response.toString());

Check warning on line 99 in src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java#L99

Added line #L99 was not covered by tests
}
return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), EMPTY_PARAMS));

Check warning on line 101 in src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java#L101

Added line #L101 was not covered by tests
}
};
}

@Override
public List<Route> routes() {
List<Route> routes = new ArrayList<>();
for (String path : urlPaths) {
routes.add(new Route(RestRequest.Method.POST, path));
routes.add(new Route(RestRequest.Method.GET, path));
}
return routes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public class RestCreateWorkflowAction extends BaseRestHandler {
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Intantiates a new RestCreateWorkflowAction
* Instantiates a new RestCreateWorkflowAction
*
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.rest;

import com.google.common.collect.ImmutableList;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.SearchWorkflowAction;

import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;

/**
* Rest Action to facilitate requests to search workflows
*/
public class RestSearchWorkflowAction extends AbstractSearchWorkflowAction<Template> {

private static final String SEARCH_WORKFLOW_ACTION = "search_workflow_action";
private static final String SEARCH_WORKFLOW_PATH = WORKFLOW_URI + "/_search";

/**
* Instantiates a new RestSearchWorkflowAction
*
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestSearchWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
super(
ImmutableList.of(SEARCH_WORKFLOW_PATH),
GLOBAL_CONTEXT_INDEX,
Template.class,
SearchWorkflowAction.INSTANCE,
flowFrameworkFeatureEnabledSetting
);
}

@Override
public String getName() {
return SEARCH_WORKFLOW_ACTION;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACION_NAME_PREFIX;

/**
* External Action for public facing RestCreateWorkflowActiom
* External Action for public facing RestCreateWorkflowAction
*/
public class CreateWorkflowAction extends ActionType<WorkflowResponse> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.ActionType;
import org.opensearch.action.search.SearchResponse;

import static org.opensearch.flowframework.common.CommonValue.TRANSPORT_ACION_NAME_PREFIX;

/**
* External Action for public facing RestSearchWorkflowAction
*/
public class SearchWorkflowAction extends ActionType<SearchResponse> {

/** The name of this action */
public static final String NAME = TRANSPORT_ACION_NAME_PREFIX + "workflow/search";
/** An instance of this action */
public static final SearchWorkflowAction INSTANCE = new SearchWorkflowAction();

private SearchWorkflowAction() {
super(NAME, SearchResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.transport;

import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.client.Client;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.action.ActionListener;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportService;

/**
* Transport Action to search workflows created
*/
public class SearchWorkflowTransportAction extends HandledTransportAction<SearchRequest, SearchResponse> {
private Client client;

/**
* Intantiates a new CreateWorkflowTransportAction
* @param transportService the TransportService
* @param actionFilters action filters
* @param client The client used to make the request to OS
*/
@Inject
public SearchWorkflowTransportAction(TransportService transportService, ActionFilters actionFilters, Client client) {
super(SearchWorkflowAction.NAME, transportService, actionFilters, SearchRequest::new);
this.client = client;
}

@Override
protected void doExecute(Task task, SearchRequest request, ActionListener<SearchResponse> actionListener) {
// TODO: AccessController should take care of letting the user with right permission to view the workflow
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
client.search(request, ActionListener.runBefore(actionListener, () -> context.restore()));
} catch (Exception e) {
actionListener.onFailure(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.util;

import org.apache.commons.lang3.ArrayUtils;
import org.opensearch.flowframework.common.CommonValue;
import org.opensearch.rest.RestRequest;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.fetch.subphase.FetchSourceContext;

/**
* Utility methods for Rest Handlers
*/
public class RestHandlerUtils {

public static final String[] USER_EXCLUDE = new String[] { CommonValue.USER_FIELD };

Check warning on line 22 in src/main/java/org/opensearch/flowframework/util/RestHandlerUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/util/RestHandlerUtils.java#L22

Added line #L22 was not covered by tests

private RestHandlerUtils() {}

/**
* Creates a source context and include/exclude information to be shared based on the user
*
* @param request the REST request
* @param searchSourceBuilder the search request source builder
* @return modified sources
*/
public static FetchSourceContext getSourceContext(RestRequest request, SearchSourceBuilder searchSourceBuilder) {
// TODO
// 1. Move UI_METADATA to GC Index
// 2. check if the request came from dashboard and exclude UI_METADATA
if (searchSourceBuilder.fetchSource() != null) {
String[] newArray = (String[]) ArrayUtils.addAll(searchSourceBuilder.fetchSource().excludes(), USER_EXCLUDE);
return new FetchSourceContext(true, searchSourceBuilder.fetchSource().includes(), newArray);

Check warning on line 39 in src/main/java/org/opensearch/flowframework/util/RestHandlerUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/util/RestHandlerUtils.java#L38-L39

Added lines #L38 - L39 were not covered by tests
} else {
return null;

Check warning on line 41 in src/main/java/org/opensearch/flowframework/util/RestHandlerUtils.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/util/RestHandlerUtils.java#L41

Added line #L41 was not covered by tests
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public void testPlugin() throws IOException {
3,
ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size()
);
assertEquals(2, ffp.getRestHandlers(null, null, null, null, null, null, null).size());
assertEquals(2, ffp.getActions().size());
assertEquals(3, ffp.getRestHandlers(null, null, null, null, null, null, null).size());
assertEquals(3, ffp.getActions().size());
assertEquals(1, ffp.getExecutorBuilders(settings).size());
assertEquals(1, ffp.getSettings().size());
}
Expand Down
Loading
Loading