Skip to content

Commit

Permalink
cleaning up comments
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Mar 12, 2024
1 parent 88404eb commit d07b542
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 54 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)

## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x)
### Features
- Adding create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558))

### Enhancements
- Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525))
- Added an optional workflow_step param to the get workflow steps API ([#538](https://github.com/opensearch-project/flow-framework/pull/538))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private static Object conditionallySubstitute(Object value, Map<String, Workflow
}

/**
* Generates a string based on an arbitrary String to object map sing Jackson
* Generates a string based on an arbitrary String to object map using Jackson
* @param map content map
* @return instance of the string
* @throws JsonProcessingException JsonProcessingException from Jackson for issues processing map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.ingest.PutPipelineRequest;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -89,61 +88,51 @@ public PlainActionFuture<WorkflowData> execute(
byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8);
BytesReference configurationsBytes = new BytesArray(byteArr);

ActionListener<AcknowledgedResponse> actionListener = new ActionListener<>() {

@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
String resourceName = getResourceByWorkflowStep(getName());
try {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
pipelineId,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
// PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead
// TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here
createIngestPipelineFuture.onResponse(
new WorkflowData(
Map.of(resourceName, pipelineId),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
String errorMessage = "Failed to update new created "
+ currentNodeId
+ " resource "
+ getName()
+ " id "
+ pipelineId;
logger.error(errorMessage, exception);
createIngestPipelineFuture.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
}

@Override
public void onFailure(Exception e) {
String errorMessage = "Failed to create ingest pipeline";
// Create PutPipelineRequest and execute
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON);
clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(acknowledgedResponse -> {
String resourceName = getResourceByWorkflowStep(getName());
try {
flowFrameworkIndicesHandler.updateResourceInStateIndex(
currentNodeInputs.getWorkflowId(),
currentNodeId,
getName(),
pipelineId,
ActionListener.wrap(updateResponse -> {
logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex());
// PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead
// TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here
createIngestPipelineFuture.onResponse(
new WorkflowData(
Map.of(resourceName, pipelineId),
currentNodeInputs.getWorkflowId(),
currentNodeInputs.getNodeId()
)
);
}, exception -> {
String errorMessage = "Failed to update new created "
+ currentNodeId
+ " resource "
+ getName()
+ " id "
+ pipelineId;
logger.error(errorMessage, exception);
createIngestPipelineFuture.onFailure(
new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))
);
})
);

} catch (Exception e) {
String errorMessage = "Failed to parse and update new created resource";
logger.error(errorMessage, e);
createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}

};

// Create PutPipelineRequest and execute
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON);
clusterAdminClient.putPipeline(putPipelineRequest, actionListener);
}, e -> {
String errorMessage = "Failed to create ingest pipeline";
logger.error(errorMessage, e);
createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}));

} catch (FlowFrameworkException e) {
createIngestPipelineFuture.onFailure(e);
Expand Down

0 comments on commit d07b542

Please sign in to comment.