Skip to content

Commit

Permalink
Perform code refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Sumit Bansal <[email protected]>
  • Loading branch information
sumitasr committed Sep 3, 2024
1 parent 27b68c5 commit 6249251
Showing 1 changed file with 26 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* <p>
* Set specific setting to for setting the threshold of throttling of particular task type.
* e.g : Set "cluster_manager.throttling.thresholds.put_mapping" to set throttling limit of "put mapping" tasks,
* Set it to default value(-1) to disable the throttling for this task type.
* Set it to default value(-1) to disable the throttling for this task type.
*/
public class ClusterManagerTaskThrottler implements TaskBatcherListener {
private static final Logger logger = LogManager.getLogger(ClusterManagerTaskThrottler.class);
Expand Down Expand Up @@ -210,41 +210,44 @@ Long getThrottlingLimit(final String taskKey) {
}

private void checkForClusterManagerThrottling(
final ThrottlingKey clusterManagerThrottlingKey,
final String taskThrottlingKey,
final Long threshold,
final long taskCount,
final int tasksSize
) {
if (clusterManagerThrottlingKey.isThrottlingEnabled()) {
Long threshold = tasksThreshold.get(taskThrottlingKey);
if (threshold != null && shouldThrottle(threshold, taskCount, tasksSize)) {
clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, tasksSize);
logger.warn(
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
taskThrottlingKey,
tasksSize,
threshold
);
throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey);
}
if (threshold != null && shouldThrottle(threshold, taskCount, tasksSize)) {
throw new ClusterManagerThrottlingException("Throttling Exception : Limit exceeded for " + taskThrottlingKey);
}
}

@Override
public void onBeginSubmit(List<? extends TaskBatcher.BatchedTask> tasks) {
final ThrottlingKey clusterManagerThrottlingKey = ((ClusterStateTaskExecutor<Object>) tasks.get(0).batchingKey)
.getClusterManagerThrottlingKey();
if (!clusterManagerThrottlingKey.isThrottlingEnabled()) {
return;
}
final String taskThrottlingKey = clusterManagerThrottlingKey.getTaskThrottlingKey();
final Long threshold = getThrottlingLimit(taskThrottlingKey);
tasksCount.putIfAbsent(taskThrottlingKey, 0L);

// Performing shallow check before taking lock, performing throttle check and computing new count
checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, tasksCount.get(taskThrottlingKey), tasks.size());

tasksCount.computeIfPresent(taskThrottlingKey, (key, count) -> {
int size = tasks.size();
checkForClusterManagerThrottling(clusterManagerThrottlingKey, taskThrottlingKey, count, size);
return count + size;
});
int tasksSize = tasks.size();

try {
checkForClusterManagerThrottling(taskThrottlingKey, threshold, tasksCount.get(taskThrottlingKey), tasksSize);
tasksCount.computeIfPresent(taskThrottlingKey, (key, count) -> {
checkForClusterManagerThrottling(taskThrottlingKey, threshold, count, tasksSize);
return count + tasksSize;
});
} catch (final ClusterManagerThrottlingException e) {
clusterManagerTaskThrottlerListener.onThrottle(taskThrottlingKey, tasksSize);
logger.trace(
"Throwing Throttling Exception for [{}]. Trying to add [{}] tasks to queue, limit is set to [{}]",
taskThrottlingKey,
tasksSize,
threshold
);
throw e;
}
}

/**
Expand Down

0 comments on commit 6249251

Please sign in to comment.