diff --git a/build.gradle b/build.gradle index e09f533bd..8b2845a3b 100644 --- a/build.gradle +++ b/build.gradle @@ -136,6 +136,7 @@ dependencies { implementation 'org.junit.jupiter:junit-jupiter:5.10.1' implementation "com.google.guava:guava:32.1.3-jre" api group: 'org.opensearch', name:'opensearch-ml-client', version: "${opensearch_build}" + implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.10' implementation "org.opensearch:common-utils:${common_utils_version}" configurations.all { diff --git a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java index 15e61f558..c52bd2fef 100644 --- a/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java +++ b/src/main/java/org/opensearch/flowframework/FlowFrameworkPlugin.java @@ -29,10 +29,13 @@ import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; import org.opensearch.flowframework.rest.RestCreateWorkflowAction; import org.opensearch.flowframework.rest.RestProvisionWorkflowAction; +import org.opensearch.flowframework.rest.RestSearchWorkflowAction; import org.opensearch.flowframework.transport.CreateWorkflowAction; import org.opensearch.flowframework.transport.CreateWorkflowTransportAction; import org.opensearch.flowframework.transport.ProvisionWorkflowAction; import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction; +import org.opensearch.flowframework.transport.SearchWorkflowAction; +import org.opensearch.flowframework.transport.SearchWorkflowTransportAction; import org.opensearch.flowframework.workflow.WorkflowProcessSorter; import org.opensearch.flowframework.workflow.WorkflowStepFactory; import org.opensearch.ml.client.MachineLearningNodeClient; @@ -104,7 +107,8 @@ public List getRestHandlers( ) { return ImmutableList.of( new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting), - new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting) + new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting), + new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting) ); } @@ -112,7 +116,8 @@ public List getRestHandlers( public List> getActions() { return ImmutableList.of( new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class), - new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class) + new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class), + new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class) ); } diff --git a/src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java new file mode 100644 index 000000000..0aed4348c --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/rest/AbstractSearchWorkflowAction.java @@ -0,0 +1,115 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.rest; + +import org.opensearch.action.ActionType; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; +import org.opensearch.search.builder.SearchSourceBuilder; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; +import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED; +import static org.opensearch.flowframework.util.RestHandlerUtils.getSourceContext; + +/** + * Abstract class to handle search request. + */ +public abstract class AbstractSearchWorkflowAction extends BaseRestHandler { + + protected final List urlPaths; + protected final String index; + protected final Class clazz; + protected final ActionType actionType; + protected final FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; + + /** + * Instantiates a new AbstractSearchWorkflowAction + * @param urlPaths urlPaths to create routes + * @param index index the search should be done on + * @param clazz model class + * @param actionType from which action abstract class is called + * @param flowFrameworkFeatureEnabledSetting Whether this API is enabled + */ + public AbstractSearchWorkflowAction( + List urlPaths, + String index, + Class clazz, + ActionType actionType, + FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting + ) { + this.urlPaths = urlPaths; + this.index = index; + this.clazz = clazz; + this.actionType = actionType; + this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) { + FlowFrameworkException ffe = new FlowFrameworkException( + "This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.", + RestStatus.FORBIDDEN + ); + return channel -> channel.sendResponse( + new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS)) + ); + } + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.parseXContent(request.contentOrSourceParamParser()); + searchSourceBuilder.fetchSource(getSourceContext(request, searchSourceBuilder)); + searchSourceBuilder.seqNoAndPrimaryTerm(true).version(true); + SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder).indices(index); + return channel -> client.execute(actionType, searchRequest, search(channel)); + } + + /** + * Builds the action response for the Search Request + * + * @param channel the REST channel + * @return the action response + */ + protected RestResponseListener search(RestChannel channel) { + return new RestResponseListener(channel) { + @Override + public RestResponse buildResponse(SearchResponse response) throws Exception { + if (response.isTimedOut()) { + return new BytesRestResponse(RestStatus.REQUEST_TIMEOUT, response.toString()); + } + return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), EMPTY_PARAMS)); + } + }; + } + + @Override + public List routes() { + List routes = new ArrayList<>(); + for (String path : urlPaths) { + routes.add(new Route(RestRequest.Method.POST, path)); + routes.add(new Route(RestRequest.Method.GET, path)); + } + return routes; + } +} diff --git a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java index 645e8ad2b..bd872b4ff 100644 --- a/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java +++ b/src/main/java/org/opensearch/flowframework/rest/RestCreateWorkflowAction.java @@ -45,7 +45,7 @@ public class RestCreateWorkflowAction extends BaseRestHandler { private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting; /** - * Intantiates a new RestCreateWorkflowAction + * Instantiates a new RestCreateWorkflowAction * * @param flowFrameworkFeatureEnabledSetting Whether this API is enabled */ diff --git a/src/main/java/org/opensearch/flowframework/rest/RestSearchWorkflowAction.java b/src/main/java/org/opensearch/flowframework/rest/RestSearchWorkflowAction.java new file mode 100644 index 000000000..dc0739643 --- /dev/null +++ b/src/main/java/org/opensearch/flowframework/rest/RestSearchWorkflowAction.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.rest; + +import com.google.common.collect.ImmutableList; +import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting; +import org.opensearch.flowframework.model.Template; +import org.opensearch.flowframework.transport.SearchWorkflowAction; + +import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI; + +/** + * Rest Action to facilitate requests to search workflows + */ +public class RestSearchWorkflowAction extends AbstractSearchWorkflowAction