Skip to content

Commit

Permalink
add multi node IT, fix update api bugs
Browse files Browse the repository at this point in the history
Signed-off-by: Amit Galitzky <[email protected]>
  • Loading branch information
amitgalitz committed Jan 17, 2024
1 parent c6e6ed2 commit 23c1897
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 59 deletions.
46 changes: 25 additions & 21 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,20 +43,20 @@ jobs:
name: Test JDK${{ matrix.java }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java }}
distribution: temurin
- name: Build and Run Tests
run: |
- uses: actions/checkout@v4
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java }}
distribution: temurin
- name: Build and Run Tests
run: |
./gradlew check -x integTest -x yamlRestTest -x spotlessJava
- name: Upload Coverage Report
if: ${{ matrix.codecov }}
uses: codecov/codecov-action@v3
with:
file: ./build/reports/jacoco/test/jacocoTestReport.xml
- name: Upload Coverage Report
if: ${{ matrix.codecov }}
uses: codecov/codecov-action@v3
with:
file: ./build/reports/jacoco/test/jacocoTestReport.xml
integTest:
needs: [spotless, javadoc]
strategy:
Expand All @@ -69,12 +69,16 @@ jobs:
name: Integ Test JDK${{ matrix.java }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
steps:
- uses: actions/checkout@v4
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java }}
distribution: temurin
- name: Build and Run Tests
run: |
- uses: actions/checkout@v4
- name: Set up JDK ${{ matrix.java }}
uses: actions/setup-java@v4
with:
java-version: ${{ matrix.java }}
distribution: temurin
- name: Build and Run Tests
run: |
./gradlew integTest yamlRestTest
- name: Multi Nodes Integration Testing
if: matrix.java == 21
run: |
./gradlew integTest -PnumNodes=3
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ public Collection<Object> createComponents(
flowFrameworkSettings = new FlowFrameworkSettings(clusterService, settings);
MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
EncryptorUtils encryptorUtils = new EncryptorUtils(clusterService, client);
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService, encryptorUtils);
FlowFrameworkIndicesHandler flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(
client,
clusterService,
encryptorUtils,
xContentRegistry
);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(
threadPool,
clusterService,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
Expand All @@ -29,8 +30,10 @@
import org.opensearch.commons.authuser.User;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.ProvisioningProgress;
import org.opensearch.flowframework.model.ResourceCreated;
Expand All @@ -47,8 +50,10 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR;
import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.flowframework.common.CommonValue.CONFIG_INDEX_MAPPING;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX;
import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX_MAPPING;
Expand All @@ -70,20 +75,28 @@ public class FlowFrameworkIndicesHandler {
private final EncryptorUtils encryptorUtils;
private static final Map<String, AtomicBoolean> indexMappingUpdated = new HashMap<>();
private static final Map<String, Object> indexSettings = Map.of("index.auto_expand_replicas", "0-1");
private final NamedXContentRegistry xContentRegistry;

/**
* constructor
* @param client the open search client
* @param clusterService ClusterService
* @param encryptorUtils encryption utility
* @param xContentRegistry contentRegister to parse any response
*/
public FlowFrameworkIndicesHandler(Client client, ClusterService clusterService, EncryptorUtils encryptorUtils) {
public FlowFrameworkIndicesHandler(
Client client,
ClusterService clusterService,
EncryptorUtils encryptorUtils,
NamedXContentRegistry xContentRegistry
) {
this.client = client;
this.clusterService = clusterService;
this.encryptorUtils = encryptorUtils;
for (FlowFrameworkIndex mlIndex : FlowFrameworkIndex.values()) {
indexMappingUpdated.put(mlIndex.getIndexName(), new AtomicBoolean(false));
}
this.xContentRegistry = xContentRegistry;
}

static {
Expand Down Expand Up @@ -395,22 +408,89 @@ public void updateTemplateInGlobalContext(String documentId, Template template,
+ ", global_context index does not exist.";
logger.error(exceptionMessage);
listener.onFailure(new FlowFrameworkException(exceptionMessage, RestStatus.BAD_REQUEST));
} else {
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId);
try (
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()
) {
Template encryptedTemplate = encryptorUtils.encryptTemplateCredentials(template);
request.source(encryptedTemplate.toXContent(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, context::restore));
return;
}
doesTemplateExists(documentId, templateExists -> {

Check warning on line 413 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L413

Added line #L413 was not covered by tests
if (templateExists) {
isWorkflowProvisioned(documentId, workflowIsProvisioned -> {

Check warning on line 415 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L415

Added line #L415 was not covered by tests
if (workflowIsProvisioned) {
IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId);

Check warning on line 417 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L417

Added line #L417 was not covered by tests
try (
XContentBuilder builder = XContentFactory.jsonBuilder();
ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()

Check warning on line 420 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L419-L420

Added lines #L419 - L420 were not covered by tests
) {
Template encryptedTemplate = encryptorUtils.encryptTemplateCredentials(template);
request.source(encryptedTemplate.toXContent(builder, ToXContent.EMPTY_PARAMS))
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
client.index(request, ActionListener.runBefore(listener, context::restore));
} catch (Exception e) {
String errorMessage = "Failed to update global_context entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(
new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e))

Check warning on line 430 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L422-L430

Added lines #L422 - L430 were not covered by tests
);
}
} else {
String errorMessage = "The template has already been provisioned so it can't be updated: " + documentId;
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));

Check warning on line 436 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L432-L436

Added lines #L432 - L436 were not covered by tests
}
}, listener);

Check warning on line 438 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L438

Added line #L438 was not covered by tests

} else {
String errorMessage = "Failed to get template: " + documentId;
logger.error(errorMessage);
listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.BAD_REQUEST));

