diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 1b29ff173cc..c5419a62df7 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -1941,7 +1941,7 @@ private RemotingCommand getAllMessageRequestMode(ChannelHandlerContext ctx, Remo final RemotingCommand response = RemotingCommand.createResponseCommand(null); String content = this.brokerController.getQueryAssignmentProcessor().getMessageRequestModeManager().encode(); - if (content != null && content.length() > 0) { + if (content != null && !content.isEmpty()) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java index e66703e5653..04324043fb8 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java @@ -20,21 +20,6 @@ import com.google.common.collect.Sets; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; -import java.net.InetSocketAddress; -import java.net.SocketAddress; -import java.net.UnknownHostException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.LongAdder; import org.apache.rocketmq.auth.authentication.enums.UserType; import org.apache.rocketmq.auth.authentication.manager.AuthenticationMetadataManager; import org.apache.rocketmq.auth.authentication.model.Subject; @@ -45,8 +30,10 @@ import org.apache.rocketmq.auth.authorization.model.Environment; import org.apache.rocketmq.auth.authorization.model.Resource; import org.apache.rocketmq.broker.BrokerController; +import org.apache.rocketmq.broker.client.ClientChannelInfo; import org.apache.rocketmq.broker.client.ConsumerGroupInfo; import org.apache.rocketmq.broker.client.ConsumerManager; +import org.apache.rocketmq.broker.client.net.Broker2Client; import org.apache.rocketmq.broker.offset.ConsumerOffsetManager; import org.apache.rocketmq.broker.schedule.ScheduleMessageService; import org.apache.rocketmq.broker.subscription.RocksDBSubscriptionGroupManager; @@ -55,11 +42,13 @@ import org.apache.rocketmq.common.BoundaryType; import org.apache.rocketmq.common.BrokerConfig; import org.apache.rocketmq.common.KeyBuilder; +import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.TopicFilterType; import org.apache.rocketmq.common.TopicQueueId; import org.apache.rocketmq.common.action.Action; +import org.apache.rocketmq.common.constant.FIleReadaheadMode; import org.apache.rocketmq.common.constant.PermName; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageAccessor; @@ -67,13 +56,20 @@ import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.topic.TopicValidator; import org.apache.rocketmq.remoting.exception.RemotingCommandException; +import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; +import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; import org.apache.rocketmq.remoting.netty.NettyClientConfig; import org.apache.rocketmq.remoting.netty.NettyServerConfig; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; import org.apache.rocketmq.remoting.protocol.RequestCode; import org.apache.rocketmq.remoting.protocol.ResponseCode; import org.apache.rocketmq.remoting.protocol.body.AclInfo; +import org.apache.rocketmq.remoting.protocol.body.CreateTopicListRequestBody; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.HARuntimeInfo; import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody; +import org.apache.rocketmq.remoting.protocol.body.QueryCorrectionOffsetBody; import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody; import org.apache.rocketmq.remoting.protocol.body.UserInfo; import org.apache.rocketmq.remoting.protocol.header.CreateAclRequestHeader; @@ -82,8 +78,11 @@ import org.apache.rocketmq.remoting.protocol.header.DeleteAclRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader; import org.apache.rocketmq.remoting.protocol.header.DeleteUserRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ExchangeHAInfoResponseHeader; import org.apache.rocketmq.remoting.protocol.header.GetAclRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetAllTopicConfigResponseHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerStatusRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetEarliestMsgStoretimeRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.GetMinOffsetRequestHeader; @@ -91,6 +90,13 @@ import org.apache.rocketmq.remoting.protocol.header.GetUserRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ListAclsRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ListUsersRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.NotifyMinBrokerIdChangeRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryCorrectionOffsetHeader; +import org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ResetMasterFlushOffsetHeader; +import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.ResumeCheckHalfMessageRequestHeader; import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader; import org.apache.rocketmq.remoting.protocol.header.UpdateAclRequestHeader; @@ -98,12 +104,17 @@ import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType; import org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel; import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.apache.rocketmq.store.CommitLog; import org.apache.rocketmq.store.DefaultMessageStore; import org.apache.rocketmq.store.MessageStore; import org.apache.rocketmq.store.SelectMappedBufferResult; import org.apache.rocketmq.store.config.MessageStoreConfig; import org.apache.rocketmq.store.logfile.DefaultMappedFile; import org.apache.rocketmq.store.stats.BrokerStats; +import org.apache.rocketmq.store.timer.TimerCheckpoint; +import org.apache.rocketmq.store.timer.TimerMessageStore; +import org.apache.rocketmq.store.timer.TimerMetrics; +import org.apache.rocketmq.store.util.LibC; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -112,6 +123,26 @@ import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; +import java.io.IOException; +import java.lang.reflect.Field; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.LongAdder; + import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -170,11 +201,32 @@ public class AdminBrokerProcessorTest { @Mock private AuthorizationMetadataManager authorizationMetadataManager; + @Mock + private TimerMessageStore timerMessageStore; + + @Mock + private TimerMetrics timerMetrics; + + @Mock + private MessageStoreConfig messageStoreConfig; + + @Mock + private CommitLog commitLog; + + @Mock + private Broker2Client broker2Client; + + @Mock + private ClientChannelInfo clientChannelInfo; + @Before public void init() throws Exception { brokerController.setMessageStore(messageStore); brokerController.setAuthenticationMetadataManager(authenticationMetadataManager); brokerController.setAuthorizationMetadataManager(authorizationMetadataManager); + Field field = BrokerController.class.getDeclaredField("broker2Client"); + field.setAccessible(true); + field.set(brokerController, broker2Client); //doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor(); @@ -280,6 +332,31 @@ public void testUpdateAndCreateTopic() throws Exception { assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); } + @Test + public void testUpdateAndCreateTopicList() throws RemotingCommandException { + List systemTopicList = new ArrayList<>(systemTopicSet); + RemotingCommand request = buildCreateTopicListRequest(systemTopicList); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).isEqualTo("The topic[" + systemTopicList.get(0) + "] is conflict with system topic."); + + List inValidTopicList = new ArrayList<>(); + inValidTopicList.add(""); + request = buildCreateTopicListRequest(inValidTopicList); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + + List topicList = new ArrayList<>(); + topicList.add("TEST_CREATE_TOPIC"); + topicList.add("TEST_CREATE_TOPIC1"); + request = buildCreateTopicListRequest(topicList); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + //test no changes + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + @Test public void testDeleteTopicInRocksdb() throws Exception { if (notToBeExecuted()) { @@ -815,7 +892,6 @@ public void testCreateAcl() throws RemotingCommandException { request.setBody(JSON.toJSONBytes(aclInfo)); RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); - } @Test @@ -833,7 +909,6 @@ public void testUpdateAcl() throws RemotingCommandException { request.setBody(JSON.toJSONBytes(aclInfo)); RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); - } @Test @@ -893,6 +968,349 @@ public void testListAcl() throws RemotingCommandException { assertThat(aclInfoData.get(0).getPolicies().get(0).getEntries().get(0).getDecision()).isEqualTo("Allow"); } + @Test + public void testGetTimeCheckPoint() throws RemotingCommandException { + when(this.brokerController.getTimerCheckpoint()).thenReturn(null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_CHECK_POINT, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + assertThat(response.getRemark()).isEqualTo("The checkpoint is null"); + + when(this.brokerController.getTimerCheckpoint()).thenReturn(new TimerCheckpoint()); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + @Test + public void testGetTimeMetrics() throws RemotingCommandException, IOException { + when(this.brokerController.getMessageStore().getTimerMessageStore()).thenReturn(null); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TIMER_METRICS, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + + when(this.brokerController.getMessageStore().getTimerMessageStore()).thenReturn(timerMessageStore); + when(this.timerMessageStore.getTimerMetrics()).thenReturn(timerMetrics); + when(this.timerMetrics.encode()).thenReturn(new TimerMetrics.TimerMetricsSerializeWrapper().toJson(false)); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testUpdateColdDataFlowCtrGroupConfig() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_COLD_DATA_FLOW_CTR_CONFIG, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + request.setBody("consumerGroup1=1".getBytes()); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + request.setBody("".getBytes()); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testRemoveColdDataFlowCtrGroupConfig() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REMOVE_COLD_DATA_FLOW_CTR_CONFIG, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + request.setBody("consumerGroup1".getBytes()); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testGetColdDataFlowCtrInfo() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_COLD_DATA_FLOW_CTR_INFO, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testSetCommitLogReadAheadMode() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SET_COMMITLOG_READ_MODE, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + + HashMap extfields = new HashMap<>(); + extfields.put(FIleReadaheadMode.READ_AHEAD_MODE, String.valueOf(LibC.MADV_DONTNEED)); + request.setExtFields(extfields); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + + extfields.clear(); + extfields.put(FIleReadaheadMode.READ_AHEAD_MODE, String.valueOf(LibC.MADV_NORMAL)); + request.setExtFields(extfields); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + this.brokerController.setMessageStore(defaultMessageStore); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + + when(this.defaultMessageStore.getMessageStoreConfig()).thenReturn(messageStoreConfig); + when(this.defaultMessageStore.getCommitLog()).thenReturn(commitLog); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testGetUnknownCmdResponse() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(10000, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.REQUEST_CODE_NOT_SUPPORTED); + } + + @Test + public void testGetAllMessageRequestMode() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_MESSAGE_REQUEST_MODE, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testResetOffset() throws RemotingCommandException { + ResetOffsetRequestHeader requestHeader = + createRequestHeader("topic","group",-1,false,-1,-1); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST); + + this.brokerController.getTopicConfigManager().getTopicConfigTable().put("topic", new TopicConfig("topic")); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); + + this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().put("group", new SubscriptionGroupConfig()); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + requestHeader.setQueueId(0); + request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader); + request.makeCustomHeaderToNet(); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + requestHeader.setOffset(2L); + request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader); + request.makeCustomHeaderToNet(); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + } + + @Test + public void testGetConsumerStatus() throws RemotingCommandException { + GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); + requestHeader.setGroup("group"); + requestHeader.setTopic("topic"); + requestHeader.setClientAddr(""); + RemotingCommand request = RemotingCommand + .createRequestCommand(RequestCode.INVOKE_BROKER_TO_GET_CONSUMER_STATUS, requestHeader); + RemotingCommand responseCommand = RemotingCommand.createResponseCommand(null); + responseCommand.setCode(ResponseCode.SUCCESS); + when(broker2Client.getConsumeStatus(anyString(),anyString(),anyString())).thenReturn(responseCommand); + request.makeCustomHeaderToNet(); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testQueryTopicConsumeByWho() throws RemotingCommandException { + QueryTopicConsumeByWhoRequestHeader requestHeader = new QueryTopicConsumeByWhoRequestHeader(); + requestHeader.setTopic("topic"); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPIC_CONSUME_BY_WHO, requestHeader); + request.makeCustomHeaderToNet(); + HashSet groups = new HashSet<>(); + groups.add("group"); + when(brokerController.getConsumerManager()).thenReturn(consumerManager); + when(consumerManager.queryTopicConsumeByWho(anyString())).thenReturn(groups); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + assertThat(RemotingSerializable.decode(response.getBody(), GroupList.class) + .getGroupList().contains("group")) + .isEqualTo(groups.contains("group")); + } + + @Test + public void testQueryTopicByConsumer() throws RemotingCommandException { + QueryTopicsByConsumerRequestHeader requestHeader = new QueryTopicsByConsumerRequestHeader(); + requestHeader.setGroup("group"); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_TOPICS_BY_CONSUMER, requestHeader); + request.makeCustomHeaderToNet(); + when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testQuerySubscriptionByConsumer() throws RemotingCommandException { + QuerySubscriptionByConsumerRequestHeader requestHeader = new QuerySubscriptionByConsumerRequestHeader(); + requestHeader.setGroup("group"); + requestHeader.setTopic("topic"); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_SUBSCRIPTION_BY_CONSUMER, requestHeader); + request.makeCustomHeaderToNet(); + when(brokerController.getConsumerManager()).thenReturn(consumerManager); + when(consumerManager.findSubscriptionData(anyString(),anyString())).thenReturn(null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testGetSystemTopicListFromBroker() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_BROKER, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testCleanExpiredConsumeQueue() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_EXPIRED_CONSUMEQUEUE, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testDeleteExpiredCommitLog() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_EXPIRED_COMMITLOG, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testCleanUnusedTopic() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CLEAN_UNUSED_TOPIC, null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testGetConsumerRunningInfo() throws RemotingCommandException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { + when(brokerController.getConsumerManager()).thenReturn(consumerManager); + when(consumerManager.findChannel(anyString(),anyString())).thenReturn(null); + GetConsumerRunningInfoRequestHeader requestHeader = new GetConsumerRunningInfoRequestHeader(); + requestHeader.setClientId("client"); + requestHeader.setConsumerGroup("group"); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_RUNNING_INFO, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + + when(consumerManager.findChannel(anyString(),anyString())).thenReturn(clientChannelInfo); + when(clientChannelInfo.getVersion()).thenReturn(MQVersion.Version.V3_0_0_SNAPSHOT.ordinal()); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + + when(clientChannelInfo.getVersion()).thenReturn(MQVersion.Version.V5_2_3.ordinal()); + when(brokerController.getBroker2Client()).thenReturn(broker2Client); + when(clientChannelInfo.getChannel()).thenReturn(channel); + RemotingCommand responseCommand = RemotingCommand.createResponseCommand(null); + responseCommand.setCode(ResponseCode.SUCCESS); + when(broker2Client.callClient(any(Channel.class),any(RemotingCommand.class))).thenReturn(responseCommand); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + when(broker2Client.callClient(any(Channel.class),any(RemotingCommand.class))).thenThrow(new RemotingTimeoutException("timeout")); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.CONSUME_MSG_TIMEOUT); + } + + @Test + public void testQueryCorrectionOffset() throws RemotingCommandException { + Map correctionOffsetMap = new HashMap<>(); + correctionOffsetMap.put(0, 100L); + correctionOffsetMap.put(1, 200L); + Map compareOffsetMap = new HashMap<>(); + compareOffsetMap.put(0, 80L); + compareOffsetMap.put(1, 300L); + when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager); + when(consumerOffsetManager.queryMinOffsetInAllGroup(anyString(),anyString())).thenReturn(correctionOffsetMap); + when(consumerOffsetManager.queryOffset(anyString(),anyString())).thenReturn(compareOffsetMap); + QueryCorrectionOffsetHeader queryCorrectionOffsetHeader = new QueryCorrectionOffsetHeader(); + queryCorrectionOffsetHeader.setTopic("topic"); + queryCorrectionOffsetHeader.setCompareGroup("group"); + queryCorrectionOffsetHeader.setFilterGroups(""); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.QUERY_CORRECTION_OFFSET, queryCorrectionOffsetHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + QueryCorrectionOffsetBody body = RemotingSerializable.decode(response.getBody(), QueryCorrectionOffsetBody.class); + Map correctionOffsets = body.getCorrectionOffsets(); + assertThat(correctionOffsets.get(0)).isEqualTo(Long.MAX_VALUE); + assertThat(correctionOffsets.get(1)).isEqualTo(200L); + } + + @Test + public void testNotifyMinBrokerIdChange() throws RemotingCommandException { + NotifyMinBrokerIdChangeRequestHeader requestHeader = new NotifyMinBrokerIdChangeRequestHeader(); + requestHeader.setMinBrokerId(1L); + requestHeader.setMinBrokerAddr("127.0.0.1:10912"); + requestHeader.setOfflineBrokerAddr("127.0.0.1:10911"); + requestHeader.setHaBrokerAddr(""); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.NOTIFY_MIN_BROKER_ID_CHANGE, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testUpdateBrokerHaInfo() throws RemotingCommandException { + ExchangeHAInfoResponseHeader requestHeader = new ExchangeHAInfoResponseHeader(); + requestHeader.setMasterAddress("127.0.0.1:10911"); + requestHeader.setMasterFlushOffset(0L); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.EXCHANGE_BROKER_HA_INFO, requestHeader); + request.makeCustomHeaderToNet(); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + when(brokerController.getMessageStore()).thenReturn(messageStore); + requestHeader.setMasterHaAddress("127.0.0.1:10912"); + request = RemotingCommand.createRequestCommand(RequestCode.EXCHANGE_BROKER_HA_INFO, requestHeader); + request.makeCustomHeaderToNet(); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + when(messageStore.getMasterFlushedOffset()).thenReturn(0L); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testGetBrokerHaStatus() throws RemotingCommandException { + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_HA_STATUS,null); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR); + + when(brokerController.getMessageStore()).thenReturn(messageStore); + when(messageStore.getHARuntimeInfo()).thenReturn(new HARuntimeInfo()); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + @Test + public void testResetMasterFlushOffset() throws RemotingCommandException { + ResetMasterFlushOffsetHeader requestHeader = new ResetMasterFlushOffsetHeader(); + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESET_MASTER_FLUSH_OFFSET,requestHeader); + RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + + requestHeader.setMasterFlushOffset(0L); + request.makeCustomHeaderToNet(); + response = adminBrokerProcessor.processRequest(handlerContext, request); + assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS); + } + + private ResetOffsetRequestHeader createRequestHeader(String topic,String group,long timestamp,boolean force,long offset,int queueId) { + ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); + requestHeader.setTopic(topic); + requestHeader.setGroup(group); + requestHeader.setTimestamp(timestamp); + requestHeader.setForce(force); + requestHeader.setOffset(offset); + requestHeader.setQueueId(queueId); + return requestHeader; + } + private RemotingCommand buildCreateTopicRequest(String topic) { CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader(); requestHeader.setTopic(topic); @@ -900,12 +1318,29 @@ private RemotingCommand buildCreateTopicRequest(String topic) { requestHeader.setReadQueueNums(8); requestHeader.setWriteQueueNums(8); requestHeader.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); - RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader); request.makeCustomHeaderToNet(); return request; } + private RemotingCommand buildCreateTopicListRequest(List topicList) { + List topicConfigList = new ArrayList<>(); + for (String topic:topicList) { + TopicConfig topicConfig = new TopicConfig(topic); + topicConfig.setReadQueueNums(8); + topicConfig.setWriteQueueNums(8); + topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG); + topicConfig.setPerm(PermName.PERM_READ | PermName.PERM_WRITE); + topicConfig.setTopicSysFlag(0); + topicConfig.setOrder(false); + topicConfigList.add(topicConfig); + } + RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC_LIST, null); + CreateTopicListRequestBody createTopicListRequestBody = new CreateTopicListRequestBody(topicConfigList); + request.setBody(createTopicListRequestBody.encode()); + return request; + } + private RemotingCommand buildDeleteTopicRequest(String topic) { DeleteTopicRequestHeader requestHeader = new DeleteTopicRequestHeader(); requestHeader.setTopic(topic);