Skip to content

Commit

Permalink
[Backport 2.x] Incrementally remove resources from workflow state dur…
Browse files Browse the repository at this point in the history
…ing deprovisioning (#910)

Incrementally remove resources from workflow state during deprovisioning (#898)

* Add method to delete a resource from the resources_created field



* Update deprovisioned resources incrementally



* Use Log4j ParameterizedMessage for string substitutions



---------


(cherry picked from commit 2f52b7e)

Signed-off-by: Daniel Widdis <[email protected]>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 0cde785 commit d1d48b1
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 25 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/)
## [Unreleased 2.x](https://github.com/opensearch-project/flow-framework/compare/2.17...2.x)
### Features
### Enhancements
- Incrementally remove resources from workflow state during deprovisioning ([#898](https://github.com/opensearch-project/flow-framework/pull/898))

### Bug Fixes
### Infrastructure
### Documentation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessageFactory;
import org.opensearch.ExceptionsHelper;
import org.opensearch.action.DocWriteRequest.OpType;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
Expand Down Expand Up @@ -693,6 +695,7 @@ public void addResourceToStateIndex(
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
newResource,
OpType.INDEX,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
Expand All @@ -701,15 +704,41 @@ public void addResourceToStateIndex(
}

/**
* Performs a get and update of a State Index document adding a new resource with strong consistency and retries
* Removes a resource from the state index, including common exception handling
* @param workflowId The workflow document id in the state index
* @param resourceToDelete The resource to delete
* @param listener the ActionListener for this step to handle completing the future after update
*/
public void deleteResourceFromStateIndex(String workflowId, ResourceCreated resourceToDelete, ActionListener<WorkflowData> listener) {
if (!doesIndexExist(WORKFLOW_STATE_INDEX)) {
String errorMessage = "Failed to update state for " + workflowId + " due to missing " + WORKFLOW_STATE_INDEX + " index";
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND));
} else {
try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) {
getAndUpdateResourceInStateDocumentWithRetries(
workflowId,
resourceToDelete,
OpType.DELETE,
RETRIES,
ActionListener.runBefore(listener, context::restore)
);
}
}
}

/**
* Performs a get and update of a State Index document adding or removing a resource with strong consistency and retries
* @param workflowId The document id to update
* @param newResource The resource to add to the resources created list
* @param resource The resource to add or remove from the resources created list
* @param operation The operation to perform on the resource (INDEX to append to the list or DELETE to remove)
* @param retries The number of retries on update version conflicts
* @param listener The listener to complete on success or failure
*/
private void getAndUpdateResourceInStateDocumentWithRetries(
String workflowId,
ResourceCreated newResource,
ResourceCreated resource,
OpType operation,
int retries,
ActionListener<WorkflowData> listener
) {
Expand All @@ -721,7 +750,11 @@ private void getAndUpdateResourceInStateDocumentWithRetries(
}
WorkflowState currentState = WorkflowState.parse(getResponse.getSourceAsString());
List<ResourceCreated> resourcesCreated = new ArrayList<>(currentState.resourcesCreated());
resourcesCreated.add(newResource);
if (operation == OpType.DELETE) {
resourcesCreated.removeIf(r -> r.resourceMap().equals(resource.resourceMap()));
} else {
resourcesCreated.add(resource);
}
XContentBuilder builder = XContentFactory.jsonBuilder();
WorkflowState newState = WorkflowState.builder(currentState).resourcesCreated(resourcesCreated).build();
newState.toXContent(builder, null);
Expand All @@ -732,41 +765,54 @@ private void getAndUpdateResourceInStateDocumentWithRetries(
client.update(
updateRequest,
ActionListener.wrap(
r -> handleStateUpdateSuccess(workflowId, newResource, listener),
e -> handleStateUpdateException(workflowId, newResource, retries, listener, e)
r -> handleStateUpdateSuccess(workflowId, resource, operation, listener),
e -> handleStateUpdateException(workflowId, resource, operation, retries, listener, e)
)
);
}, ex -> handleStateUpdateException(workflowId, newResource, 0, listener, ex)));
}, ex -> handleStateUpdateException(workflowId, resource, operation, 0, listener, ex)));
}

