Skip to content

Commit

Permalink
[ISSUE apache#7381] Fix the problem of inaccurate timer message metric (
Browse files Browse the repository at this point in the history
apache#7382)

* correct the timerMetrics' result.

* for further extension.

* checkstyle.

* use toLong.
  • Loading branch information
GenerousMan authored Sep 23, 2023
1 parent 73b3fde commit 88a9d93
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicFilterType;
Expand Down Expand Up @@ -599,7 +600,12 @@ public void addMetric(MessageExt msg, int value) {
if (null == msg || null == msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC)) {
return;
}
timerMetrics.addAndGet(msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC), value);
if (msg.getProperty(TIMER_ENQUEUE_MS) != null
&& NumberUtils.toLong(msg.getProperty(TIMER_ENQUEUE_MS)) == Long.MAX_VALUE) {
return;
}
// pass msg into addAndGet, for further more judgement extension.
timerMetrics.addAndGet(msg, value);
} catch (Throwable t) {
if (frequency.incrementAndGet() % 1000 == 0) {
LOGGER.error("error in adding metric", t);
Expand Down Expand Up @@ -1323,6 +1329,7 @@ protected void putMessageToTimerWheel(TimerRequest req) {
perfCounterTicks.startTick(ENQUEUE_PUT);
DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg()));
if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {
req.setEnqueueTime(Long.MAX_VALUE);
dequeuePutQueue.put(req);
} else {
boolean doEnqueueRes = doEnqueue(
Expand Down Expand Up @@ -1452,9 +1459,14 @@ public void run() {
}
try {
perfCounterTicks.startTick(DEQUEUE_PUT);
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(tr.getMsg()));
addMetric(tr.getMsg(), -1);
MessageExtBrokerInner msg = convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
MessageExt msgExt = tr.getMsg();
DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(msgExt));
if (tr.getEnqueueTime() == Long.MAX_VALUE) {
// never enqueue, mark it.
MessageAccessor.putProperty(msgExt, TIMER_ENQUEUE_MS, String.valueOf(Long.MAX_VALUE));
}
addMetric(msgExt, -1);
MessageExtBrokerInner msg = convert(msgExt, tr.getEnqueueTime(), needRoll(tr.getMagic()));
doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
while (!doRes && !isStopped()) {
if (!isRunningDequeue()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,7 +80,8 @@ public long updateDistPair(int period, int value) {
return distPair.getCount().addAndGet(value);
}

public long addAndGet(String topic, int value) {
public long addAndGet(MessageExt msg, int value) {
String topic = msg.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
Metric pair = getTopicPair(topic);
getDataVersion().nextVersion();
pair.setTimeStamp(System.currentTimeMillis());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ public class TimerRequest {
private final int sizePy;
private final long delayTime;

private final long enqueueTime;
private final int magic;

private long enqueueTime;
private MessageExt msg;


Expand Down Expand Up @@ -94,7 +95,9 @@ public void setDeleteList(Set<String> deleteList) {
public void setLatch(CountDownLatch latch) {
this.latch = latch;
}

public void setEnqueueTime(long enqueueTime) {
this.enqueueTime = enqueueTime;
}
public void idempotentRelease() {
idempotentRelease(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.store.timer;

import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.junit.Assert;
import org.junit.Test;

Expand All @@ -31,8 +34,11 @@ public void testTimingCount() {

TimerMetrics first = new TimerMetrics(baseDir);
Assert.assertTrue(first.load());
first.addAndGet("AAA", 1000);
first.addAndGet("BBB", 2000);
MessageExt msg = new MessageExt();
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, "AAA");
first.addAndGet(msg, 1000);
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, "BBB");
first.addAndGet(msg, 2000);
Assert.assertEquals(1000, first.getTimingCount("AAA"));
Assert.assertEquals(2000, first.getTimingCount("BBB"));
long curr = System.currentTimeMillis();
Expand Down

0 comments on commit 88a9d93

Please sign in to comment.