diff --git a/CHANGELOG.md b/CHANGELOG.md index 76bf757083d15..fd3acf115dd23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,8 +18,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) - [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535)) - [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) -- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) -- [AdmissionControl] Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats +- [Admission Control] Add changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) +- [Admission Control] Add changes to integrate CPU AC and ResourceUsageCollector with Stats ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 1598fbaf3711a..bf057ff57c2d5 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -230,7 +230,8 @@ public NodeStats(StreamInput in) throws IOException { } else { repositoriesStats = null; } - if(in.getVersion().onOrAfter(Version.V_3_0_0)) { + // TODO: change to V_2_12_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new); } else { admissionControlStats = null; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 5efec8b876435..9c5dcc9e9de3f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -171,6 +171,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 7dd34fff1b159..11046e44b61e0 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -221,7 +221,8 @@ protected TransportReplicationAction( transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); - if(transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)){ + // Register only TransportShardBulkAction for admission control ( primary indexing action ) + if (transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)) { transportService.registerRequestHandler( transportPrimaryAction, executor, diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 7fa8ec771b488..5687b2f0a253a 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -300,16 +300,19 @@ public TransportRequestHandler interceptHandler( return actualHandler; } + /** + * Intercept the transport action and perform admission control if applicable + */ @Override public TransportRequestHandler interceptHandler( String action, String executor, boolean forceExecution, TransportRequestHandler actualHandler, - AdmissionControlActionType transportActionType + AdmissionControlActionType admissionControlActionType ) { for (TransportInterceptor interceptor : this.transportInterceptors) { - actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, transportActionType); + actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, admissionControlActionType); } return actualHandler; } diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 3c6dd15834f57..224061d09b2c6 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -269,7 +269,7 @@ public NodeStats stats( searchPipelineStats ? this.searchPipelineService.stats() : null, segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null, repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null, - admissionControl ? this.admissionControlService.stats(): null + admissionControl ? this.admissionControlService.stats() : null ); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java index b71b062dc788d..3af9e2c453972 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java @@ -11,15 +11,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; -import org.opensearch.ratelimitting.admissioncontrol.stats.BaseAdmissionControllerStats; -import org.opensearch.ratelimitting.admissioncontrol.stats.CPUBasedAdmissionControllerStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -47,8 +45,14 @@ public class AdmissionControlService { * @param settings Immutable settings instance * @param clusterService ClusterService Instance * @param threadPool ThreadPool Instance + * @param resourceUsageCollectorService Instance used to get node resource usage stats */ - public AdmissionControlService(Settings settings, ClusterService clusterService, ThreadPool threadPool, ResourceUsageCollectorService resourceUsageCollectorService) { + public AdmissionControlService( + Settings settings, + ClusterService clusterService, + ThreadPool threadPool, + ResourceUsageCollectorService resourceUsageCollectorService + ) { this.threadPool = threadPool; this.admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings); this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>(); @@ -72,7 +76,9 @@ private void initialise() { * @param admissionControlActionType type of the admissionControllerActionType */ public void applyTransportAdmissionControl(String action, AdmissionControlActionType admissionControlActionType) { - this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action, admissionControlActionType); }); + this.ADMISSION_CONTROLLERS.forEach( + (name, admissionController) -> { admissionController.apply(action, admissionControlActionType); } + ); } /** @@ -90,7 +96,12 @@ public void registerAdmissionController(String admissionControllerName) { private AdmissionController controllerFactory(String admissionControllerName) { switch (admissionControllerName) { case CPU_BASED_ADMISSION_CONTROLLER: - return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterService, this.resourceUsageCollectorService); + return new CPUBasedAdmissionController( + admissionControllerName, + this.settings, + this.clusterService, + this.resourceUsageCollectorService + ); default: throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName); } @@ -113,26 +124,15 @@ public AdmissionController getAdmissionController(String controllerName) { return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null); } - public AdmissionControlStats stats(){ - List statsList = new ArrayList<>(); - if(this.ADMISSION_CONTROLLERS.size() > 0){ + public AdmissionControlStats stats() { + List statsList = new ArrayList<>(); + if (this.ADMISSION_CONTROLLERS.size() > 0) { this.ADMISSION_CONTROLLERS.forEach((controllerName, admissionController) -> { - BaseAdmissionControllerStats admissionControllerStats = controllerStatsFactory(admissionController); - if(admissionControllerStats != null) { - statsList.add(admissionControllerStats); - } + AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController, controllerName); + statsList.add(admissionControllerStats); }); return new AdmissionControlStats(statsList); } return null; } - - private BaseAdmissionControllerStats controllerStatsFactory(AdmissionController admissionController) { - switch (admissionController.getName()) { - case CPU_BASED_ADMISSION_CONTROLLER: - return new CPUBasedAdmissionControllerStats(admissionController); - default: - return null; - } - } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java index 794a70f7a7483..b17c59ae307cc 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java @@ -11,7 +11,6 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.node.ResourceUsageCollectorService; -import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; @@ -35,9 +34,15 @@ public abstract class AdmissionController { /** * @param rejectionCount initialised rejectionCount value for AdmissionController * @param admissionControllerName name of the admissionController + * @param resourceUsageCollectorService instance used to get resource usage stats of the node * @param clusterService */ - public AdmissionController(AtomicLong rejectionCount, String admissionControllerName, ResourceUsageCollectorService resourceUsageCollectorService, ClusterService clusterService) { + public AdmissionController( + AtomicLong rejectionCount, + String admissionControllerName, + ResourceUsageCollectorService resourceUsageCollectorService, + ClusterService clusterService + ) { this.rejectionCount = rejectionCount; this.admissionControllerName = admissionControllerName; this.resourceUsageCollectorService = resourceUsageCollectorService; @@ -62,8 +67,7 @@ public Boolean isAdmissionControllerEnforced(AdmissionControlMode admissionContr } /** - * Increment the tracking-objects and apply the admission control if threshold is breached. - * Mostly applicable while applying admission controller + * Apply admission control based on the resource usage for an action */ public abstract void apply(String action, AdmissionControlActionType admissionControlActionType); @@ -74,9 +78,12 @@ public String getName() { return this.admissionControllerName; } + /** + * Add rejection count to the rejection count metric tracked by the admission-controller + */ public void addRejectionCount(String admissionControlActionType, long count) { AtomicLong updatedCount = new AtomicLong(0); - if(this.rejectionCountMap.containsKey(admissionControlActionType)){ + if (this.rejectionCountMap.containsKey(admissionControlActionType)) { updatedCount.addAndGet(this.rejectionCountMap.get(admissionControlActionType).get()); } updatedCount.addAndGet(count); @@ -91,6 +98,9 @@ public long getRejectionCount(String admissionControlActionType) { return rejectionCount.get(); } + /** + * Get rejection stats of the admission controller + */ public Map getRejectionStats() { Map rejectionStats = new HashMap<>(); rejectionCountMap.forEach((actionType, count) -> rejectionStats.put(actionType, count.get())); diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java index 2514b1e83fd04..7ca82f00784e7 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java @@ -16,11 +16,8 @@ import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -35,54 +32,85 @@ public class CPUBasedAdmissionController extends AdmissionController { /** * * @param admissionControllerName State of the admission controller + * @param settings Immutable settings instance + * @param clusterService ClusterService Instance + * @param resourceUsageCollectorService Instance used to get node resource usage stats */ - public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterService clusterService, ResourceUsageCollectorService resourceUsageCollectorService) { + public CPUBasedAdmissionController( + String admissionControllerName, + Settings settings, + ClusterService clusterService, + ResourceUsageCollectorService resourceUsageCollectorService + ) { super(new AtomicLong(0), admissionControllerName, resourceUsageCollectorService, clusterService); this.settings = new CPUBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings); } /** - * This function will take of applying admission controller based on CPU usage + * Apply admission control based on process CPU usage * @param action is the transport action */ @Override public void apply(String action, AdmissionControlActionType admissionControlActionType) { - // TODO Will extend this logic further currently just incrementing rejectionCount if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) { this.applyForTransportLayer(action, admissionControlActionType); } } + /** + * Apply transport layer admission control if configured limit has been reached + */ private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) { if (isLimitsBreached(admissionControlActionType)) { this.addRejectionCount(admissionControlActionType.getType(), 1); if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) { - throw new OpenSearchRejectedExecutionException("Action ["+ actionName +"] was rejected due to CPU usage admission controller limit breached"); + throw new OpenSearchRejectedExecutionException( + String.format("CPU usage admission controller limit reached for action [%s]", actionName) + ); } } } - private boolean isLimitsBreached(AdmissionControlActionType transportActionType) { - long maxCpuLimit = this.getCpuRejectionThreshold(transportActionType); - Optional nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(this.clusterService.state().nodes().getLocalNodeId()); - if(nodePerformanceStatistics.isPresent()) { - double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent(); - if (cpuUsage >= maxCpuLimit){ - LOGGER.warn("CpuBasedAdmissionController rejected the request as the current CPU usage [" + - cpuUsage + "%] exceeds the allowed limit [" + maxCpuLimit + "%]"); - return true; + /** + * Check if resource usage limits are breached for an admission control + */ + private boolean isLimitsBreached(AdmissionControlActionType admissionControlActionType) { + // check if cluster state is ready + if (clusterService.state() != null && clusterService.state().nodes() != null) { + long maxCpuLimit = this.getCpuRejectionThreshold(admissionControlActionType); + Optional nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics( + this.clusterService.state().nodes().getLocalNodeId() + ); + if (nodePerformanceStatistics.isPresent()) { + double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent(); + if (cpuUsage >= maxCpuLimit) { + LOGGER.warn( + "CpuBasedAdmissionController rejected the request as the current CPU usage [" + + cpuUsage + + "%] exceeds the allowed limit [" + + maxCpuLimit + + "%]" + ); + return true; + } } } return false; } - private long getCpuRejectionThreshold(AdmissionControlActionType transportActionType) { - switch (transportActionType) { + + /** + * Get CPU rejection threshold for action type + */ + private long getCpuRejectionThreshold(AdmissionControlActionType admissionControlActionType) { + switch (admissionControlActionType) { case SEARCH: return this.settings.getSearchCPULimit(); case INDEXING: return this.settings.getIndexingCPULimit(); default: - throw new IllegalArgumentException("Not Supported TransportAction Type: " + transportActionType.getType()); + throw new IllegalArgumentException( + "Admission control not Supported for AdmissionControlActionType: " + admissionControlActionType.getType() + ); } } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java index 188feb77318e4..f4def5f41574f 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java @@ -20,13 +20,13 @@ public class AdmissionControlStats implements ToXContentFragment, Writeable { - List admissionControllerStatsList; + List admissionControllerStatsList; /** * * @param admissionControllerStatsList list of admissionControllerStats */ - public AdmissionControlStats(List admissionControllerStatsList){ + public AdmissionControlStats(List admissionControllerStatsList) { this.admissionControllerStatsList = admissionControllerStatsList; } @@ -37,7 +37,7 @@ public AdmissionControlStats(List admissionControl */ public AdmissionControlStats(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - this.admissionControllerStatsList = in.readNamedWriteableList(BaseAdmissionControllerStats.class); + this.admissionControllerStatsList = in.readNamedWriteableList(AdmissionControllerStats.class); } else { this.admissionControllerStatsList = null; } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java similarity index 73% rename from server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java rename to server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java index 7b4e4a9695509..c6d93170b0088 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java @@ -8,35 +8,38 @@ package org.opensearch.ratelimitting.admissioncontrol.stats; +import org.opensearch.core.common.io.stream.NamedWriteable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; -import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; import java.io.IOException; import java.util.Map; -import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER; -public class CPUBasedAdmissionControllerStats extends BaseAdmissionControllerStats { - - /** - * Returns the name of the writeable object - */ - @Override - public String getWriteableName() { - return CPU_BASED_ADMISSION_CONTROLLER; - } - +/** + * Class for admission controller stats + */ +public class AdmissionControllerStats implements NamedWriteable, ToXContentFragment { public Map rejectionCount; + public String admissionControllerName; - public CPUBasedAdmissionControllerStats(AdmissionController admissionController){ + public AdmissionControllerStats(AdmissionController admissionController, String admissionControllerName) { this.rejectionCount = admissionController.getRejectionStats(); + this.admissionControllerName = admissionControllerName; } - public CPUBasedAdmissionControllerStats(StreamInput in) throws IOException { + public AdmissionControllerStats(StreamInput in) throws IOException { this.rejectionCount = in.readMap(StreamInput::readString, StreamInput::readLong); + this.admissionControllerName = in.readString(); + } + + @Override + public String getWriteableName() { + return admissionControllerName; } + /** * Write this into the {@linkplain StreamOutput}. * @@ -45,6 +48,7 @@ public CPUBasedAdmissionControllerStats(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeMap(this.rejectionCount, StreamOutput::writeString, StreamOutput::writeLong); + out.writeString(this.admissionControllerName); } /** diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java deleted file mode 100644 index 0ee1807bf80da..0000000000000 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ratelimitting.admissioncontrol.stats; - -import org.opensearch.core.common.io.stream.NamedWriteable; -import org.opensearch.core.xcontent.ToXContentFragment; - -public abstract class BaseAdmissionControllerStats implements NamedWriteable, ToXContentFragment { -} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java index dfe286d9b9537..6561a670f0794 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java @@ -54,15 +54,17 @@ public AdmissionControlTransportHandler( */ @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { - // intercept all the transport requests here and apply admission control - try { - // TODO Need to evaluate if we need to apply admission control or not if force Execution is true will update in next PR. - this.admissionControlService.applyTransportAdmissionControl(this.action, this.admissionControlActionType); - } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { - log.warn(openSearchRejectedExecutionException.getMessage()); - channel.sendResponse(openSearchRejectedExecutionException); - } catch (final Exception e) { - throw e; + // skip admission control if force execution is true + if (!this.forceExecution) { + // intercept the transport requests here and apply admission control + try { + this.admissionControlService.applyTransportAdmissionControl(this.action, this.admissionControlActionType); + } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { + log.warn(openSearchRejectedExecutionException.getMessage()); + channel.sendResponse(openSearchRejectedExecutionException); + } catch (final Exception e) { + throw e; + } } actualHandler.messageReceived(request, channel, task); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java index c725af821ac8f..ae1520bca769d 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java @@ -15,7 +15,7 @@ import org.opensearch.transport.TransportRequestHandler; /** - * This class allows throttling to intercept requests on both the sender and the receiver side. + * This class allows throttling by intercepting requests on both the sender and the receiver side. */ public class AdmissionControlTransportInterceptor implements TransportInterceptor { @@ -37,6 +37,12 @@ public TransportRequestHandler interceptHandler( TransportRequestHandler actualHandler, AdmissionControlActionType admissionControlActionType ) { - return new AdmissionControlTransportHandler<>(action, actualHandler, this.admissionControlService, forceExecution, admissionControlActionType); + return new AdmissionControlTransportHandler<>( + action, + actualHandler, + this.admissionControlService, + forceExecution, + admissionControlActionType + ); } } diff --git a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java index 12b0990a5d692..e8efbeb7de3f9 100644 --- a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java +++ b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java @@ -59,21 +59,14 @@ default TransportRequestHandler interceptHandler } /** - * - * @param action - * @param executor - * @param forceExecution - * @param actualHandler - * @param transportActionType - * @return - * @param + * This is called for handlers that needs admission control support */ default TransportRequestHandler interceptHandler( String action, String executor, boolean forceExecution, TransportRequestHandler actualHandler, - AdmissionControlActionType transportActionType + AdmissionControlActionType admissionControlActionType ) { return interceptHandler(action, executor, forceExecution, actualHandler); } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index a55a20478aa3d..211564c6bb4ac 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -1243,13 +1243,14 @@ public void registerRequestHandler( } /** - * Registers a new request handler + * Registers a new request handler with admission control support * * @param action The action the request handler is associated with - * @param requestReader The request class that will be used to construct new instances for streaming * @param executor The executor the request handling will be executed on * @param forceExecution Force execution on the executor queue and never reject it - * @param transportActionType Check the request size and raise an exception in case the limit is breached. + * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. + * @param admissionControlActionType Admission control based on resource usage limits of provided action type + * @param requestReader The request class that will be used to construct new instances for streaming * @param handler The handler itself that implements the request handling */ public void registerRequestHandler( @@ -1257,12 +1258,12 @@ public void registerRequestHandler( String executor, boolean forceExecution, boolean canTripCircuitBreaker, - AdmissionControlActionType transportActionType, + AdmissionControlActionType admissionControlActionType, Writeable.Reader requestReader, TransportRequestHandler handler ) { validateActionName(action); - handler = interceptor.interceptHandler(action, executor, forceExecution, handler, transportActionType); + handler = interceptor.interceptHandler(action, executor, forceExecution, handler, admissionControlActionType); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( action, requestReader, diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java index abd38a3cbf1fb..95caa8b1a6a22 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java @@ -109,12 +109,14 @@ public void testApplyAdmissionControllerDisabled() { admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); admissionControlService.applyTransportAdmissionControl(this.action, null); List admissionControllerList = admissionControlService.getAdmissionControllers(); - admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); }); + admissionControllerList.forEach(admissionController -> { + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + }); } public void testApplyAdmissionControllerEnabled() { this.action = "indices:data/write/bulk[s][p]"; - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool,null); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); admissionControlService.applyTransportAdmissionControl(this.action, null); assertEquals( admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java index 419e9ea8d4827..3923048376d69 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java @@ -10,7 +10,7 @@ import org.opensearch.test.OpenSearchTestCase; -public class TransportActionTypeTests extends OpenSearchTestCase { +public class admissionControlActionTypeTests extends OpenSearchTestCase { public void testValidActionType() { assertEquals(AdmissionControlActionType.SEARCH.getType(), "search");