diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFCacheConfigMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFCacheConfigMetricsCollector.java index 8a5e3e5c..b5bc67a8 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFCacheConfigMetricsCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFCacheConfigMetricsCollector.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import java.io.Closeable; import java.security.AccessController; import java.security.PrivilegedAction; import org.apache.commons.lang3.reflect.FieldUtils; @@ -34,10 +35,12 @@ public class RTFCacheConfigMetricsCollector extends PerformanceAnalyzerMetricsCollector implements TelemetryCollector { - private MetricsRegistry metricsRegistry; private static final Logger LOG = LogManager.getLogger(RTFCacheConfigMetricsCollector.class); private PerformanceAnalyzerController performanceAnalyzerController; private ConfigOverridesWrapper configOverridesWrapper; + private Closeable fieldDataCacheGauge; + private Closeable requestCacheGauge; + private boolean metricsInitialised; public RTFCacheConfigMetricsCollector( PerformanceAnalyzerController performanceAnalyzerController, @@ -56,11 +59,11 @@ public RTFCacheConfigMetricsCollector( public void collectMetrics(long l) { if (performanceAnalyzerController.isCollectorDisabled( configOverridesWrapper, getCollectorName())) { + closeOpenGaugeObservablesIfAny(); LOG.info("RTFCacheConfigMetricsCollector is disabled. Skipping collection."); return; } - - metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); + MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry(); if (metricsRegistry == null) { LOG.error("could not get the instance of MetricsRegistry class"); return; @@ -71,33 +74,59 @@ configOverridesWrapper, getCollectorName())) { LOG.error("could not get the instance of indicesService class"); return; } - LOG.debug("Executing collect metrics for RTFCacheConfigMetricsCollector"); - CacheMaxSizeStatus fieldDataCacheMaxSizeStatus = - AccessController.doPrivileged( - (PrivilegedAction) - () -> { - try { - Cache fieldDataCache = - indicesService - .getIndicesFieldDataCache() - .getCache(); - long fieldDataMaxSize = - (Long) - FieldUtils.readField( - fieldDataCache, - CACHE_MAX_WEIGHT, - true); - return new CacheMaxSizeStatus( - FIELD_DATA_CACHE.toString(), fieldDataMaxSize); - } catch (Exception e) { - LOG.debug( - "Error occurred while fetching fieldDataCacheMaxSizeStatus: " - + e.getMessage()); - return null; - } - }); + initialiseMetricsIfNeeded(metricsRegistry, indicesService); + } + + private void initialiseMetricsIfNeeded( + MetricsRegistry metricsRegistry, IndicesService indicesService) { + if (!metricsInitialised) { + fieldDataCacheGauge = + metricsRegistry.createGauge( + RTFMetrics.CacheConfigValue.Constants.CACHE_MAX_SIZE_VALUE, + "Cache Max Size metrics", + RTFMetrics.MetricUnits.BYTE.toString(), + () -> getFieldCacheMaxSizeStatus(indicesService), + Tags.create() + .addTag( + RTFMetrics.CacheConfigDimension.Constants.TYPE_VALUE, + FIELD_DATA_CACHE.toString())); + requestCacheGauge = + metricsRegistry.createGauge( + RTFMetrics.CacheConfigValue.Constants.CACHE_MAX_SIZE_VALUE, + "Cache Max Size metrics", + RTFMetrics.MetricUnits.BYTE.toString(), + () -> getRequestCacheMaxSizeStatus(indicesService), + Tags.create() + .addTag( + RTFMetrics.CacheConfigDimension.Constants.TYPE_VALUE, + SHARD_REQUEST_CACHE.toString())); + metricsInitialised = true; + } + } + + private void closeOpenGaugeObservablesIfAny() { + if (fieldDataCacheGauge != null) { + try { + fieldDataCacheGauge.close(); + } catch (Exception e) { + LOG.error("Unable to close the fieldDataCacheGauge observable"); + } finally { + fieldDataCacheGauge = null; + } + } + if (requestCacheGauge != null) { + try { + requestCacheGauge.close(); + } catch (Exception e) { + LOG.error("Unable to close the fieldDataCacheGauge observable"); + } finally { + requestCacheGauge = null; + } + } + } + private double getRequestCacheMaxSizeStatus(IndicesService indicesService) { CacheMaxSizeStatus shardRequestCacheMaxSizeStatus = AccessController.doPrivileged( (PrivilegedAction) @@ -132,28 +161,45 @@ configOverridesWrapper, getCollectorName())) { return null; } }); - - if (fieldDataCacheMaxSizeStatus != null - && fieldDataCacheMaxSizeStatus.getCacheMaxSize() > 0) { - recordMetrics(fieldDataCacheMaxSizeStatus); - } - if (shardRequestCacheMaxSizeStatus != null && shardRequestCacheMaxSizeStatus.getCacheMaxSize() > 0) { - recordMetrics(shardRequestCacheMaxSizeStatus); + return shardRequestCacheMaxSizeStatus.getCacheMaxSize(); + } else { + return 0.0; } } - private void recordMetrics(CacheMaxSizeStatus cacheMaxSizeStatus) { - metricsRegistry.createGauge( - RTFMetrics.CacheConfigValue.Constants.CACHE_MAX_SIZE_VALUE, - "Cache Max Size metrics", - RTFMetrics.MetricUnits.BYTE.toString(), - () -> (double) cacheMaxSizeStatus.getCacheMaxSize(), - Tags.create() - .addTag( - RTFMetrics.CacheConfigDimension.Constants.TYPE_VALUE, - cacheMaxSizeStatus.getCacheType())); + private static double getFieldCacheMaxSizeStatus(IndicesService indicesService) { + CacheMaxSizeStatus fieldDataCacheMaxSizeStatus = + AccessController.doPrivileged( + (PrivilegedAction) + () -> { + try { + Cache fieldDataCache = + indicesService + .getIndicesFieldDataCache() + .getCache(); + long fieldDataMaxSize = + (Long) + FieldUtils.readField( + fieldDataCache, + CACHE_MAX_WEIGHT, + true); + return new CacheMaxSizeStatus( + FIELD_DATA_CACHE.toString(), fieldDataMaxSize); + } catch (Exception e) { + LOG.debug( + "Error occurred while fetching fieldDataCacheMaxSizeStatus: " + + e.getMessage()); + return null; + } + }); + if (fieldDataCacheMaxSizeStatus != null + && fieldDataCacheMaxSizeStatus.getCacheMaxSize() > 0) { + return fieldDataCacheMaxSizeStatus.getCacheMaxSize(); + } else { + return 0.0; + } } static class CacheMaxSizeStatus extends MetricStatus { diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollector.java index 6872089d..fddd8985 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollector.java +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollector.java @@ -5,7 +5,9 @@ package org.opensearch.performanceanalyzer.collectors.telemetry; +import java.io.Closeable; import java.lang.management.MemoryUsage; +import java.util.HashMap; import java.util.Map; import java.util.function.Supplier; import org.apache.logging.log4j.LogManager; @@ -38,6 +40,7 @@ public class RTFHeapMetricsCollector extends PerformanceAnalyzerMetricsCollector private boolean metricsInitialised; private PerformanceAnalyzerController performanceAnalyzerController; private ConfigOverridesWrapper configOverridesWrapper; + private Map memTypeToGaugeObservableMap; public RTFHeapMetricsCollector( PerformanceAnalyzerController performanceAnalyzerController, @@ -50,6 +53,7 @@ public RTFHeapMetricsCollector( this.metricsInitialised = false; this.performanceAnalyzerController = performanceAnalyzerController; this.configOverridesWrapper = configOverridesWrapper; + this.memTypeToGaugeObservableMap = new HashMap<>(); } @Override @@ -57,6 +61,7 @@ public void collectMetrics(long startTime) { if (performanceAnalyzerController.isCollectorDisabled( configOverridesWrapper, getCollectorName())) { LOG.info("RTFDisksCollector is disabled. Skipping collection."); + closeOpenGaugeObservablesIfAny(); return; } @@ -72,6 +77,21 @@ configOverridesWrapper, getCollectorName())) { recordMetrics(); } + private void closeOpenGaugeObservablesIfAny() { + for (String key : memTypeToGaugeObservableMap.keySet()) { + if (memTypeToGaugeObservableMap.containsKey(key)) { + try { + Closeable observableGauge = memTypeToGaugeObservableMap.remove(key); + if (observableGauge != null) { + observableGauge.close(); + } + } catch (Exception e) { + LOG.error("Unable to close the observable gauge for key {}", key); + } + } + } + } + private void initialiseMetricsIfNeeded() { if (metricsInitialised == false) { gcCollectionEventMetrics = @@ -91,6 +111,7 @@ private void initialiseMetricsIfNeeded() { RTFMetrics.HeapValue.Constants.USED_VALUE, "GC Heap Used PA Metrics", RTFMetrics.MetricUnits.BYTE.toString()); + metricsInitialised = true; } } @@ -119,12 +140,31 @@ private void recordMetrics() { heapUsedMetrics.record( memoryUsage.getUsed(), Tags.create().addTag(memTypeAttributeKey, entry.getKey())); - metricsRegistry.createGauge( - RTFMetrics.HeapValue.Constants.MAX_VALUE, - "Heap Max PA metrics", - "", - () -> (double) memoryUsage.getMax(), - Tags.create().addTag(memTypeAttributeKey, entry.getKey())); + createGaugeInstanceIfNotAvailable(entry.getKey()); + } + } + + private void createGaugeInstanceIfNotAvailable(String key) { + if (!memTypeToGaugeObservableMap.containsKey(key)) { + LOG.info("Gauge doesn't exist for the mem type {}", key); + Closeable observableGauge = + metricsRegistry.createGauge( + RTFMetrics.HeapValue.Constants.MAX_VALUE, + "Heap Max PA metrics", + "", + () -> getValue(key), + Tags.create().addTag(memTypeAttributeKey, key)); + memTypeToGaugeObservableMap.put(key, observableGauge); + } + } + + private double getValue(String key) { + Map> memoryUsageSuppliers = + HeapMetrics.getMemoryUsageSuppliers(); + MemoryUsage memoryUsage = null; + if (memoryUsageSuppliers.get(key) != null) { + memoryUsage = memoryUsageSuppliers.get(key).get(); } + return memoryUsage != null ? memoryUsage.getMax() : 0.0; } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFCacheConfigMetricsCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFCacheConfigMetricsCollectorTests.java index 27d55e36..c8d5b52a 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFCacheConfigMetricsCollectorTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFCacheConfigMetricsCollectorTests.java @@ -9,6 +9,7 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -29,6 +30,7 @@ public class RTFCacheConfigMetricsCollectorTests extends OpenSearchSingleNodeTes private static final String TEST_INDEX = "test"; private RTFCacheConfigMetricsCollector rtfCacheConfigMetricsCollector; private static MetricsRegistry metricsRegistry; + private static MetricsRegistry metricsRegistry1; private long startTimeInMills = 1153721339; @Before @@ -36,6 +38,7 @@ public void init() { MetricsConfiguration.CONFIG_MAP.put( RTFCacheConfigMetricsCollector.class, MetricsConfiguration.cdefault); metricsRegistry = mock(MetricsRegistry.class); + metricsRegistry1 = mock(MetricsRegistry.class); OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry); IndicesService indicesService = getInstanceFromNode(IndicesService.class); OpenSearchResources.INSTANCE.setIndicesService(indicesService); @@ -58,4 +61,17 @@ public void testCollectMetrics() throws IOException { verify(metricsRegistry, atLeastOnce()) .createGauge(anyString(), anyString(), anyString(), any(), any()); } + + @Test + public void testCollectMetricsRepeated() throws IOException { + createIndex(TEST_INDEX); + rtfCacheConfigMetricsCollector.collectMetrics(startTimeInMills); + verify(metricsRegistry, atLeastOnce()) + .createGauge(anyString(), anyString(), anyString(), any(), any()); + + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry1); + rtfCacheConfigMetricsCollector.collectMetrics(startTimeInMills); + verify(metricsRegistry1, never()) + .createGauge(anyString(), anyString(), anyString(), any(), any()); + } } diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollectorTests.java index 928a5a29..5aa5dcc7 100644 --- a/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollectorTests.java +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/telemetry/RTFHeapMetricsCollectorTests.java @@ -25,6 +25,7 @@ public class RTFHeapMetricsCollectorTests extends CollectorTestBase { private RTFHeapMetricsCollector rtfHeapMetricsCollector; private static MetricsRegistry metricsRegistry; + private static MetricsRegistry metricsRegistry1; private static Histogram gcCollectionEventHistogram; private static Histogram gcCollectionTimeHistogram; private static Histogram heapUsedHistogram; @@ -35,6 +36,7 @@ public void init() { RTFHeapMetricsCollector.class, MetricsConfiguration.cdefault); metricsRegistry = mock(MetricsRegistry.class); + metricsRegistry1 = mock(MetricsRegistry.class); gcCollectionEventHistogram = mock(Histogram.class); gcCollectionTimeHistogram = mock(Histogram.class); heapUsedHistogram = mock(Histogram.class); @@ -65,4 +67,20 @@ public void testCollectMetrics() throws IOException { verify(metricsRegistry, atLeastOnce()) .createGauge(anyString(), anyString(), anyString(), any(), any()); } + + @Test + public void testCollectMetricsRepeated() throws IOException { + + rtfHeapMetricsCollector.collectMetrics(System.currentTimeMillis()); + verify(heapUsedHistogram, atLeastOnce()).record(anyDouble(), any()); + verify(gcCollectionTimeHistogram, atLeastOnce()).record(anyDouble(), any()); + verify(gcCollectionEventHistogram, atLeastOnce()).record(anyDouble(), any()); + verify(metricsRegistry, atLeastOnce()) + .createGauge(anyString(), anyString(), anyString(), any(), any()); + + OpenSearchResources.INSTANCE.setMetricsRegistry(metricsRegistry1); + rtfHeapMetricsCollector.collectMetrics(System.currentTimeMillis()); + verify(metricsRegistry1, never()) + .createGauge(anyString(), anyString(), anyString(), any(), any()); + } }