Check warning on line 443 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L441-L443

Added lines #L441 - L443 were not covered by tests
}
});
}

Check warning on line 446 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L445-L446

Added lines #L445 - L446 were not covered by tests

/**
* Check if the given template exists in the template index
*
* @param documentId document id
* @param booleanResultConsumer boolean consumer function
*/
public void doesTemplateExists(String documentId, Consumer<Boolean> booleanResultConsumer) {
GetRequest getRequest = new GetRequest(GLOBAL_CONTEXT_INDEX, documentId);
client.get(getRequest, ActionListener.wrap(response -> { booleanResultConsumer.accept(response.isExists()); }, exception -> {
logger.error("Failed to get template " + documentId, exception);
booleanResultConsumer.accept(false);
}));
}

Check warning on line 460 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L455-L460

Added lines #L455 - L460 were not covered by tests

/**
* Check if the workflow has been provisioned and executes the consumer by passing a boolean
*
* @param documentId document id
* @param booleanResultConsumer boolean consumer function
* @param listener action listener
* @param <T> action listener response type
*/
public <T> void isWorkflowProvisioned(String documentId, Consumer<Boolean> booleanResultConsumer, ActionListener<T> listener) {
GetRequest getRequest = new GetRequest(WORKFLOW_STATE_INDEX, documentId);
client.get(getRequest, ActionListener.wrap(response -> {

Check warning on line 472 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L471-L472

Added lines #L471 - L472 were not covered by tests
if (!response.isExists()) {
booleanResultConsumer.accept(false);
return;

Check warning on line 475 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L474-L475

Added lines #L474 - L475 were not covered by tests
}
try (XContentParser parser = ParseUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
WorkflowState workflowState = WorkflowState.parse(parser);

Check warning on line 479 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L477-L479

Added lines #L477 - L479 were not covered by tests
if (workflowState.getProvisioningProgress().equals(ProvisioningProgress.NOT_STARTED.name())) {
booleanResultConsumer.accept(true);

Check warning on line 481 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L481

Added line #L481 was not covered by tests
} else {
booleanResultConsumer.accept(false);

Check warning on line 483 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L483

Added line #L483 was not covered by tests
}
} catch (Exception e) {
String errorMessage = "Failed to update global_context entry : " + documentId;
logger.error(errorMessage, e);
listener.onFailure(new FlowFrameworkException(errorMessage + " : " + e.getMessage(), ExceptionsHelper.status(e)));
String message = "Failed to parse workflow state" + documentId;
logger.error(message, e);
listener.onFailure(new FlowFrameworkException(message, RestStatus.INTERNAL_SERVER_ERROR));

Check warning on line 488 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L486-L488

Added lines #L486 - L488 were not covered by tests
}
}
}, exception -> {
logger.error("Failed to get workflow state " + documentId, exception);
booleanResultConsumer.accept(false);
}));

Check warning on line 493 in src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java#L490-L493

