Skip to content

Commit

Permalink
[ISSUE apache#8968] Introduce the clearRetryTopicWhenDeleteTopic opti…
Browse files Browse the repository at this point in the history
…on to enable precise external deletion of topics (apache#8969)

* Add the clearRetryTopicWhenDeleteTopic option to allow precise deletion of topics externally without the need to traverse consumerOffset

* Fix check style
  • Loading branch information
RongtongJin authored Nov 27, 2024
1 parent cd5071b commit 3ae0139
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ public class BrokerController {
private final NettyClientConfig nettyClientConfig;
protected final MessageStoreConfig messageStoreConfig;
private final AuthConfig authConfig;
protected final ConsumerOffsetManager consumerOffsetManager;
protected ConsumerOffsetManager consumerOffsetManager;
protected final BroadcastOffsetManager broadcastOffsetManager;
protected final ConsumerManager consumerManager;
protected final ConsumerFilterManager consumerFilterManager;
Expand Down Expand Up @@ -1313,6 +1313,11 @@ public ConsumerOffsetManager getConsumerOffsetManager() {
return consumerOffsetManager;
}

public void setConsumerOffsetManager(ConsumerOffsetManager consumerOffsetManager) {
this.consumerOffsetManager = consumerOffsetManager;
}


public BroadcastOffsetManager getBroadcastOffsetManager() {
return broadcastOffsetManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,7 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R
response.setBody(JSON.toJSONBytes(result));
return response;
}

@Override
public boolean rejectRequest() {
return false;
Expand Down Expand Up @@ -559,18 +560,17 @@ private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
return response;
}
finally {
} finally {
executionTime = System.currentTimeMillis() - startTime;
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
.build();
.put(LABEL_INVOCATION_STATUS, status.getName())
.put(LABEL_IS_SYSTEM, TopicValidator.isSystemTopic(topic))
.build();
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes);
}
LOGGER.info("executionTime of create topic:{} is {} ms" , topic, executionTime);
LOGGER.info("executionTime of create topic:{} is {} ms", topic, executionTime);
return response;
}

Expand Down Expand Up @@ -637,8 +637,7 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
return response;
}
finally {
} finally {
executionTime = System.currentTimeMillis() - startTime;
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Expand All @@ -648,7 +647,7 @@ private synchronized RemotingCommand updateAndCreateTopicList(ChannelHandlerCont
.build();
BrokerMetricsManager.topicCreateExecuteTime.record(executionTime, attributes);
}
LOGGER.info("executionTime of all topics:{} is {} ms" , topicNames, executionTime);
LOGGER.info("executionTime of all topics:{} is {} ms", topicNames, executionTime);
return response;
}

Expand Down Expand Up @@ -725,21 +724,28 @@ private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
}
}

final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
// delete pop retry topics first
try {
List<String> topicsToClean = new ArrayList<>();
topicsToClean.add(topic);

if (brokerController.getBrokerConfig().isClearRetryTopicWhenDeleteTopic()) {
final Set<String> groups = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(topic);
for (String group : groups) {
final String popRetryTopicV2 = KeyBuilder.buildPopRetryTopic(topic, group, true);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV2) != null) {
deleteTopicInBroker(popRetryTopicV2);
topicsToClean.add(popRetryTopicV2);
}
final String popRetryTopicV1 = KeyBuilder.buildPopRetryTopicV1(topic, group);
if (brokerController.getTopicConfigManager().selectTopicConfig(popRetryTopicV1) != null) {
deleteTopicInBroker(popRetryTopicV1);
topicsToClean.add(popRetryTopicV1);
}
}
// delete topic
deleteTopicInBroker(topic);
}

