Skip to content

Commit

Permalink
URI path filtering support in cluster stats API (opensearch-project#1…
Browse files Browse the repository at this point in the history
…5938)

* URI path filtering support in cluster stats API

Signed-off-by: Swetha Guptha <[email protected]>
(cherry picked from commit 1982427)
Signed-off-by: Swetha Guptha <[email protected]>
  • Loading branch information
SwethaGuptha authored and Swetha Guptha committed Oct 22, 2024
1 parent 4ed6a47 commit d6fa5f3
Show file tree
Hide file tree
Showing 14 changed files with 1,483 additions and 115 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add method to return dynamic SecureTransportParameters from SecureTransportSettingsProvider interface ([#16387](https://github.com/opensearch-project/OpenSearch/pull/16387)
- Add _list/shards API as paginated alternate to _cat/shards ([#14641](https://github.com/opensearch-project/OpenSearch/pull/14641))
- [Star Tree - Search] Add support for metric aggregations with/without term query ([15289](https://github.com/opensearch-project/OpenSearch/pull/15289))
- URI path filtering support in cluster stats API ([#15938](https://github.com/opensearch-project/OpenSearch/pull/15938))


### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.23.1 to 2.24.0 ([#15858](https://github.com/opensearch-project/OpenSearch/pull/15858))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.upgrades;

import org.opensearch.Version;
import org.opensearch.client.Request;
import org.opensearch.client.Response;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;

public class ClusterStatsIT extends AbstractRollingTestCase {

private final List<String> nodeStatsMetrics = List.of("os", "process", "jvm", "fs", "plugins", "ingest", "network_types", "discovery_types", "packaging_types");

private final List<String> indicesStatsMetrics = List.of("shards", "docs", "store", "fielddata", "query_cache", "completion", "segments", "analysis", "mappings");

public void testClusterStats() throws IOException {
Response response = client().performRequest(new Request("GET", "/_cluster/stats"));
validateClusterStatsWithFilterResponse(response, nodeStatsMetrics, indicesStatsMetrics);
if (AbstractRollingTestCase.UPGRADE_FROM_VERSION.onOrAfter(Version.V_2_18_0) || (
CLUSTER_TYPE == ClusterType.UPGRADED && Version.CURRENT.onOrAfter(Version.V_2_18_0))) {
response = client().performRequest(new Request("GET", "/_cluster/stats/os/nodes/_all"));
validateClusterStatsWithFilterResponse(response, List.of("os"), Collections.emptyList());
response = client().performRequest(new Request("GET", "/_cluster/stats/indices/mappings/nodes/_all"));
validateClusterStatsWithFilterResponse(response, Collections.emptyList(), List.of("mappings"));
response = client().performRequest(new Request("GET", "/_cluster/stats/os,indices/mappings/nodes/_all"));
validateClusterStatsWithFilterResponse(response, List.of("os"), List.of("mappings"));
}
}

private void validateClusterStatsWithFilterResponse(Response response, List<String> requestedNodesStatsMetrics, List<String> requestedIndicesStatsMetrics) throws IOException {
assertEquals(200, response.getStatusLine().getStatusCode());
Map<String, Object> entity = entityAsMap(response);
if (requestedNodesStatsMetrics != null && !requestedNodesStatsMetrics.isEmpty()) {
assertTrue(entity.containsKey("nodes"));
Map<?, ?> nodesStats = (Map<?, ?>) entity.get("nodes");
for (String metric : nodeStatsMetrics) {
if (requestedNodesStatsMetrics.contains(metric)) {
assertTrue(nodesStats.containsKey(metric));
} else {
assertFalse(nodesStats.containsKey(metric));
}
}
}

if (requestedIndicesStatsMetrics != null && !requestedIndicesStatsMetrics.isEmpty()) {
assertTrue(entity.containsKey("indices"));
Map<?, ?> indicesStats = (Map<?, ?>) entity.get("indices");
for (String metric : indicesStatsMetrics) {
if (requestedIndicesStatsMetrics.contains(metric)) {
assertTrue(indicesStats.containsKey(metric));
} else {
assertFalse(indicesStats.containsKey(metric));
}
}
}
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

package org.opensearch.action.admin.cluster.stats;

import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.IndexMetric;
import org.opensearch.action.admin.indices.stats.CommonStats;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.core.xcontent.ToXContentFragment;
Expand All @@ -47,6 +48,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Cluster Stats per index
Expand All @@ -68,14 +70,23 @@ public class ClusterStatsIndices implements ToXContentFragment {
private MappingStats mappings;

public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, MappingStats mappingStats, AnalysisStats analysisStats) {
Map<String, ShardStats> countsPerIndex = new HashMap<>();
this(Set.of(IndexMetric.values()), nodeResponses, mappingStats, analysisStats);

}

this.docs = new DocsStats();
this.store = new StoreStats();
this.fieldData = new FieldDataStats();
this.queryCache = new QueryCacheStats();
this.completion = new CompletionStats();
this.segments = new SegmentsStats();
public ClusterStatsIndices(
Set<IndexMetric> indicesMetrics,
List<ClusterStatsNodeResponse> nodeResponses,
MappingStats mappingStats,
AnalysisStats analysisStats
) {
Map<String, ShardStats> countsPerIndex = new HashMap<>();
this.docs = indicesMetrics.contains(IndexMetric.DOCS) ? new DocsStats() : null;
this.store = indicesMetrics.contains(IndexMetric.STORE) ? new StoreStats() : null;
this.fieldData = indicesMetrics.contains(IndexMetric.FIELDDATA) ? new FieldDataStats() : null;
this.queryCache = indicesMetrics.contains(IndexMetric.QUERY_CACHE) ? new QueryCacheStats() : null;
this.completion = indicesMetrics.contains(IndexMetric.COMPLETION) ? new CompletionStats() : null;
this.segments = indicesMetrics.contains(IndexMetric.SEGMENTS) ? new SegmentsStats() : null;

for (ClusterStatsNodeResponse r : nodeResponses) {
// Aggregated response from the node
Expand All @@ -92,12 +103,24 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping
}
}

docs.add(r.getAggregatedNodeLevelStats().commonStats.docs);
store.add(r.getAggregatedNodeLevelStats().commonStats.store);
fieldData.add(r.getAggregatedNodeLevelStats().commonStats.fieldData);
queryCache.add(r.getAggregatedNodeLevelStats().commonStats.queryCache);
completion.add(r.getAggregatedNodeLevelStats().commonStats.completion);
segments.add(r.getAggregatedNodeLevelStats().commonStats.segments);
if (indicesMetrics.contains(IndexMetric.DOCS)) {
docs.add(r.getAggregatedNodeLevelStats().commonStats.docs);
}
if (indicesMetrics.contains(IndexMetric.STORE)) {
store.add(r.getAggregatedNodeLevelStats().commonStats.store);
}
if (indicesMetrics.contains(IndexMetric.FIELDDATA)) {
fieldData.add(r.getAggregatedNodeLevelStats().commonStats.fieldData);
}
if (indicesMetrics.contains(IndexMetric.QUERY_CACHE)) {
queryCache.add(r.getAggregatedNodeLevelStats().commonStats.queryCache);
}
if (indicesMetrics.contains(IndexMetric.COMPLETION)) {
completion.add(r.getAggregatedNodeLevelStats().commonStats.completion);
}
if (indicesMetrics.contains(IndexMetric.SEGMENTS)) {
segments.add(r.getAggregatedNodeLevelStats().commonStats.segments);
}
} else {
// Default response from the node
for (org.opensearch.action.admin.indices.stats.ShardStats shardStats : r.shardsStats()) {
Expand All @@ -113,21 +136,35 @@ public ClusterStatsIndices(List<ClusterStatsNodeResponse> nodeResponses, Mapping

if (shardStats.getShardRouting().primary()) {
indexShardStats.primaries++;
docs.add(shardCommonStats.docs);
if (indicesMetrics.contains(IndexMetric.DOCS)) {
docs.add(shardCommonStats.docs);
}
}
if (indicesMetrics.contains(IndexMetric.STORE)) {
store.add(shardCommonStats.store);
}
if (indicesMetrics.contains(IndexMetric.FIELDDATA)) {
fieldData.add(shardCommonStats.fieldData);
}
if (indicesMetrics.contains(IndexMetric.QUERY_CACHE)) {
queryCache.add(shardCommonStats.queryCache);
}
if (indicesMetrics.contains(IndexMetric.COMPLETION)) {
completion.add(shardCommonStats.completion);
}
if (indicesMetrics.contains(IndexMetric.SEGMENTS)) {
segments.add(shardCommonStats.segments);
}
store.add(shardCommonStats.store);
fieldData.add(shardCommonStats.fieldData);
queryCache.add(shardCommonStats.queryCache);
completion.add(shardCommonStats.completion);
segments.add(shardCommonStats.segments);
}
}
}

shards = new ShardStats();
indexCount = countsPerIndex.size();
for (final ShardStats indexCountsCursor : countsPerIndex.values()) {
shards.addIndexShardCount(indexCountsCursor);
if (indicesMetrics.contains(IndexMetric.SHARDS)) {
shards = new ShardStats();
for (final ShardStats indexCountsCursor : countsPerIndex.values()) {
shards.addIndexShardCount(indexCountsCursor);
}
}

this.mappings = mappingStats;
Expand Down Expand Up @@ -186,13 +223,27 @@ static final class Fields {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(Fields.COUNT, indexCount);
shards.toXContent(builder, params);
docs.toXContent(builder, params);
store.toXContent(builder, params);
fieldData.toXContent(builder, params);
queryCache.toXContent(builder, params);
completion.toXContent(builder, params);
segments.toXContent(builder, params);
if (shards != null) {
shards.toXContent(builder, params);
}
if (docs != null) {
docs.toXContent(builder, params);
}
if (store != null) {
store.toXContent(builder, params);
}
if (fieldData != null) {
fieldData.toXContent(builder, params);
}
if (queryCache != null) {
queryCache.toXContent(builder, params);
}
if (completion != null) {
completion.toXContent(builder, params);
}
if (segments != null) {
segments.toXContent(builder, params);
}
if (mappings != null) {
mappings.toXContent(builder, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.action.admin.cluster.node.info.NodeInfo;
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.action.admin.cluster.node.stats.NodeStats;
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest.Metric;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.annotation.PublicApi;
Expand Down Expand Up @@ -89,10 +90,29 @@ public class ClusterStatsNodes implements ToXContentFragment {
private final PackagingTypes packagingTypes;
private final IngestStats ingestStats;

public static final Set<Metric> NODE_STATS_METRICS = Set.of(
// Stats computed from node info and node stat
Metric.OS,
Metric.JVM,
// Stats computed from node stat
Metric.FS,
Metric.PROCESS,
Metric.INGEST,
// Stats computed from node info
Metric.PLUGINS,
Metric.NETWORK_TYPES,
Metric.DISCOVERY_TYPES,
Metric.PACKAGING_TYPES
);

ClusterStatsNodes(List<ClusterStatsNodeResponse> nodeResponses) {
this(Set.of(Metric.values()), nodeResponses);
}

ClusterStatsNodes(Set<Metric> requestedMetrics, List<ClusterStatsNodeResponse> nodeResponses) {
this.versions = new HashSet<>();
this.fs = new FsInfo.Path();
this.plugins = new HashSet<>();
this.fs = requestedMetrics.contains(ClusterStatsRequest.Metric.FS) ? new FsInfo.Path() : null;
this.plugins = requestedMetrics.contains(ClusterStatsRequest.Metric.PLUGINS) ? new HashSet<>() : null;

Set<InetAddress> seenAddresses = new HashSet<>(nodeResponses.size());
List<NodeInfo> nodeInfos = new ArrayList<>(nodeResponses.size());
Expand All @@ -101,26 +121,29 @@ public class ClusterStatsNodes implements ToXContentFragment {
nodeInfos.add(nodeResponse.nodeInfo());
nodeStats.add(nodeResponse.nodeStats());
this.versions.add(nodeResponse.nodeInfo().getVersion());
this.plugins.addAll(nodeResponse.nodeInfo().getInfo(PluginsAndModules.class).getPluginInfos());
if (requestedMetrics.contains(ClusterStatsRequest.Metric.PLUGINS)) {
this.plugins.addAll(nodeResponse.nodeInfo().getInfo(PluginsAndModules.class).getPluginInfos());
}

// now do the stats that should be deduped by hardware (implemented by ip deduping)
TransportAddress publishAddress = nodeResponse.nodeInfo().getInfo(TransportInfo.class).address().publishAddress();
final InetAddress inetAddress = publishAddress.address().getAddress();
if (!seenAddresses.add(inetAddress)) {
continue;
}
if (nodeResponse.nodeStats().getFs() != null) {
if (requestedMetrics.contains(ClusterStatsRequest.Metric.FS) && nodeResponse.nodeStats().getFs() != null) {
this.fs.add(nodeResponse.nodeStats().getFs().getTotal());
}
}

this.counts = new Counts(nodeInfos);
this.os = new OsStats(nodeInfos, nodeStats);
this.process = new ProcessStats(nodeStats);
this.jvm = new JvmStats(nodeInfos, nodeStats);
this.networkTypes = new NetworkTypes(nodeInfos);
this.discoveryTypes = new DiscoveryTypes(nodeInfos);
this.packagingTypes = new PackagingTypes(nodeInfos);
this.ingestStats = new IngestStats(nodeStats);
this.networkTypes = requestedMetrics.contains(ClusterStatsRequest.Metric.NETWORK_TYPES) ? new NetworkTypes(nodeInfos) : null;
this.discoveryTypes = requestedMetrics.contains(ClusterStatsRequest.Metric.DISCOVERY_TYPES) ? new DiscoveryTypes(nodeInfos) : null;
this.packagingTypes = requestedMetrics.contains(ClusterStatsRequest.Metric.PACKAGING_TYPES) ? new PackagingTypes(nodeInfos) : null;
this.ingestStats = requestedMetrics.contains(ClusterStatsRequest.Metric.INGEST) ? new IngestStats(nodeStats) : null;
this.process = requestedMetrics.contains(ClusterStatsRequest.Metric.PROCESS) ? new ProcessStats(nodeStats) : null;
this.os = requestedMetrics.contains(ClusterStatsRequest.Metric.OS) ? new OsStats(nodeInfos, nodeStats) : null;
this.jvm = requestedMetrics.contains(ClusterStatsRequest.Metric.JVM) ? new JvmStats(nodeInfos, nodeStats) : null;
}

public Counts getCounts() {
Expand Down Expand Up @@ -179,36 +202,54 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}
builder.endArray();

builder.startObject(Fields.OS);
os.toXContent(builder, params);
builder.endObject();
if (os != null) {
builder.startObject(Fields.OS);
os.toXContent(builder, params);
builder.endObject();
}

builder.startObject(Fields.PROCESS);
process.toXContent(builder, params);
builder.endObject();
if (process != null) {
builder.startObject(Fields.PROCESS);
process.toXContent(builder, params);
builder.endObject();
}

builder.startObject(Fields.JVM);
jvm.toXContent(builder, params);
builder.endObject();
if (jvm != null) {
builder.startObject(Fields.JVM);
jvm.toXContent(builder, params);
builder.endObject();
}

builder.field(Fields.FS);
fs.toXContent(builder, params);
if (fs != null) {
builder.field(Fields.FS);
fs.toXContent(builder, params);
}

builder.startArray(Fields.PLUGINS);
for (PluginInfo pluginInfo : plugins) {
pluginInfo.toXContent(builder, params);
if (plugins != null) {
builder.startArray(Fields.PLUGINS);
for (PluginInfo pluginInfo : plugins) {
pluginInfo.toXContent(builder, params);
}
builder.endArray();
}
builder.endArray();

builder.startObject(Fields.NETWORK_TYPES);
networkTypes.toXContent(builder, params);
builder.endObject();
if (networkTypes != null) {
builder.startObject(Fields.NETWORK_TYPES);
networkTypes.toXContent(builder, params);
builder.endObject();
}

discoveryTypes.toXContent(builder, params);
if (discoveryTypes != null) {
discoveryTypes.toXContent(builder, params);
}

packagingTypes.toXContent(builder, params);
if (packagingTypes != null) {
packagingTypes.toXContent(builder, params);
}

ingestStats.toXContent(builder, params);
if (ingestStats != null) {
ingestStats.toXContent(builder, params);
}

return builder;
}
Expand Down
Loading

0 comments on commit d6fa5f3

Please sign in to comment.