Skip to content

Commit

Permalink
Improve provisioning exceptions with step ID and hide exception details
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Widdis <[email protected]>
  • Loading branch information
dbwiddis committed Feb 9, 2024
1 parent 33ff2f3 commit 1ec10df
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,10 @@ public void updateFlowFrameworkSystemIndexDoc(
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId);
Map<String, Object> updatedContent = new HashMap<>();
updatedContent.putAll(updatedFields);
Map<String, Object> updatedContent = new HashMap<>(updatedFields);

Check warning on line 525 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L525

Added line #L525 was not covered by tests
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(3);
updateRequest.retryOnConflict(5);

Check warning on line 528 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L528

Added line #L528 was not covered by tests
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@
import org.opensearch.transport.TransportService;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.ERROR_FIELD;
Expand Down Expand Up @@ -192,9 +191,9 @@ private void executeWorkflowAsync(String workflowId, List<ProcessNode> workflowS
* @param workflowId The workflowId associated with the workflow that is executing
*/
private void executeWorkflow(List<ProcessNode> workflowSequence, String workflowId) {
String currentStepId = "";

Check warning on line 194 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L194

Added line #L194 was not covered by tests
try {

List<PlainActionFuture<?>> workflowFutureList = new ArrayList<>();
Map<String, PlainActionFuture<?>> workflowFutureMap = new LinkedHashMap<>();

Check warning on line 196 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L196

Added line #L196 was not covered by tests
for (ProcessNode processNode : workflowSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

Expand All @@ -210,11 +209,14 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
)
);

workflowFutureList.add(processNode.execute());
workflowFutureMap.put(processNode.id(), processNode.execute());

Check warning on line 212 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L212

Added line #L212 was not covered by tests
}

// Attempt to join each workflow step future, may throw a ExecutionException if any step completes exceptionally
workflowFutureList.forEach(PlainActionFuture::actionGet);
// Attempt to complete each workflow step future, may throw a ExecutionException if any step completes exceptionally
for (Map.Entry<String, PlainActionFuture<?>> e : workflowFutureMap.entrySet()) {
currentStepId = e.getKey();
e.getValue().actionGet();
}

Check warning on line 219 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L217-L219

Added lines #L217 - L219 were not covered by tests

logger.info("Provisioning completed successfully for workflow {}", workflowId);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
Expand All @@ -229,15 +231,10 @@ private void executeWorkflow(List<ProcessNode> workflowSequence, String workflow
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage(), exception); })
);
} catch (Exception ex) {
logger.error("Provisioning failed for workflow: {}", workflowId, ex);
String errorMessage;
if (ex instanceof CancellationException) {
errorMessage = "A step in the workflow was cancelled.";
} else if (ex.getCause() != null) {
errorMessage = ex.getCause().getMessage();
} else {
errorMessage = ex.getMessage();
}
logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex);

Check warning on line 234 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L234

Added line #L234 was not covered by tests
String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName())
+ " during step "
+ currentStepId;
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Expand Down

0 comments on commit 1ec10df

Please sign in to comment.