Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #8829] Support for Persisting LMQ Consumer Offsets in Config V1 Using RocksDB #8939

Merged
merged 1 commit into from
Nov 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.BrokerAttachedPlugin;
import org.apache.rocketmq.broker.processor.AckMessageProcessor;
Expand Down Expand Up @@ -352,7 +351,7 @@ public BrokerController(
} else if (this.messageStoreConfig.isEnableRocksDBStore()) {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqTopicConfigManager(this) : new RocksDBTopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqSubscriptionGroupManager(this) : new RocksDBSubscriptionGroupManager(this);
this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new RocksDBLmqConsumerOffsetManager(this) : new RocksDBConsumerOffsetManager(this);
this.consumerOffsetManager = new RocksDBConsumerOffsetManager(this);
RongtongJin marked this conversation as resolved.
Show resolved Hide resolved
} else {
this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.rocketmq.broker.offset;

import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.config.v1.RocksDBLmqConsumerOffsetManager;
import org.apache.rocketmq.broker.config.v1.RocksDBConsumerOffsetManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.store.config.MessageStoreConfig;
Expand All @@ -28,56 +28,48 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.when;

public class RocksDBLmqConsumerOffsetManagerTest {
private static final String LMQ_GROUP = MixAll.LMQ_PREFIX + "FooBarGroup";
private static final String NON_LMQ_GROUP = "nonLmqGroup";
private static final String TOPIC = "FooBarTopic";

private static final String LMQ_TOPIC = MixAll.LMQ_PREFIX + "FooBarTopic";
private static final String NON_LMQ_TOPIC = "FooBarTopic";
private static final int QUEUE_ID = 0;
private static final long OFFSET = 12345;

private BrokerController brokerController;

private RocksDBLmqConsumerOffsetManager offsetManager;
private RocksDBConsumerOffsetManager offsetManager;

@Before
public void setUp() {
brokerController = Mockito.mock(BrokerController.class);
when(brokerController.getMessageStoreConfig()).thenReturn(Mockito.mock(MessageStoreConfig.class));
when(brokerController.getBrokerConfig()).thenReturn(Mockito.mock(BrokerConfig.class));
offsetManager = new RocksDBLmqConsumerOffsetManager(brokerController);
when(brokerController.getBrokerConfig()).thenReturn(new BrokerConfig());
offsetManager = new RocksDBConsumerOffsetManager(brokerController);
}

@Test
public void testQueryOffsetForLmq() {
// Setup
offsetManager.getLmqOffsetTable().put(getKey(), OFFSET);
// Execute
long actualOffset = offsetManager.queryOffset(LMQ_GROUP, TOPIC, QUEUE_ID);
// Verify
assertEquals("Offset should match the expected value.", OFFSET, actualOffset);
}

@Test
public void testQueryOffsetForNonLmq() {
long actualOffset = offsetManager.queryOffset(NON_LMQ_GROUP, TOPIC, QUEUE_ID);
long actualOffset = offsetManager.queryOffset(NON_LMQ_GROUP, NON_LMQ_TOPIC, QUEUE_ID);
// Verify
assertEquals("Offset should not be null.", -1, actualOffset);
}


@Test
public void testQueryOffsetForLmqGroupWithExistingOffset() {
offsetManager.getLmqOffsetTable().put(getKey(), OFFSET);
offsetManager.commitOffset("127.0.0.1",LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET);

// Act
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(LMQ_GROUP, TOPIC);
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(LMQ_GROUP, LMQ_TOPIC);

// Assert
assertNotNull(actualOffsets);
Expand All @@ -89,23 +81,20 @@ public void testQueryOffsetForLmqGroupWithExistingOffset() {
public void testQueryOffsetForLmqGroupWithoutExistingOffset() {
// Act
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(LMQ_GROUP, "nonExistingTopic");

// Assert
assertNotNull(actualOffsets);
assertTrue("The map should be empty for non-existing offsets", actualOffsets.isEmpty());
assertNull(actualOffsets);
}

@Test
public void testQueryOffsetForNonLmqGroup() {
when(brokerController.getBrokerConfig().getConsumerOffsetUpdateVersionStep()).thenReturn(1L);
// Arrange
Map<Integer, Long> mockOffsets = new HashMap<>();
mockOffsets.put(QUEUE_ID, OFFSET);

offsetManager.commitOffset("clientHost", NON_LMQ_GROUP, TOPIC, QUEUE_ID, OFFSET);
offsetManager.commitOffset("clientHost", NON_LMQ_GROUP, NON_LMQ_TOPIC, QUEUE_ID, OFFSET);

// Act
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(NON_LMQ_GROUP, TOPIC);
Map<Integer, Long> actualOffsets = offsetManager.queryOffset(NON_LMQ_GROUP, NON_LMQ_TOPIC);

// Assert
assertNotNull(actualOffsets);
Expand All @@ -115,21 +104,13 @@ public void testQueryOffsetForNonLmqGroup() {
@Test
public void testCommitOffsetForLmq() {
// Execute
offsetManager.commitOffset("clientHost", LMQ_GROUP, TOPIC, QUEUE_ID, OFFSET);
offsetManager.commitOffset("clientHost", LMQ_GROUP, LMQ_TOPIC, QUEUE_ID, OFFSET);
// Verify
Long expectedOffset = offsetManager.getLmqOffsetTable().get(getKey());
Long expectedOffset = offsetManager.getOffsetTable().get(getLMQKey()).get(QUEUE_ID);
assertEquals("Offset should be updated correctly.", OFFSET, expectedOffset.longValue());
}

@Test
public void testEncode() {
offsetManager.setLmqOffsetTable(new ConcurrentHashMap<>(512));
offsetManager.getLmqOffsetTable().put(getKey(), OFFSET);
String encodedData = offsetManager.encode();
assertTrue(encodedData.contains(String.valueOf(OFFSET)));
}

private String getKey() {
return TOPIC + "@" + LMQ_GROUP;
private String getLMQKey() {
return LMQ_TOPIC + "@" + LMQ_GROUP;
}
}
Loading