try {
for (String topicToClean : topicsToClean) {
// delete topic
deleteTopicInBroker(topicToClean);
}
} catch (Throwable t) {
return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
}
Expand Down Expand Up @@ -982,10 +988,10 @@ private synchronized RemotingCommand updateColdDataFlowCtrGroupConfig(ChannelHan
String consumerGroup = String.valueOf(key);
Long threshold = Long.valueOf(String.valueOf(value));
this.brokerController.getColdDataCgCtrService()
.addOrUpdateGroupConfig(consumerGroup, threshold);
.addOrUpdateGroupConfig(consumerGroup, threshold);
} catch (Exception e) {
LOGGER.error("updateColdDataFlowCtrGroupConfig properties on entry error, key: {}, val: {}",
key, value, e);
key, value, e);
}
});
} else {
Expand Down Expand Up @@ -1598,12 +1604,12 @@ private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext c
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
long executionTime = System.currentTimeMillis() - startTime;
LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms" ,config.getGroupName() ,executionTime);
LOGGER.info("executionTime of create subscriptionGroup:{} is {} ms", config.getGroupName(), executionTime);
InvocationStatus status = response.getCode() == ResponseCode.SUCCESS ?
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
InvocationStatus.SUCCESS : InvocationStatus.FAILURE;
Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
.put(LABEL_INVOCATION_STATUS, status.getName())
.build();
.put(LABEL_INVOCATION_STATUS, status.getName())
.build();
BrokerMetricsManager.consumerGroupCreateExecuteTime.record(executionTime, attributes);
return response;
}
Expand Down Expand Up @@ -2083,13 +2089,13 @@ private Long searchOffsetByTimestamp(String topic, int queueId, long timestamp)
/**
* Reset consumer offset.
*
* @param topic Required, not null.
* @param group Required, not null.
* @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue
* would get reset.
* @param topic Required, not null.
* @param group Required, not null.
* @param queueId if target queue ID is negative, all message queues will be reset; otherwise, only the target queue
* would get reset.
* @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being; otherwise,
* binary search is performed to locate target offset.
* @param offset Target offset to reset to if target queue ID is properly provided.
* binary search is performed to locate target offset.
* @param offset Target offset to reset to if target queue ID is properly provided.
* @return Affected queues and their new offset
*/
private RemotingCommand resetOffsetInner(String topic, String group, int queueId, long timestamp, Long offset) {
Expand Down Expand Up @@ -3371,7 +3377,8 @@ private boolean validateBlackListConfigExist(Properties properties) {
return false;
}

private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
CheckRocksdbCqWriteProgressRequestHeader requestHeader = request.decodeCommandCustomHeader(CheckRocksdbCqWriteProgressRequestHeader.class);
String requestTopic = requestHeader.getTopic();
MessageStore messageStore = brokerController.getMessageStore();
Expand Down Expand Up @@ -3428,7 +3435,9 @@ private CheckRocksdbCqWriteResult doCheckRocksdbCqWriteProgress(ChannelHandlerCo
return result;
}

private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic, RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail, long checkpointByStoreTime) {
private boolean processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInterface> queueMap, String topic,
RocksDBMessageStore rocksDBMessageStore, StringBuilder diffResult, boolean printDetail,
long checkpointByStoreTime) {
boolean processResult = true;
for (Map.Entry<Integer, ConsumeQueueInterface> queueEntry : queueMap.entrySet()) {
Integer queueId = queueEntry.getKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ public class BrokerConfig extends BrokerIdentity {

private boolean appendCkAsync = false;

private boolean clearRetryTopicWhenDeleteTopic = true;

private boolean enableLmqStats = false;

Expand Down Expand Up @@ -1908,6 +1909,14 @@ public void setAppendCkAsync(boolean appendCkAsync) {
this.appendCkAsync = appendCkAsync;
}

public boolean isClearRetryTopicWhenDeleteTopic() {
return clearRetryTopicWhenDeleteTopic;
}

public void setClearRetryTopicWhenDeleteTopic(boolean clearRetryTopicWhenDeleteTopic) {
this.clearRetryTopicWhenDeleteTopic = clearRetryTopicWhenDeleteTopic;
}

public boolean isEnableLmqStats() {
return enableLmqStats;
}
Expand Down

0 comments on commit 3ae0139

Please sign in to comment.