From db4eafaca3749a1d2665699f5a953c3a96e6b96a Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Mon, 16 Oct 2023 23:48:50 +0000 Subject: [PATCH 1/2] Fixing bug, handling case in which a GC entry does not exist prior to attempting to parse from source Signed-off-by: Joshua Palis --- .../transport/ProvisionWorkflowTransportAction.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 0dbec5bf2..3b2ce2492 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -84,6 +84,16 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { context.restore(); + if (!response.isExists()) { + listener.onFailure( + new FlowFrameworkException( + "Failed to retrieve template (" + workflowId + ") from global context.", + RestStatus.NOT_FOUND + ) + ); + return; + } + // Parse template from document source Template template = Template.parseFromDocumentSource(response.getSourceAsString()); From 2af5686d71e23bedc91388378ba328311053dc85 Mon Sep 17 00:00:00 2001 From: Joshua Palis Date: Tue, 17 Oct 2023 17:17:14 +0000 Subject: [PATCH 2/2] Catching topological sorting exceptions when executing the workflow Signed-off-by: Joshua Palis --- .../ProvisionWorkflowTransportAction.java | 47 ++++++++++--------- 1 file changed, 26 insertions(+), 21 deletions(-) diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 3b2ce2492..e03a1b4d8 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -141,33 +141,38 @@ private void executeWorkflowAsync(String workflowId, Workflow workflow) { * @param workflowListener The listener that updates the status of a workflow execution */ private void executeWorkflow(Workflow workflow, ActionListener workflowListener) { - - List processSequence = workflowProcessSorter.sortProcessNodes(workflow); - List> workflowFutureList = new ArrayList<>(); - - for (ProcessNode processNode : processSequence) { - List predecessors = processNode.predecessors(); - - logger.info( - "Queueing process [{}].{}", - processNode.id(), - predecessors.isEmpty() - ? " Can start immediately!" - : String.format( - Locale.getDefault(), - " Must wait for [%s] to complete first.", - predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) - ) - ); - - workflowFutureList.add(processNode.execute()); - } try { + + // Attempt to topologically sort the workflow graph + List processSequence = workflowProcessSorter.sortProcessNodes(workflow); + List> workflowFutureList = new ArrayList<>(); + + for (ProcessNode processNode : processSequence) { + List predecessors = processNode.predecessors(); + + logger.info( + "Queueing process [{}].{}", + processNode.id(), + predecessors.isEmpty() + ? " Can start immediately!" + : String.format( + Locale.getDefault(), + " Must wait for [%s] to complete first.", + predecessors.stream().map(p -> p.id()).collect(Collectors.joining(", ")) + ) + ); + + workflowFutureList.add(processNode.execute()); + } + // Attempt to join each workflow step future, may throw a CompletionException if any step completes exceptionally workflowFutureList.forEach(CompletableFuture::join); // TODO : Create State Index request with provisioning state, start time, end time, etc, pending implementation. String for now workflowListener.onResponse("READY"); + + } catch (IllegalArgumentException e) { + workflowListener.onFailure(new FlowFrameworkException(e.getMessage(), RestStatus.BAD_REQUEST)); } catch (CancellationException | CompletionException ex) { workflowListener.onFailure(new FlowFrameworkException(ex.getMessage(), RestStatus.INTERNAL_SERVER_ERROR)); }