Skip to content

Commit

Permalink
feat(backpressure): add metrics (#2198)
Browse files Browse the repository at this point in the history
* feat(backpressure): log it on recovery from backpressure

Signed-off-by: Ning Yu <[email protected]>

* feat: add metric fetch_limiter_waiting_task_num

Signed-off-by: Ning Yu <[email protected]>

* feat: add metric fetch_limiter_timeout_count

Signed-off-by: Ning Yu <[email protected]>

* feat: add metric fetch_limiter_time

Signed-off-by: Ning Yu <[email protected]>

* feat: add metric back_pressure_state

Signed-off-by: Ning Yu <[email protected]>

* feat: add metric broker_quota_limit

Signed-off-by: Ning Yu <[email protected]>

* fix(backpressure): run checkers with fixed delay

Signed-off-by: Ning Yu <[email protected]>

* style: fix lint

Signed-off-by: Ning Yu <[email protected]>

* perf: drop too large values

Signed-off-by: Ning Yu <[email protected]>

* refactor: record -1 for other states

Signed-off-by: Ning Yu <[email protected]>

* test: fix tests

Signed-off-by: Ning Yu <[email protected]>

---------

Signed-off-by: Ning Yu <[email protected]>
  • Loading branch information
Chillax-0v0 authored Dec 2, 2024
1 parent 504ce4e commit b470ba4
Show file tree
Hide file tree
Showing 12 changed files with 252 additions and 13 deletions.
45 changes: 42 additions & 3 deletions core/src/main/scala/kafka/server/FairLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -27,13 +28,32 @@ public class FairLimiter implements Limiter {
private final Lock lock = new ReentrantLock(true);
private final Semaphore permits;

public FairLimiter(int size) {
maxPermits = size;
permits = new Semaphore(size);
/**
* The name of this limiter, used for metrics.
*/
private final String name;
/**
* The number of threads waiting for permits, used for metrics.
*/
private final AtomicInteger waitingThreads = new AtomicInteger(0);

public FairLimiter(int size, String name) {
this.maxPermits = size;
this.permits = new Semaphore(size);
this.name = name;
}

@Override
public Handler acquire(int permit) throws InterruptedException {
waitingThreads.incrementAndGet();
try {
return acquire0(permit);
} finally {
waitingThreads.decrementAndGet();
}
}

private Handler acquire0(int permit) throws InterruptedException {
lock.lock();
try {
permits.acquire(permit);
Expand All @@ -45,6 +65,15 @@ public Handler acquire(int permit) throws InterruptedException {

@Override
public Handler acquire(int permit, long timeoutMs) throws InterruptedException {
waitingThreads.incrementAndGet();
try {
return acquire0(permit, timeoutMs);
} finally {
waitingThreads.decrementAndGet();
}
}

private Handler acquire0(int permit, long timeoutMs) throws InterruptedException {
long start = System.nanoTime();
if (lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) {
try {
Expand Down Expand Up @@ -72,6 +101,16 @@ public int availablePermits() {
return permits.availablePermits();
}

@Override
public int waitingThreads() {
return waitingThreads.get();
}

@Override
public String name() {
return name;
}

private Handler acquireLocked(int permit, long timeoutNs) throws InterruptedException {
if (permit > maxPermits) {
permit = maxPermits;
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/server/Limiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public interface Limiter {
*/
int availablePermits();

/**
* Return the number of threads waiting for permits.
*/
int waitingThreads();

/**
* Return the name of this limiter.
*/
String name();

/**
* A handler to release acquired permits.
*/
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/server/NoopLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ public int availablePermits() {
return Integer.MAX_VALUE;
}

@Override
public int waitingThreads() {
return 0;
}

@Override
public String name() {
return "noop";
}

public static class NoopHandler implements Handler {
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package kafka.server.streamaspect

import com.automq.stream.s3.metrics.S3StreamMetricsManager
import kafka.network.RequestChannel
import kafka.server._
import kafka.utils.QuotaUtils
Expand Down Expand Up @@ -42,6 +43,13 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,

override def delayQueueSensor: Sensor = brokerDelayQueueSensor

S3StreamMetricsManager.registerBrokerQuotaLimitSupplier(() => java.util.Map.of(
QuotaType.RequestRate.toString, quotaLimit(QuotaType.RequestRate),
QuotaType.Produce.toString, quotaLimit(QuotaType.Produce),
QuotaType.Fetch.toString, quotaLimit(QuotaType.Fetch),
QuotaType.SlowFetch.toString, quotaLimit(QuotaType.SlowFetch)
))

def getMaxValueInQuotaWindow(quotaType: QuotaType, request: RequestChannel.Request): Double = {
if (shouldThrottle(request)) {
quotaLimit(quotaType)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka.server.streamaspect

import com.automq.stream.api.exceptions.FastReadFailFastException
import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil}
import com.automq.stream.utils.FutureUtil
import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor
import kafka.cluster.Partition
Expand Down Expand Up @@ -114,14 +115,28 @@ class ElasticReplicaManager(
fetchExecutorQueueSizeGaugeMap
})

private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB
private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB
private val fetchLimiterGaugeMap = new util.HashMap[String, Integer]()
private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_FAST_NAME) // 200MiB
private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_SLOW_NAME) // 200MiB
private val fetchLimiterWaitingTasksGaugeMap = new util.HashMap[String, Integer]()
S3StreamKafkaMetricsManager.setFetchLimiterWaitingTaskNumSupplier(() => {
fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.waitingThreads())
fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.waitingThreads())
fetchLimiterWaitingTasksGaugeMap
})
private val fetchLimiterPermitsGaugeMap = new util.HashMap[String, Integer]()
S3StreamKafkaMetricsManager.setFetchLimiterPermitNumSupplier(() => {
fetchLimiterGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits())
fetchLimiterGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits())
fetchLimiterGaugeMap
fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits())
fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits())
fetchLimiterPermitsGaugeMap
})
private val fetchLimiterTimeoutCounterMap = util.Map.of(
fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(fastFetchLimiter.name),
slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(slowFetchLimiter.name)
)
private val fetchLimiterTimeHistogramMap = util.Map.of(
fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, fastFetchLimiter.name),
slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, slowFetchLimiter.name)
)

/**
* Used to reduce allocation in [[readFromLocalLogV2]]
Expand Down Expand Up @@ -561,14 +576,16 @@ class ElasticReplicaManager(
math.min(bytesNeedFromParam, limiter.maxPermits())
}

val timer: TimerUtil = new TimerUtil()
val handler: Handler = timeoutMs match {
case t if t > 0 => limiter.acquire(bytesNeed(), t)
case _ => limiter.acquire(bytesNeed())
}
fetchLimiterTimeHistogramMap.get(limiter.name).record(timer.elapsedAs(TimeUnit.NANOSECONDS))

if (handler == null) {
// handler maybe null if it timed out to acquire from limiter
// TODO add metrics for this
// the handler will be null if it timed out to acquire from limiter
fetchLimiterTimeoutCounterMap.get(limiter.name).add(MetricsLevel.INFO, 1)
// warn(s"Returning emtpy fetch response for fetch request $readPartitionInfo since the wait time exceeds $timeoutMs ms.")
ElasticReplicaManager.emptyReadResults(readPartitionInfo.map(_._1))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package com.automq.stream.s3.backpressure;

import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;

Expand Down Expand Up @@ -49,6 +50,11 @@ public class DefaultBackPressureManager implements BackPressureManager {
* Note: It should only be accessed in the {@link #checkerScheduler} thread.
*/
private long lastRegulateTime = System.currentTimeMillis();
/**
* The last load level to trigger the regulator.
* Only used for logging and monitoring.
*/
private LoadLevel lastRegulateLevel = LoadLevel.NORMAL;

public DefaultBackPressureManager(Regulator regulator) {
this(regulator, DEFAULT_COOLDOWN_MS);
Expand All @@ -62,11 +68,12 @@ public DefaultBackPressureManager(Regulator regulator, long cooldownMs) {
@Override
public void start() {
this.checkerScheduler = Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("back-pressure-checker-%d", false), LOGGER);
S3StreamMetricsManager.registerBackPressureStateSupplier(this::currentLoadLevel);
}

@Override
public void registerChecker(Checker checker) {
checkerScheduler.scheduleAtFixedRate(() -> {
checkerScheduler.scheduleWithFixedDelay(() -> {
loadLevels.put(checker.source(), checker.check());
maybeRegulate();
}, 0, checker.intervalMs(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -113,6 +120,9 @@ private LoadLevel currentLoadLevel() {

private void regulate(LoadLevel loadLevel, long now) {
if (LoadLevel.NORMAL.equals(loadLevel)) {
if (!LoadLevel.NORMAL.equals(lastRegulateLevel)) {
LOGGER.info("The system is back to a normal state, checkers: {}", loadLevels);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("The system is in a normal state, checkers: {}", loadLevels);
}
Expand All @@ -122,5 +132,6 @@ private void regulate(LoadLevel loadLevel, long now) {

loadLevel.regulate(regulator);
lastRegulateTime = now;
lastRegulateLevel = loadLevel;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.metrics;

import io.opentelemetry.api.metrics.ObservableDoubleGauge;

public class NoopObservableDoubleGauge implements ObservableDoubleGauge {
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,12 @@ public class S3StreamMetricsConstant {
public static final String LABEL_STAGE_GET_OBJECTS = "get_objects";
public static final String LABEL_STAGE_FIND_INDEX = "find_index";
public static final String LABEL_STAGE_COMPUTE = "compute";

// Back Pressure
public static final String BACK_PRESSURE_STATE_METRIC_NAME = "back_pressure_state";
public static final AttributeKey<String> LABEL_BACK_PRESSURE_STATE = AttributeKey.stringKey("state");

// Broker Quota
public static final String BROKER_QUOTA_LIMIT_METRIC_NAME = "broker_quota_limit";
public static final AttributeKey<String> LABEL_BROKER_QUOTA_TYPE = AttributeKey.stringKey("type");
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package com.automq.stream.s3.metrics;

import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.backpressure.LoadLevel;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
import com.automq.stream.s3.metrics.operations.S3Stage;
import com.automq.stream.s3.metrics.wrapper.ConfigListener;
Expand All @@ -34,6 +35,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import io.opentelemetry.api.metrics.ObservableLongGauge;

import static com.automq.stream.s3.metrics.S3StreamMetricsConstant.LABEL_CACHE_NAME;
Expand Down Expand Up @@ -139,10 +141,23 @@ public class S3StreamMetricsManager {
private static final MultiAttributes<String> OPERATOR_INDEX_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamMetricsConstant.LABEL_INDEX);

// Back Pressure
private static final MultiAttributes<String> BACK_PRESSURE_STATE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamMetricsConstant.LABEL_BACK_PRESSURE_STATE);
private static ObservableLongGauge backPressureState = new NoopObservableLongGauge();
private static Supplier<LoadLevel> backPressureStateSupplier = () -> LoadLevel.NORMAL;

// Broker Quota
private static final MultiAttributes<String> BROKER_QUOTA_TYPE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamMetricsConstant.LABEL_BROKER_QUOTA_TYPE);
private static ObservableDoubleGauge brokerQuotaLimit = new NoopObservableDoubleGauge();
private static Supplier<Map<String, Double>> brokerQuotaLimitSupplier = () -> new ConcurrentHashMap<>();

static {
BASE_ATTRIBUTES_LISTENERS.add(ALLOC_TYPE_ATTRIBUTES);
BASE_ATTRIBUTES_LISTENERS.add(OPERATOR_INDEX_ATTRIBUTES);
BASE_ATTRIBUTES_LISTENERS.add(BACK_PRESSURE_STATE_ATTRIBUTES);
BASE_ATTRIBUTES_LISTENERS.add(BROKER_QUOTA_TYPE_ATTRIBUTES);
}

public static void configure(MetricsConfig metricsConfig) {
Expand Down Expand Up @@ -400,6 +415,8 @@ public static void initMetrics(Meter meter, String prefix) {
});

initAsyncCacheMetrics(meter, prefix);
initBackPressureMetrics(meter, prefix);
initBrokerQuotaMetrics(meter, prefix);
}

private static void initAsyncCacheMetrics(Meter meter, String prefix) {
Expand Down Expand Up @@ -475,6 +492,43 @@ private static void initAsyncCacheMetrics(Meter meter, String prefix) {
});
}

private static void initBackPressureMetrics(Meter meter, String prefix) {
backPressureState = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BACK_PRESSURE_STATE_METRIC_NAME)
.setDescription("Back pressure state")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
LoadLevel state = backPressureStateSupplier.get();
result.record(state.ordinal(), BACK_PRESSURE_STATE_ATTRIBUTES.get(state.name()));
// To beautify Grafana dashboard, we record -1 for other states
for (LoadLevel l : LoadLevel.values()) {
if (l != state) {
result.record(-1, BACK_PRESSURE_STATE_ATTRIBUTES.get(l.name()));
}
}
}
});
}

private static void initBrokerQuotaMetrics(Meter meter, String prefix) {
brokerQuotaLimit = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BROKER_QUOTA_LIMIT_METRIC_NAME)
.setDescription("Broker quota limit")
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
Map<String, Double> brokerQuotaLimitMap = brokerQuotaLimitSupplier.get();
for (Map.Entry<String, Double> entry : brokerQuotaLimitMap.entrySet()) {
String quotaType = entry.getKey();
Double quotaLimit = entry.getValue();
// drop too large values
if (quotaLimit > 1e15) {
continue;
}
result.record(quotaLimit, BROKER_QUOTA_TYPE_ATTRIBUTES.get(quotaType));
}
}
});
}

public static void registerNetworkLimiterQueueSizeSupplier(AsyncNetworkBandwidthLimiter.Type type,
Supplier<Integer> networkLimiterQueueSizeSupplier) {
switch (type) {
Expand Down Expand Up @@ -907,4 +961,12 @@ public static void registerLocalStreamRangeIndexCacheSizeSupplier(Supplier<Integ
public static void registerLocalStreamRangeIndexCacheStreamNumSupplier(Supplier<Integer> localStreamRangeIndexCacheStreamNum) {
S3StreamMetricsManager.localStreamRangeIndexCacheStreamNum = localStreamRangeIndexCacheStreamNum;
}

public static void registerBackPressureStateSupplier(Supplier<LoadLevel> backPressureStateSupplier) {
S3StreamMetricsManager.backPressureStateSupplier = backPressureStateSupplier;
}

public static void registerBrokerQuotaLimitSupplier(Supplier<Map<String, Double>> brokerQuotaLimitSupplier) {
S3StreamMetricsManager.brokerQuotaLimitSupplier = brokerQuotaLimitSupplier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void setup() {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(scheduler).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
}).when(scheduler).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
Expand Down
Loading

0 comments on commit b470ba4

Please sign in to comment.