From 27072107327e4cec37267c21d0904ffe99974cd1 Mon Sep 17 00:00:00 2001 From: Daniel Widdis Date: Fri, 8 Mar 2024 21:02:02 -0800 Subject: [PATCH] Add created, updated, and provisioned timestamps to saved template (#551) * Add created, last updated, and last provisioned fields to Template Signed-off-by: Daniel Widdis * Change last updated timestamp when updating workflow Signed-off-by: Daniel Widdis * Change last provisioned timestamp when provisioning workflow Signed-off-by: Daniel Widdis * Allow overriding template not started check Signed-off-by: Daniel Widdis * Use java.time and not joda time Signed-off-by: Daniel Widdis * Preserve timestamps when encrypting and redacting template Signed-off-by: Daniel Widdis * Add bwc tests, more timestamp testing Signed-off-by: Daniel Widdis * Build a Template from an existing one Signed-off-by: Daniel Widdis * Rename param, add comments Signed-off-by: Daniel Widdis --------- Signed-off-by: Daniel Widdis --- .github/workflows/test_bwc.yml | 45 ++++ CHANGELOG.md | 1 + build.gradle | 192 ++++++++++++++++++ .../flowframework/common/CommonValue.java | 10 +- .../indices/FlowFrameworkIndicesHandler.java | 18 +- .../flowframework/model/Template.java | 176 +++++++++++++--- .../CreateWorkflowTransportAction.java | 103 ++++++---- .../ProvisionWorkflowTransportAction.java | 27 ++- .../flowframework/util/EncryptorUtils.java | 28 +-- .../resources/mappings/global-context.json | 14 +- ...FlowFrameworkBackwardsCompatibilityIT.java | 189 +++++++++++++++++ .../FlowFrameworkIndicesHandlerTests.java | 5 +- .../flowframework/model/TemplateTests.java | 11 + .../rest/FlowFrameworkRestApiIT.java | 69 +++++-- .../rest/RestCreateWorkflowActionTests.java | 5 +- .../CreateWorkflowTransportActionTests.java | 60 +++++- .../GetWorkflowTransportActionTests.java | 5 +- ...ProvisionWorkflowTransportActionTests.java | 14 +- .../WorkflowRequestResponseTests.java | 11 +- .../util/EncryptorUtilsTests.java | 5 +- src/test/resources/template/noop.json | 13 ++ 21 files changed, 875 insertions(+), 126 deletions(-) create mode 100644 .github/workflows/test_bwc.yml create mode 100644 src/test/java/org/opensearch/flowframework/bwc/FlowFrameworkBackwardsCompatibilityIT.java create mode 100644 src/test/resources/template/noop.json diff --git a/.github/workflows/test_bwc.yml b/.github/workflows/test_bwc.yml new file mode 100644 index 000000000..b5f2e2d16 --- /dev/null +++ b/.github/workflows/test_bwc.yml @@ -0,0 +1,45 @@ +name: BWC +on: + push: + branches: + - "**" + pull_request: + branches: + - "**" + +jobs: + Build-ff-linux: + strategy: + matrix: + java: [11,17,21] + fail-fast: false + + name: Test Flow Framework BWC + runs-on: ubuntu-latest + + steps: + - name: Setup Java ${{ matrix.java }} + uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: ${{ matrix.java }} + + - name: Checkout Flow Framework + uses: actions/checkout@v4 + + - name: Assemble Flow Framework + run: | + plugin_version=`./gradlew properties -q | grep "opensearch_build:" | awk '{print $2}'` + echo plugin_version $plugin_version + ./gradlew assemble + echo "Creating ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version ..." + mkdir -p ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version + echo "Copying ./build/distributions/*.zip to ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version ..." + ls ./build/distributions/ + cp ./build/distributions/*.zip ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version + echo "Copied ./build/distributions/*.zip to ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version ..." + ls ./src/test/resources/org/opensearch/flowframework/bwc/flow-framework/$plugin_version + - name: Run Flow Framework Backwards Compatibility Tests + run: | + echo "Running backwards compatibility tests ..." + ./gradlew bwcTestSuite -Dtests.security.manager=false diff --git a/CHANGELOG.md b/CHANGELOG.md index 213d73bfd..dc4b74fbf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.1.0/) ### Enhancements - Substitute REST path or body parameters in Workflow Steps ([#525](https://github.com/opensearch-project/flow-framework/pull/525)) - Added an optional workflow_step param to the get workflow steps API ([#538](https://github.com/opensearch-project/flow-framework/pull/538)) +- Add created, updated, and provisioned timestamps to saved template ([#551](https://github.com/opensearch-project/flow-framework/pull/551)) - Enable Flow Framework by default ([#553](https://github.com/opensearch-project/flow-framework/pull/553)) ### Bug Fixes diff --git a/build.gradle b/build.gradle index a652cca1f..2f008a1c6 100644 --- a/build.gradle +++ b/build.gradle @@ -1,5 +1,6 @@ import java.nio.file.Files import org.opensearch.gradle.testclusters.OpenSearchCluster +import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask import org.opensearch.gradle.test.RestIntegTestTask import java.util.concurrent.Callable import java.nio.file.Paths @@ -23,6 +24,16 @@ buildscript { opensearch_no_snapshot = opensearch_build.replace("-SNAPSHOT","") System.setProperty('tests.security.manager', 'false') common_utils_version = System.getProperty("common_utils.version", opensearch_build) + + bwcVersionShort = "2.12.0" + bwcVersion = bwcVersionShort + ".0" + bwcOpenSearchFFDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' + + 'opensearch/plugins/opensearch-flow-framework-' + bwcVersion + '.zip' + baseName = "ffBwcCluster" + bwcFilePath = "src/test/resources/org/opensearch/flowframework/bwc/" + bwcFlowFrameworkPath = bwcFilePath + "flowframework/" + + isSameMajorVersion = opensearch_version.split("\\.")[0] == bwcVersionShort.split("\\.")[0] } repositories { @@ -78,6 +89,9 @@ dependencyLicenses.enabled = false // This requires an additional Jar not published as part of build-tools loggerUsageCheck.enabled = false thirdPartyAudit.enabled = false +// Allow test cases to be named Tests without having to be inherited from LuceneTestCase. +// see https://github.com/elastic/elasticsearch/blob/323f312bbc829a63056a79ebe45adced5099f6e6/buildSrc/src/main/java/org/elasticsearch/gradle/precommit/TestingConventionsTasks.java +testingConventions.enabled = false // No need to validate pom, as we do not upload to maven/sonatype validateNebulaPom.enabled = false @@ -192,6 +206,12 @@ jacocoTestReport { } tasks.named("check").configure { dependsOn(jacocoTestReport) } +tasks.named("yamlRestTest").configure { + filter { + excludeTestsMatching "org.opensearch.flowframework.rest.*IT" + excludeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } +} // Set up integration tests task integTest(type: RestIntegTestTask) { @@ -231,6 +251,13 @@ integTest { } } + // Exclude BWC tests, run separately + if (System.getProperty("tests.rest.bwcsuite") == null) { + filter { + excludeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + } + // Exclude integration tests that require security plugin if (System.getProperty("https") == null || System.getProperty("https") == "false") { filter { @@ -425,6 +452,166 @@ task integTestRemote(type: RestIntegTestTask) { } } +2.times {i -> + testClusters { + "${baseName}$i" { + testDistribution = "ARCHIVE" + versions = [bwcVersionShort, opensearch_version] + numberOfNodes = 3 + plugin(provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + if (new File("$project.rootDir/$bwcFilePath/flow-framework/$bwcVersion").exists()) { + project.delete(files("$project.rootDir/$bwcFilePath/flow-framework/$bwcVersion")) + } + project.mkdir bwcFlowFrameworkPath + bwcVersion + ant.get(src: bwcOpenSearchFFDownload, + dest: bwcFlowFrameworkPath + bwcVersion, + httpusecaches: false) + return fileTree(bwcFlowFrameworkPath + bwcVersion).getSingleFile() + } + } + } + })) + setting 'path.repo', "${buildDir}/cluster/shared/repo/${baseName}" + setting 'http.content_type.required', 'true' + } + } +} + +List> plugins = [ + provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.getSingleFile() + } + } + } + }), + provider(new Callable(){ + @Override + RegularFile call() throws Exception { + return new RegularFile() { + @Override + File getAsFile() { + return fileTree(bwcFilePath + "flow-framework/" + project.version).getSingleFile() + } + } + } + }) + ] + +// Creates 2 test clusters with 3 nodes of the old version. +2.times {i -> + task "${baseName}#oldVersionClusterTask$i"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion || (i == 1) } + useCluster testClusters."${baseName}$i" + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'old_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'old' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}$i".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}$i".getName()}") + } +} + +// Upgrades one node of the old cluster to new OpenSearch version with upgraded plugin version +// This results in a mixed cluster with 2 nodes on the old version and 1 upgraded node. +// This is also used as a one third upgraded cluster for a rolling upgrade. +task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion } + useCluster testClusters."${baseName}0" + dependsOn "${baseName}#oldVersionClusterTask0" + doFirst { + testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'mixed_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'first' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") +} + +// Upgrades the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded. +// This results in a mixed cluster with 1 node on the old version and 2 upgraded nodes. +// This is used for rolling upgrade. +task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion } + dependsOn "${baseName}#mixedClusterTask" + useCluster testClusters."${baseName}0" + doFirst { + testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'mixed_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'second' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") +} + +// Upgrades the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded. +// This results in a fully upgraded cluster. +// This is used for rolling upgrade. +task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask) { + onlyIf { isSameMajorVersion } + dependsOn "${baseName}#twoThirdsUpgradedClusterTask" + useCluster testClusters."${baseName}0" + doFirst { + testClusters."${baseName}0".upgradeNodeAndPluginToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + mustRunAfter "${baseName}#mixedClusterTask" + systemProperty 'tests.rest.bwcsuite', 'mixed_cluster' + systemProperty 'tests.rest.bwcsuite_round', 'third' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}0".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}0".getName()}") +} + +// Upgrades all the nodes of the old cluster to new OpenSearch version with upgraded plugin version +// at the same time resulting in a fully upgraded cluster. +task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) { + dependsOn "${baseName}#oldVersionClusterTask1" + useCluster testClusters."${baseName}1" + doFirst { + testClusters."${baseName}1".upgradeAllNodesAndPluginsToNextVersion(plugins) + } + filter { + includeTestsMatching "org.opensearch.flowframework.bwc.*IT" + } + systemProperty 'tests.rest.bwcsuite', 'upgraded_cluster' + systemProperty 'tests.plugin_bwc_version', bwcVersion + nonInputProperties.systemProperty('tests.rest.cluster', "${-> testClusters."${baseName}1".allHttpSocketURI.join(",")}") + nonInputProperties.systemProperty('tests.clustername', "${-> testClusters."${baseName}1".getName()}") +} + +// A bwc test suite which runs all the bwc tasks combined. +task bwcTestSuite(type: StandaloneRestIntegTestTask) { + filter { + excludeTestsMatching '**.*Test*' + excludeTestsMatching '**.*IT*' + setFailOnNoMatchingTests(false) + } + dependsOn tasks.named("${baseName}#mixedClusterTask") + dependsOn tasks.named("${baseName}#rollingUpgradeClusterTask") + dependsOn tasks.named("${baseName}#fullRestartClusterTask") +} // test retry configuration allprojects { @@ -438,6 +625,11 @@ allprojects { } } } + // Needed for Gradle 9.0 + tasks.withType(StandaloneRestIntegTestTask).configureEach { + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath + } } // Automatically sets up the integration test cluster locally diff --git a/src/main/java/org/opensearch/flowframework/common/CommonValue.java b/src/main/java/org/opensearch/flowframework/common/CommonValue.java index f4bec21a7..ec88a3778 100644 --- a/src/main/java/org/opensearch/flowframework/common/CommonValue.java +++ b/src/main/java/org/opensearch/flowframework/common/CommonValue.java @@ -48,6 +48,12 @@ private CommonValue() {} public static final String CREATE_TIME = "create_time"; /** The template field name for the user who created the workflow **/ public static final String USER_FIELD = "user"; + /** The created time field */ + public static final String CREATED_TIME = "created_time"; + /** The last updated time field */ + public static final String LAST_UPDATED_TIME_FIELD = "last_updated_time"; + /** The last provisioned time field */ + public static final String LAST_PROVISIONED_TIME_FIELD = "last_provisioned_time"; /* * Constants associated with Rest or Transport actions @@ -156,10 +162,6 @@ private CommonValue() {} public static final String APP_TYPE_FIELD = "app_type"; /** To include field for an agent response */ public static final String INCLUDE_OUTPUT_IN_AGENT_RESPONSE = "include_output_in_agent_response"; - /** The created time field for an agent */ - public static final String CREATED_TIME = "created_time"; - /** The last updated time field for an agent */ - public static final String LAST_UPDATED_TIME_FIELD = "last_updated_time"; /** HttpHost */ public static final String HTTP_HOST_FIELD = "http_host"; /** Http scheme */ diff --git a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java index b0a629216..0547c875a 100644 --- a/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java +++ b/src/main/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandler.java @@ -395,6 +395,22 @@ public void putInitialStateToWorkflowState(String workflowId, User user, ActionL * @param listener action listener */ public void updateTemplateInGlobalContext(String documentId, Template template, ActionListener listener) { + updateTemplateInGlobalContext(documentId, template, listener, false); + } + + /** + * Replaces a document in the global context index + * @param documentId the document Id + * @param template the use-case template + * @param listener action listener + * @param ignoreNotStartedCheck if set true, ignores the requirement that the provisioning is not started + */ + public void updateTemplateInGlobalContext( + String documentId, + Template template, + ActionListener listener, + boolean ignoreNotStartedCheck + ) { if (!doesIndexExist(GLOBAL_CONTEXT_INDEX)) { String errorMessage = "Failed to update template for workflow_id : " + documentId + ", global_context index does not exist."; logger.error(errorMessage); @@ -404,7 +420,7 @@ public void updateTemplateInGlobalContext(String documentId, Template template, doesTemplateExist(documentId, templateExists -> { if (templateExists) { isWorkflowNotStarted(documentId, workflowIsNotStarted -> { - if (workflowIsNotStarted) { + if (workflowIsNotStarted || ignoreNotStartedCheck) { IndexRequest request = new IndexRequest(GLOBAL_CONTEXT_INDEX).id(documentId); try ( XContentBuilder builder = XContentFactory.jsonBuilder(); diff --git a/src/main/java/org/opensearch/flowframework/model/Template.java b/src/main/java/org/opensearch/flowframework/model/Template.java index 8ef914d65..71632d003 100644 --- a/src/main/java/org/opensearch/flowframework/model/Template.java +++ b/src/main/java/org/opensearch/flowframework/model/Template.java @@ -19,16 +19,22 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.core.xcontent.XContentParser; import org.opensearch.flowframework.exception.FlowFrameworkException; +import org.opensearch.flowframework.util.ParseUtils; import java.io.IOException; +import java.time.Instant; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.flowframework.common.CommonValue.CREATED_TIME; import static org.opensearch.flowframework.common.CommonValue.DESCRIPTION_FIELD; +import static org.opensearch.flowframework.common.CommonValue.LAST_PROVISIONED_TIME_FIELD; +import static org.opensearch.flowframework.common.CommonValue.LAST_UPDATED_TIME_FIELD; import static org.opensearch.flowframework.common.CommonValue.NAME_FIELD; import static org.opensearch.flowframework.common.CommonValue.UI_METADATA_FIELD; import static org.opensearch.flowframework.common.CommonValue.USER_FIELD; @@ -48,14 +54,17 @@ public class Template implements ToXContentObject { /** The template field name for template use case */ public static final String USE_CASE_FIELD = "use_case"; - private String name; - private String description; - private String useCase; // probably an ENUM actually - private Version templateVersion; - private List compatibilityVersion; - private Map workflows; - private Map uiMetadata; - private User user; + private final String name; + private final String description; + private final String useCase; // probably an ENUM actually + private final Version templateVersion; + private final List compatibilityVersion; + private final Map workflows; + private final Map uiMetadata; + private final User user; + private final Instant createdTime; + private final Instant lastUpdatedTime; + private final Instant lastProvisionedTime; /** * Instantiate the object representing a use case template @@ -68,6 +77,9 @@ public class Template implements ToXContentObject { * @param workflows Workflow graph definitions corresponding to the defined operations. * @param uiMetadata The UI metadata related to the given workflow * @param user The user extracted from the thread context from the request + * @param createdTime Created time in milliseconds since the epoch + * @param lastUpdatedTime Last Updated time in milliseconds since the epoch + * @param lastProvisionedTime Last Provisioned time in milliseconds since the epoch */ public Template( String name, @@ -77,7 +89,10 @@ public Template( List compatibilityVersion, Map workflows, Map uiMetadata, - User user + User user, + Instant createdTime, + Instant lastUpdatedTime, + Instant lastProvisionedTime ) { this.name = name; this.description = description; @@ -87,10 +102,11 @@ public Template( this.workflows = Map.copyOf(workflows); this.uiMetadata = uiMetadata; this.user = user; + this.createdTime = createdTime; + this.lastUpdatedTime = lastUpdatedTime; + this.lastProvisionedTime = lastProvisionedTime; } - private Template() {} - /** * Class for constructing a Builder for Template */ @@ -99,16 +115,43 @@ public static class Builder { private String description = ""; private String useCase = ""; private Version templateVersion = null; - private List compatibilityVersion = new ArrayList<>(); - private Map workflows = new HashMap<>(); + private List compatibilityVersion = Collections.emptyList(); + private Map workflows = Collections.emptyMap(); private Map uiMetadata = null; private User user = null; + private Instant createdTime = null; + private Instant lastUpdatedTime = null; + private Instant lastProvisionedTime = null; /** * Empty Constructor for the Builder object */ public Builder() {} + /** + * Construct a Builder from an existing template + * @param t The existing template to copy + */ + public Builder(Template t) { + this.name = t.name(); + this.description = t.description(); + this.useCase = t.useCase(); + this.templateVersion = t.templateVersion; + if (t.compatibilityVersion() != null) { + this.compatibilityVersion = List.copyOf(t.compatibilityVersion()); + } + if (t.workflows() != null) { + this.workflows = Map.copyOf(t.workflows()); + } + if (t.getUiMetadata() != null) { + this.uiMetadata = Map.copyOf(t.getUiMetadata()); + } + this.user = t.getUser(); + this.createdTime = t.createdTime(); + this.lastUpdatedTime = t.lastUpdatedTime(); + this.lastProvisionedTime = t.lastProvisionedTime(); + } + /** * Builder method for adding template name * @param name template name @@ -189,23 +232,55 @@ public Builder user(User user) { return this; } + /** + * Builder method for adding createdTime + * @param createdTime created time in milliseconds since the epoch + * @return the Builder object + */ + public Builder createdTime(Instant createdTime) { + this.createdTime = createdTime; + return this; + } + + /** + * Builder method for adding lastUpdatedTime + * @param lastUpdatedTime last updated time in milliseconds since the epoch + * @return the Builder object + */ + public Builder lastUpdatedTime(Instant lastUpdatedTime) { + this.lastUpdatedTime = lastUpdatedTime; + return this; + } + + /** + * Builder method for adding lastProvisionedTime + * @param lastProvisionedTime last provisioned time in milliseconds since the epoch + * @return the Builder object + */ + public Builder lastProvisionedTime(Instant lastProvisionedTime) { + this.lastProvisionedTime = lastProvisionedTime; + return this; + } + /** * Allows building a template * @return Template Object containing all needed fields */ public Template build() { - Template template = new Template(); - template.name = this.name; - template.description = this.description; - template.useCase = this.useCase; - template.templateVersion = this.templateVersion; - template.compatibilityVersion = this.compatibilityVersion; - template.workflows = this.workflows; - template.uiMetadata = this.uiMetadata; - template.user = this.user; - return template; + return new Template( + this.name, + this.description, + this.useCase, + this.templateVersion, + this.compatibilityVersion, + this.workflows, + this.uiMetadata, + this.user, + this.createdTime, + this.lastUpdatedTime, + this.lastProvisionedTime + ); } - } @Override @@ -244,6 +319,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws xContentBuilder.field(USER_FIELD, user); } + if (createdTime != null) { + xContentBuilder.field(CREATED_TIME, createdTime.toEpochMilli()); + } + + if (lastUpdatedTime != null) { + xContentBuilder.field(LAST_UPDATED_TIME_FIELD, lastUpdatedTime.toEpochMilli()); + } + + if (lastProvisionedTime != null) { + xContentBuilder.field(LAST_PROVISIONED_TIME_FIELD, lastProvisionedTime.toEpochMilli()); + } + return xContentBuilder.endObject(); } @@ -263,6 +350,9 @@ public static Template parse(XContentParser parser) throws IOException { Map workflows = new HashMap<>(); Map uiMetadata = null; User user = null; + Instant createdTime = null; + Instant lastUpdatedTime = null; + Instant lastProvisionedTime = null; ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.currentToken(), parser); while (parser.nextToken() != XContentParser.Token.END_OBJECT) { @@ -315,6 +405,15 @@ public static Template parse(XContentParser parser) throws IOException { case USER_FIELD: user = User.parse(parser); break; + case CREATED_TIME: + createdTime = ParseUtils.parseInstant(parser); + break; + case LAST_UPDATED_TIME_FIELD: + lastUpdatedTime = ParseUtils.parseInstant(parser); + break; + case LAST_PROVISIONED_TIME_FIELD: + lastProvisionedTime = ParseUtils.parseInstant(parser); + break; default: throw new FlowFrameworkException( "Unable to parse field [" + fieldName + "] in a template object.", @@ -334,6 +433,9 @@ public static Template parse(XContentParser parser) throws IOException { .workflows(workflows) .uiMetadata(uiMetadata) .user(user) + .createdTime(createdTime) + .lastUpdatedTime(lastUpdatedTime) + .lastProvisionedTime(lastProvisionedTime) .build(); } @@ -449,6 +551,30 @@ public User getUser() { return user; } + /** + * Time the template was created + * @return the createdTime + */ + public Instant createdTime() { + return createdTime; + } + + /** + * Time the template was last updated + * @return the lastUpdatedTime + */ + public Instant lastUpdatedTime() { + return lastUpdatedTime; + } + + /** + * Time the template was last provisioned + * @return the lastProvisionedTime + */ + public Instant lastProvisionedTime() { + return lastProvisionedTime; + } + @Override public String toString() { return "Template [name=" @@ -464,7 +590,7 @@ public String toString() { + ", workflows=" + workflows + ", uiMedata=" - + uiMetadata + + (uiMetadata == null ? "{}" : uiMetadata) + "]"; } } diff --git a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java index cccbc26e2..677278a93 100644 --- a/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/CreateWorkflowTransportAction.java @@ -11,6 +11,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; +import org.opensearch.action.get.GetRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; @@ -38,12 +39,14 @@ import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import static java.lang.Boolean.FALSE; +import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.opensearch.flowframework.common.CommonValue.PROVISIONING_PROGRESS_FIELD; import static org.opensearch.flowframework.common.CommonValue.STATE_FIELD; import static org.opensearch.flowframework.util.ParseUtils.getUserContext; @@ -93,6 +96,7 @@ public CreateWorkflowTransportAction( protected void doExecute(Task task, WorkflowRequest request, ActionListener listener) { User user = getUserContext(client); + Instant creationTime = Instant.now(); Template templateWithUser = new Template( request.getTemplate().name(), request.getTemplate().description(), @@ -101,7 +105,10 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { - flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( - request.getWorkflowId(), - Map.ofEntries( - Map.entry(STATE_FIELD, State.NOT_STARTED), - Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED) - ), - ActionListener.wrap(updateResponse -> { - logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name()); - listener.onResponse(new WorkflowResponse(request.getWorkflowId())); - }, exception -> { - String errorMessage = "Failed to update workflow " + request.getWorkflowId() + " in template index"; - logger.error(errorMessage, exception); - if (exception instanceof FlowFrameworkException) { - listener.onFailure(exception); - } else { - listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); - } - }) - ); - }, exception -> { - String errorMessage = "Failed to update use case template " + request.getWorkflowId(); - logger.error(errorMessage, exception); - if (exception instanceof FlowFrameworkException) { - listener.onFailure(exception); + // This is an existing workflow (PUT) + // Fetch existing entry for time stamps + logger.info("Querying existing workflow from global context: {}", workflowId); + try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { + client.get(new GetRequest(GLOBAL_CONTEXT_INDEX, workflowId), ActionListener.wrap(getResponse -> { + context.restore(); + if (getResponse.isExists()) { + Template existingTemplate = Template.parse(getResponse.getSourceAsString()); + // Update existing entry, full document replacement + Template template = new Template.Builder(templateWithUser).createdTime(existingTemplate.createdTime()) + .lastUpdatedTime(Instant.now()) + .lastProvisionedTime(existingTemplate.lastProvisionedTime()) + .build(); + flowFrameworkIndicesHandler.updateTemplateInGlobalContext( + request.getWorkflowId(), + template, + ActionListener.wrap(response -> { + flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( + request.getWorkflowId(), + Map.ofEntries( + Map.entry(STATE_FIELD, State.NOT_STARTED), + Map.entry(PROVISIONING_PROGRESS_FIELD, ProvisioningProgress.NOT_STARTED) + ), + ActionListener.wrap(updateResponse -> { + logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.NOT_STARTED.name()); + listener.onResponse(new WorkflowResponse(request.getWorkflowId())); + }, exception -> { + String errorMessage = "Failed to update workflow " + request.getWorkflowId() + " in template index"; + logger.error(errorMessage, exception); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure( + new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)) + ); + } + }) + ); + }, exception -> { + String errorMessage = "Failed to update use case template " + request.getWorkflowId(); + logger.error(errorMessage, exception); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); + } + }) + ); } else { - listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); + String errorMessage = "Failed to retrieve template (" + workflowId + ") from global context."; + logger.error(errorMessage); + listener.onFailure(new FlowFrameworkException(errorMessage, RestStatus.NOT_FOUND)); } - }) - ); + }, exception -> { + String errorMessage = "Failed to retrieve template (" + workflowId + ") from global context."; + logger.error(errorMessage, exception); + listener.onFailure(new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception))); + })); + } } } diff --git a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java index 4ee10d557..205b749f9 100644 --- a/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java +++ b/src/main/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportAction.java @@ -118,10 +118,10 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { if (TRUE.equals(workflowIsNotStarted)) { + // update state index flowFrameworkIndicesHandler.updateFlowFrameworkSystemIndexDoc( workflowId, Map.ofEntries( @@ -145,7 +146,27 @@ protected void doExecute(Task task, WorkflowRequest request, ActionListener { logger.info("updated workflow {} state to {}", request.getWorkflowId(), State.PROVISIONING); executeWorkflowAsync(workflowId, provisionProcessSequence, listener); - listener.onResponse(new WorkflowResponse(workflowId)); + // update last provisioned field in template + Template newTemplate = new Template.Builder(template).lastProvisionedTime(Instant.now()).build(); + flowFrameworkIndicesHandler.updateTemplateInGlobalContext( + request.getWorkflowId(), + newTemplate, + ActionListener.wrap(templateResponse -> { + listener.onResponse(new WorkflowResponse(request.getWorkflowId())); + }, exception -> { + String errorMessage = "Failed to update use case template " + request.getWorkflowId(); + logger.error(errorMessage, exception); + if (exception instanceof FlowFrameworkException) { + listener.onFailure(exception); + } else { + listener.onFailure( + new FlowFrameworkException(errorMessage, ExceptionsHelper.status(exception)) + ); + } + }), + // We've already checked workflow is not started, ignore second check + true + ); }, exception -> { String errorMessage = "Failed to update workflow state: " + workflowId; logger.error(errorMessage, exception); diff --git a/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java b/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java index 52a43928a..df7e66e07 100644 --- a/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java +++ b/src/main/java/org/opensearch/flowframework/util/EncryptorUtils.java @@ -126,8 +126,6 @@ public Template decryptTemplateCredentials(Template template) { * @return template with encrypted credentials */ private Template processTemplateCredentials(Template template, Function cipherFunction) { - Template.Builder processedTemplateBuilder = new Template.Builder(); - Map processedWorkflows = new HashMap<>(); for (Map.Entry entry : template.workflows().entrySet()) { @@ -161,17 +159,7 @@ private Template processTemplateCredentials(Template template, Function processedWorkflows = new HashMap<>(); for (Map.Entry entry : template.workflows().entrySet()) { @@ -241,17 +227,7 @@ public Template redactTemplateCredentials(Template template) { processedWorkflows.put(entry.getKey(), new Workflow(entry.getValue().userParams(), processedNodes, entry.getValue().edges())); } - Template processedTemplate = redactedTemplateBuilder.name(template.name()) - .description(template.description()) - .useCase(template.useCase()) - .templateVersion(template.templateVersion()) - .compatibilityVersion(template.compatibilityVersion()) - .workflows(processedWorkflows) - .uiMetadata(template.getUiMetadata()) - .user(template.getUser()) - .build(); - - return processedTemplate; + return new Template.Builder(template).workflows(processedWorkflows).build(); } /** diff --git a/src/main/resources/mappings/global-context.json b/src/main/resources/mappings/global-context.json index dd282f40a..61b8a5487 100644 --- a/src/main/resources/mappings/global-context.json +++ b/src/main/resources/mappings/global-context.json @@ -1,7 +1,7 @@ { "dynamic": false, "_meta": { - "schema_version": 1 + "schema_version": 2 }, "properties": { "workflow_id": { @@ -73,6 +73,18 @@ } } } + }, + "created_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "last_updated_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + }, + "last_provisioned_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" } } } diff --git a/src/test/java/org/opensearch/flowframework/bwc/FlowFrameworkBackwardsCompatibilityIT.java b/src/test/java/org/opensearch/flowframework/bwc/FlowFrameworkBackwardsCompatibilityIT.java new file mode 100644 index 000000000..108252465 --- /dev/null +++ b/src/test/java/org/opensearch/flowframework/bwc/FlowFrameworkBackwardsCompatibilityIT.java @@ -0,0 +1,189 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ +package org.opensearch.flowframework.bwc; + +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.message.BasicHeader; +import org.opensearch.client.Response; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.flowframework.TestHelpers; +import org.opensearch.flowframework.model.Template; +import org.opensearch.test.rest.OpenSearchRestTestCase; +import org.junit.Before; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID; + +public class FlowFrameworkBackwardsCompatibilityIT extends OpenSearchRestTestCase { + + private static final ClusterType CLUSTER_TYPE = ClusterType.parse(System.getProperty("tests.rest.bwcsuite")); + private static final String CLUSTER_NAME = System.getProperty("tests.clustername"); + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + setupSettings(); + } + + private void setupSettings() throws IOException { + // Enable Flow Framework Plugin Rest APIs + Response response = TestHelpers.makeRequest( + client(), + "PUT", + "_cluster/settings", + null, + "{\"transient\":{\"plugins.flow_framework.enabled\":true}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(200, response.getStatusLine().getStatusCode()); + } + + @Override + protected final boolean preserveIndicesUponCompletion() { + return true; + } + + @Override + protected final boolean preserveReposUponCompletion() { + return true; + } + + @Override + protected boolean preserveTemplatesUponCompletion() { + return true; + } + + @Override + protected final Settings restClientSettings() { + return Settings.builder() + .put(super.restClientSettings()) + // increase the timeout here to 90 seconds to handle long waits for a green + // cluster health. the waits for green need to be longer than a minute to + // account for delayed shards + .put(OpenSearchRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") + .build(); + } + + private enum ClusterType { + OLD, + MIXED, + UPGRADED; + + public static ClusterType parse(String value) { + switch (value) { + case "old_cluster": + return OLD; + case "mixed_cluster": + return MIXED; + case "upgraded_cluster": + return UPGRADED; + default: + throw new AssertionError("unknown cluster type: " + value); + } + } + } + + @SuppressWarnings("unchecked") + public void testBackwardsCompatibility() throws Exception { + // This iteration of nodes is only to get plugins installed on each node. We don't currently use its functionality but in case we + // ever need version-based dependencies in future BWC tests it will be needed. It's directly copied from similar implementations + // in other plugins. + String uri = getUri(); + Map> responseMap = (Map>) getAsMap(uri).get("nodes"); + for (Map response : responseMap.values()) { + List> plugins = (List>) response.get("plugins"); + Set pluginNames = plugins.stream().map(map -> map.get("name")).collect(Collectors.toSet()); + assertTrue(pluginNames.contains("opensearch-flow-framework")); + String workflowId = createNoopTemplate(); + Template t = getTemplate(workflowId); + switch (CLUSTER_TYPE) { + case OLD: + // mapping for 2.12 does not include time stamps + assertNull(t.createdTime()); + assertNull(t.lastUpdatedTime()); + assertNull(t.lastProvisionedTime()); + break; + case MIXED: + // Time stamps may or may not be null depending on whether index has been accessed by new version node + assertNull(t.lastProvisionedTime()); + break; + case UPGRADED: + // mapping for 2.13+ includes time stamps + assertNotNull(t.createdTime()); + assertEquals(t.createdTime(), t.lastUpdatedTime()); + assertNull(t.lastProvisionedTime()); + break; + } + break; + } + } + + private String getUri() { + switch (CLUSTER_TYPE) { + case OLD: + return "_nodes/" + CLUSTER_NAME + "-0/plugins"; + case MIXED: + String round = System.getProperty("tests.rest.bwcsuite_round"); + if (round.equals("second")) { + return "_nodes/" + CLUSTER_NAME + "-1/plugins"; + } else if (round.equals("third")) { + return "_nodes/" + CLUSTER_NAME + "-2/plugins"; + } else { + return "_nodes/" + CLUSTER_NAME + "-0/plugins"; + } + case UPGRADED: + return "_nodes/plugins"; + default: + throw new AssertionError("unknown cluster type: " + CLUSTER_TYPE); + } + } + + private String createNoopTemplate() throws IOException, ParseException { + Response response = TestHelpers.makeRequest( + client(), + "POST", + "_plugins/_flow_framework/workflow", + null, + "{\"name\":\"test\", \"workflows\":{\"provision\": {\"nodes\": [{\"id\":\"test-step\", \"type\":\"noop\"}]}}}", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(RestStatus.CREATED.getStatus(), response.getStatusLine().getStatusCode()); + + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + assertNotNull(workflowId); + return workflowId; + } + + private Template getTemplate(String workflowId) throws IOException, ParseException { + Response response = TestHelpers.makeRequest( + client(), + "GET", + "_plugins/_flow_framework/workflow/" + workflowId, + null, + "", + List.of(new BasicHeader(HttpHeaders.USER_AGENT, "")) + ); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + + String body = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8); + return Template.parse(body); + } +} diff --git a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java index eda561f90..f80166b13 100644 --- a/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java +++ b/src/test/java/org/opensearch/flowframework/indices/FlowFrameworkIndicesHandlerTests.java @@ -120,7 +120,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + null, + null, + null ); } diff --git a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java index e0b88d725..63dbf31f6 100644 --- a/src/test/java/org/opensearch/flowframework/model/TemplateTests.java +++ b/src/test/java/org/opensearch/flowframework/model/TemplateTests.java @@ -14,6 +14,7 @@ import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; +import java.time.Instant; import java.util.Collections; import java.util.List; import java.util.Map; @@ -40,6 +41,7 @@ public void testTemplate() throws IOException { Workflow workflow = new Workflow(Map.of("key", "value"), nodes, edges); Map uiMetadata = null; + Instant now = Instant.now(); Template template = new Template( "test", "a test template", @@ -48,6 +50,9 @@ public void testTemplate() throws IOException { compatibilityVersion, Map.of("workflow", workflow), uiMetadata, + null, + now, + now, null ); @@ -59,6 +64,9 @@ public void testTemplate() throws IOException { assertEquals(uiMetadata, template.getUiMetadata()); Workflow wf = template.workflows().get("workflow"); assertNotNull(wf); + assertEquals(now, template.createdTime()); + assertEquals(now, template.lastUpdatedTime()); + assertNull(template.lastProvisionedTime()); assertEquals("Workflow [userParams={key=value}, nodes=[A, B], edges=[A->B]]", wf.toString()); String json = TemplateTestJsonUtil.parseToJson(template); @@ -72,6 +80,9 @@ public void testTemplate() throws IOException { assertEquals(uiMetadata, templateX.getUiMetadata()); Workflow wfX = templateX.workflows().get("workflow"); assertNotNull(wfX); + assertEquals(now, template.createdTime()); + assertEquals(now, template.lastUpdatedTime()); + assertNull(template.lastProvisionedTime()); assertEquals("Workflow [userParams={key=value}, nodes=[A, B], edges=[A->B]]", wfX.toString()); } diff --git a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java index 5e65d8cb1..dbb966489 100644 --- a/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java +++ b/src/test/java/org/opensearch/flowframework/rest/FlowFrameworkRestApiIT.java @@ -8,6 +8,7 @@ */ package org.opensearch.flowframework.rest; +import org.apache.hc.core5.http.io.entity.EntityUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.action.search.SearchResponse; @@ -27,6 +28,8 @@ import org.junit.Before; import org.junit.ComparisonFailure; +import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.util.Collections; import java.util.HashSet; import java.util.List; @@ -80,7 +83,7 @@ public void testFailedUpdateWorkflow() throws Exception { Template template = TestHelpers.createTemplateFromFile("createconnector-registerremotemodel-deploymodel.json"); ResponseException exception = expectThrows(ResponseException.class, () -> updateWorkflow(client(), "123", template)); - assertTrue(exception.getMessage().contains("Failed to get template: 123")); + assertTrue(exception.getMessage().contains("Failed to retrieve template (123) from global context.")); Response response = createWorkflow(client(), template); assertEquals(RestStatus.CREATED, TestHelpers.restStatus(response)); @@ -128,15 +131,7 @@ public void testCreateAndProvisionLocalModelWorkflow() throws Exception { ) .collect(Collectors.toList()); Workflow missingInputs = new Workflow(originalWorkflow.userParams(), modifiednodes, originalWorkflow.edges()); - Template templateWithMissingInputs = new Template.Builder().name(template.name()) - .description(template.description()) - .useCase(template.useCase()) - .templateVersion(template.templateVersion()) - .compatibilityVersion(template.compatibilityVersion()) - .workflows(Map.of(PROVISION_WORKFLOW, missingInputs)) - .uiMetadata(template.getUiMetadata()) - .user(template.getUser()) - .build(); + Template templateWithMissingInputs = new Template.Builder(template).workflows(Map.of(PROVISION_WORKFLOW, missingInputs)).build(); // Hit Create Workflow API with invalid template Response response = createWorkflow(client(), templateWithMissingInputs); @@ -188,15 +183,7 @@ public void testCreateAndProvisionCyclicalTemplate() throws Exception { List.of(new WorkflowEdge("workflow_step_2", "workflow_step_3"), new WorkflowEdge("workflow_step_3", "workflow_step_2")) ); - Template cyclicalTemplate = new Template.Builder().name(template.name()) - .description(template.description()) - .useCase(template.useCase()) - .templateVersion(template.templateVersion()) - .compatibilityVersion(template.compatibilityVersion()) - .workflows(Map.of(PROVISION_WORKFLOW, cyclicalWorkflow)) - .uiMetadata(template.getUiMetadata()) - .user(template.getUser()) - .build(); + Template cyclicalTemplate = new Template.Builder(template).workflows(Map.of(PROVISION_WORKFLOW, cyclicalWorkflow)).build(); // Hit dry run ResponseException exception = expectThrows(ResponseException.class, () -> createWorkflowValidation(client(), cyclicalTemplate)); @@ -313,4 +300,48 @@ public void testCreateAndProvisionAgentFrameworkWorkflow() throws Exception { assertEquals(RestStatus.OK, TestHelpers.restStatus(deleteResponse)); } + public void testTimestamps() throws Exception { + Template noopTemplate = TestHelpers.createTemplateFromFile("noop.json"); + // Create the template, should have created and updated matching + Response response = createWorkflow(client(), noopTemplate); + Map responseMap = entityAsMap(response); + String workflowId = (String) responseMap.get(WORKFLOW_ID); + assertNotNull(workflowId); + + response = getWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + Template t = Template.parse(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + Instant createdTime = t.createdTime(); + Instant lastUpdatedTime = t.lastUpdatedTime(); + assertNotNull(createdTime); + assertEquals(createdTime, lastUpdatedTime); + assertNull(t.lastProvisionedTime()); + + // Update the template, should have created same as before and updated newer + response = updateWorkflow(client(), workflowId, noopTemplate); + assertEquals(RestStatus.CREATED.getStatus(), response.getStatusLine().getStatusCode()); + + response = getWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + t = Template.parse(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + assertEquals(createdTime, t.createdTime()); + assertTrue(t.lastUpdatedTime().isAfter(lastUpdatedTime)); + lastUpdatedTime = t.lastUpdatedTime(); + assertNull(t.lastProvisionedTime()); + + // Provision the template, should have created and updated same as before and provisioned newer + response = provisionWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + + response = getWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + t = Template.parse(EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8)); + assertEquals(createdTime, t.createdTime()); + assertEquals(lastUpdatedTime, t.lastUpdatedTime()); + assertTrue(t.lastProvisionedTime().isAfter(lastUpdatedTime)); + + // Clean up + response = deleteWorkflow(client(), workflowId); + assertEquals(RestStatus.OK.getStatus(), response.getStatusLine().getStatusCode()); + } } diff --git a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java index d95f10375..250d00e87 100644 --- a/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java +++ b/src/test/java/org/opensearch/flowframework/rest/RestCreateWorkflowActionTests.java @@ -73,7 +73,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + null, + null, + null ); // Invalid template configuration, wrong field name diff --git a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java index 7dd8267f1..f5e57c588 100644 --- a/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/CreateWorkflowTransportActionTests.java @@ -10,6 +10,8 @@ import org.apache.lucene.search.TotalHits; import org.opensearch.Version; +import org.opensearch.action.get.GetRequest; +import org.opensearch.action.get.GetResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; @@ -123,7 +125,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + null, + null, + null ); } @@ -185,7 +190,10 @@ public void testValidation_Failed() throws Exception { List.of(Version.fromString("2.0.0"), Version.fromString("3.0.0")), Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + null, + null, + null ); @SuppressWarnings("unchecked") @@ -315,6 +323,15 @@ public void testFailedToUpdateWorkflow() { ActionListener listener = mock(ActionListener.class); WorkflowRequest updateWorkflow = new WorkflowRequest("1", template); + doAnswer(invocation -> { + ActionListener getListener = invocation.getArgument(1); + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(true); + when(getResponse.getSourceAsString()).thenReturn(template.toJson()); + getListener.onResponse(getResponse); + return null; + }).when(client).get(any(GetRequest.class), any()); + doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(2); responseListener.onFailure(new Exception("failed")); @@ -327,11 +344,45 @@ public void testFailedToUpdateWorkflow() { assertEquals("Failed to update use case template 1", exceptionCaptor.getValue().getMessage()); } + public void testFailedToUpdateNonExistingWorkflow() { + @SuppressWarnings("unchecked") + ActionListener listener = mock(ActionListener.class); + WorkflowRequest updateWorkflow = new WorkflowRequest("2", template); + + doAnswer(invocation -> { + ActionListener getListener = invocation.getArgument(1); + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(false); + getListener.onResponse(getResponse); + return null; + }).when(client).get(any(GetRequest.class), any()); + + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(2); + responseListener.onFailure(new Exception("failed")); + return null; + }).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(any(), any(Template.class), any()); + + createWorkflowTransportAction.doExecute(mock(Task.class), updateWorkflow, listener); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + verify(listener, times(1)).onFailure(exceptionCaptor.capture()); + assertEquals("Failed to retrieve template (2) from global context.", exceptionCaptor.getValue().getMessage()); + } + public void testUpdateWorkflow() { @SuppressWarnings("unchecked") ActionListener listener = mock(ActionListener.class); WorkflowRequest updateWorkflow = new WorkflowRequest("1", template); + doAnswer(invocation -> { + ActionListener getListener = invocation.getArgument(1); + GetResponse getResponse = mock(GetResponse.class); + when(getResponse.isExists()).thenReturn(true); + when(getResponse.getSourceAsString()).thenReturn(new Template.Builder().name("test").build().toJson()); + getListener.onResponse(getResponse); + return null; + }).when(client).get(any(GetRequest.class), any()); + doAnswer(invocation -> { ActionListener responseListener = invocation.getArgument(2); responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true)); @@ -501,7 +552,10 @@ private Template generateValidTemplate() { List.of(Version.fromString("2.0.0"), Version.fromString("3.0.0")), Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + null, + null, + null ); return validTemplate; diff --git a/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java index a3b259ad9..cdc4e5814 100644 --- a/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/GetWorkflowTransportActionTests.java @@ -89,7 +89,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("provision", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + null, + null, + null ); ThreadPool clientThreadPool = mock(ThreadPool.class); diff --git a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java index 2b2f58532..9e60c0407 100644 --- a/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/ProvisionWorkflowTransportActionTests.java @@ -11,6 +11,7 @@ import org.opensearch.Version; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; +import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.update.UpdateResponse; import org.opensearch.client.Client; @@ -19,6 +20,7 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.index.shard.ShardId; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.flowframework.TestHelpers; import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler; @@ -44,6 +46,7 @@ import static org.opensearch.flowframework.common.CommonValue.GLOBAL_CONTEXT_INDEX; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -99,7 +102,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("provision", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + null, + null, + null ); ThreadPool clientThreadPool = mock(ThreadPool.class); @@ -144,6 +150,12 @@ public void testProvisionWorkflow() { return null; }).when(flowFrameworkIndicesHandler).updateFlowFrameworkSystemIndexDoc(any(), any(), any()); + doAnswer(invocation -> { + ActionListener responseListener = invocation.getArgument(2); + responseListener.onResponse(new IndexResponse(new ShardId(GLOBAL_CONTEXT_INDEX, "", 1), "1", 1L, 1L, 1L, true)); + return null; + }).when(flowFrameworkIndicesHandler).updateTemplateInGlobalContext(any(), any(Template.class), any(), anyBoolean()); + provisionWorkflowTransportAction.doExecute(mock(Task.class), workflowRequest, listener); ArgumentCaptor responseCaptor = ArgumentCaptor.forClass(WorkflowResponse.class); verify(listener, times(1)).onResponse(responseCaptor.capture()); diff --git a/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java b/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java index 200312bec..98f9a1499 100644 --- a/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java +++ b/src/test/java/org/opensearch/flowframework/transport/WorkflowRequestResponseTests.java @@ -52,7 +52,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("workflow", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + null, + null, + null ); } @@ -71,7 +74,7 @@ public void testNullIdWorkflowRequest() throws IOException { WorkflowRequest streamInputRequest = new WorkflowRequest(in); assertEquals(nullIdRequest.getWorkflowId(), streamInputRequest.getWorkflowId()); - assertEquals(nullIdRequest.getTemplate().toJson(), streamInputRequest.getTemplate().toJson()); + assertEquals(nullIdRequest.getTemplate().toString(), streamInputRequest.getTemplate().toString()); assertNull(nullIdRequest.validate()); assertFalse(nullIdRequest.isProvision()); assertTrue(nullIdRequest.getParams().isEmpty()); @@ -113,7 +116,7 @@ public void testWorkflowRequest() throws IOException { WorkflowRequest streamInputRequest = new WorkflowRequest(in); assertEquals(workflowRequest.getWorkflowId(), streamInputRequest.getWorkflowId()); - assertEquals(workflowRequest.getTemplate().toJson(), streamInputRequest.getTemplate().toJson()); + assertEquals(workflowRequest.getTemplate().toString(), streamInputRequest.getTemplate().toString()); assertNull(workflowRequest.validate()); assertFalse(workflowRequest.isProvision()); assertTrue(workflowRequest.getParams().isEmpty()); @@ -134,7 +137,7 @@ public void testWorkflowRequestWithParams() throws IOException { WorkflowRequest streamInputRequest = new WorkflowRequest(in); assertEquals(workflowRequest.getWorkflowId(), streamInputRequest.getWorkflowId()); - assertEquals(workflowRequest.getTemplate().toJson(), streamInputRequest.getTemplate().toJson()); + assertEquals(workflowRequest.getTemplate().toString(), streamInputRequest.getTemplate().toString()); assertNull(workflowRequest.validate()); assertTrue(workflowRequest.isProvision()); assertEquals("bar", workflowRequest.getParams().get("foo")); diff --git a/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java b/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java index a952b4253..cae595430 100644 --- a/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java +++ b/src/test/java/org/opensearch/flowframework/util/EncryptorUtilsTests.java @@ -77,7 +77,10 @@ public void setUp() throws Exception { compatibilityVersions, Map.of("provision", workflow), Collections.emptyMap(), - TestHelpers.randomUser() + TestHelpers.randomUser(), + null, + null, + null ); ClusterState clusterState = mock(ClusterState.class); diff --git a/src/test/resources/template/noop.json b/src/test/resources/template/noop.json new file mode 100644 index 000000000..c0675151c --- /dev/null +++ b/src/test/resources/template/noop.json @@ -0,0 +1,13 @@ +{ + "name": "noop", + "workflows": { + "provision": { + "nodes": [ + { + "id": "no-op", + "type": "noop" + } + ] + } + } +}