diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index 1bc6d75c5..76c543883 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -522,11 +522,10 @@ public void updateFlowFrameworkSystemIndexDoc( } else { try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId); - Map updatedContent = new HashMap<>(); - updatedContent.putAll(updatedFields); + Map updatedContent = new HashMap<>(updatedFields); updateRequest.doc(updatedContent); updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - updateRequest.retryOnConflict(3); + updateRequest.retryOnConflict(5); // TODO: decide what condition can be considered as an update conflict and add retry strategy client.update(updateRequest, ActionListener.runBefore(listener, context::restore)); } catch (Exception e) { diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 2ed203c63..efa4b8e6b 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -35,12 +35,11 @@ import org.opensearch.transport.TransportService; import java.time.Instant; -import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashMap; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.concurrent.CancellationException; import java.util.stream.Collectors; import static org.opensearch.flowframework.common.CommonValue.ERROR_FIELD; @@ -192,9 +191,9 @@ private void executeWorkflowAsync(String workflowId, List workflowS * @param workflowId The workflowId associated with the workflow that is executing */ private void executeWorkflow(List workflowSequence, String workflowId) { + String currentStepId = ""; try { - - List> workflowFutureList = new ArrayList<>(); + Map> workflowFutureMap = new LinkedHashMap<>(); for (ProcessNode processNode : workflowSequence) { List predecessors = processNode.predecessors(); @@ -210,11 +209,14 @@ private void executeWorkflow(List workflowSequence, String workflow ) ); - workflowFutureList.add(processNode.execute()); + workflowFutureMap.put(processNode.id(), processNode.execute()); } - // Attempt to join each workflow step future, may throw a ExecutionException if any step completes exceptionally - workflowFutureList.forEach(PlainActionFuture::actionGet); + // Attempt to complete each workflow step future, may throw a ExecutionException if any step completes exceptionally + for (Map.Entry> e : workflowFutureMap.entrySet()) { + currentStepId = e.getKey(); + e.getValue().actionGet(); + } logger.info("Provisioning completed successfully for workflow {}", workflowId); flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( @@ -229,15 +231,10 @@ private void executeWorkflow(List workflowSequence, String workflow }, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage(), exception); }) ); } catch (Exception ex) { - logger.error("Provisioning failed for workflow: {}", workflowId, ex); - String errorMessage; - if (ex instanceof CancellationException) { - errorMessage = "A step in the workflow was cancelled."; - } else if (ex.getCause() != null) { - errorMessage = ex.getCause().getMessage(); - } else { - errorMessage = ex.getMessage(); - } + logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex); + String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName()) + + " during step " + + currentStepId; flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( workflowId, Map.ofEntries(