Skip to content

Commit

Permalink
fix UT
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Oct 9, 2024
1 parent ff4145c commit 7dcd545
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 130 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add _list/indices API as paginated alternate to _cat/indices ([#14718](https://github.com/opensearch-project/OpenSearch/pull/14718))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- Add new metric REMOTE_STORE to NodeStatNode.javas API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))
- Add new metric REMOTE_STORE to NodeStats API response ([#15611](https://github.com/opensearch-project/OpenSearch/pull/15611))
- [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978))
- Add changes to block calls in cat shards, indices and segments based on dynamic limit settings ([#15986](https://github.com/opensearch-project/OpenSearch/pull/15986))
- New `phone` & `phone-search` analyzer + tokenizer ([#15915](https://github.com/opensearch-project/OpenSearch/pull/15915))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ protected WlmStats newNodeResponse(StreamInput in) throws IOException {

@Override
protected WlmStats nodeOperation(WlmStatsRequest wlmStatsRequest) {
return queryGroupService.nodeStats(wlmStatsRequest.getQueryGroupIds(), wlmStatsRequest.isBreach());
assert transportService.getLocalNode() != null;
return new WlmStats(
transportService.getLocalNode(),
queryGroupService.nodeStats(wlmStatsRequest.getQueryGroupIds(), wlmStatsRequest.isBreach())
);
}
}
91 changes: 43 additions & 48 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -354,12 +354,12 @@ public class Node implements Closeable {
);

/**
* controls whether the node is allowed to persist things like metadata to disk
* Note that this does not control whether the node stores actual indices (see
* {@link #NODE_DATA_SETTING}). However, if this is false, {@link #NODE_DATA_SETTING}
* and {@link #NODE_MASTER_SETTING} must also be false.
*
*/
* controls whether the node is allowed to persist things like metadata to disk
* Note that this does not control whether the node stores actual indices (see
* {@link #NODE_DATA_SETTING}). However, if this is false, {@link #NODE_DATA_SETTING}
* and {@link #NODE_MASTER_SETTING} must also be false.
*
*/
public static final Setting<Boolean> NODE_LOCAL_STORAGE_SETTING = Setting.boolSetting(
"node.local_storage",
true,
Expand Down Expand Up @@ -1037,6 +1037,41 @@ protected Node(

final QueryGroupsStateAccessor queryGroupsStateAccessor = new QueryGroupsStateAccessor();

final QueryGroupService queryGroupService = new QueryGroupService(
new QueryGroupTaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupResourceUsageTrackerService,
queryGroupsStateAccessor
),
clusterService,
threadPool,
workloadManagementSettings,
queryGroupsStateAccessor
);
taskResourceTrackingService.addTaskCompletionListener(queryGroupService);

final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener(
queryGroupService,
threadPool
);

// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
new SearchRequestOperationsCompositeListenerFactory(
Stream.concat(
Stream.of(
searchRequestStats,
searchRequestSlowLog,
searchTaskRequestOperationsListener,
queryGroupRequestOperationListener
),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
.map(p -> (SearchRequestOperationsListener) p)
).toArray(SearchRequestOperationsListener[]::new)
);

ActionModule actionModule = new ActionModule(
settings,
clusterModule.getIndexNameExpressionResolver(),
Expand Down Expand Up @@ -1079,11 +1114,9 @@ protected Node(
admissionControlService
);

SetOnce<QueryGroupService> queryGroupServiceSetOnce = new SetOnce<>();

WorkloadManagementTransportInterceptor workloadManagementTransportInterceptor = new WorkloadManagementTransportInterceptor(
threadPool,
queryGroupServiceSetOnce // We will need to replace this with actual implementation
queryGroupService
);

final Collection<SecureSettingsFactory> secureSettingsFactories = pluginsService.filterPlugins(Plugin.class)
Expand Down Expand Up @@ -1145,45 +1178,6 @@ protected Node(
taskHeaders,
tracer
);

final QueryGroupService queryGroupService = new QueryGroupService(
new QueryGroupTaskCancellationService(
workloadManagementSettings,
new MaximumResourceTaskSelectionStrategy(),
queryGroupResourceUsageTrackerService,
queryGroupsStateAccessor
),
transportService,
clusterService,
threadPool,
workloadManagementSettings,
queryGroupsStateAccessor
);

queryGroupServiceSetOnce.set(queryGroupService);
taskResourceTrackingService.addTaskCompletionListener(queryGroupService);

final QueryGroupRequestOperationListener queryGroupRequestOperationListener = new QueryGroupRequestOperationListener(
queryGroupService,
threadPool
);

// register all standard SearchRequestOperationsCompositeListenerFactory to the SearchRequestOperationsCompositeListenerFactory
final SearchRequestOperationsCompositeListenerFactory searchRequestOperationsCompositeListenerFactory =
new SearchRequestOperationsCompositeListenerFactory(
Stream.concat(
Stream.of(
searchRequestStats,
searchRequestSlowLog,
searchTaskRequestOperationsListener,
queryGroupRequestOperationListener
),
pluginComponents.stream()
.filter(p -> p instanceof SearchRequestOperationsListener)
.map(p -> (SearchRequestOperationsListener) p)
).toArray(SearchRequestOperationsListener[]::new)
);

TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings());
transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer);
this.extensionsManager.initializeServicesAndRestHandler(
Expand Down Expand Up @@ -1509,6 +1503,7 @@ protected Node(
b.bind(SegmentReplicationStatsTracker.class).toInstance(segmentReplicationStatsTracker);
b.bind(SearchRequestOperationsCompositeListenerFactory.class).toInstance(searchRequestOperationsCompositeListenerFactory);
b.bind(SegmentReplicator.class).toInstance(segmentReplicator);

taskManagerClientOptional.ifPresent(value -> b.bind(TaskManagerClient.class).toInstance(value));
});
injector = modules.createInjector();
Expand Down
72 changes: 40 additions & 32 deletions server/src/main/java/org/opensearch/wlm/QueryGroupService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.QueryGroup;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.monitor.jvm.JvmStats;
Expand All @@ -28,12 +27,10 @@
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.opensearch.wlm.cancellation.QueryGroupTaskCancellationService;
import org.opensearch.wlm.stats.QueryGroupState;
import org.opensearch.wlm.stats.QueryGroupStats;
import org.opensearch.wlm.stats.QueryGroupStats.QueryGroupStatsHolder;
import org.opensearch.wlm.stats.WlmStats;

import java.io.IOException;
import java.util.HashMap;
Expand Down Expand Up @@ -63,11 +60,9 @@ public class QueryGroupService extends AbstractLifecycleComponent
private final Set<QueryGroup> deletedQueryGroups;
private final NodeDuressTrackers nodeDuressTrackers;
private final QueryGroupsStateAccessor queryGroupsStateAccessor;
private final TransportService transportService;

public QueryGroupService(
QueryGroupTaskCancellationService taskCancellationService,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
WorkloadManagementSettings workloadManagementSettings,
Expand All @@ -76,7 +71,6 @@ public QueryGroupService(

this(
taskCancellationService,
transportService,
clusterService,
threadPool,
workloadManagementSettings,
Expand Down Expand Up @@ -105,7 +99,6 @@ public QueryGroupService(

public QueryGroupService(
QueryGroupTaskCancellationService taskCancellationService,
TransportService transportService,
ClusterService clusterService,
ThreadPool threadPool,
WorkloadManagementSettings workloadManagementSettings,
Expand All @@ -115,7 +108,6 @@ public QueryGroupService(
Set<QueryGroup> deletedQueryGroups
) {
this.taskCancellationService = taskCancellationService;
this.transportService = transportService;
this.clusterService = clusterService;
this.threadPool = threadPool;
this.workloadManagementSettings = workloadManagementSettings;
Expand Down Expand Up @@ -214,53 +206,48 @@ public void incrementFailuresFor(final String queryGroupId) {
/**
* @return node level query group stats
*/
public WlmStats nodeStats(Set<String> queryGroupIds, Boolean requestedBreached) {
public QueryGroupStats nodeStats(Set<String> queryGroupIds, Boolean requestedBreached) {
final Map<String, QueryGroupStatsHolder> statsHolderMap = new HashMap<>();
Map<String, QueryGroup> existingGroups = clusterService.state().metadata().queryGroups();
Map<String, QueryGroupState> existingStateMap = queryGroupsStateAccessor.getQueryGroupStateMap();
if (!queryGroupIds.contains("_all")) {
for (String id : queryGroupIds) {
if (!existingGroups.containsKey(id)) {
if (!existingStateMap.containsKey(id)) {
throw new ResourceNotFoundException("QueryGroup with id " + id + " does not exist");
}
}
}
Map<String, QueryGroupState> stateMap = queryGroupsStateAccessor.getQueryGroupStateMap();
if (stateMap != null) {
stateMap.forEach((queryGroupId, currentState) -> {
if (existingStateMap != null) {
existingStateMap.forEach((queryGroupId, currentState) -> {
boolean shouldInclude = queryGroupIds.contains("_all") || queryGroupIds.contains(queryGroupId);
if (shouldInclude) {
if (requestedBreached == null
|| requestedBreached == (resourceLimitBreached(existingGroups.get(queryGroupId), currentState).v1()
.length() != 0)) {
if (requestedBreached == null || requestedBreached == resourceLimitBreached(queryGroupId, currentState)) {
statsHolderMap.put(queryGroupId, QueryGroupStatsHolder.from(currentState));
}
}
});
}
return new WlmStats(transportService.getLocalNode(), new QueryGroupStats(statsHolderMap));
return new QueryGroupStats(statsHolderMap);
}

/**
* @return if the QueryGroup breaches any resource limit based on the LastRecordedUsage
*/
public Tuple<StringBuilder, ResourceType> resourceLimitBreached(QueryGroup queryGroup, QueryGroupState queryGroupState) {
StringBuilder reason = new StringBuilder();
public boolean resourceLimitBreached(String id, QueryGroupState currentState) {
QueryGroup queryGroup = clusterService.state().metadata().queryGroups().get(id);
if (queryGroup == null) {
throw new ResourceNotFoundException("QueryGroup with id " + id + " does not exist");
}

for (ResourceType resourceType : TRACKED_RESOURCES) {
if (queryGroup.getResourceLimits().containsKey(resourceType)) {
final double threshold = getNormalisedRejectionThreshold(queryGroup.getResourceLimits().get(resourceType), resourceType);
final double lastRecordedUsage = queryGroupState.getResourceState().get(resourceType).getLastRecordedUsage();
final double lastRecordedUsage = currentState.getResourceState().get(resourceType).getLastRecordedUsage();
if (threshold < lastRecordedUsage) {
reason.append(resourceType)
.append(" limit is breaching for ENFORCED type QueryGroup: (")
.append(threshold)
.append(" < ")
.append(lastRecordedUsage)
.append("). ");
return new Tuple<>(reason, resourceType);
return true;
}
}
}
return new Tuple<>(reason, null);
return false;
}

/**
Expand All @@ -287,9 +274,30 @@ public void rejectIfNeeded(String queryGroupId) {
return;

optionalQueryGroup.ifPresent(queryGroup -> {
Tuple<StringBuilder, ResourceType> reason = resourceLimitBreached(queryGroup, queryGroupState);
if (reason.v1().length() != 0) {
queryGroupState.getResourceState().get(reason.v2()).rejections.inc();
boolean reject = false;
final StringBuilder reason = new StringBuilder();
for (ResourceType resourceType : TRACKED_RESOURCES) {
if (queryGroup.getResourceLimits().containsKey(resourceType)) {
final double threshold = getNormalisedRejectionThreshold(
queryGroup.getResourceLimits().get(resourceType),
resourceType
);
final double lastRecordedUsage = queryGroupState.getResourceState().get(resourceType).getLastRecordedUsage();
if (threshold < lastRecordedUsage) {
reject = true;
reason.append(resourceType)
.append(" limit is breaching for ENFORCED type QueryGroup: (")
.append(threshold)
.append(" < ")
.append(lastRecordedUsage)
.append("). ");
queryGroupState.getResourceState().get(resourceType).rejections.inc();
// should not double count even if both the resource limits are breaching
break;
}
}
}
if (reject) {
queryGroupState.totalRejections.inc();
throw new OpenSearchRejectedExecutionException(
"QueryGroup " + queryGroupId + " is already contended. " + reason.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.wlm;

import org.opensearch.common.SetOnce;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
Expand All @@ -21,11 +20,11 @@
*/
public class WorkloadManagementTransportInterceptor implements TransportInterceptor {
private final ThreadPool threadPool;
private final SetOnce<QueryGroupService> queryGroupService;
private final QueryGroupService queryGroupService;

public WorkloadManagementTransportInterceptor(final ThreadPool threadPool, final SetOnce<QueryGroupService> queryGroupServiceSetOnce) {
public WorkloadManagementTransportInterceptor(final ThreadPool threadPool, final QueryGroupService queryGroupService) {
this.threadPool = threadPool;
this.queryGroupService = queryGroupServiceSetOnce;
this.queryGroupService = queryGroupService;
}

@Override
Expand All @@ -46,13 +45,9 @@ public static class RequestHandler<T extends TransportRequest> implements Transp

private final ThreadPool threadPool;
TransportRequestHandler<T> actualHandler;
private final SetOnce<QueryGroupService> queryGroupService;
private final QueryGroupService queryGroupService;

public RequestHandler(
ThreadPool threadPool,
TransportRequestHandler<T> actualHandler,
SetOnce<QueryGroupService> queryGroupService
) {
public RequestHandler(ThreadPool threadPool, TransportRequestHandler<T> actualHandler, QueryGroupService queryGroupService) {
this.threadPool = threadPool;
this.actualHandler = actualHandler;
this.queryGroupService = queryGroupService;
Expand All @@ -63,8 +58,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro
if (isSearchWorkloadRequest(task)) {
((QueryGroupTask) task).setQueryGroupId(threadPool.getThreadContext());
final String queryGroupId = ((QueryGroupTask) (task)).getQueryGroupId();
assert queryGroupService.get() != null;
queryGroupService.get().rejectIfNeeded(queryGroupId);
queryGroupService.rejectIfNeeded(queryGroupId);
}
actualHandler.messageReceived(request, channel, task);
}
Expand Down
Loading

0 comments on commit 7dcd545

Please sign in to comment.