From 01e320ff34f365b95daf07a01506019e46d8f3a7 Mon Sep 17 00:00:00 2001 From: Ajay Kumar Movva Date: Sat, 21 Oct 2023 02:50:15 +0530 Subject: [PATCH] Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats Signed-off-by: Ajay Kumar Movva --- CHANGELOG.md | 1 + .../admin/cluster/node/stats/NodeStats.java | 21 ++++- .../cluster/node/stats/NodesStatsRequest.java | 3 +- .../node/stats/TransportNodesStatsAction.java | 3 +- .../action/search/SearchTransportService.java | 11 +++ .../TransportReplicationAction.java | 30 ++++++-- .../common/network/NetworkModule.java | 15 ++++ .../main/java/org/opensearch/node/Node.java | 29 +++---- .../java/org/opensearch/node/NodeService.java | 12 ++- .../AdmissionControlService.java | 52 ++++++++++--- .../controllers/AdmissionController.java | 53 ++++++++++--- .../CPUBasedAdmissionController.java | 55 +++++++++++--- ...e.java => AdmissionControlActionType.java} | 6 +- .../stats/AdmissionControlStats.java | 76 +++++++++++++++++++ .../stats/BaseAdmissionControllerStats.java | 15 ++++ .../CPUBasedAdmissionControllerStats.java | 76 +++++++++++++++++++ .../AdmissionControlTransportHandler.java | 8 +- .../AdmissionControlTransportInterceptor.java | 6 +- .../transport/TransportInterceptor.java | 21 +++++ .../transport/TransportService.java | 34 +++++++++ .../AdmissionControlServiceTests.java | 23 +++--- .../CPUBasedAdmissionControllerTests.java | 29 ++++--- .../enums/TransportActionTypeTests.java | 10 +-- ...AdmissionControlTransportHandlerTests.java | 13 ++-- .../MockInternalClusterInfoService.java | 3 +- 25 files changed, 505 insertions(+), 100 deletions(-) rename server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/{TransportActionType.java => AdmissionControlActionType.java} (85%) create mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java create mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java create mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 374dd4ab57ee6..76bf757083d15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535)) - [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) - [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) +- [AdmissionControl] Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 3d37056956c69..1598fbaf3711a 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -58,6 +58,7 @@ import org.opensearch.monitor.process.ProcessStats; import org.opensearch.node.AdaptiveSelectionStats; import org.opensearch.node.NodesResourceUsageStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; import org.opensearch.repositories.RepositoriesStats; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; @@ -154,6 +155,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { @Nullable private RepositoriesStats repositoriesStats; + @Nullable + private AdmissionControlStats admissionControlStats; + public NodeStats(StreamInput in) throws IOException { super(in); timestamp = in.readVLong(); @@ -226,6 +230,11 @@ public NodeStats(StreamInput in) throws IOException { } else { repositoriesStats = null; } + if(in.getVersion().onOrAfter(Version.V_3_0_0)) { + admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new); + } else { + admissionControlStats = null; + } } public NodeStats( @@ -255,7 +264,8 @@ public NodeStats( @Nullable TaskCancellationStats taskCancellationStats, @Nullable SearchPipelineStats searchPipelineStats, @Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats, - @Nullable RepositoriesStats repositoriesStats + @Nullable RepositoriesStats repositoriesStats, + @Nullable AdmissionControlStats admissionControlStats ) { super(node); this.timestamp = timestamp; @@ -284,6 +294,7 @@ public NodeStats( this.searchPipelineStats = searchPipelineStats; this.segmentReplicationRejectionStats = segmentReplicationRejectionStats; this.repositoriesStats = repositoriesStats; + this.admissionControlStats = admissionControlStats; } public long getTimestamp() { @@ -435,6 +446,11 @@ public RepositoriesStats getRepositoriesStats() { return repositoriesStats; } + @Nullable + public AdmissionControlStats getAdmissionControlStats() { + return admissionControlStats; + } + @Override public void writeTo(StreamOutput out) throws IOException { super.writeTo(out); @@ -588,6 +604,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (getRepositoriesStats() != null) { getRepositoriesStats().toXContent(builder, params); } + if (getAdmissionControlStats() != null) { + getAdmissionControlStats().toXContent(builder, params); + } return builder; } } diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java index fc72668d36413..95c96ffb20757 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodesStatsRequest.java @@ -216,7 +216,8 @@ public enum Metric { SEARCH_PIPELINE("search_pipeline"), RESOURCE_USAGE_STATS("resource_usage_stats"), SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"), - REPOSITORIES("repositories"); + REPOSITORIES("repositories"), + ADMISSION_CONTROL("admission_control"); private String metricName; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java index 99cf42cfdc4d0..1df73d3b4394d 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/TransportNodesStatsAction.java @@ -127,7 +127,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) { NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics), NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics), NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics), - NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics) + NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics), + NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics) ); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index a723937afd2ed..64c738f633f2e 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -45,6 +45,7 @@ import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchService; import org.opensearch.search.dfs.DfsSearchResult; @@ -542,6 +543,9 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler( DFS_ACTION_NAME, ThreadPool.Names.SAME, + false, + true, + AdmissionControlActionType.SEARCH, ShardSearchRequest::new, (request, channel, task) -> searchService.executeDfsPhase( request, @@ -556,6 +560,9 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler( QUERY_ACTION_NAME, ThreadPool.Names.SAME, + false, + true, + AdmissionControlActionType.SEARCH, ShardSearchRequest::new, (request, channel, task) -> { searchService.executeQueryPhase( @@ -575,6 +582,9 @@ public static void registerRequestHandler(TransportService transportService, Sea transportService.registerRequestHandler( QUERY_ID_ACTION_NAME, ThreadPool.Names.SAME, + false, + true, + AdmissionControlActionType.SEARCH, QuerySearchRequest::new, (request, channel, task) -> { searchService.executeQueryPhase( @@ -633,6 +643,7 @@ public static void registerRequestHandler(TransportService transportService, Sea ThreadPool.Names.SAME, true, true, + AdmissionControlActionType.SEARCH, ShardFetchSearchRequest::new, (request, channel, task) -> { searchService.executeFetchPhase( diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index ddebdc5530e70..7dd34fff1b159 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -38,6 +38,7 @@ import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListenerResponseHandler; import org.opensearch.action.UnavailableShardsException; +import org.opensearch.action.bulk.TransportShardBulkAction; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActiveShardCount; import org.opensearch.action.support.ChannelActionListener; @@ -82,6 +83,7 @@ import org.opensearch.indices.IndexClosedException; import org.opensearch.indices.IndicesService; import org.opensearch.node.NodeClosedException; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.ConnectTransportException; @@ -219,14 +221,26 @@ protected TransportReplicationAction( transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); - transportService.registerRequestHandler( - transportPrimaryAction, - executor, - forceExecutionOnPrimary, - true, - in -> new ConcreteShardRequest<>(requestReader, in), - this::handlePrimaryRequest - ); + if(transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)){ + transportService.registerRequestHandler( + transportPrimaryAction, + executor, + forceExecutionOnPrimary, + true, + AdmissionControlActionType.INDEXING, + in -> new ConcreteShardRequest<>(requestReader, in), + this::handlePrimaryRequest + ); + } else { + transportService.registerRequestHandler( + transportPrimaryAction, + executor, + forceExecutionOnPrimary, + true, + in -> new ConcreteShardRequest<>(requestReader, in), + this::handlePrimaryRequest + ); + } // we must never reject on because of thread pool capacity on replicas transportService.registerRequestHandler( diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 821d48fccf48c..7fa8ec771b488 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -55,6 +55,7 @@ import org.opensearch.http.HttpServerTransport; import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask; import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.RawTaskStatus; import org.opensearch.tasks.Task; import org.opensearch.telemetry.tracing.Tracer; @@ -299,6 +300,20 @@ public TransportRequestHandler interceptHandler( return actualHandler; } + @Override + public TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler, + AdmissionControlActionType transportActionType + ) { + for (TransportInterceptor interceptor : this.transportInterceptors) { + actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, transportActionType); + } + return actualHandler; + } + @Override public AsyncSender interceptSender(AsyncSender sender) { for (TransportInterceptor interceptor : this.transportInterceptors) { diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index e80b768074fc7..3fb75089b9865 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -894,12 +894,24 @@ protected Node( final RestController restController = actionModule.getRestController(); - final AdmissionControlService admissionControlService = new AdmissionControlService( + final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker( + threadPool, settings, - clusterService.getClusterSettings(), + clusterService.getClusterSettings() + ); + final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService( + nodeResourceUsageTracker, + clusterService, threadPool ); + final AdmissionControlService admissionControlService = new AdmissionControlService( + settings, + clusterService, + threadPool, + resourceUsageCollectorService + ); + AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor( admissionControlService ); @@ -1101,16 +1113,6 @@ protected Node( transportService.getTaskManager(), taskCancellationMonitoringSettings ); - final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker( - threadPool, - settings, - clusterService.getClusterSettings() - ); - final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService( - nodeResourceUsageTracker, - clusterService, - threadPool - ); this.nodeService = new NodeService( settings, threadPool, @@ -1135,7 +1137,8 @@ protected Node( taskCancellationMonitoringService, resourceUsageCollectorService, segmentReplicationStatsTracker, - repositoryService + repositoryService, + admissionControlService ); final SearchService searchService = newSearchService( diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 49dde0b81cac7..3c6dd15834f57 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -54,6 +54,7 @@ import org.opensearch.ingest.IngestService; import org.opensearch.monitor.MonitorService; import org.opensearch.plugins.PluginsService; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.repositories.RepositoriesService; import org.opensearch.script.ScriptService; import org.opensearch.search.aggregations.support.AggregationUsageService; @@ -96,6 +97,7 @@ public class NodeService implements Closeable { private final FileCache fileCache; private final TaskCancellationMonitoringService taskCancellationMonitoringService; private final RepositoriesService repositoriesService; + AdmissionControlService admissionControlService; private final SegmentReplicationStatsTracker segmentReplicationStatsTracker; @@ -123,7 +125,8 @@ public class NodeService implements Closeable { TaskCancellationMonitoringService taskCancellationMonitoringService, ResourceUsageCollectorService resourceUsageCollectorService, SegmentReplicationStatsTracker segmentReplicationStatsTracker, - RepositoriesService repositoriesService + RepositoriesService repositoriesService, + AdmissionControlService admissionControlService ) { this.settings = settings; this.threadPool = threadPool; @@ -148,6 +151,7 @@ public class NodeService implements Closeable { this.taskCancellationMonitoringService = taskCancellationMonitoringService; this.resourceUsageCollectorService = resourceUsageCollectorService; this.repositoriesService = repositoriesService; + this.admissionControlService = admissionControlService; clusterService.addStateApplier(ingestService); clusterService.addStateApplier(searchPipelineService); this.segmentReplicationStatsTracker = segmentReplicationStatsTracker; @@ -232,7 +236,8 @@ public NodeStats stats( boolean searchPipelineStats, boolean resourceUsageStats, boolean segmentReplicationTrackerStats, - boolean repositoriesStats + boolean repositoriesStats, + boolean admissionControl ) { // for indices stats we want to include previous allocated shards stats as well (it will // only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats) @@ -263,7 +268,8 @@ public NodeStats stats( taskCancellation ? this.taskCancellationMonitoringService.stats() : null, searchPipelineStats ? this.searchPipelineService.stats() : null, segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null, - repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null + repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null, + admissionControl ? this.admissionControlService.stats(): null ); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java index 2cc409b0e4465..b71b062dc788d 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java @@ -10,10 +10,16 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.BaseAdmissionControllerStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.CPUBasedAdmissionControllerStats; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -31,21 +37,24 @@ public class AdmissionControlService { public final AdmissionControlSettings admissionControlSettings; private final ConcurrentMap ADMISSION_CONTROLLERS; private static final Logger logger = LogManager.getLogger(AdmissionControlService.class); - private final ClusterSettings clusterSettings; + private final ClusterService clusterService; private final Settings settings; + private ResourceUsageCollectorService resourceUsageCollectorService; + /** * * @param settings Immutable settings instance - * @param clusterSettings ClusterSettings Instance + * @param clusterService ClusterService Instance * @param threadPool ThreadPool Instance */ - public AdmissionControlService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + public AdmissionControlService(Settings settings, ClusterService clusterService, ThreadPool threadPool, ResourceUsageCollectorService resourceUsageCollectorService) { this.threadPool = threadPool; - this.admissionControlSettings = new AdmissionControlSettings(clusterSettings, settings); + this.admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings); this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>(); - this.clusterSettings = clusterSettings; + this.clusterService = clusterService; this.settings = settings; + this.resourceUsageCollectorService = resourceUsageCollectorService; this.initialise(); } @@ -58,10 +67,12 @@ private void initialise() { } /** - * Handler to trigger registered admissionController + * + * @param action transport action that is being executed. we are using it for logging while request is rejected + * @param admissionControlActionType type of the admissionControllerActionType */ - public void applyTransportAdmissionControl(String action) { - this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action); }); + public void applyTransportAdmissionControl(String action, AdmissionControlActionType admissionControlActionType) { + this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action, admissionControlActionType); }); } /** @@ -79,7 +90,7 @@ public void registerAdmissionController(String admissionControllerName) { private AdmissionController controllerFactory(String admissionControllerName) { switch (admissionControllerName) { case CPU_BASED_ADMISSION_CONTROLLER: - return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterSettings); + return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterService, this.resourceUsageCollectorService); default: throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName); } @@ -101,4 +112,27 @@ public List getAdmissionControllers() { public AdmissionController getAdmissionController(String controllerName) { return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null); } + + public AdmissionControlStats stats(){ + List statsList = new ArrayList<>(); + if(this.ADMISSION_CONTROLLERS.size() > 0){ + this.ADMISSION_CONTROLLERS.forEach((controllerName, admissionController) -> { + BaseAdmissionControllerStats admissionControllerStats = controllerStatsFactory(admissionController); + if(admissionControllerStats != null) { + statsList.add(admissionControllerStats); + } + }); + return new AdmissionControlStats(statsList); + } + return null; + } + + private BaseAdmissionControllerStats controllerStatsFactory(AdmissionController admissionController) { + switch (admissionController.getName()) { + case CPU_BASED_ADMISSION_CONTROLLER: + return new CPUBasedAdmissionControllerStats(admissionController); + default: + return null; + } + } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java index 00564a9967f31..794a70f7a7483 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java @@ -8,8 +8,15 @@ package org.opensearch.ratelimitting.admissioncontrol.controllers; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; /** @@ -21,15 +28,21 @@ public abstract class AdmissionController { private final AtomicLong rejectionCount; private final String admissionControllerName; + final ResourceUsageCollectorService resourceUsageCollectorService; + public final Map rejectionCountMap; + public final ClusterService clusterService; /** - * - * @param rejectionCount initialised rejectionCount value for AdmissionController + * @param rejectionCount initialised rejectionCount value for AdmissionController * @param admissionControllerName name of the admissionController + * @param clusterService */ - public AdmissionController(AtomicLong rejectionCount, String admissionControllerName) { + public AdmissionController(AtomicLong rejectionCount, String admissionControllerName, ResourceUsageCollectorService resourceUsageCollectorService, ClusterService clusterService) { this.rejectionCount = rejectionCount; this.admissionControllerName = admissionControllerName; + this.resourceUsageCollectorService = resourceUsageCollectorService; + this.clusterService = clusterService; + this.rejectionCountMap = ConcurrentCollections.newConcurrentMap(); } /** @@ -40,11 +53,19 @@ public boolean isEnabledForTransportLayer(AdmissionControlMode admissionControlM return admissionControlMode != AdmissionControlMode.DISABLED; } + /** + * + * @return true if admissionController is Enforced Mode else false + */ + public Boolean isAdmissionControllerEnforced(AdmissionControlMode admissionControlMode) { + return admissionControlMode == AdmissionControlMode.ENFORCED; + } + /** * Increment the tracking-objects and apply the admission control if threshold is breached. * Mostly applicable while applying admission controller */ - public abstract void apply(String action); + public abstract void apply(String action, AdmissionControlActionType admissionControlActionType); /** * @return name of the admission-controller @@ -53,18 +74,26 @@ public String getName() { return this.admissionControllerName; } - /** - * Adds the rejection count for the controller. Primarily used when copying controller states. - * @param count To add the value of the tracking resource object as the provided count - */ - public void addRejectionCount(long count) { - this.rejectionCount.addAndGet(count); + public void addRejectionCount(String admissionControlActionType, long count) { + AtomicLong updatedCount = new AtomicLong(0); + if(this.rejectionCountMap.containsKey(admissionControlActionType)){ + updatedCount.addAndGet(this.rejectionCountMap.get(admissionControlActionType).get()); + } + updatedCount.addAndGet(count); + this.rejectionCountMap.put(admissionControlActionType, updatedCount); } /** * @return current value of the rejection count metric tracked by the admission-controller. */ - public long getRejectionCount() { - return this.rejectionCount.get(); + public long getRejectionCount(String admissionControlActionType) { + AtomicLong rejectionCount = this.rejectionCountMap.getOrDefault(admissionControlActionType, new AtomicLong()); + return rejectionCount.get(); + } + + public Map getRejectionStats() { + Map rejectionStats = new HashMap<>(); + rejectionCountMap.forEach((actionType, count) -> rejectionStats.put(actionType, count.get())); + return rejectionStats; } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java index 3a8956b2cce87..2514b1e83fd04 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java @@ -10,10 +10,18 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.node.NodeResourceUsageStats; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; /** @@ -28,9 +36,9 @@ public class CPUBasedAdmissionController extends AdmissionController { * * @param admissionControllerName State of the admission controller */ - public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterSettings clusterSettings) { - super(new AtomicLong(0), admissionControllerName); - this.settings = new CPUBasedAdmissionControllerSettings(clusterSettings, settings); + public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterService clusterService, ResourceUsageCollectorService resourceUsageCollectorService) { + super(new AtomicLong(0), admissionControllerName, resourceUsageCollectorService, clusterService); + this.settings = new CPUBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings); } /** @@ -38,18 +46,43 @@ public CPUBasedAdmissionController(String admissionControllerName, Settings sett * @param action is the transport action */ @Override - public void apply(String action) { + public void apply(String action, AdmissionControlActionType admissionControlActionType) { // TODO Will extend this logic further currently just incrementing rejectionCount if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) { - this.applyForTransportLayer(action); + this.applyForTransportLayer(action, admissionControlActionType); } } - private void applyForTransportLayer(String actionName) { - // currently incrementing counts to evaluate the controller triggering as expected and using in testing so limiting to 10 - // TODO will update rejection logic further in next PR's - if (this.getRejectionCount() < 10) { - this.addRejectionCount(1); + private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) { + if (isLimitsBreached(admissionControlActionType)) { + this.addRejectionCount(admissionControlActionType.getType(), 1); + if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) { + throw new OpenSearchRejectedExecutionException("Action ["+ actionName +"] was rejected due to CPU usage admission controller limit breached"); + } + } + } + + private boolean isLimitsBreached(AdmissionControlActionType transportActionType) { + long maxCpuLimit = this.getCpuRejectionThreshold(transportActionType); + Optional nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(this.clusterService.state().nodes().getLocalNodeId()); + if(nodePerformanceStatistics.isPresent()) { + double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent(); + if (cpuUsage >= maxCpuLimit){ + LOGGER.warn("CpuBasedAdmissionController rejected the request as the current CPU usage [" + + cpuUsage + "%] exceeds the allowed limit [" + maxCpuLimit + "%]"); + return true; + } + } + return false; + } + private long getCpuRejectionThreshold(AdmissionControlActionType transportActionType) { + switch (transportActionType) { + case SEARCH: + return this.settings.getSearchCPULimit(); + case INDEXING: + return this.settings.getIndexingCPULimit(); + default: + throw new IllegalArgumentException("Not Supported TransportAction Type: " + transportActionType.getType()); } } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionType.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java similarity index 85% rename from server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionType.java rename to server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java index f2fdca0cfe49b..8cf6e973ceb64 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionType.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionType.java @@ -13,13 +13,13 @@ /** * Enums that defines the type of the transport requests */ -public enum TransportActionType { +public enum AdmissionControlActionType { INDEXING("indexing"), SEARCH("search"); private final String type; - TransportActionType(String uriType) { + AdmissionControlActionType(String uriType) { this.type = uriType; } @@ -31,7 +31,7 @@ public String getType() { return type; } - public static TransportActionType fromName(String name) { + public static AdmissionControlActionType fromName(String name) { name = name.toLowerCase(Locale.ROOT); switch (name) { case "indexing": diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java new file mode 100644 index 0000000000000..188feb77318e4 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.stats; + +import org.opensearch.Version; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +public class AdmissionControlStats implements ToXContentFragment, Writeable { + + List admissionControllerStatsList; + + /** + * + * @param admissionControllerStatsList list of admissionControllerStats + */ + public AdmissionControlStats(List admissionControllerStatsList){ + this.admissionControllerStatsList = admissionControllerStatsList; + } + + /** + * + * @param in the stream to read from + * @throws IOException if an I/O error occurs + */ + public AdmissionControlStats(StreamInput in) throws IOException { + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.admissionControllerStatsList = in.readNamedWriteableList(BaseAdmissionControllerStats.class); + } else { + this.admissionControllerStatsList = null; + } + } + + /** + * Write this into the {@linkplain StreamOutput}. + * + * @param out the output stream to write entity content to + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeList(this.admissionControllerStatsList); + } + } + + /** + * @param builder + * @param params + * @return + * @throws IOException + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject("admission_control"); + this.admissionControllerStatsList.forEach(stats -> { + try { + builder.field(stats.getWriteableName(), stats); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return builder.endObject(); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java new file mode 100644 index 0000000000000..0ee1807bf80da --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java @@ -0,0 +1,15 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.stats; + +import org.opensearch.core.common.io.stream.NamedWriteable; +import org.opensearch.core.xcontent.ToXContentFragment; + +public abstract class BaseAdmissionControllerStats implements NamedWriteable, ToXContentFragment { +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java new file mode 100644 index 0000000000000..7b4e4a9695509 --- /dev/null +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol.stats; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; + +import java.io.IOException; +import java.util.Map; + +import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER; +public class CPUBasedAdmissionControllerStats extends BaseAdmissionControllerStats { + + /** + * Returns the name of the writeable object + */ + @Override + public String getWriteableName() { + return CPU_BASED_ADMISSION_CONTROLLER; + } + + public Map rejectionCount; + + public CPUBasedAdmissionControllerStats(AdmissionController admissionController){ + this.rejectionCount = admissionController.getRejectionStats(); + } + + public CPUBasedAdmissionControllerStats(StreamInput in) throws IOException { + this.rejectionCount = in.readMap(StreamInput::readString, StreamInput::readLong); + } + /** + * Write this into the {@linkplain StreamOutput}. + * + * @param out + */ + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeMap(this.rejectionCount, StreamOutput::writeString, StreamOutput::writeLong); + } + + /** + * @param builder + * @param params + * @return + * @throws IOException + */ + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startObject("transport"); + { + builder.startObject("rejection_count"); + { + this.rejectionCount.forEach((actionType, count) -> { + try { + builder.field(actionType, count); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + } + builder.endObject(); + } + builder.endObject(); + return builder.endObject(); + } +} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java index 7d0f5fbc17a51..dfe286d9b9537 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java @@ -12,6 +12,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportRequest; @@ -28,18 +29,21 @@ public class AdmissionControlTransportHandler implem protected final Logger log = LogManager.getLogger(this.getClass()); AdmissionControlService admissionControlService; boolean forceExecution; + AdmissionControlActionType admissionControlActionType; public AdmissionControlTransportHandler( String action, TransportRequestHandler actualHandler, AdmissionControlService admissionControlService, - boolean forceExecution + boolean forceExecution, + AdmissionControlActionType admissionControlActionType ) { super(); this.action = action; this.actualHandler = actualHandler; this.admissionControlService = admissionControlService; this.forceExecution = forceExecution; + this.admissionControlActionType = admissionControlActionType; } /** @@ -53,7 +57,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro // intercept all the transport requests here and apply admission control try { // TODO Need to evaluate if we need to apply admission control or not if force Execution is true will update in next PR. - this.admissionControlService.applyTransportAdmissionControl(this.action); + this.admissionControlService.applyTransportAdmissionControl(this.action, this.admissionControlActionType); } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { log.warn(openSearchRejectedExecutionException.getMessage()); channel.sendResponse(openSearchRejectedExecutionException); diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java index 01cfcbd780006..c725af821ac8f 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java @@ -9,6 +9,7 @@ package org.opensearch.ratelimitting.admissioncontrol.transport; import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.transport.TransportInterceptor; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; @@ -33,8 +34,9 @@ public TransportRequestHandler interceptHandler( String action, String executor, boolean forceExecution, - TransportRequestHandler actualHandler + TransportRequestHandler actualHandler, + AdmissionControlActionType admissionControlActionType ) { - return new AdmissionControlTransportHandler<>(action, actualHandler, this.admissionControlService, forceExecution); + return new AdmissionControlTransportHandler<>(action, actualHandler, this.admissionControlService, forceExecution, admissionControlActionType); } } diff --git a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java index 9ee2db6d39893..12b0990a5d692 100644 --- a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java +++ b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java @@ -35,6 +35,7 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.core.common.io.stream.Writeable.Reader; import org.opensearch.core.transport.TransportResponse; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; /** * This interface allows plugins to intercept requests on both the sender and the receiver side. @@ -57,6 +58,26 @@ default TransportRequestHandler interceptHandler return actualHandler; } + /** + * + * @param action + * @param executor + * @param forceExecution + * @param actualHandler + * @param transportActionType + * @return + * @param + */ + default TransportRequestHandler interceptHandler( + String action, + String executor, + boolean forceExecution, + TransportRequestHandler actualHandler, + AdmissionControlActionType transportActionType + ) { + return interceptHandler(action, executor, forceExecution, actualHandler); + } + /** * This is called up-front providing the actual low level {@link AsyncSender} that performs the low level send request. * The returned sender is used to send all requests that come in via diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index de88c3619abe8..a55a20478aa3d 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -64,6 +64,7 @@ import org.opensearch.core.service.ReportingService; import org.opensearch.core.transport.TransportResponse; import org.opensearch.node.NodeClosedException; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; import org.opensearch.telemetry.tracing.Span; @@ -1241,6 +1242,39 @@ public void registerRequestHandler( transport.registerRequestHandler(reg); } + /** + * Registers a new request handler + * + * @param action The action the request handler is associated with + * @param requestReader The request class that will be used to construct new instances for streaming + * @param executor The executor the request handling will be executed on + * @param forceExecution Force execution on the executor queue and never reject it + * @param transportActionType Check the request size and raise an exception in case the limit is breached. + * @param handler The handler itself that implements the request handling + */ + public void registerRequestHandler( + String action, + String executor, + boolean forceExecution, + boolean canTripCircuitBreaker, + AdmissionControlActionType transportActionType, + Writeable.Reader requestReader, + TransportRequestHandler handler + ) { + validateActionName(action); + handler = interceptor.interceptHandler(action, executor, forceExecution, handler, transportActionType); + RequestHandlerRegistry reg = new RequestHandlerRegistry<>( + action, + requestReader, + taskManager, + handler, + executor, + forceExecution, + canTripCircuitBreaker + ); + transport.registerRequestHandler(reg); + } + /** * called by the {@link Transport} implementation when an incoming request arrives but before * any parsing of it has happened (with the exception of the requestId and action) diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java index bac4eaf3fd677..abd38a3cbf1fb 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java @@ -13,6 +13,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; import org.opensearch.test.OpenSearchTestCase; @@ -46,13 +47,13 @@ public void tearDown() throws Exception { } public void testWhenAdmissionControllerRegistered() { - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); assertEquals(admissionControlService.getAdmissionControllers().size(), 1); } public void testRegisterInvalidAdmissionController() { String test = "TEST"; - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); assertEquals(admissionControlService.getAdmissionControllers().size(), 1); IllegalArgumentException ex = expectThrows( IllegalArgumentException.class, @@ -62,7 +63,7 @@ public void testRegisterInvalidAdmissionController() { } public void testAdmissionControllerSettings() { - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); AdmissionControlSettings admissionControlSettings = admissionControlService.admissionControlSettings; List admissionControllerList = admissionControlService.getAdmissionControllers(); assertEquals(admissionControllerList.size(), 1); @@ -105,19 +106,19 @@ public void testAdmissionControllerSettings() { public void testApplyAdmissionControllerDisabled() { this.action = "indices:data/write/bulk[s][p]"; - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); - admissionControlService.applyTransportAdmissionControl(this.action); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + admissionControlService.applyTransportAdmissionControl(this.action, null); List admissionControllerList = admissionControlService.getAdmissionControllers(); - admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(), 0); }); + admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); }); } public void testApplyAdmissionControllerEnabled() { this.action = "indices:data/write/bulk[s][p]"; - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService.getClusterSettings(), threadPool); - admissionControlService.applyTransportAdmissionControl(this.action); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool,null); + admissionControlService.applyTransportAdmissionControl(this.action, null); assertEquals( admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount(), + .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0 ); @@ -128,12 +129,12 @@ public void testApplyAdmissionControllerEnabled() { ) .build(); clusterService.getClusterSettings().applySettings(settings); - admissionControlService.applyTransportAdmissionControl(this.action); + admissionControlService.applyTransportAdmissionControl(this.action, null); List admissionControllerList = admissionControlService.getAdmissionControllers(); assertEquals(admissionControllerList.size(), 1); assertEquals( admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) - .getRejectionCount(), + .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 1 ); } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java index af6ec0749e709..2473b242f71b5 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java @@ -11,6 +11,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; import org.opensearch.test.OpenSearchTestCase; @@ -45,10 +46,11 @@ public void testCheckDefaultParameters() { admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, Settings.EMPTY, - clusterService.getClusterSettings() + clusterService, + null ); assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); - assertEquals(admissionController.getRejectionCount(), 0); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); assertFalse( admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode()) @@ -59,7 +61,8 @@ public void testCheckUpdateSettings() { admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, Settings.EMPTY, - clusterService.getClusterSettings() + clusterService, + null ); Settings settings = Settings.builder() .put( @@ -70,7 +73,7 @@ public void testCheckUpdateSettings() { clusterService.getClusterSettings().applySettings(settings); assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); - assertEquals(admissionController.getRejectionCount(), 0); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); } @@ -79,13 +82,14 @@ public void testApplyControllerWithDefaultSettings() { admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, Settings.EMPTY, - clusterService.getClusterSettings() + clusterService, + null ); - assertEquals(admissionController.getRejectionCount(), 0); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); action = "indices:data/write/bulk[s][p]"; - admissionController.apply(action); - assertEquals(admissionController.getRejectionCount(), 0); + admissionController.apply(action, AdmissionControlActionType.INDEXING); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); } public void testApplyControllerWhenSettingsEnabled() { @@ -98,12 +102,13 @@ public void testApplyControllerWhenSettingsEnabled() { admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, settings, - clusterService.getClusterSettings() + clusterService, + null ); assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); - assertEquals(admissionController.getRejectionCount(), 0); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); action = "indices:data/write/bulk[s][p]"; - admissionController.apply(action); - assertEquals(admissionController.getRejectionCount(), 1); + admissionController.apply(action, AdmissionControlActionType.INDEXING); + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 1); } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java index 02f582c26f54e..419e9ea8d4827 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java @@ -13,15 +13,15 @@ public class TransportActionTypeTests extends OpenSearchTestCase { public void testValidActionType() { - assertEquals(TransportActionType.SEARCH.getType(), "search"); - assertEquals(TransportActionType.INDEXING.getType(), "indexing"); - assertEquals(TransportActionType.fromName("search"), TransportActionType.SEARCH); - assertEquals(TransportActionType.fromName("indexing"), TransportActionType.INDEXING); + assertEquals(AdmissionControlActionType.SEARCH.getType(), "search"); + assertEquals(AdmissionControlActionType.INDEXING.getType(), "indexing"); + assertEquals(AdmissionControlActionType.fromName("search"), AdmissionControlActionType.SEARCH); + assertEquals(AdmissionControlActionType.fromName("indexing"), AdmissionControlActionType.INDEXING); } public void testInValidActionType() { String name = "test"; - IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> TransportActionType.fromName(name)); + IllegalArgumentException ex = expectThrows(IllegalArgumentException.class, () -> AdmissionControlActionType.fromName(name)); assertEquals(ex.getMessage(), "Not Supported TransportAction Type: " + name); } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java index 03d4819a94045..057cf35a12f6b 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandlerTests.java @@ -29,7 +29,8 @@ public void testHandlerInvoked() throws Exception { action, handler, mock(AdmissionControlService.class), - false + false, + null ); admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); assertEquals(1, handler.count); @@ -38,13 +39,14 @@ public void testHandlerInvoked() throws Exception { public void testHandlerInvokedRejectedException() throws Exception { String action = "TEST"; AdmissionControlService admissionControlService = mock(AdmissionControlService.class); - doThrow(new OpenSearchRejectedExecutionException()).when(admissionControlService).applyTransportAdmissionControl(action); + doThrow(new OpenSearchRejectedExecutionException()).when(admissionControlService).applyTransportAdmissionControl(action, null); InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); admissionControlTransportHandler = new AdmissionControlTransportHandler( action, handler, admissionControlService, - false + false, + null ); try { admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); @@ -58,13 +60,14 @@ public void testHandlerInvokedRejectedException() throws Exception { public void testHandlerInvokedRandomException() throws Exception { String action = "TEST"; AdmissionControlService admissionControlService = mock(AdmissionControlService.class); - doThrow(new NullPointerException()).when(admissionControlService).applyTransportAdmissionControl(action); + doThrow(new NullPointerException()).when(admissionControlService).applyTransportAdmissionControl(action, null); InterceptingRequestHandler handler = new InterceptingRequestHandler<>(action); admissionControlTransportHandler = new AdmissionControlTransportHandler( action, handler, admissionControlService, - false + false, + null ); try { admissionControlTransportHandler.messageReceived(mock(TransportRequest.class), mock(TransportChannel.class), mock(Task.class)); diff --git a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java index 2ba4de5e54a67..1ad6083074025 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/opensearch/cluster/MockInternalClusterInfoService.java @@ -123,7 +123,8 @@ List adjustNodesStats(List nodesStats) { nodeStats.getTaskCancellationStats(), nodeStats.getSearchPipelineStats(), nodeStats.getSegmentReplicationRejectionStats(), - nodeStats.getRepositoriesStats() + nodeStats.getRepositoriesStats(), + nodeStats.getAdmissionControlStats() ); }).collect(Collectors.toList()); }