Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backpressure): add metrics #2198

Merged
merged 11 commits into from
Dec 2, 2024
45 changes: 42 additions & 3 deletions core/src/main/scala/kafka/server/FairLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -27,13 +28,32 @@ public class FairLimiter implements Limiter {
private final Lock lock = new ReentrantLock(true);
private final Semaphore permits;

public FairLimiter(int size) {
maxPermits = size;
permits = new Semaphore(size);
/**
* The name of this limiter, used for metrics.
*/
private final String name;
/**
* The number of threads waiting for permits, used for metrics.
*/
private final AtomicInteger waitingThreads = new AtomicInteger(0);

public FairLimiter(int size, String name) {
this.maxPermits = size;
this.permits = new Semaphore(size);
this.name = name;
}

@Override
public Handler acquire(int permit) throws InterruptedException {
waitingThreads.incrementAndGet();
try {
return acquire0(permit);
} finally {
waitingThreads.decrementAndGet();
}
}

private Handler acquire0(int permit) throws InterruptedException {
lock.lock();
try {
permits.acquire(permit);
Expand All @@ -45,6 +65,15 @@ public Handler acquire(int permit) throws InterruptedException {

@Override
public Handler acquire(int permit, long timeoutMs) throws InterruptedException {
waitingThreads.incrementAndGet();
try {
return acquire0(permit, timeoutMs);
} finally {
waitingThreads.decrementAndGet();
}
}

private Handler acquire0(int permit, long timeoutMs) throws InterruptedException {
long start = System.nanoTime();
if (lock.tryLock(timeoutMs, TimeUnit.MILLISECONDS)) {
try {
Expand Down Expand Up @@ -72,6 +101,16 @@ public int availablePermits() {
return permits.availablePermits();
}

@Override
public int waitingThreads() {
return waitingThreads.get();
}

@Override
public String name() {
return name;
}

private Handler acquireLocked(int permit, long timeoutNs) throws InterruptedException {
if (permit > maxPermits) {
permit = maxPermits;
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/server/Limiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public interface Limiter {
*/
int availablePermits();

/**
* Return the number of threads waiting for permits.
*/
int waitingThreads();

/**
* Return the name of this limiter.
*/
String name();

/**
* A handler to release acquired permits.
*/
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/kafka/server/NoopLimiter.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,16 @@ public int availablePermits() {
return Integer.MAX_VALUE;
}

@Override
public int waitingThreads() {
return 0;
}

@Override
public String name() {
return "noop";
}

public static class NoopHandler implements Handler {
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package kafka.server.streamaspect

import com.automq.stream.s3.metrics.S3StreamMetricsManager
import kafka.network.RequestChannel
import kafka.server._
import kafka.utils.QuotaUtils
Expand Down Expand Up @@ -42,6 +43,13 @@ class BrokerQuotaManager(private val config: BrokerQuotaManagerConfig,

override def delayQueueSensor: Sensor = brokerDelayQueueSensor

S3StreamMetricsManager.registerBrokerQuotaLimitSupplier(() => java.util.Map.of(
QuotaType.RequestRate.toString, quotaLimit(QuotaType.RequestRate),
QuotaType.Produce.toString, quotaLimit(QuotaType.Produce),
QuotaType.Fetch.toString, quotaLimit(QuotaType.Fetch),
QuotaType.SlowFetch.toString, quotaLimit(QuotaType.SlowFetch)
))

def getMaxValueInQuotaWindow(quotaType: QuotaType, request: RequestChannel.Request): Double = {
if (shouldThrottle(request)) {
quotaLimit(quotaType)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafka.server.streamaspect

import com.automq.stream.api.exceptions.FastReadFailFastException
import com.automq.stream.s3.metrics.{MetricsLevel, TimerUtil}
import com.automq.stream.utils.FutureUtil
import com.automq.stream.utils.threads.S3StreamThreadPoolMonitor
import kafka.cluster.Partition
Expand Down Expand Up @@ -114,14 +115,28 @@ class ElasticReplicaManager(
fetchExecutorQueueSizeGaugeMap
})

private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB
private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024) // 200MiB
private val fetchLimiterGaugeMap = new util.HashMap[String, Integer]()
private val fastFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_FAST_NAME) // 200MiB
private val slowFetchLimiter = new FairLimiter(200 * 1024 * 1024, FETCH_LIMITER_SLOW_NAME) // 200MiB
private val fetchLimiterWaitingTasksGaugeMap = new util.HashMap[String, Integer]()
S3StreamKafkaMetricsManager.setFetchLimiterWaitingTaskNumSupplier(() => {
fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.waitingThreads())
fetchLimiterWaitingTasksGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.waitingThreads())
fetchLimiterWaitingTasksGaugeMap
})
private val fetchLimiterPermitsGaugeMap = new util.HashMap[String, Integer]()
S3StreamKafkaMetricsManager.setFetchLimiterPermitNumSupplier(() => {
fetchLimiterGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits())
fetchLimiterGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits())
fetchLimiterGaugeMap
fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_FAST_NAME, fastFetchLimiter.availablePermits())
fetchLimiterPermitsGaugeMap.put(FETCH_LIMITER_SLOW_NAME, slowFetchLimiter.availablePermits())
fetchLimiterPermitsGaugeMap
})
private val fetchLimiterTimeoutCounterMap = util.Map.of(
fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(fastFetchLimiter.name),
slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeoutMetric(slowFetchLimiter.name)
)
private val fetchLimiterTimeHistogramMap = util.Map.of(
fastFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, fastFetchLimiter.name),
slowFetchLimiter.name, S3StreamKafkaMetricsManager.buildFetchLimiterTimeMetric(MetricsLevel.INFO, slowFetchLimiter.name)
)

/**
* Used to reduce allocation in [[readFromLocalLogV2]]
Expand Down Expand Up @@ -561,14 +576,16 @@ class ElasticReplicaManager(
math.min(bytesNeedFromParam, limiter.maxPermits())
}

val timer: TimerUtil = new TimerUtil()
val handler: Handler = timeoutMs match {
case t if t > 0 => limiter.acquire(bytesNeed(), t)
case _ => limiter.acquire(bytesNeed())
}
fetchLimiterTimeHistogramMap.get(limiter.name).record(timer.elapsedAs(TimeUnit.NANOSECONDS))

if (handler == null) {
// handler maybe null if it timed out to acquire from limiter
// TODO add metrics for this
// the handler will be null if it timed out to acquire from limiter
fetchLimiterTimeoutCounterMap.get(limiter.name).add(MetricsLevel.INFO, 1)
// warn(s"Returning emtpy fetch response for fetch request $readPartitionInfo since the wait time exceeds $timeoutMs ms.")
ElasticReplicaManager.emptyReadResults(readPartitionInfo.map(_._1))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package com.automq.stream.s3.backpressure;

import com.automq.stream.s3.metrics.S3StreamMetricsManager;
import com.automq.stream.utils.ThreadUtils;
import com.automq.stream.utils.Threads;

Expand Down Expand Up @@ -49,6 +50,11 @@ public class DefaultBackPressureManager implements BackPressureManager {
* Note: It should only be accessed in the {@link #checkerScheduler} thread.
*/
private long lastRegulateTime = System.currentTimeMillis();
/**
* The last load level to trigger the regulator.
* Only used for logging and monitoring.
*/
private LoadLevel lastRegulateLevel = LoadLevel.NORMAL;

public DefaultBackPressureManager(Regulator regulator) {
this(regulator, DEFAULT_COOLDOWN_MS);
Expand All @@ -62,11 +68,12 @@ public DefaultBackPressureManager(Regulator regulator, long cooldownMs) {
@Override
public void start() {
this.checkerScheduler = Threads.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("back-pressure-checker-%d", false), LOGGER);
S3StreamMetricsManager.registerBackPressureStateSupplier(this::currentLoadLevel);
}

@Override
public void registerChecker(Checker checker) {
checkerScheduler.scheduleAtFixedRate(() -> {
checkerScheduler.scheduleWithFixedDelay(() -> {
loadLevels.put(checker.source(), checker.check());
maybeRegulate();
}, 0, checker.intervalMs(), TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -113,6 +120,9 @@ private LoadLevel currentLoadLevel() {

private void regulate(LoadLevel loadLevel, long now) {
if (LoadLevel.NORMAL.equals(loadLevel)) {
if (!LoadLevel.NORMAL.equals(lastRegulateLevel)) {
LOGGER.info("The system is back to a normal state, checkers: {}", loadLevels);
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("The system is in a normal state, checkers: {}", loadLevels);
}
Expand All @@ -122,5 +132,6 @@ private void regulate(LoadLevel loadLevel, long now) {

loadLevel.regulate(regulator);
lastRegulateTime = now;
lastRegulateLevel = loadLevel;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright 2024, AutoMQ HK Limited.
*
* The use of this file is governed by the Business Source License,
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.metrics;

import io.opentelemetry.api.metrics.ObservableDoubleGauge;

public class NoopObservableDoubleGauge implements ObservableDoubleGauge {
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,4 +151,12 @@ public class S3StreamMetricsConstant {
public static final String LABEL_STAGE_GET_OBJECTS = "get_objects";
public static final String LABEL_STAGE_FIND_INDEX = "find_index";
public static final String LABEL_STAGE_COMPUTE = "compute";

// Back Pressure
public static final String BACK_PRESSURE_STATE_METRIC_NAME = "back_pressure_state";
public static final AttributeKey<String> LABEL_BACK_PRESSURE_STATE = AttributeKey.stringKey("state");

// Broker Quota
public static final String BROKER_QUOTA_LIMIT_METRIC_NAME = "broker_quota_limit";
public static final AttributeKey<String> LABEL_BROKER_QUOTA_TYPE = AttributeKey.stringKey("type");
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
package com.automq.stream.s3.metrics;

import com.automq.stream.s3.ByteBufAlloc;
import com.automq.stream.s3.backpressure.LoadLevel;
import com.automq.stream.s3.metrics.operations.S3ObjectStage;
import com.automq.stream.s3.metrics.operations.S3Stage;
import com.automq.stream.s3.metrics.wrapper.ConfigListener;
Expand All @@ -34,6 +35,7 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.LongCounter;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.api.metrics.ObservableDoubleGauge;
import io.opentelemetry.api.metrics.ObservableLongGauge;

import static com.automq.stream.s3.metrics.S3StreamMetricsConstant.LABEL_CACHE_NAME;
Expand Down Expand Up @@ -139,10 +141,23 @@ public class S3StreamMetricsManager {
private static final MultiAttributes<String> OPERATOR_INDEX_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamMetricsConstant.LABEL_INDEX);

// Back Pressure
private static final MultiAttributes<String> BACK_PRESSURE_STATE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamMetricsConstant.LABEL_BACK_PRESSURE_STATE);
private static ObservableLongGauge backPressureState = new NoopObservableLongGauge();
private static Supplier<LoadLevel> backPressureStateSupplier = () -> LoadLevel.NORMAL;

// Broker Quota
private static final MultiAttributes<String> BROKER_QUOTA_TYPE_ATTRIBUTES = new MultiAttributes<>(Attributes.empty(),
S3StreamMetricsConstant.LABEL_BROKER_QUOTA_TYPE);
private static ObservableDoubleGauge brokerQuotaLimit = new NoopObservableDoubleGauge();
private static Supplier<Map<String, Double>> brokerQuotaLimitSupplier = () -> new ConcurrentHashMap<>();

static {
BASE_ATTRIBUTES_LISTENERS.add(ALLOC_TYPE_ATTRIBUTES);
BASE_ATTRIBUTES_LISTENERS.add(OPERATOR_INDEX_ATTRIBUTES);
BASE_ATTRIBUTES_LISTENERS.add(BACK_PRESSURE_STATE_ATTRIBUTES);
BASE_ATTRIBUTES_LISTENERS.add(BROKER_QUOTA_TYPE_ATTRIBUTES);
}

public static void configure(MetricsConfig metricsConfig) {
Expand Down Expand Up @@ -400,6 +415,8 @@ public static void initMetrics(Meter meter, String prefix) {
});

initAsyncCacheMetrics(meter, prefix);
initBackPressureMetrics(meter, prefix);
initBrokerQuotaMetrics(meter, prefix);
}

private static void initAsyncCacheMetrics(Meter meter, String prefix) {
Expand Down Expand Up @@ -475,6 +492,43 @@ private static void initAsyncCacheMetrics(Meter meter, String prefix) {
});
}

private static void initBackPressureMetrics(Meter meter, String prefix) {
backPressureState = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BACK_PRESSURE_STATE_METRIC_NAME)
.setDescription("Back pressure state")
.ofLongs()
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
LoadLevel state = backPressureStateSupplier.get();
result.record(state.ordinal(), BACK_PRESSURE_STATE_ATTRIBUTES.get(state.name()));
// To beautify Grafana dashboard, we record -1 for other states
for (LoadLevel l : LoadLevel.values()) {
if (l != state) {
result.record(-1, BACK_PRESSURE_STATE_ATTRIBUTES.get(l.name()));
}
}
}
});
}

private static void initBrokerQuotaMetrics(Meter meter, String prefix) {
brokerQuotaLimit = meter.gaugeBuilder(prefix + S3StreamMetricsConstant.BROKER_QUOTA_LIMIT_METRIC_NAME)
.setDescription("Broker quota limit")
.buildWithCallback(result -> {
if (MetricsLevel.INFO.isWithin(metricsConfig.getMetricsLevel())) {
Map<String, Double> brokerQuotaLimitMap = brokerQuotaLimitSupplier.get();
for (Map.Entry<String, Double> entry : brokerQuotaLimitMap.entrySet()) {
String quotaType = entry.getKey();
Double quotaLimit = entry.getValue();
// drop too large values
if (quotaLimit > 1e15) {
continue;
}
result.record(quotaLimit, BROKER_QUOTA_TYPE_ATTRIBUTES.get(quotaType));
}
}
});
}

public static void registerNetworkLimiterQueueSizeSupplier(AsyncNetworkBandwidthLimiter.Type type,
Supplier<Integer> networkLimiterQueueSizeSupplier) {
switch (type) {
Expand Down Expand Up @@ -907,4 +961,12 @@ public static void registerLocalStreamRangeIndexCacheSizeSupplier(Supplier<Integ
public static void registerLocalStreamRangeIndexCacheStreamNumSupplier(Supplier<Integer> localStreamRangeIndexCacheStreamNum) {
S3StreamMetricsManager.localStreamRangeIndexCacheStreamNum = localStreamRangeIndexCacheStreamNum;
}

public static void registerBackPressureStateSupplier(Supplier<LoadLevel> backPressureStateSupplier) {
S3StreamMetricsManager.backPressureStateSupplier = backPressureStateSupplier;
}

public static void registerBrokerQuotaLimitSupplier(Supplier<Map<String, Double>> brokerQuotaLimitSupplier) {
S3StreamMetricsManager.brokerQuotaLimitSupplier = brokerQuotaLimitSupplier;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void setup() {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(scheduler).scheduleAtFixedRate(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
}).when(scheduler).scheduleWithFixedDelay(any(Runnable.class), anyLong(), anyLong(), any(TimeUnit.class));
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
Expand Down
Loading