Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Put plugin API behind a feature flag #142

Merged
merged 2 commits into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.IndexScopedSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.settings.SettingsFilter;
import org.opensearch.common.util.concurrent.OpenSearchExecutors;
Expand All @@ -24,6 +25,7 @@
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.indices.FlowFrameworkIndicesHandler;
import org.opensearch.flowframework.rest.RestCreateWorkflowAction;
import org.opensearch.flowframework.rest.RestProvisionWorkflowAction;
Expand Down Expand Up @@ -57,6 +59,8 @@
*/
public class FlowFrameworkPlugin extends Plugin implements ActionPlugin {

private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiate this plugin.
*/
Expand All @@ -76,6 +80,9 @@ public Collection<Object> createComponents(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<RepositoriesService> repositoriesServiceSupplier
) {
Settings settings = environment.settings();
flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings);

MachineLearningNodeClient mlClient = new MachineLearningNodeClient(client);
WorkflowStepFactory workflowStepFactory = new WorkflowStepFactory(clusterService, client, mlClient);
WorkflowProcessSorter workflowProcessSorter = new WorkflowProcessSorter(workflowStepFactory, threadPool);
Expand All @@ -95,7 +102,10 @@ public List<RestHandler> getRestHandlers(
IndexNameExpressionResolver indexNameExpressionResolver,
Supplier<DiscoveryNodes> nodesInCluster
) {
return ImmutableList.of(new RestCreateWorkflowAction(), new RestProvisionWorkflowAction());
return ImmutableList.of(
new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting),
new RestProvisionWorkflowAction(flowFrameworkFeatureEnabledSetting)
);
}

@Override
Expand All @@ -106,6 +116,12 @@ public List<RestHandler> getRestHandlers(
);
}

