Skip to content

Commit

Permalink
Improve provisioning exceptions with step ID and hide exception detai…
Browse files Browse the repository at this point in the history
…ls (#515)

Signed-off-by: Daniel Widdis <[email protected]>
(cherry picked from commit 861616a)
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
github-actions[bot] committed Feb 9, 2024
1 parent cb2f454 commit 7af06e7
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,10 @@ public void updateFlowFrameworkSystemIndexDoc(
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId);
Map<String, Object> updatedContent = new HashMap<>();
updatedContent.putAll(updatedFields);
Map<String, Object> updatedContent = new HashMap<>(updatedFields);
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(3);
updateRequest.retryOnConflict(5);
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@
import org.opensearch.transport.TransportService;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.ERROR_FIELD;
Expand Down Expand Up @@ -192,9 +191,9 @@ private void executeWorkflowAsync(String workflowId, List<ProcessNode> workflowS
* @param workflowId The workflowId associated with the workflow that is executing
*/
private void executeWorkflow(List<ProcessNode> workflowSequence, String workflowId) {
String currentStepId = "";
try {

List<PlainActionFuture<?>> workflowFutureList = new ArrayList<>();
Map<String, PlainActionFuture<?>> workflowFutureMap = new LinkedHashMap<>();
for (ProcessNode processNode : workflowSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

Expand All @@ -210,11 +209,14 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
)
);

workflowFutureList.add(processNode.execute());
workflowFutureMap.put(processNode.id(), processNode.execute());
}

// Attempt to join each workflow step future, may throw a ExecutionException if any step completes exceptionally
workflowFutureList.forEach(PlainActionFuture::actionGet);
// Attempt to complete each workflow step future, may throw a ExecutionException if any step completes exceptionally
for (Map.Entry<String, PlainActionFuture<?>> e : workflowFutureMap.entrySet()) {
currentStepId = e.getKey();
e.getValue().actionGet();
}

logger.info("Provisioning completed successfully for workflow {}", workflowId);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
Expand All @@ -229,15 +231,10 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage(), exception); })
);
} catch (Exception ex) {
logger.error("Provisioning failed for workflow: {}", workflowId, ex);
String errorMessage;
if (ex instanceof CancellationException) {
errorMessage = "A step in the workflow was cancelled.";
} else if (ex.getCause() != null) {
errorMessage = ex.getCause().getMessage();
} else {
errorMessage = ex.getMessage();
}
logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex);
String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName())
+ " during step "
+ currentStepId;
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Expand Down

0 comments on commit 7af06e7

Please sign in to comment.