Added lines #L490 - L493 were not covered by tests
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener<Work
logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name());
listener.onResponse(new WorkflowResponse(request.getWorkflowId()));
}, exception -> {
logger.error("Failed to update workflow state : {}", exception.getMessage());
logger.error("Failed to update workflow in template index: ", exception);

Check warning on line 234 in src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java#L234

Added line #L234 was not covered by tests
if (exception instanceof FlowFrameworkException) {
listener.onFailure(exception);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void setUp() throws Exception {
threadContext = new ThreadContext(settings);
when(client.threadPool()).thenReturn(threadPool);
when(threadPool.getThreadContext()).thenReturn(threadContext);
flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService, encryptorUtils);
flowFrameworkIndicesHandler = new FlowFrameworkIndicesHandler(client, clusterService, encryptorUtils, xContentRegistry());
adminClient = mock(AdminClient.class);
indicesAdminClient = mock(IndicesAdminClient.class);
metadata = mock(Metadata.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,33 @@ public void testSearchWorkflows() throws Exception {
}
}

public void testFailedUpdateWorkflow() throws Exception {
// Create a Workflow that has a credential 12345
Template template = TestHelpers.createTemplateFromFile("createconnector-registerremotemodel-deploymodel.json");

ResponseException exception = expectThrows(ResponseException.class, () -> updateWorkflow(client(), "123", template));
assertTrue(exception.getMessage().contains("Failed to get template: 123"));

Response response = createWorkflow(client(), template);
assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response));

Map<String, Object> responseMap = entityAsMap(response);
String workflowId = (String) responseMap.get(WORKFLOW_ID);

// Provision Template
Response provisionResponse = provisionWorkflow(client(), workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(provisionResponse));
getAndAssertWorkflowStatus(client(), workflowId, State.PROVISIONING, ProvisioningProgress.IN_PROGRESS);

// Failed update since provisioning has started
ResponseException exceptionProvisioned = expectThrows(
ResponseException.class,
() -> updateWorkflow(client(), workflowId, template)
);
assertTrue(exceptionProvisioned.getMessage().contains("The template has already been provisioned so it can't be updated"));

}

public void testCreateAndProvisionLocalModelWorkflow() throws Exception {
// Using a 1 step template to register a local model and deploy model
Template template = TestHelpers.createTemplateFromFile("register-deploylocalsparseencodingmodel.json");
Expand Down Expand Up @@ -134,14 +161,14 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception {
assertEquals("deploy_model", resourcesCreated.get(1).workflowStepName());
assertNotNull(resourcesCreated.get(1).resourceId());

// Deprovision the workflow to avoid opening circut breaker when running additional tests
Response deprovisionResponse = deprovisionWorkflow(client(), workflowId);

// wait for deprovision to complete
Thread.sleep(5000);
// // Deprovision the workflow to avoid opening circut breaker when running additional tests
// Response deprovisionResponse = deprovisionWorkflow(client(), workflowId);
//
// // wait for deprovision to complete
// Thread.sleep(5000);
}

public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
public void testCreateAndProvisionCyclicalTemplate() throws Exception {

// Using a 3 step template to create a connector, register remote model and deploy model
Template template = TestHelpers.createTemplateFromFile("createconnector-registerremotemodel-deploymodel.json");
Expand Down Expand Up @@ -170,6 +197,12 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
assertTrue(exception.getMessage().contains("Cycle detected"));
assertTrue(exception.getMessage().contains("workflow_step_2->workflow_step_3"));
assertTrue(exception.getMessage().contains("workflow_step_3->workflow_step_2"));
}

public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {

// Using a 3 step template to create a connector, register remote model and deploy model
Template template = TestHelpers.createTemplateFromFile("createconnector-registerremotemodel-deploymodel.json");

// Hit Create Workflow API with original template
Response response = createWorkflow(client(), template);
Expand Down Expand Up @@ -197,10 +230,10 @@ public void testCreateAndProvisionRemoteModelWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(2).resourceId());

// Deprovision the workflow to avoid opening circut breaker when running additional tests
Response deprovisionResponse = deprovisionWorkflow(client(), workflowId);

// wait for deprovision to complete
Thread.sleep(5000);
// Response deprovisionResponse = deprovisionWorkflow(client(), workflowId);
//
// // wait for deprovision to complete
// Thread.sleep(5000);
}

public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
Expand Down Expand Up @@ -239,16 +272,16 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception {
assertNotNull(resourcesCreated.get(0).resourceId());

// Hit Deprovision API
Response deprovisionResponse = deprovisionWorkflow(client(), workflowId);
assertBusy(
() -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); },
60,
TimeUnit.SECONDS
);

// Hit Delete API
Response deleteResponse = deleteWorkflow(client(), workflowId);
assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));
// Response deprovisionResponse = deprovisionWorkflow(client(), workflowId);
// assertBusy(
// () -> { getAndAssertWorkflowStatus(client(), workflowId, State.NOT_STARTED, ProvisioningProgress.NOT_STARTED); },
// 60,
// TimeUnit.SECONDS
// );
//
// // Hit Delete API
// Response deleteResponse = deleteWorkflow(client(), workflowId);
// assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse));
}

}

0 comments on commit 23c1897

Please sign in to comment.