From 6249251b892db9e83027c0092d39f333fcd03e52 Mon Sep 17 00:00:00 2001 From: Sumit Bansal Date: Tue, 3 Sep 2024 10:47:52 +0530 Subject: [PATCH] Perform code refactor Signed-off-by: Sumit Bansal --- .../service/ClusterManagerTaskThrottler.java | 49 ++++++++++--------- 1 file changed, 26 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java index a507c62418994..1df7a761129ac 100644 --- a/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java +++ b/server/src/main/java/org/opensearch/cluster/service/ClusterManagerTaskThrottler.java @@ -33,7 +33,7 @@ *

* Set specific setting to for setting the threshold of throttling of particular task type. * e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks, - * Set it to default value(-1) to disable the throttling for this task type. + * Set it to default value(-1) to disable the throttling for this task type. */ public class ClusterManagerTaskThrottler implements TaskBatcherListener { private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class); @@ -210,23 +210,13 @@ Long getThrottlingLimit(final String taskKey) { } private void checkForClusterManagerThrottling( - final ThrottlingKey clusterManagerThrottlingKey, final String taskThrottlingKey, + final Long threshold, final long taskCount, final int tasksSize ) { - if (clusterManagerThrottlingKey.isThrottlingEnabled()) { - Long threshold = tasksThreshold.get(taskThrottlingKey); - if (threshold != null && shouldThrottle(threshold, taskCount, tasksSize)) { - clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, tasksSize); - logger.warn( - "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", - taskThrottlingKey, - tasksSize, - threshold - ); - throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey); - } + if (threshold != null && shouldThrottle(threshold, taskCount, tasksSize)) { + throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey); } } @@ -234,17 +224,30 @@ private void checkForClusterManagerThrottling( public void onBeginSubmit(List tasks) { final ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor) tasks.get(0).batchingKey) .getClusterManagerThrottlingKey(); + if (!clusterManagerThrottlingKey.isThrottlingEnabled()) { + return; + } final String taskThrottlingKey = clusterManagerThrottlingKey.getTaskThrottlingKey(); + final Long threshold = getThrottlingLimit(taskThrottlingKey); tasksCount.putIfAbsent(taskThrottlingKey, 0L); - - // Performing shallow check before taking lock, performing throttle check and computing new count - checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, tasksCount.get(taskThrottlingKey), tasks.size()); - - tasksCount.computeIfPresent(taskThrottlingKey, (key, count) -> { - int size = tasks.size(); - checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, count, size); - return count + size; - }); + int tasksSize = tasks.size(); + + try { + checkForClusterManagerThrottling(taskThrottlingKey, threshold, tasksCount.get(taskThrottlingKey), tasksSize); + tasksCount.computeIfPresent(taskThrottlingKey, (key, count) -> { + checkForClusterManagerThrottling(taskThrottlingKey, threshold, count, tasksSize); + return count + tasksSize; + }); + } catch (final ClusterManagerThrottlingException e) { + clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, tasksSize); + logger.trace( + "Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]", + taskThrottlingKey, + tasksSize, + threshold + ); + throw e; + } } /**