Skip to content

Commit

Permalink
Add an HttpHost Workflow Step
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Feb 21, 2024
1 parent 24bf51a commit f826920
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 61 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x)
### Features
- Add HttpHost WorkflowStep ([#530](https://github.com/opensearch-project/flow-framework/pull/530))

### Enhancements
### Bug Fixes
### Infrastructure
Expand Down
2 changes: 1 addition & 1 deletion DEVELOPER_GUIDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ snapshots/
### Adding Workflow Steps

To add functionality to workflows, add new Workflow Steps to the [`org.opensearch.flowframework.workflow`](https://github.com/opensearch-project/flow-framework/tree/main/src/main/java/org/opensearch/flowframework/workflow) package.
1. Implement the [Workflow](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java) interface. See existing steps for examples for input, output, and API execution.
1. Implement the [WorkflowStep](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStep.java) interface. See existing steps for examples for input, output, and API execution.
2. Choose a unique name for the step which is not used by other steps. This will align with the `step_type` field in the templates and should be descriptive of what the step does.
3. Add a constructor and call it from the [WorkflowStepFactory](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java).
4. Add an entry to the [WorkflowStepFactory](https://github.com/opensearch-project/flow-framework/blob/main/src/main/java/org/opensearch/flowframework/workflow/WorkflowStepFactory.java) enum specifying required inputs, outputs, required plugins, and optionally a different timeout than the default.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ private CommonValue() {}
public static final String CREATED_TIME = "created_time";
/** The last updated time field for an agent */
public static final String LAST_UPDATED_TIME_FIELD = "last_updated_time";
/** HttpHost */
public static final String HTTP_HOST_FIELD = "http_host";
/** Http scheme */
public static final String SCHEME_FIELD = "scheme";
/** Http hostname */
public static final String HOSTNAME_FIELD = "hostname";
/** Http port */
public static final String PORT_FIELD = "port";

/*
* Constants associated with resource provisioning / state
Expand Down
103 changes: 103 additions & 0 deletions src/main/java/org/opensearch/flowframework/workflow/HttpHostStep.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.workflow;

import org.apache.hc.core5.http.HttpHost;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.util.ParseUtils;

import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

import static org.opensearch.flowframework.common.CommonValue.HOSTNAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.HTTP_HOST_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PORT_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SCHEME_FIELD;

/**
* Step to register parameters for an HTTP Connection to a Host
*/
public class HttpHostStep implements WorkflowStep {

private static final Logger logger = LogManager.getLogger(HttpHostStep.class);
PlainActionFuture<WorkflowData> hostFuture = PlainActionFuture.newFuture();
static final String NAME = "http_host";

@Override
public PlainActionFuture<WorkflowData> execute(
String currentNodeId,
WorkflowData currentNodeInputs,
Map<String, WorkflowData> outputs,
Map<String, String> previousNodeInputs
) {
Set<String> requiredKeys = Set.of(SCHEME_FIELD, HOSTNAME_FIELD, PORT_FIELD);
Set<String> optionalKeys = Collections.emptySet();

try {
Map<String, Object> inputs = ParseUtils.getInputsFromPreviousSteps(
requiredKeys,
optionalKeys,
currentNodeInputs,
outputs,
previousNodeInputs
);

String scheme = validScheme(inputs.get(SCHEME_FIELD));
String hostname = inputs.get(HOSTNAME_FIELD).toString();
int port = validPort(inputs.get(PORT_FIELD));

HttpHost httpHost = new HttpHost(scheme, hostname, port);

hostFuture.onResponse(
new WorkflowData(
Map.ofEntries(Map.entry(HTTP_HOST_FIELD, httpHost)),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);

logger.info("Http Host registered successfully {}", httpHost);

} catch (FlowFrameworkException e) {
hostFuture.onFailure(e);
}
return hostFuture;
}

private String validScheme(Object o) {
String scheme = o.toString().toLowerCase(Locale.ROOT);
if ("http".equals(scheme) || "https".equals(scheme)) {
return scheme;
}
throw new FlowFrameworkException("http_host scheme must be http or https", RestStatus.BAD_REQUEST);
}

private int validPort(Object o) {
try {
int port = Integer.parseInt(o.toString());
if ((port & 0xffff0000) != 0) {
throw new FlowFrameworkException("http_host port number must be between 0 and 65535", RestStatus.BAD_REQUEST);
}
return port;
} catch (NumberFormatException e) {
throw new FlowFrameworkException("http_host port must be a number between 0 and 65535", RestStatus.BAD_REQUEST);
}
}

@Override
public String getName() {
return NAME;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,19 @@
import static org.opensearch.flowframework.common.CommonValue.EMBEDDING_DIMENSION;
import static org.opensearch.flowframework.common.CommonValue.FRAMEWORK_TYPE;
import static org.opensearch.flowframework.common.CommonValue.FUNCTION_NAME;
import static org.opensearch.flowframework.common.CommonValue.HOSTNAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.HTTP_HOST_FIELD;
import static org.opensearch.flowframework.common.CommonValue.MODEL_CONTENT_HASH_VALUE;
import static org.opensearch.flowframework.common.CommonValue.MODEL_FORMAT;
import static org.opensearch.flowframework.common.CommonValue.MODEL_GROUP_STATUS;
import static org.opensearch.flowframework.common.CommonValue.MODEL_TYPE;
import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.OPENSEARCH_ML;
import static org.opensearch.flowframework.common.CommonValue.PARAMETERS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PORT_FIELD;
import static org.opensearch.flowframework.common.CommonValue.PROTOCOL_FIELD;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.SCHEME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.SUCCESS;
import static org.opensearch.flowframework.common.CommonValue.TOOLS_FIELD;
import static org.opensearch.flowframework.common.CommonValue.TYPE;
Expand Down Expand Up @@ -100,6 +104,7 @@ public WorkflowStepFactory(
stepMap.put(ToolStep.NAME, ToolStep::new);
stepMap.put(RegisterAgentStep.NAME, () -> new RegisterAgentStep(mlClient, flowFrameworkIndicesHandler));
stepMap.put(DeleteAgentStep.NAME, () -> new DeleteAgentStep(mlClient));
stepMap.put(HttpHostStep.NAME, HttpHostStep::new);
}

/**
Expand Down Expand Up @@ -194,7 +199,16 @@ public enum WorkflowSteps {
DELETE_AGENT(DeleteAgentStep.NAME, List.of(AGENT_ID), List.of(AGENT_ID), List.of(OPENSEARCH_ML), null),

/** Create Tool Step */
CREATE_TOOL(ToolStep.NAME, List.of(TYPE), List.of(TOOLS_FIELD), List.of(OPENSEARCH_ML), null);
CREATE_TOOL(ToolStep.NAME, List.of(TYPE), List.of(TOOLS_FIELD), List.of(OPENSEARCH_ML), null),

/** Http Host Step */
HTTP_HOST(
HttpHostStep.NAME,
List.of(SCHEME_FIELD, HOSTNAME_FIELD, PORT_FIELD),
List.of(HTTP_HOST_FIELD),
Collections.emptyList(),
null
);

private final String workflowStepName;
private final List<String> inputs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@
import org.opensearch.flowframework.common.FlowFrameworkSettings;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.workflow.WorkflowStepFactory;
import org.opensearch.flowframework.workflow.WorkflowStepFactory.WorkflowSteps;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -37,67 +39,12 @@ public void setUp() throws Exception {
}

public void testParseWorkflowValidator() throws IOException {
Map<String, WorkflowStepValidator> workflowStepValidators = new HashMap<>();
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.CREATE_CONNECTOR.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.CREATE_CONNECTOR.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.DELETE_MODEL.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.DELETE_MODEL.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.DEPLOY_MODEL.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.DEPLOY_MODEL.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_CUSTOM_MODEL.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_CUSTOM_MODEL.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_PRETRAINED_MODEL.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_PRETRAINED_MODEL.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_SPARSE_ENCODING_MODEL.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.REGISTER_LOCAL_SPARSE_ENCODING_MODEL.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.REGISTER_REMOTE_MODEL.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.REGISTER_REMOTE_MODEL.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.REGISTER_MODEL_GROUP.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.REGISTER_MODEL_GROUP.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.REGISTER_AGENT.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.REGISTER_AGENT.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.CREATE_TOOL.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.CREATE_TOOL.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.UNDEPLOY_MODEL.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.UNDEPLOY_MODEL.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.DELETE_CONNECTOR.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.DELETE_CONNECTOR.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.DELETE_AGENT.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.DELETE_AGENT.getWorkflowStepValidator()
);
workflowStepValidators.put(
WorkflowStepFactory.WorkflowSteps.NOOP.getWorkflowStepName(),
WorkflowStepFactory.WorkflowSteps.NOOP.getWorkflowStepValidator()
);
Map<String, WorkflowStepValidator> workflowStepValidators = Arrays.stream(WorkflowSteps.values())
.collect(Collectors.toMap(WorkflowSteps::getWorkflowStepName, WorkflowSteps::getWorkflowStepValidator));

WorkflowValidator validator = new WorkflowValidator(workflowStepValidators);

assertEquals(14, validator.getWorkflowStepValidators().size());
assertEquals(15, validator.getWorkflowStepValidators().size());

assertTrue(validator.getWorkflowStepValidators().keySet().contains("create_connector"));
assertEquals(7, validator.getWorkflowStepValidators().get("create_connector").getInputs().size());
Expand Down Expand Up @@ -155,6 +102,9 @@ public void testParseWorkflowValidator() throws IOException {
assertEquals(0, validator.getWorkflowStepValidators().get("noop").getInputs().size());
assertEquals(0, validator.getWorkflowStepValidators().get("noop").getOutputs().size());

assertTrue(validator.getWorkflowStepValidators().keySet().contains("http_host"));
assertEquals(3, validator.getWorkflowStepValidators().get("http_host").getInputs().size());
assertEquals(1, validator.getWorkflowStepValidators().get("http_host").getOutputs().size());
}

public void testWorkflowStepFactoryHasValidators() throws IOException {
Expand Down
Loading

0 comments on commit f826920

Please sign in to comment.