@Override
public List<Setting<?>> getSettings() {
List<Setting<?>> settings = ImmutableList.of(FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED);
return settings;
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
// TODO : Determine final size/queueSize values for the provision thread pool
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;

/**
* Controls enabling or disabling features of this plugin
*/
public class FlowFrameworkFeatureEnabledSetting {

/** This setting enables/disables the Flow Framework REST API */
public static final Setting<Boolean> FLOW_FRAMEWORK_ENABLED = Setting.boolSetting(
"plugins.flow_framework.enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

private volatile Boolean isFlowFrameworkEnabled;

/**
* Instantiate this class.
*
* @param clusterService OpenSearch cluster service
* @param settings OpenSearch settings
*/
public FlowFrameworkFeatureEnabledSetting(ClusterService clusterService, Settings settings) {
// Currently this is just an on/off switch for the entire plugin's API.
// If desired more fine-tuned feature settings can be added below.
isFlowFrameworkEnabled = FLOW_FRAMEWORK_ENABLED.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(FLOW_FRAMEWORK_ENABLED, it -> isFlowFrameworkEnabled = it);
}

/**
* Whether the flow framework feature is enabled. If disabled, no REST APIs will be availble.
* @return whether Flow Framework is enabled.
*/
public boolean isFlowFrameworkEnabled() {
return isFlowFrameworkEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.transport.CreateWorkflowAction;
Expand All @@ -31,6 +32,7 @@
import static org.opensearch.flowframework.common.CommonValue.DRY_RUN;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;

/**
* Rest Action to facilitate requests to create and update a use case template
Expand All @@ -40,10 +42,16 @@ public class RestCreateWorkflowAction extends BaseRestHandler {
private static final Logger logger = LogManager.getLogger(RestCreateWorkflowAction.class);
private static final String CREATE_WORKFLOW_ACTION = "create_workflow_action";

private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Intantiates a new RestCreateWorkflowAction
*
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestCreateWorkflowAction() {}
public RestCreateWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
Expand All @@ -62,6 +70,15 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
FlowFrameworkException ffe = new FlowFrameworkException(
"This API is disabled. To enable it, set [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
return channel -> channel.sendResponse(
new BytesRestResponse(ffe.getRestStatus(), ffe.toXContent(channel.newErrorBuilder(), ToXContent.EMPTY_PARAMS))
);
}
try {

String workflowId = request.param(WORKFLOW_ID);
Expand Down Expand Up @@ -89,5 +106,4 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli
);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.exception.FlowFrameworkException;
import org.opensearch.flowframework.transport.ProvisionWorkflowAction;
import org.opensearch.flowframework.transport.WorkflowRequest;
Expand All @@ -29,6 +30,7 @@

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_ID;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED;

/**
* Rest action to facilitate requests to provision a workflow from an inline defined or stored use case template
Expand All @@ -39,10 +41,16 @@ public class RestProvisionWorkflowAction extends BaseRestHandler {

private static final String PROVISION_WORKFLOW_ACTION = "provision_workflow_action";

private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

/**
* Instantiates a new RestProvisionWorkflowAction
*
* @param flowFrameworkFeatureEnabledSetting Whether this API is enabled
*/
public RestProvisionWorkflowAction() {}
public RestProvisionWorkflowAction(FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting) {
this.flowFrameworkFeatureEnabledSetting = flowFrameworkFeatureEnabledSetting;
}

@Override
public String getName() {
Expand All @@ -61,6 +69,12 @@ public List<Route> routes() {
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
String workflowId = request.param(WORKFLOW_ID);
try {
if (!flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()) {
throw new FlowFrameworkException(
"This API is disabled. To enable it, update the setting [" + FLOW_FRAMEWORK_ENABLED.getKey() + "] to true.",
RestStatus.FORBIDDEN
);
}
// Validate content
if (request.hasContent()) {
throw new FlowFrameworkException("Invalid request format", RestStatus.BAD_REQUEST);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,21 @@
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.env.Environment;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.TestThreadPool;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -31,6 +39,9 @@ public class FlowFrameworkPluginTests extends OpenSearchTestCase {
private ClusterAdminClient clusterAdminClient;
private ThreadPool threadPool;
private Settings settings;
private Environment environment;
private ClusterSettings clusterSettings;
private ClusterService clusterService;

@Override
public void setUp() throws Exception {
Expand All @@ -41,7 +52,18 @@ public void setUp() throws Exception {
when(client.admin()).thenReturn(adminClient);
when(adminClient.cluster()).thenReturn(clusterAdminClient);
threadPool = new TestThreadPool(FlowFrameworkPluginTests.class.getName());
settings = Settings.EMPTY;

environment = mock(Environment.class);
settings = Settings.builder().build();
when(environment.settings()).thenReturn(settings);

final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED)
).collect(Collectors.toSet());
clusterSettings = new ClusterSettings(settings, settingsSet);
clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
}

@Override
Expand All @@ -52,10 +74,14 @@ public void tearDown() throws Exception {

public void testPlugin() throws IOException {
try (FlowFrameworkPlugin ffp = new FlowFrameworkPlugin()) {
assertEquals(3, ffp.createComponents(client, null, threadPool, null, null, null, null, null, null, null, null).size());
assertEquals(
3,
ffp.createComponents(client, clusterService, threadPool, null, null, null, environment, null, null, null, null).size()
);
assertEquals(2, ffp.getRestHandlers(null, null, null, null, null, null, null).size());
assertEquals(2, ffp.getActions().size());
assertEquals(1, ffp.getExecutorBuilders(settings).size());
assertEquals(1, ffp.getSettings().size());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework.common;

import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class FlowFrameworkFeatureEnabledSettingTests extends OpenSearchTestCase {

private Settings settings;
private ClusterSettings clusterSettings;
private ClusterService clusterService;

private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

@Override
public void setUp() throws Exception {
super.setUp();

settings = Settings.builder().build();
final Set<Setting<?>> settingsSet = Stream.concat(
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS.stream(),
Stream.of(FlowFrameworkFeatureEnabledSetting.FLOW_FRAMEWORK_ENABLED)
).collect(Collectors.toSet());
clusterSettings = new ClusterSettings(settings, settingsSet);
clusterService = mock(ClusterService.class);
when(clusterService.getClusterSettings()).thenReturn(clusterSettings);
flowFrameworkFeatureEnabledSetting = new FlowFrameworkFeatureEnabledSetting(clusterService, settings);
}

@Override
public void tearDown() throws Exception {
super.tearDown();
}

public void testSettings() throws IOException {
assertFalse(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.MediaTypeRegistry;
import org.opensearch.flowframework.TestHelpers;
import org.opensearch.flowframework.common.FlowFrameworkFeatureEnabledSetting;
import org.opensearch.flowframework.model.Template;
import org.opensearch.flowframework.model.Workflow;
import org.opensearch.flowframework.model.WorkflowEdge;
Expand All @@ -30,6 +31,7 @@

import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class RestCreateWorkflowActionTests extends OpenSearchTestCase {

Expand All @@ -38,10 +40,13 @@ public class RestCreateWorkflowActionTests extends OpenSearchTestCase {
private String createWorkflowPath;
private String updateWorkflowPath;
private NodeClient nodeClient;
private FlowFrameworkFeatureEnabledSetting flowFrameworkFeatureEnabledSetting;

@Override
public void setUp() throws Exception {
super.setUp();
flowFrameworkFeatureEnabledSetting = mock(FlowFrameworkFeatureEnabledSetting.class);
when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(true);

Version templateVersion = Version.fromString("1.0.0");
List<Version> compatibilityVersions = List.of(Version.fromString("2.0.0"), Version.fromString("3.0.0"));
Expand All @@ -64,7 +69,7 @@ public void setUp() throws Exception {

// Invalid template configuration, wrong field name
this.invalidTemplate = template.toJson().replace("use_case", "invalid");
this.createWorkflowRestAction = new RestCreateWorkflowAction();
this.createWorkflowRestAction = new RestCreateWorkflowAction(flowFrameworkFeatureEnabledSetting);
this.createWorkflowPath = String.format(Locale.ROOT, "%s", WORKFLOW_URI);
this.updateWorkflowPath = String.format(Locale.ROOT, "%s/{%s}", WORKFLOW_URI, "workflow_id");
this.nodeClient = mock(NodeClient.class);
Expand Down Expand Up @@ -95,4 +100,15 @@ public void testInvalidCreateWorkflowRequest() throws Exception {
assertEquals(RestStatus.BAD_REQUEST, channel.capturedResponse().status());
assertTrue(channel.capturedResponse().content().utf8ToString().contains("Unable to parse field [invalid] in a template object."));
}

public void testFeatureFlagNotEnabled() throws Exception {
when(flowFrameworkFeatureEnabledSetting.isFlowFrameworkEnabled()).thenReturn(false);
RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withMethod(RestRequest.Method.POST)
.withPath(this.createWorkflowPath)
.build();
FakeRestChannel channel = new FakeRestChannel(request, false, 1);
createWorkflowRestAction.handleRequest(request, channel, nodeClient);
assertEquals(RestStatus.FORBIDDEN, channel.capturedResponse().status());
assertTrue(channel.capturedResponse().content().utf8ToString().contains("This API is disabled."));
}
}
Loading
Loading