diff --git a/CHANGELOG.md b/CHANGELOG.md index dc4b74fbf..5b95b4a7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.12...2.x) ### Features - Add HttpHost WorkflowStep ([#530](https://github.com/opensearch-project/flow-framework/pull/530)) +- Adding create ingest pipeline step ([#558](https://github.com/opensearch-project/flow-framework/pull/558)) ### Enhancements - Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525)) diff --git a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java index 3a27448b4..140f0a4af 100644 --- a/src/main/java/org/opensearch/flowframework/util/ParseUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/ParseUtils.java @@ -374,7 +374,7 @@ private static Object conditionallySubstitute(Object value, Map execute( byte[] byteArr = configurations.getBytes(StandardCharsets.UTF_8); BytesReference configurationsBytes = new BytesArray(byteArr); - ActionListener actionListener = new ActionListener<>() { - - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - String resourceName = getResourceByWorkflowStep(getName()); - try { - flowFrameworkIndicesHandler.updateResourceInStateIndex( - currentNodeInputs.getWorkflowId(), - currentNodeId, - getName(), - pipelineId, - ActionListener.wrap(updateResponse -> { - logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); - // PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead - // TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here - createIngestPipelineFuture.onResponse( - new WorkflowData( - Map.of(resourceName, pipelineId), - currentNodeInputs.getWorkflowId(), - currentNodeInputs.getNodeId() - ) - ); - }, exception -> { - String errorMessage = "Failed to update new created " - + currentNodeId - + " resource " - + getName() - + " id " - + pipelineId; - logger.error(errorMessage, exception); - createIngestPipelineFuture.onFailure( - new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)) - ); - }) - ); - - } catch (Exception e) { - String errorMessage = "Failed to parse and update new created resource"; - logger.error(errorMessage, e); - createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); - } - } - - @Override - public void onFailure(Exception e) { - String errorMessage = "Failed to create ingest pipeline"; + // Create PutPipelineRequest and execute + PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON); + clusterAdminClient.putPipeline(putPipelineRequest, ActionListener.wrap(acknowledgedResponse -> { + String resourceName = getResourceByWorkflowStep(getName()); + try { + flowFrameworkIndicesHandler.updateResourceInStateIndex( + currentNodeInputs.getWorkflowId(), + currentNodeId, + getName(), + pipelineId, + ActionListener.wrap(updateResponse -> { + logger.info("successfully updated resources created in state index: {}", updateResponse.getIndex()); + // PutPipelineRequest returns only an AcknowledgeResponse, saving pipelineId instead + // TODO: revisit this concept of pipeline_id to be consistent with what makes most sense to end user here + createIngestPipelineFuture.onResponse( + new WorkflowData( + Map.of(resourceName, pipelineId), + currentNodeInputs.getWorkflowId(), + currentNodeInputs.getNodeId() + ) + ); + }, exception -> { + String errorMessage = "Failed to update new created " + + currentNodeId + + " resource " + + getName() + + " id " + + pipelineId; + logger.error(errorMessage, exception); + createIngestPipelineFuture.onFailure( + new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)) + ); + }) + ); + + } catch (Exception e) { + String errorMessage = "Failed to parse and update new created resource"; logger.error(errorMessage, e); createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); } - - }; - - // Create PutPipelineRequest and execute - PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineId, configurationsBytes, XContentType.JSON); - clusterAdminClient.putPipeline(putPipelineRequest, actionListener); + }, e -> { + String errorMessage = "Failed to create ingest pipeline"; + logger.error(errorMessage, e); + createIngestPipelineFuture.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e))); + })); } catch (FlowFrameworkException e) { createIngestPipelineFuture.onFailure(e);