diff --git a/core/src/main/scala/kafka/server/FairLimiter.java b/core/src/main/scala/kafka/server/FairLimiter.java index d6ad1edc6c..86ac171494 100644 --- a/core/src/main/scala/kafka/server/FairLimiter.java +++ b/core/src/main/scala/kafka/server/FairLimiter.java @@ -13,6 +13,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -27,13 +28,32 @@ public class FairLimiter implements Limiter { private final Lock lock = new ReentrantLock(true); private final Semaphore permits; - public FairLimiter(int size) { - maxPermits = size; - permits = new Semaphore(size); + /** + * The name of this limiter, used for metrics. + */ + private final String name; + /** + * The number of threads waiting for permits, used for metrics. + */ + private final AtomicInteger waitingThreads = new AtomicInteger(0); + + public FairLimiter(int size, String name) { + this.maxPermits = size; + this.permits = new Semaphore(size); + this.name = name; } @Override public Handler acquire(int permit) throws InterruptedException { + waitingThreads.incrementAndGet(); + try { + return acquire0(permit); + } finally { + waitingThreads.decrementAndGet(); + } + } + + private Handler acquire0(int permit) throws InterruptedException { lock.lock(); try { permits.acquire(permit); @@ -45,6 +65,15 @@ public Handler acquire(int permit) throws InterruptedException { @Override public Handler acquire(int permit, long timeoutMs) throws InterruptedException { + waitingThreads.incrementAndGet(); + try { + return acquire0(permit, timeoutMs); + } finally { + waitingThreads.decrementAndGet(); + } + } + + private Handler acquire0(int permit, long timeoutMs) throws InterruptedException { long start = System.nanoTime(); if (lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) { try { @@ -72,6 +101,16 @@ public int availablePermits() { return permits.availablePermits(); } + @Override + public int waitingThreads() { + return waitingThreads.get(); + } + + @Override + public String name() { + return name; + } + private Handler acquireLocked(int permit, long timeoutNs) throws InterruptedException { if (permit > maxPermits) { permit = maxPermits; diff --git a/core/src/main/scala/kafka/server/Limiter.java b/core/src/main/scala/kafka/server/Limiter.java index 3e3511de77..b6742adc0b 100644 --- a/core/src/main/scala/kafka/server/Limiter.java +++ b/core/src/main/scala/kafka/server/Limiter.java @@ -49,6 +49,16 @@ public interface Limiter { */ int availablePermits(); + /** + * Return the number of threads waiting for permits. + */ + int waitingThreads(); + + /** + * Return the name of this limiter. + */ + String name(); + /** * A handler to release acquired permits. */ diff --git a/core/src/main/scala/kafka/server/NoopLimiter.java b/core/src/main/scala/kafka/server/NoopLimiter.java index 1fab234492..aaaa63ab7d 100644 --- a/core/src/main/scala/kafka/server/NoopLimiter.java +++ b/core/src/main/scala/kafka/server/NoopLimiter.java @@ -38,6 +38,16 @@ public int availablePermits() { return Integer.MAX_VALUE; } + @Override + public int waitingThreads() { + return 0; + } + + @Override + public String name() { + return "noop"; + } + public static class NoopHandler implements Handler { @Override public void close() { diff --git a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala index 772e804ece..b9b5bd3ddd 100644 --- a/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/BrokerQuotaManager.scala @@ -11,6 +11,7 @@ package kafka.server.streamaspect +import com.automq.stream.s3.metrics.S3StreamMetricsManager import kafka.network.RequestChannel import kafka.server._ import kafka.utils.QuotaUtils @@ -42,6 +43,13 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig, override def delayQueueSensor: Sensor = brokerDelayQueueSensor + S3StreamMetricsManager.registerBrokerQuotaLimitSupplier(() => java.util.Map.of( + QuotaType.RequestRate.toString, quotaLimit(QuotaType.RequestRate), + QuotaType.Produce.toString, quotaLimit(QuotaType.Produce), + QuotaType.Fetch.toString, quotaLimit(QuotaType.Fetch), + QuotaType.SlowFetch.toString, quotaLimit(QuotaType.SlowFetch) + )) + def getMaxValueInQuotaWindow(quotaType: QuotaType, request: RequestChannel.Request): Double = { if (shouldThrottle(request)) { quotaLimit(quotaType) diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index 96da3503d0..358c27d3f1 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -1,6 +1,7 @@ package kafka.server.streamaspect import com.automq.stream.api.exceptions.FastReadFailFastException +import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil} import com.automq.stream.utils.FutureUtil import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor import kafka.cluster.Partition @@ -114,14 +115,28 @@ class ElasticReplicaManager( fetchExecutorQueueSizeGaugeMap }) - private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB - private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB - private val fetchLimiterGaugeMap = new util.HashMap[String, Integer]() + private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_FAST_NAME) // 200MiB + private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_SLOW_NAME) // 200MiB + private val fetchLimiterWaitingTasksGaugeMap = new util.HashMap[String, Integer]() + S3StreamKafkaMetricsManager.setFetchLimiterWaitingTaskNumSupplier(() => { + fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.waitingThreads()) + fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.waitingThreads()) + fetchLimiterWaitingTasksGaugeMap + }) + private val fetchLimiterPermitsGaugeMap = new util.HashMap[String, Integer]() S3StreamKafkaMetricsManager.setFetchLimiterPermitNumSupplier(() => { - fetchLimiterGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits()) - fetchLimiterGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits()) - fetchLimiterGaugeMap + fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits()) + fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits()) + fetchLimiterPermitsGaugeMap }) + private val fetchLimiterTimeoutCounterMap = util.Map.of( + fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(fastFetchLimiter.name), + slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(slowFetchLimiter.name) + ) + private val fetchLimiterTimeHistogramMap = util.Map.of( + fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, fastFetchLimiter.name), + slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, slowFetchLimiter.name) + ) /** * Used to reduce allocation in [[readFromLocalLogV2]] @@ -561,14 +576,16 @@ class ElasticReplicaManager( math.min(bytesNeedFromParam, limiter.maxPermits()) } + val timer: TimerUtil = new TimerUtil() val handler: Handler = timeoutMs match { case t if t > 0 => limiter.acquire(bytesNeed(), t) case _ => limiter.acquire(bytesNeed()) } + fetchLimiterTimeHistogramMap.get(limiter.name).record(timer.elapsedAs(TimeUnit.NANOSECONDS)) if (handler == null) { - // handler maybe null if it timed out to acquire from limiter - // TODO add metrics for this + // the handler will be null if it timed out to acquire from limiter + fetchLimiterTimeoutCounterMap.get(limiter.name).add(MetricsLevel.INFO, 1) // warn(s"Returning emtpy fetch response for fetch request $readPartitionInfo since the wait time exceeds $timeoutMs ms.") ElasticReplicaManager.emptyReadResults(readPartitionInfo.map(_._1)) } else { diff --git a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java b/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java index 5934972722..3ccc553a2e 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/backpressure/DefaultBackPressureManager.java @@ -11,6 +11,7 @@ package com.automq.stream.s3.backpressure; +import com.automq.stream.s3.metrics.S3StreamMetricsManager; import com.automq.stream.utils.ThreadUtils; import com.automq.stream.utils.Threads; @@ -49,6 +50,11 @@ public class DefaultBackPressureManager implements BackPressureManager { * Note: It should only be accessed in the {@link #checkerScheduler} thread. */ private long lastRegulateTime = System.currentTimeMillis(); + /** + * The last load level to trigger the regulator. + * Only used for logging and monitoring. + */ + private LoadLevel lastRegulateLevel = LoadLevel.NORMAL; public DefaultBackPressureManager(Regulator regulator) { this(regulator, DEFAULT_COOLDOWN_MS); @@ -62,11 +68,12 @@ public DefaultBackPressureManager(Regulator regulator, long cooldownMs) { @Override public void start() { this.checkerScheduler = Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("back-pressure-checker-%d", false), LOGGER); + S3StreamMetricsManager.registerBackPressureStateSupplier(this::currentLoadLevel); } @Override public void registerChecker(Checker checker) { - checkerScheduler.scheduleAtFixedRate(() -> { + checkerScheduler.scheduleWithFixedDelay(() -> { loadLevels.put(checker.source(), checker.check()); maybeRegulate(); }, 0, checker.intervalMs(), TimeUnit.MILLISECONDS); @@ -113,6 +120,9 @@ private LoadLevel currentLoadLevel() { private void regulate(LoadLevel loadLevel, long now) { if (LoadLevel.NORMAL.equals(loadLevel)) { + if (!LoadLevel.NORMAL.equals(lastRegulateLevel)) { + LOGGER.info("The system is back to a normal state, checkers: {}", loadLevels); + } if (LOGGER.isDebugEnabled()) { LOGGER.debug("The system is in a normal state, checkers: {}", loadLevels); } @@ -122,5 +132,6 @@ private void regulate(LoadLevel loadLevel, long now) { loadLevel.regulate(regulator); lastRegulateTime = now; + lastRegulateLevel = loadLevel; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableDoubleGauge.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableDoubleGauge.java new file mode 100644 index 0000000000..6486d55a9d --- /dev/null +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/NoopObservableDoubleGauge.java @@ -0,0 +1,17 @@ +/* + * Copyright 2024, AutoMQ HK Limited. + * + * The use of this file is governed by the Business Source License, + * as detailed in the file "/LICENSE.S3Stream" included in this repository. + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +package com.automq.stream.s3.metrics; + +import io.opentelemetry.api.metrics.ObservableDoubleGauge; + +public class NoopObservableDoubleGauge implements ObservableDoubleGauge { +} diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java index 38f07a4117..592b579719 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsConstant.java @@ -151,4 +151,12 @@ public class S3StreamMetricsConstant { public static final String LABEL_STAGE_GET_OBJECTS = "get_objects"; public static final String LABEL_STAGE_FIND_INDEX = "find_index"; public static final String LABEL_STAGE_COMPUTE = "compute"; + + // Back Pressure + public static final String BACK_PRESSURE_STATE_METRIC_NAME = "back_pressure_state"; + public static final AttributeKey LABEL_BACK_PRESSURE_STATE = AttributeKey.stringKey("state"); + + // Broker Quota + public static final String BROKER_QUOTA_LIMIT_METRIC_NAME = "broker_quota_limit"; + public static final AttributeKey LABEL_BROKER_QUOTA_TYPE = AttributeKey.stringKey("type"); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java index 32228a8f28..056340b5df 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java +++ b/s3stream/src/main/java/com/automq/stream/s3/metrics/S3StreamMetricsManager.java @@ -12,6 +12,7 @@ package com.automq.stream.s3.metrics; import com.automq.stream.s3.ByteBufAlloc; +import com.automq.stream.s3.backpressure.LoadLevel; import com.automq.stream.s3.metrics.operations.S3ObjectStage; import com.automq.stream.s3.metrics.operations.S3Stage; import com.automq.stream.s3.metrics.wrapper.ConfigListener; @@ -34,6 +35,7 @@ import io.opentelemetry.api.common.Attributes; import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.api.metrics.ObservableDoubleGauge; import io.opentelemetry.api.metrics.ObservableLongGauge; import static com.automq.stream.s3.metrics.S3StreamMetricsConstant.LABEL_CACHE_NAME; @@ -139,10 +141,23 @@ public class S3StreamMetricsManager { private static final MultiAttributes OPERATOR_INDEX_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), S3StreamMetricsConstant.LABEL_INDEX); + // Back Pressure + private static final MultiAttributes BACK_PRESSURE_STATE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), + S3StreamMetricsConstant.LABEL_BACK_PRESSURE_STATE); + private static ObservableLongGauge backPressureState = new NoopObservableLongGauge(); + private static Supplier backPressureStateSupplier = () -> LoadLevel.NORMAL; + + // Broker Quota + private static final MultiAttributes BROKER_QUOTA_TYPE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), + S3StreamMetricsConstant.LABEL_BROKER_QUOTA_TYPE); + private static ObservableDoubleGauge brokerQuotaLimit = new NoopObservableDoubleGauge(); + private static Supplier> brokerQuotaLimitSupplier = () -> new ConcurrentHashMap<>(); static { BASE_ATTRIBUTES_LISTENERS.add(ALLOC_TYPE_ATTRIBUTES); BASE_ATTRIBUTES_LISTENERS.add(OPERATOR_INDEX_ATTRIBUTES); + BASE_ATTRIBUTES_LISTENERS.add(BACK_PRESSURE_STATE_ATTRIBUTES); + BASE_ATTRIBUTES_LISTENERS.add(BROKER_QUOTA_TYPE_ATTRIBUTES); } public static void configure(MetricsConfig metricsConfig) { @@ -400,6 +415,8 @@ public static void initMetrics(Meter meter, String prefix) { }); initAsyncCacheMetrics(meter, prefix); + initBackPressureMetrics(meter, prefix); + initBrokerQuotaMetrics(meter, prefix); } private static void initAsyncCacheMetrics(Meter meter, String prefix) { @@ -475,6 +492,43 @@ private static void initAsyncCacheMetrics(Meter meter, String prefix) { }); } + private static void initBackPressureMetrics(Meter meter, String prefix) { + backPressureState = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BACK_PRESSURE_STATE_METRIC_NAME) + .setDescription("Back pressure state") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + LoadLevel state = backPressureStateSupplier.get(); + result.record(state.ordinal(), BACK_PRESSURE_STATE_ATTRIBUTES.get(state.name())); + // To beautify Grafana dashboard, we record -1 for other states + for (LoadLevel l : LoadLevel.values()) { + if (l != state) { + result.record(-1, BACK_PRESSURE_STATE_ATTRIBUTES.get(l.name())); + } + } + } + }); + } + + private static void initBrokerQuotaMetrics(Meter meter, String prefix) { + brokerQuotaLimit = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BROKER_QUOTA_LIMIT_METRIC_NAME) + .setDescription("Broker quota limit") + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + Map brokerQuotaLimitMap = brokerQuotaLimitSupplier.get(); + for (Map.Entry entry : brokerQuotaLimitMap.entrySet()) { + String quotaType = entry.getKey(); + Double quotaLimit = entry.getValue(); + // drop too large values + if (quotaLimit > 1e15) { + continue; + } + result.record(quotaLimit, BROKER_QUOTA_TYPE_ATTRIBUTES.get(quotaType)); + } + } + }); + } + public static void registerNetworkLimiterQueueSizeSupplier(AsyncNetworkBandwidthLimiter.Type type, Supplier networkLimiterQueueSizeSupplier) { switch (type) { @@ -907,4 +961,12 @@ public static void registerLocalStreamRangeIndexCacheSizeSupplier(Supplier localStreamRangeIndexCacheStreamNum) { S3StreamMetricsManager.localStreamRangeIndexCacheStreamNum = localStreamRangeIndexCacheStreamNum; } + + public static void registerBackPressureStateSupplier(Supplier backPressureStateSupplier) { + S3StreamMetricsManager.backPressureStateSupplier = backPressureStateSupplier; + } + + public static void registerBrokerQuotaLimitSupplier(Supplier> brokerQuotaLimitSupplier) { + S3StreamMetricsManager.brokerQuotaLimitSupplier = brokerQuotaLimitSupplier; + } } diff --git a/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java b/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java index 41d1690e3f..9e5b2bbed5 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/backpressure/DefaultBackPressureManagerTest.java @@ -58,7 +58,7 @@ public void setup() { Runnable runnable = invocation.getArgument(0); runnable.run(); return null; - }).when(scheduler).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); + }).when(scheduler).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class)); doAnswer(invocation -> { Runnable runnable = invocation.getArgument(0); runnable.run(); diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java index 98ea1b5bf6..e4dd6b6f93 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsConstants.java @@ -20,7 +20,10 @@ public class S3StreamKafkaMetricsConstants { public static final String STREAM_SET_OBJECT_NUM = "stream_set_object_num"; public static final String STREAM_OBJECT_NUM = "stream_object_num"; public static final String FETCH_LIMITER_PERMIT_NUM = "fetch_limiter_permit_num"; + public static final String FETCH_LIMITER_WAITING_TASK_NUM = "fetch_limiter_waiting_task_num"; public static final String FETCH_PENDING_TASK_NUM = "fetch_pending_task_num"; + public static final String FETCH_LIMITER_TIMEOUT_COUNT = "fetch_limiter_timeout_count"; + public static final String FETCH_LIMITER_TIME = "fetch_limiter_time"; public static final String LOG_APPEND_PERMIT_NUM = "log_append_permit_num"; public static final String SLOW_BROKER_METRIC_NAME = "slow_broker_count"; public static final String TOPIC_PARTITION_COUNT_METRIC_NAME = "topic_partition_count"; diff --git a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java index 27d2426eae..19fbca4e07 100644 --- a/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java +++ b/server-common/src/main/java/org/apache/kafka/server/metrics/s3stream/S3StreamKafkaMetricsManager.java @@ -13,23 +13,32 @@ import com.automq.stream.s3.metrics.MetricsConfig; import com.automq.stream.s3.metrics.MetricsLevel; +import com.automq.stream.s3.metrics.NoopLongCounter; import com.automq.stream.s3.metrics.NoopObservableLongGauge; import com.automq.stream.s3.metrics.wrapper.ConfigListener; +import com.automq.stream.s3.metrics.wrapper.CounterMetric; +import com.automq.stream.s3.metrics.wrapper.HistogramInstrument; +import com.automq.stream.s3.metrics.wrapper.HistogramMetric; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.Function; import java.util.function.Supplier; import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.metrics.LongCounter; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.api.metrics.ObservableLongGauge; public class S3StreamKafkaMetricsManager { private static final List BASE_ATTRIBUTES_LISTENERS = new ArrayList<>(); + + public static final List FETCH_LIMITER_TIME_METRICS = new CopyOnWriteArrayList<>(); + private static final MultiAttributes BROKER_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), S3StreamKafkaMetricsConstants.LABEL_NODE_ID); private static final MultiAttributes S3_OBJECT_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(), @@ -59,10 +68,16 @@ public class S3StreamKafkaMetricsManager { private static Supplier> streamSetObjectNumSupplier = Collections::emptyMap; private static ObservableLongGauge streamObjectNumMetrics = new NoopObservableLongGauge(); private static Supplier streamObjectNumSupplier = () -> 0; + private static ObservableLongGauge fetchLimiterPermitNumMetrics = new NoopObservableLongGauge(); private static Supplier> fetchLimiterPermitNumSupplier = Collections::emptyMap; + private static ObservableLongGauge fetchLimiterWaitingTaskNumMetrics = new NoopObservableLongGauge(); + private static Supplier> fetchLimiterWaitingTaskNumSupplier = Collections::emptyMap; private static ObservableLongGauge fetchPendingTaskNumMetrics = new NoopObservableLongGauge(); private static Supplier> fetchPendingTaskNumSupplier = Collections::emptyMap; + private static LongCounter fetchLimiterTimeoutCount = new NoopLongCounter(); + private static HistogramInstrument fetchLimiterTime; + private static ObservableLongGauge logAppendPermitNumMetrics = new NoopObservableLongGauge(); private static Supplier logAppendPermitNumSupplier = () -> 0; private static MetricsConfig metricsConfig = new MetricsConfig(MetricsLevel.INFO, Attributes.empty()); @@ -193,6 +208,18 @@ private static void initFetchMetrics(Meter meter, String prefix) { } } }); + fetchLimiterWaitingTaskNumMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.FETCH_LIMITER_WAITING_TASK_NUM) + .setDescription("The number of tasks waiting for permits in fetch limiters") + .ofLongs() + .buildWithCallback(result -> { + if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) { + Map fetchLimiterWaitingTaskNumMap = fetchLimiterWaitingTaskNumSupplier.get(); + for (Map.Entry entry : fetchLimiterWaitingTaskNumMap.entrySet()) { + result.record(entry.getValue(), FETCH_LIMITER_ATTRIBUTES.get(entry.getKey())); + } + } + }); + fetchPendingTaskNumMetrics = meter.gaugeBuilder(prefix + S3StreamKafkaMetricsConstants.FETCH_PENDING_TASK_NUM) .setDescription("The number of pending tasks in fetch executors") .ofLongs() @@ -204,6 +231,12 @@ private static void initFetchMetrics(Meter meter, String prefix) { } } }); + + fetchLimiterTimeoutCount = meter.counterBuilder(prefix + S3StreamKafkaMetricsConstants.FETCH_LIMITER_TIMEOUT_COUNT) + .setDescription("The number of acquire permits timeout in fetch limiters") + .build(); + fetchLimiterTime = new HistogramInstrument(meter, prefix + S3StreamKafkaMetricsConstants.FETCH_LIMITER_TIME, + "The time cost of acquire permits in fetch limiters", "nanoseconds", () -> FETCH_LIMITER_TIME_METRICS); } private static void initLogAppendMetrics(Meter meter, String prefix) { @@ -258,10 +291,31 @@ public static void setFetchLimiterPermitNumSupplier(Supplier> fetchLimiterWaitingTaskNumSupplier) { + S3StreamKafkaMetricsManager.fetchLimiterWaitingTaskNumSupplier = fetchLimiterWaitingTaskNumSupplier; + } + public static void setFetchPendingTaskNumSupplier(Supplier> fetchPendingTaskNumSupplier) { S3StreamKafkaMetricsManager.fetchPendingTaskNumSupplier = fetchPendingTaskNumSupplier; } + public static CounterMetric buildFetchLimiterTimeoutMetric(String limiterName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + CounterMetric metric = new CounterMetric(metricsConfig, FETCH_LIMITER_ATTRIBUTES.get(limiterName), () -> fetchLimiterTimeoutCount); + BASE_ATTRIBUTES_LISTENERS.add(metric); + return metric; + } + } + + public static HistogramMetric buildFetchLimiterTimeMetric(MetricsLevel metricsLevel, String limiterName) { + synchronized (BASE_ATTRIBUTES_LISTENERS) { + HistogramMetric metric = new HistogramMetric(metricsLevel, metricsConfig, FETCH_LIMITER_ATTRIBUTES.get(limiterName)); + BASE_ATTRIBUTES_LISTENERS.add(metric); + FETCH_LIMITER_TIME_METRICS.add(metric); + return metric; + } + } + public static void setLogAppendPermitNumSupplier(Supplier logAppendPermitNumSupplier) { S3StreamKafkaMetricsManager.logAppendPermitNumSupplier = logAppendPermitNumSupplier; }