Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port feature/agent_framework commits to main branch #297

Merged
merged 27 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5afa4bc
[Feature/agent_framework] Registers a single agent with multiple tool…
owaiskazi19 Nov 30, 2023
f9c4622
[Feature/agent_framework] Add Delete Connector Step (#211)
dbwiddis Dec 1, 2023
062834b
[feature/agent_framework] Changing resources created format (#231)
amitgalitz Dec 1, 2023
9cb621d
[Feature/agent_framework] Fetches modelID for RegisterAgent and Tools…
owaiskazi19 Dec 2, 2023
83af2ba
Add Util method to fetch inputs from parameters, content, and previou…
dbwiddis Dec 2, 2023
1d98b07
[Feature/agent_framework] Add Undeploy Model Step (#236)
dbwiddis Dec 2, 2023
ecf4a60
[Feature/agent_framework] Actually make the WorkflowStepFactory a Fac…
dbwiddis Dec 3, 2023
664b41a
[Feature/agent_framework] Add Delete Model Step (#237)
dbwiddis Dec 4, 2023
5ce79e0
[Feature/agent_framework] Registers root agent with an agentId in Too…
owaiskazi19 Dec 4, 2023
a047038
Updating state index after register agent (#250)
amitgalitz Dec 4, 2023
e13c8a9
[Feature/agent_framework] Added Retry functionality for Deploy Model …
owaiskazi19 Dec 5, 2023
d82da9a
Only update state index on register agent (#253)
amitgalitz Dec 5, 2023
ac507af
[Feature/agent_framework] Add Delete Agent Step (#246)
dbwiddis Dec 6, 2023
fe43586
Includ workflow id and current node id in the exception message (#262)
jackiehanyang Dec 8, 2023
1b37073
Change thread queue to 100 and fix headers parsing bug (#265)
amitgalitz Dec 12, 2023
9c23092
Update resources_created with deploy model: (#275)
amitgalitz Dec 12, 2023
16b0a59
[feature/agent_framework] Changing resources created format (#231)
dbwiddis Dec 12, 2023
53070a1
[Feature/agent_framework] Add Get Workflow API to retrieve a stored
joshpalis Dec 13, 2023
5045aae
[Feature/agent_framework] Adds a Search Workflow State API (#284)
joshpalis Dec 13, 2023
13ce2f7
Permit ordering of tools in register agent step (#283)
dbwiddis Dec 13, 2023
2f760f4
Fix tools ordering class casting bug (#289)
dbwiddis Dec 14, 2023
4852878
Combine create api with provision api by adding a provision param (#282)
jackiehanyang Dec 14, 2023
3c45cb5
[Feature/agent_framework] Deprovision API (#271)
dbwiddis Dec 15, 2023
4cbf9dc
[Feature/agent_framework] Adding installed plugins validation (#290)
joshpalis Dec 15, 2023
856631d
Add Delete Workflow API (#294)
dbwiddis Dec 15, 2023
280ebc9
Consume REST params and consistently handle error messages (#295)
dbwiddis Dec 15, 2023
ad8e142
Allow YAML Templates (#296)
dbwiddis Dec 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,28 @@
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestDeleteWorkflowAction;
import org.opensearch.flowframework.rest.RestDeprovisionWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowAction;
import org.opensearch.flowframework.rest.RestGetWorkflowStateAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowAction;
import org.opensearch.flowframework.rest.RestSearchWorkflowStateAction;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
import org.opensearch.flowframework.transport.CreateWorkflowTransportAction;
import org.opensearch.flowframework.transport.DeleteWorkflowAction;
import org.opensearch.flowframework.transport.DeleteWorkflowTransportAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowAction;
import org.opensearch.flowframework.transport.DeprovisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowAction;
import org.opensearch.flowframework.transport.GetWorkflowStateAction;
import org.opensearch.flowframework.transport.GetWorkflowStateTransportAction;
import org.opensearch.flowframework.transport.GetWorkflowTransportAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.ProvisionWorkflowTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateAction;
import org.opensearch.flowframework.transport.SearchWorkflowStateTransportAction;
import org.opensearch.flowframework.transport.SearchWorkflowTransportAction;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.flowframework.workflow.WorkflowProcessSorter;
Expand All @@ -63,6 +75,7 @@
import static org.opensearch.flowframework.common.FlowFrameworkSettings.FLOW_FRAMEWORK_ENABLED;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_GET_TASK_REQUEST_RETRY;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOWS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.MAX_WORKFLOW_STEPS;
import static org.opensearch.flowframework.common.FlowFrameworkSettings.WORKFLOW_REQUEST_TIMEOUT;

/**
Expand Down Expand Up @@ -106,7 +119,13 @@ public Collection<Object> createComponents(
mlClient,
flowFrameworkIndicesHandler
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(
workflowStepFactory,
threadPool,
clusterService,
client,
settings
);

return ImmutableList.of(workflowStepFactory, workflowProcessSorter, encryptorUtils, flowFrameworkIndicesHandler);
}
Expand All @@ -123,19 +142,27 @@ public List<RestHandler> getRestHandlers(
) {
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting, settings, clusterService),
new RestDeleteWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestDeprovisionWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting)
new RestGetWorkflowStateAction(flowFrameworkFeatureEnabledSetting),
new RestGetWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestSearchWorkflowStateAction(flowFrameworkFeatureEnabledSetting)
);
}

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return ImmutableList.of(
new ActionHandler<>(CreateWorkflowAction.INSTANCE, CreateWorkflowTransportAction.class),
new ActionHandler<>(DeleteWorkflowAction.INSTANCE, DeleteWorkflowTransportAction.class),
new ActionHandler<>(ProvisionWorkflowAction.INSTANCE, ProvisionWorkflowTransportAction.class),
new ActionHandler<>(DeprovisionWorkflowAction.INSTANCE, DeprovisionWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowAction.INSTANCE, SearchWorkflowTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class)
new ActionHandler<>(GetWorkflowStateAction.INSTANCE, GetWorkflowStateTransportAction.class),
new ActionHandler<>(GetWorkflowAction.INSTANCE, GetWorkflowTransportAction.class),
new ActionHandler<>(SearchWorkflowStateAction.INSTANCE, SearchWorkflowStateTransportAction.class)
);
}

Expand All @@ -144,6 +171,7 @@ public List<Setting<?>> getSettings() {
List<Setting<?>> settings = ImmutableList.of(
FLOW_FRAMEWORK_ENABLED,
MAX_WORKFLOWS,
MAX_WORKFLOW_STEPS,
WORKFLOW_REQUEST_TIMEOUT,
MAX_GET_TASK_REQUEST_RETRY
);
Expand All @@ -158,7 +186,7 @@ public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
settings,
PROVISION_THREAD_POOL,
OpenSearchExecutors.allocatedProcessors(settings),
10,
100,
FLOW_FRAMEWORK_THREAD_POOL_PREFIX + PROVISION_THREAD_POOL
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,14 @@ private CommonValue() {}
/** The provision workflow thread pool name */
public static final String PROVISION_THREAD_POOL = "opensearch_workflow_provision";

/** Success name field */
public static final String SUCCESS = "success";
/** Index name field */
public static final String INDEX_NAME = "index_name";
/** Type field */
public static final String TYPE = "type";
/** default_mapping_option filed */
public static final String DEFAULT_MAPPING_OPTION = "default_mapping_option";
/** ID Field */
public static final String ID = "id";
/** Pipeline Id field */
Expand All @@ -103,6 +107,8 @@ private CommonValue() {}
public static final String MODEL_VERSION = "model_version";
/** Model Group Id field */
public static final String MODEL_GROUP_ID = "model_group_id";
/** Model Group Id field */
public static final String MODEL_GROUP_STATUS = "model_group_status";
/** Description field */
public static final String DESCRIPTION_FIELD = "description";
/** Connector Id field */
Expand Down Expand Up @@ -158,9 +164,28 @@ private CommonValue() {}
public static final String USER_OUTPUTS_FIELD = "user_outputs";
/** The template field name for template resources created */
public static final String RESOURCES_CREATED_FIELD = "resources_created";
/** The field name for the ResourceCreated's resource ID */
public static final String RESOURCE_ID_FIELD = "resource_id";
/** The field name for the ResourceCreated's resource name */
/** The field name for the step name where a resource is created */
public static final String WORKFLOW_STEP_NAME = "workflow_step_name";

/** The field name for the step ID where a resource is created */
public static final String WORKFLOW_STEP_ID = "workflow_step_id";
/** The field name for the resource type */
public static final String RESOURCE_TYPE = "resource_type";
/** The field name for the resource id */
public static final String RESOURCE_ID = "resource_id";
/** The tools field for an agent */
public static final String TOOLS_FIELD = "tools";
/** The tools order field for an agent */
public static final String TOOLS_ORDER_FIELD = "tools_order";
/** The memory field for an agent */
public static final String MEMORY_FIELD = "memory";
/** The app type field for an agent */
public static final String APP_TYPE_FIELD = "app_type";
/** The agent id of an agent */
public static final String AGENT_ID = "agent_id";
/** To include field for an agent response */
public static final String INCLUDE_OUTPUT_IN_AGENT_RESPONSE = "include_output_in_agent_response";
/** The created time field for an agent */
public static final String CREATED_TIME = "created_time";
/** The last updated time field for an agent */
public static final String LAST_UPDATED_TIME_FIELD = "last_updated_time";
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ private FlowFrameworkSettings() {}

/** The upper limit of max workflows that can be created */
public static final int MAX_WORKFLOWS_LIMIT = 10000;
/** The upper limit of max workflow steps that can be in a single workflow */
public static final int MAX_WORKFLOW_STEPS_LIMIT = 500;

/** This setting sets max workflows that can be created */
public static final Setting<Integer> MAX_WORKFLOWS = Setting.intSetting(
Expand All @@ -29,6 +31,16 @@ private FlowFrameworkSettings() {}
Setting.Property.Dynamic
);

/** This setting sets max workflows that can be created */
public static final Setting<Integer> MAX_WORKFLOW_STEPS = Setting.intSetting(
"plugins.flow_framework.max_workflow_steps",
50,
1,
MAX_WORKFLOW_STEPS_LIMIT,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

/** This setting sets the timeout for the request */
public static final Setting<TimeValue> WORKFLOW_REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.flow_framework.request_timeout",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;

import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* Enum encapsulating the different step names and the resources they create
*/
public enum WorkflowResources {

/** official workflow step name for creating a connector and associated created resource */
CREATE_CONNECTOR("create_connector", "connector_id", "delete_connector"),
/** official workflow step name for registering a remote model and associated created resource */
REGISTER_REMOTE_MODEL("register_remote_model", "model_id", "delete_model"),
/** official workflow step name for registering a local model and associated created resource */
REGISTER_LOCAL_MODEL("register_local_model", "model_id", "delete_model"),
/** official workflow step name for registering a model group and associated created resource */
REGISTER_MODEL_GROUP("register_model_group", "model_group_id", null), // TODO
/** official workflow step name for deploying a model and associated created resource */
DEPLOY_MODEL("deploy_model", "model_id", "undeploy_model"),
/** official workflow step name for creating an ingest-pipeline and associated created resource */
CREATE_INGEST_PIPELINE("create_ingest_pipeline", "pipeline_id", null), // TODO
/** official workflow step name for creating an index and associated created resource */
CREATE_INDEX("create_index", "index_name", null), // TODO
/** official workflow step name for register an agent and the associated created resource */
REGISTER_AGENT("register_agent", "agent_id", "delete_agent");

private final String workflowStep;
private final String resourceCreated;
private final String deprovisionStep;
private static final Logger logger = LogManager.getLogger(WorkflowResources.class);
private static final Set<String> allResources = Stream.of(values())
.map(WorkflowResources::getResourceCreated)
.collect(Collectors.toSet());

WorkflowResources(String workflowStep, String resourceCreated, String deprovisionStep) {
this.workflowStep = workflowStep;
this.resourceCreated = resourceCreated;
this.deprovisionStep = deprovisionStep;
}

/**
* Returns the workflowStep for the given enum Constant
* @return the workflowStep of this data.
*/
public String getWorkflowStep() {
return workflowStep;
}

/**
* Returns the resourceCreated for the given enum Constant
* @return the resourceCreated of this data.
*/
public String getResourceCreated() {
return resourceCreated;
}

/**
* Returns the deprovisionStep for the given enum Constant
* @return the deprovisionStep of this data.
*/
public String getDeprovisionStep() {
return deprovisionStep;
}

/**
* Gets the resources created type based on the workflowStep.
* @param workflowStep workflow step name
* @return the resource that will be created
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getResourceByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (workflowStep.equals(mapping.getWorkflowStep()) || workflowStep.equals(mapping.getDeprovisionStep())) {
return mapping.getResourceCreated();
}
}
}
logger.error("Unable to find resource type for step: " + workflowStep);
throw new FlowFrameworkException("Unable to find resource type for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Gets the deprovision step type based on the workflowStep.
* @param workflowStep workflow step name
* @return the corresponding step to deprovision
* @throws FlowFrameworkException if workflow step doesn't exist in enum
*/
public static String getDeprovisionStepByWorkflowStep(String workflowStep) throws FlowFrameworkException {
if (workflowStep != null && !workflowStep.isEmpty()) {
for (WorkflowResources mapping : values()) {
if (mapping.getWorkflowStep().equals(workflowStep)) {
return mapping.getDeprovisionStep();
}
}
}
logger.error("Unable to find deprovision step for step: " + workflowStep);
throw new FlowFrameworkException("Unable to find deprovision step for step: " + workflowStep, RestStatus.BAD_REQUEST);
}

/**
* Returns all the possible resource created types in enum
* @return a set of all the resource created types
*/
public static Set<String> getAllResourcesCreated() {
return allResources;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.WorkflowResources;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.ResourceCreated;
import org.opensearch.flowframework.model.State;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.WorkflowState;
import org.opensearch.flowframework.util.EncryptorUtils;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;

import java.io.IOException;
import java.net.URL;
Expand Down Expand Up @@ -435,6 +439,7 @@ public void updateFlowFrameworkSystemIndexDoc(
updatedContent.putAll(updatedFields);
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(3);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
Expand Down Expand Up @@ -468,7 +473,8 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
// TODO: Also add ability to change other fields at the same time when adding detailed provision progress
updateRequest.script(script);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
updateRequest.retryOnConflict(3);
// TODO: Implement our own concurrency control to improve on retry mechanism
client.update(updateRequest, ActionListener.runBefore(listener, () -> context.restore()));
} catch (Exception e) {
logger.error("Failed to update {} entry : {}. {}", indexName, documentId, e.getMessage());
Expand All @@ -478,4 +484,43 @@ public void updateFlowFrameworkSystemIndexDocWithScript(
}
}
}

/**
* Creates a new ResourceCreated object and a script to update the state index
* @param workflowId workflowId for the relevant step
* @param nodeId WorkflowData object with relevent step information
* @param workflowStepName the workflowstep name that created the resource
* @param resourceId the id of the newly created resource
* @param listener the ActionListener for this step to handle completing the future after update
* @throws IOException if parsing fails on new resource
*/
public void updateResourceInStateIndex(
String workflowId,
String nodeId,
String workflowStepName,
String resourceId,
ActionListener<UpdateResponse> listener
) throws IOException {
ResourceCreated newResource = new ResourceCreated(
workflowStepName,
nodeId,
WorkflowResources.getResourceByWorkflowStep(workflowStepName),
resourceId
);
XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent());
newResource.toXContent(builder, ToXContentObject.EMPTY_PARAMS);

// The script to append a new object to the resources_created array
Script script = new Script(
ScriptType.INLINE,
"painless",
"ctx._source.resources_created.add(params.newResource)",
Collections.singletonMap("newResource", newResource)
);

updateFlowFrameworkSystemIndexDocWithScript(WORKFLOW_STATE_INDEX, workflowId, script, ActionListener.wrap(updateResponse -> {
logger.info("updated resources created of {}", workflowId);
listener.onResponse(updateResponse);
}, exception -> { listener.onFailure(exception); }));
}
}
Loading
Loading