private void handleStateUpdateSuccess(String workflowId, ResourceCreated newResource, ActionListener<WorkflowData> listener) {
private void handleStateUpdateSuccess(
String workflowId,
ResourceCreated newResource,
OpType operation,
ActionListener<WorkflowData> listener
) {
String resourceName = newResource.resourceType();
String resourceId = newResource.resourceId();
String nodeId = newResource.workflowStepId();
logger.info("Updated resources created for {} on step {} with {} {}", workflowId, nodeId, resourceName, resourceId);
logger.info(
"Updated resources created for {} on step {} to {} resource {} {}",
workflowId,
nodeId,
operation.equals(OpType.DELETE) ? "delete" : "add",
resourceName,
resourceId
);
listener.onResponse(new WorkflowData(Map.of(resourceName, resourceId), workflowId, nodeId));
}

private void handleStateUpdateException(
String workflowId,
ResourceCreated newResource,
OpType operation,
int retries,
ActionListener<WorkflowData> listener,
Exception e
) {
if (e instanceof VersionConflictEngineException && retries > 0) {
// Retry if we haven't exhausted retries
getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, retries - 1, listener);
getAndUpdateResourceInStateDocumentWithRetries(workflowId, newResource, operation, retries - 1, listener);
return;
}
String errorMessage = "Failed to update workflow state for "
+ workflowId
+ " on step "
+ newResource.workflowStepId()
+ " with "
+ newResource.resourceType()
+ " "
+ newResource.resourceId();
String errorMessage = ParameterizedMessageFactory.INSTANCE.newMessage(
"Failed to update workflow state for {} on step {} to {} resource {} {}",
workflowId,
newResource.workflowStepId(),
operation.equals(OpType.DELETE) ? "delete" : "add",
newResource.resourceType(),
newResource.resourceId()
).getFormattedMessage();
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(e)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.opensearch.flowframework.common.CommonValue.ALLOW_DELETE;
Expand Down Expand Up @@ -214,19 +215,32 @@ private void executeDeprovisionSequence(
// Repeat attempting to delete resources as long as at least one is successful
int resourceCount = deprovisionProcessSequence.size();
while (resourceCount > 0) {
PlainActionFuture<WorkflowData> stateUpdateFuture;
Iterator<ProcessNode> iter = deprovisionProcessSequence.iterator();
while (iter.hasNext()) {
do {
ProcessNode deprovisionNode = iter.next();
ResourceCreated resource = getResourceFromDeprovisionNode(deprovisionNode, resourcesCreated);
String resourceNameAndId = getResourceNameAndId(resource);
PlainActionFuture<WorkflowData> deprovisionFuture = deprovisionNode.execute();
stateUpdateFuture = PlainActionFuture.newFuture();
try {
deprovisionFuture.get();
logger.info("Successful {} for {}", deprovisionNode.id(), resourceNameAndId);
// Remove from state index resource list
flowFrameworkIndicesHandler.deleteResourceFromStateIndex(workflowId, resource, stateUpdateFuture);
try {
// Wait at most 1 second for state index update.
stateUpdateFuture.actionGet(1, TimeUnit.SECONDS);
} catch (Exception e) {
// Ignore incremental resource removal failures (or timeouts) as we catch up at the end with remainingResources
}
// Remove from list so we don't try again
iter.remove();
// Pause briefly before next step
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Throwable t) {
// If any deprovision fails due to not found, it's a success
if (t.getCause() instanceof OpenSearchStatusException
Expand All @@ -238,7 +252,7 @@ private void executeDeprovisionSequence(
logger.info("Failed {} for {}", deprovisionNode.id(), resourceNameAndId);
}
}
}
} while (iter.hasNext());
if (deprovisionProcessSequence.size() < resourceCount) {
// If we've deleted something, decrement and try again if not zero
resourceCount = deprovisionProcessSequence.size();
Expand All @@ -259,6 +273,7 @@ private void executeDeprovisionSequence(
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
} else {
Expand All @@ -274,6 +289,7 @@ private void executeDeprovisionSequence(
if (!deleteNotAllowed.isEmpty()) {
logger.info("Resources requiring allow_delete: {}.", deleteNotAllowed);
}
// This is a redundant best-effort backup to the incremental deletion done earlier
updateWorkflowState(workflowId, remainingResources, deleteNotAllowed, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,17 @@ protected List<ResourceCreated> getResourcesCreated(RestClient client, String wo
TimeUnit.SECONDS
);

return getResourcesCreated(client, workflowId);
}

/**
* Helper method retrieve any resources created incrementally without waiting for completion
* @param client the rest client
* @param workflowId the workflow id to retrieve resources from
* @return a list of created resources
* @throws Exception if the request fails
*/
protected List<ResourceCreated> getResourcesCreated(RestClient client, String workflowId) throws Exception {
Response response = getWorkflowStatus(client, workflowId, true);

// Parse workflow state from response and retrieve resources created
Expand Down
Loading

0 comments on commit d1d48b1

Please sign in to comment.