Skip to content

Commit

Permalink
[ISSUE apache#7208] fix: when deleting topic also delete its pop retr…
Browse files Browse the repository at this point in the history
…y topic (apache#7209)
  • Loading branch information
HScarb authored Aug 31, 2023
1 parent d000ef9 commit f82718a
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageUtil;
import org.apache.rocketmq.common.AclConfig;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.LockCallback;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.MixAll;
Expand Down Expand Up @@ -542,16 +543,29 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
}
}

this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
this.brokerController.getTopicQueueMappingManager().delete(requestHeader.getTopic());
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic());
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic());
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic()));
final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
// delete pop retry topics first
for (String group : groups) {
final String popRetryTopic = KeyBuilder.buildPopRetryTopic(topic, group);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopic) != null) {
deleteTopicInBroker(popRetryTopic);
}
}
// delete topic
deleteTopicInBroker(topic);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}

private void deleteTopicInBroker(String topic) {
this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
this.brokerController.getTopicQueueMappingManager().delete(topic);
this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(topic);
this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(topic);
this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(topic));
}

private synchronized RemotingCommand updateAndCreateAccessConfig(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
import org.apache.rocketmq.broker.BrokerController;
Expand All @@ -41,6 +42,7 @@
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.KeyBuilder;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
Expand Down Expand Up @@ -90,8 +92,11 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
Expand Down Expand Up @@ -321,6 +326,37 @@ public void testDeleteTopicOnSlave() throws Exception {
"please execute it from master broker.");
}

@Test
public void testDeleteWithPopRetryTopic() throws Exception {
String topic = "topicA";
String anotherTopic = "another_topicA";

topicConfigManager = mock(TopicConfigManager.class);
when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
final ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<>();
topicConfigTable.put(topic, new TopicConfig());
topicConfigTable.put(KeyBuilder.buildPopRetryTopic(topic, "cid1"), new TopicConfig());

topicConfigTable.put(anotherTopic, new TopicConfig());
topicConfigTable.put(KeyBuilder.buildPopRetryTopic(anotherTopic, "cid2"), new TopicConfig());
when(topicConfigManager.getTopicConfigTable()).thenReturn(topicConfigTable);
when(topicConfigManager.selectTopicConfig(anyString())).thenAnswer(invocation -> {
final String selectTopic = invocation.getArgument(0);
return topicConfigManager.getTopicConfigTable().get(selectTopic);
});

when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(consumerOffsetManager.whichGroupByTopic(topic)).thenReturn(Sets.newHashSet("cid1"));

RemotingCommand request = buildDeleteTopicRequest(topic);
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);

verify(topicConfigManager).deleteTopicConfig(topic);
verify(topicConfigManager).deleteTopicConfig(KeyBuilder.buildPopRetryTopic(topic, "cid1"));
verify(messageStore, times(2)).deleteTopics(anySet());
}

@Test
public void testGetAllTopicConfigInRocksdb() throws Exception {
if (notToBeExecuted()) {
Expand Down

0 comments on commit f82718a

Please sign in to comment.