diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 0dbec5bf2..e03a1b4d8 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -84,6 +84,16 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { context.restore(); + if (!response.isExists()) { + listener.onFailure( + new FlowFrameworkException( + "Failed to retrieve template (" + workflowId + ") from global context.", + RestStatus.NOT_FOUND + ) + ); + return; + } + // Parse template from document source Template template = Template.parseFromDocumentSource(response.getSourceAsString()); @@ -131,33 +141,38 @@ private void executeWorkflowAsync(String workflowId, Workflow workflow) { * @param workflowListener The listener that updates the status of a workflow execution */ private void executeWorkflow(Workflow workflow, ActionListener workflowListener) { - - List processSequence = workflowProcessSorter.sortProcessNodes(workflow); - List> workflowFutureList = new ArrayList<>(); - - for (ProcessNode processNode : processSequence) { - List predecessors = processNode.predecessors(); - - logger.info( - "Queueing process [{}].{}", - processNode.id(), - predecessors.isEmpty() - ? " Can start immediately!" - : String.format( - Locale.getDefault(), - " Must wait for [%s] to complete first.", - predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) - ) - ); - - workflowFutureList.add(processNode.execute()); - } try { + + // Attempt to topologically sort the workflow graph + List processSequence = workflowProcessSorter.sortProcessNodes(workflow); + List> workflowFutureList = new ArrayList<>(); + + for (ProcessNode processNode : processSequence) { + List predecessors = processNode.predecessors(); + + logger.info( + "Queueing process [{}].{}", + processNode.id(), + predecessors.isEmpty() + ? " Can start immediately!" + : String.format( + Locale.getDefault(), + " Must wait for [%s] to complete first.", + predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) + ) + ); + + workflowFutureList.add(processNode.execute()); + } + // Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally workflowFutureList.forEach(CompletableFuture::join); // TODO : Create State Index request with provisioning state, start time, end time, etc, pending implementation. String for now workflowListener.onResponse("READY"); + + } catch (IllegalArgumentException e) { + workflowListener.onFailure(new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST)); } catch (CancellationException | CompletionException ex) { workflowListener.onFailure(new FlowFrameworkException(ex.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); }