Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve provisioning exceptions with step ID and hide exception details #515

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -522,11 +522,10 @@
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
UpdateRequest updateRequest = new UpdateRequest(WORKFLOW_STATE_INDEX, documentId);
Map<String, Object> updatedContent = new HashMap<>();
updatedContent.putAll(updatedFields);
Map<String, Object> updatedContent = new HashMap<>(updatedFields);

Check warning on line 525 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L525

Added line #L525 was not covered by tests
updateRequest.doc(updatedContent);
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.retryOnConflict(3);
updateRequest.retryOnConflict(5);

Check warning on line 528 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L528

Added line #L528 was not covered by tests
// TODO: decide what condition can be considered as an update conflict and add retry strategy
client.update(updateRequest, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,11 @@
import org.opensearch.transport.TransportService;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.ERROR_FIELD;
Expand Down Expand Up @@ -192,9 +191,9 @@
* @param workflowId The workflowId associated with the workflow that is executing
*/
private void executeWorkflow(List<ProcessNode> workflowSequence, String workflowId) {
String currentStepId = "";

Check warning on line 194 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L194

Added line #L194 was not covered by tests
try {

List<PlainActionFuture<?>> workflowFutureList = new ArrayList<>();
Map<String, PlainActionFuture<?>> workflowFutureMap = new LinkedHashMap<>();

Check warning on line 196 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L196

Added line #L196 was not covered by tests
for (ProcessNode processNode : workflowSequence) {
List<ProcessNode> predecessors = processNode.predecessors();

Expand All @@ -210,11 +209,14 @@
)
);

workflowFutureList.add(processNode.execute());
workflowFutureMap.put(processNode.id(), processNode.execute());

Check warning on line 212 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L212

Added line #L212 was not covered by tests
}

// Attempt to join each workflow step future, may throw a ExecutionException if any step completes exceptionally
workflowFutureList.forEach(PlainActionFuture::actionGet);
// Attempt to complete each workflow step future, may throw a ExecutionException if any step completes exceptionally
for (Map.Entry<String, PlainActionFuture<?>> e : workflowFutureMap.entrySet()) {
currentStepId = e.getKey();
e.getValue().actionGet();
}

Check warning on line 219 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L217-L219

Added lines #L217 - L219 were not covered by tests

logger.info("Provisioning completed successfully for workflow {}", workflowId);
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
Expand All @@ -229,15 +231,10 @@
}, exception -> { logger.error("Failed to update workflow state : {}", exception.getMessage(), exception); })
);
} catch (Exception ex) {
logger.error("Provisioning failed for workflow: {}", workflowId, ex);
String errorMessage;
if (ex instanceof CancellationException) {
errorMessage = "A step in the workflow was cancelled.";
} else if (ex.getCause() != null) {
errorMessage = ex.getCause().getMessage();
} else {
errorMessage = ex.getMessage();
}
logger.error("Provisioning failed for workflow {} during step {}.", workflowId, currentStepId, ex);

Check warning on line 234 in src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java#L234

Added line #L234 was not covered by tests
String errorMessage = (ex.getCause() == null ? ex.getClass().getName() : ex.getCause().getClass().getName())
+ " during step "
+ currentStepId;
flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc(
workflowId,
Map.ofEntries(
Expand Down
Loading