Skip to content

Commit

Permalink
adding create ingest pipeline step
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Mar 11, 2024
1 parent 2707210 commit a1839e3
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 163 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ dependencies {
implementation 'com.amazonaws:aws-encryption-sdk-java:2.4.1'
implementation 'org.bouncycastle:bcprov-jdk18on:1.77'
api "org.apache.httpcomponents.core5:httpcore5:5.2.2"
implementation("com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}")
implementation("com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}")

// ZipArchive dependencies used for integration tests
zipArchive group: 'org.opensearch.plugin', name:'opensearch-ml-plugin', version: "${opensearch_build}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public Collection<Object> createComponents(
threadPool,
mlClient,
flowFrameworkIndicesHandler,
flowFrameworkSettings
flowFrameworkSettings,
client
);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(
workflowStepFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,10 @@ private CommonValue() {}
public static final String HOSTNAME_FIELD = "hostname";
/** Http port */
public static final String PORT_FIELD = "port";
/** Pipeline ID, also corresponds to pipeline name */
public static final String PIPELINE_ID = "pipeline_id";
/** Pipeline Configurations */
public static final String CONFIGURATIONS = "configurations";

/*
* Constants associated with resource provisioning / state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,15 @@
*/
package org.opensearch.flowframework.model;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.util.ParseUtils;
import org.opensearch.flowframework.workflow.ProcessNode;
import org.opensearch.flowframework.workflow.WorkflowData;
import org.opensearch.flowframework.workflow.WorkflowStep;
Expand All @@ -28,6 +31,7 @@

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CONFIGURATIONS;
import static org.opensearch.flowframework.common.CommonValue.TOOLS_ORDER_FIELD;
import static org.opensearch.flowframework.util.ParseUtils.buildStringToObjectMap;
import static org.opensearch.flowframework.util.ParseUtils.buildStringToStringMap;
Expand Down Expand Up @@ -60,6 +64,7 @@ public class WorkflowNode implements ToXContentObject {
private final String type; // maps to a WorkflowStep
private final Map<String, String> previousNodeInputs;
private final Map<String, Object> userInputs; // maps to WorkflowData
private static final Logger logger = LogManager.getLogger(WorkflowNode.class);

/**
* Create this node with the id and type, and any user input.
Expand Down Expand Up @@ -151,7 +156,20 @@ public static WorkflowNode parse(XContentParser parser) throws IOException {
userInputs.put(inputFieldName, parser.text());
break;
case START_OBJECT:
userInputs.put(inputFieldName, parseStringToStringMap(parser));
if (CONFIGURATIONS.equals(inputFieldName)) {
Map<String, Object> configurationsMap = parser.map();
try {
String configurationsString = ParseUtils.parseArbitraryStringToObjectMapToString(configurationsMap);
userInputs.put(inputFieldName, configurationsString);
} catch (Exception ex) {
String errorMessage = "Failed to parse configuration map";
logger.error(errorMessage, ex);
throw new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST);
}
break;
} else {
userInputs.put(inputFieldName, parseStringToStringMap(parser));
}
break;
case START_ARRAY:
if (PROCESSORS_FIELD.equals(inputFieldName)) {
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/opensearch/flowframework/util/ParseUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
*/
package org.opensearch.flowframework.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
Expand Down Expand Up @@ -356,4 +359,17 @@ private static Object conditionallySubstitute(Object value, Map<String, Workflow
}
return value;
}

/**
* Generates a string based on an arbitrary String to object map sing Jackson
* @param map content map
* @return instance of the string
* @throws JsonProcessingException JsonProcessingException from Jackson for issues processing map
*/
public static String parseArbitraryStringToObjectMapToString(Map<String, Object> map) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
// Convert the map to a JSON string
String mappedString = mapper.writeValueAsString(map);
return mappedString;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,10 @@ public void onFailure(Exception e) {
credentials = getStringToStringMap(inputs.get(CREDENTIAL_FIELD), CREDENTIAL_FIELD);
actions = getConnectorActionList(inputs.get(ACTIONS_FIELD));
} catch (IllegalArgumentException iae) {
logger.error("IllegalArgumentException in connector configuration", iae);
throw new FlowFrameworkException("IllegalArgumentException in connector configuration", RestStatus.BAD_REQUEST);
} catch (PrivilegedActionException pae) {
logger.error("PrivilegedActionException in connector configuration", pae);
throw new FlowFrameworkException("PrivilegedActionException in connector configuration", RestStatus.UNAUTHORIZED);
}

Expand Down
Loading

0 comments on commit a1839e3

Please sign in to comment.