Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Combined register local model and get ml task #220

Merged
merged 4 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.FutureUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.ml.client.MachineLearningNodeClient;
import org.opensearch.ml.common.MLTaskState;
import org.opensearch.ml.common.model.MLModelConfig;
import org.opensearch.ml.common.model.MLModelFormat;
import org.opensearch.ml.common.model.TextEmbeddingModelConfig;
Expand All @@ -37,17 +41,17 @@
import static org.opensearch.flowframework.common.CommonValue.MODEL_CONTENT_HASH_VALUE;
import static org.opensearch.flowframework.common.CommonValue.MODEL_FORMAT;
import static org.opensearch.flowframework.common.CommonValue.MODEL_GROUP_ID;
import static org.opensearch.flowframework.common.CommonValue.MODEL_ID;
import static org.opensearch.flowframework.common.CommonValue.MODEL_TYPE;
import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD;
import static org.opensearch.flowframework.common.CommonValue.REGISTER_MODEL_STATUS;
import static org.opensearch.flowframework.common.CommonValue.TASK_ID;
import static org.opensearch.flowframework.common.CommonValue.URL;
import static org.opensearch.flowframework.common.CommonValue.VERSION_FIELD;

/**
* Step to register a local model
*/
public class RegisterLocalModelStep implements WorkflowStep {
public class RegisterLocalModelStep extends AbstractRetryableWorkflowStep {

private static final Logger logger = LogManager.getLogger(RegisterLocalModelStep.class);

Expand All @@ -57,9 +61,12 @@ public class RegisterLocalModelStep implements WorkflowStep {

/**
* Instantiate this class
* @param settings The OpenSearch settings
* @param clusterService The cluster service
* @param mlClient client to instantiate MLClient
*/
public RegisterLocalModelStep(MachineLearningNodeClient mlClient) {
public RegisterLocalModelStep(Settings settings, ClusterService clusterService, MachineLearningNodeClient mlClient) {
super(settings, clusterService);
this.mlClient = mlClient;
}

Expand All @@ -72,15 +79,12 @@ public CompletableFuture<WorkflowData> execute(List<WorkflowData> data) {
@Override
public void onResponse(MLRegisterModelResponse mlRegisterModelResponse) {
logger.info("Local Model registration task creation successful");
registerLocalModelFuture.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(TASK_ID, mlRegisterModelResponse.getTaskId()),
Map.entry(REGISTER_MODEL_STATUS, mlRegisterModelResponse.getStatus())
),
data.get(0).getWorkflowId()
)
);

String workflowId = data.get(0).getWorkflowId();
String taskId = mlRegisterModelResponse.getTaskId();

// Attempt to retrieve the model ID
retryableGetMlTask(workflowId, registerLocalModelFuture, taskId, 0);
}

@Override
Expand Down Expand Up @@ -198,4 +202,55 @@ public void onFailure(Exception e) {
public String getName() {
return NAME;
}

/**
* Retryable get ml task
* @param workflowId the workflow id
* @param getMLTaskFuture the workflow step future
* @param taskId the ml task id
* @param retries the current number of request retries
*/
void retryableGetMlTask(String workflowId, CompletableFuture<WorkflowData> registerLocalModelFuture, String taskId, int retries) {
amitgalitz marked this conversation as resolved.
Show resolved Hide resolved
mlClient.getTask(taskId, ActionListener.wrap(response -> {
MLTaskState currentState = response.getState();
if (currentState != MLTaskState.COMPLETED) {
if (Stream.of(MLTaskState.FAILED, MLTaskState.COMPLETED_WITH_ERROR).anyMatch(x -> x == currentState)) {
// Model registration failed or completed with errors
String errorMessage = "Local model registration failed with error : " + response.getError();
logger.error(errorMessage);
registerLocalModelFuture.completeExceptionally(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));
} else {
// Task still in progress, attempt retry
throw new IllegalStateException("Local model registration is not yet completed");
}
} else {
logger.info("Local model registeration successful");
registerLocalModelFuture.complete(
new WorkflowData(
Map.ofEntries(
Map.entry(MODEL_ID, response.getModelId()),
Map.entry(REGISTER_MODEL_STATUS, response.getState().name())
),
workflowId
)
);
}
}, exception -> {
if (retries < maxRetry) {
// Sleep thread prior to retrying request
try {
Thread.sleep(5000);
} catch (Exception e) {
FutureUtils.cancel(registerLocalModelFuture);
}
final int retryAdd = retries + 1;
retryableGetMlTask(workflowId, registerLocalModelFuture, taskId, retryAdd);
} else {
logger.error("Failed to retrieve local model registration task, maximum retries exceeded");
joshpalis marked this conversation as resolved.
Show resolved Hide resolved
registerLocalModelFuture.completeExceptionally(
new FlowFrameworkException(exception.getMessage(), ExceptionsHelper.status(exception))
);
}
}));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ private void populateMap(
stepMap.put(NoOpStep.NAME, new NoOpStep());
stepMap.put(CreateIndexStep.NAME, new CreateIndexStep(clusterService, client));
stepMap.put(CreateIngestPipelineStep.NAME, new CreateIngestPipelineStep(client));
stepMap.put(RegisterLocalModelStep.NAME, new RegisterLocalModelStep(mlClient));
stepMap.put(RegisterLocalModelStep.NAME, new RegisterLocalModelStep(settings, clusterService, mlClient));
stepMap.put(RegisterRemoteModelStep.NAME, new RegisterRemoteModelStep(mlClient));
stepMap.put(DeployModelStep.NAME, new DeployModelStep(mlClient));
stepMap.put(CreateConnectorStep.NAME, new CreateConnectorStep(mlClient, flowFrameworkIndicesHandler));
stepMap.put(ModelGroupStep.NAME, new ModelGroupStep(mlClient));
stepMap.put(GetMLTaskStep.NAME, new GetMLTaskStep(settings, clusterService, mlClient));
}

/**
Expand Down
11 changes: 1 addition & 10 deletions src/main/resources/mappings/workflow-steps.json
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
"url"
],
"outputs":[
"task_id",
"model_id",
"register_model_status"
]
},
Expand Down Expand Up @@ -83,14 +83,5 @@
"model_group_id",
"model_group_status"
]
},
"get_ml_task": {
"inputs":[
"task_id"
],
"outputs":[
"model_id",
"register_model_status"
]
}
}
Loading