diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java new file mode 100644 index 00000000000..71682fb52c0 --- /dev/null +++ b/client/src/test/java/org/apache/rocketmq/client/impl/admin/MqClientAdminImplTest.java @@ -0,0 +1,559 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.client.impl.admin; + +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.common.message.MessageConst; +import org.apache.rocketmq.common.message.MessageDecoder; +import org.apache.rocketmq.common.message.MessageExt; +import org.apache.rocketmq.common.message.MessageQueue; +import org.apache.rocketmq.remoting.RemotingClient; +import org.apache.rocketmq.remoting.exception.RemotingException; +import org.apache.rocketmq.remoting.protocol.RemotingCommand; +import org.apache.rocketmq.remoting.protocol.RemotingSerializable; +import org.apache.rocketmq.remoting.protocol.ResponseCode; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; +import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; +import org.apache.rocketmq.remoting.protocol.body.ConsumeMessageDirectlyResult; +import org.apache.rocketmq.remoting.protocol.body.ConsumerConnection; +import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo; +import org.apache.rocketmq.remoting.protocol.body.GroupList; +import org.apache.rocketmq.remoting.protocol.body.QueryConsumeTimeSpanBody; +import org.apache.rocketmq.remoting.protocol.body.QueueTimeSpan; +import org.apache.rocketmq.remoting.protocol.body.ResetOffsetBody; +import org.apache.rocketmq.remoting.protocol.body.TopicList; +import org.apache.rocketmq.remoting.protocol.header.ConsumeMessageDirectlyResultRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.CreateTopicRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteSubscriptionGroupRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.DeleteTopicRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumeStatsRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerConnectionListRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetConsumerRunningInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.GetTopicStatsInfoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryConsumeTimeSpanRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QuerySubscriptionByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicConsumeByWhoRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.QueryTopicsByConsumerRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ResetOffsetRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.ViewMessageRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteKVConfigRequestHeader; +import org.apache.rocketmq.remoting.protocol.header.namesrv.DeleteTopicFromNamesrvRequestHeader; +import org.apache.rocketmq.remoting.protocol.heartbeat.SubscriptionData; +import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class MqClientAdminImplTest { + + @Mock + private RemotingClient remotingClient; + + @Mock + private RemotingCommand response; + + private MqClientAdminImpl mqClientAdminImpl; + + private final String defaultTopic = "defaultTopic"; + + private final String defaultBrokerAddr = "127.0.0.1:10911"; + + private final long defaultTimeout = 3000L; + + @Before + public void init() throws RemotingException, InterruptedException, MQClientException { + mqClientAdminImpl = new MqClientAdminImpl(remotingClient); + when(remotingClient.invoke(any(String.class), any(RemotingCommand.class), any(Long.class))).thenReturn(CompletableFuture.completedFuture(response)); + } + + @Test + public void assertQueryMessageWithSuccess() throws Exception { + setResponseSuccess(getMessageResult()); + QueryMessageRequestHeader requestHeader = mock(QueryMessageRequestHeader.class); + when(requestHeader.getTopic()).thenReturn(defaultTopic); + when(requestHeader.getKey()).thenReturn("keys"); + CompletableFuture> actual = mqClientAdminImpl.queryMessage(defaultBrokerAddr, false, false, requestHeader, defaultTimeout); + List messageExtList = actual.get(); + assertNotNull(messageExtList); + assertEquals(1, messageExtList.size()); + } + + @Test + public void assertQueryMessageWithNotFound() throws Exception { + when(response.getCode()).thenReturn(ResponseCode.QUERY_NOT_FOUND); + QueryMessageRequestHeader requestHeader = mock(QueryMessageRequestHeader.class); + CompletableFuture> actual = mqClientAdminImpl.queryMessage(defaultBrokerAddr, false, false, requestHeader, defaultTimeout); + List messageExtList = actual.get(); + assertNotNull(messageExtList); + assertEquals(0, messageExtList.size()); + } + + @Test + public void assertQueryMessageWithError() { + setResponseError(); + QueryMessageRequestHeader requestHeader = mock(QueryMessageRequestHeader.class); + CompletableFuture> actual = mqClientAdminImpl.queryMessage(defaultBrokerAddr, false, false, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertGetTopicStatsInfoWithSuccess() throws Exception { + TopicStatsTable responseBody = new TopicStatsTable(); + setResponseSuccess(RemotingSerializable.encode(responseBody)); + GetTopicStatsInfoRequestHeader requestHeader = mock(GetTopicStatsInfoRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.getTopicStatsInfo(defaultBrokerAddr, requestHeader, defaultTimeout); + TopicStatsTable topicStatsTable = actual.get(); + assertNotNull(topicStatsTable); + assertEquals(0, topicStatsTable.getOffsetTable().size()); + } + + @Test + public void assertGetTopicStatsInfoWithError() { + setResponseError(); + GetTopicStatsInfoRequestHeader requestHeader = mock(GetTopicStatsInfoRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.getTopicStatsInfo(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertQueryConsumeTimeSpanWithSuccess() throws Exception { + QueryConsumeTimeSpanBody responseBody = new QueryConsumeTimeSpanBody(); + setResponseSuccess(RemotingSerializable.encode(responseBody)); + QueryConsumeTimeSpanRequestHeader requestHeader = mock(QueryConsumeTimeSpanRequestHeader.class); + CompletableFuture> actual = mqClientAdminImpl.queryConsumeTimeSpan(defaultBrokerAddr, requestHeader, defaultTimeout); + List queueTimeSpans = actual.get(); + assertNotNull(queueTimeSpans); + assertEquals(0, queueTimeSpans.size()); + } + + @Test + public void assertQueryConsumeTimeSpanWithError() { + setResponseError(); + QueryConsumeTimeSpanRequestHeader requestHeader = mock(QueryConsumeTimeSpanRequestHeader.class); + CompletableFuture> actual = mqClientAdminImpl.queryConsumeTimeSpan(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertUpdateOrCreateTopicWithSuccess() throws Exception { + setResponseSuccess(null); + CreateTopicRequestHeader requestHeader = mock(CreateTopicRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.updateOrCreateTopic(defaultBrokerAddr, requestHeader, defaultTimeout); + assertNull(actual.get()); + } + + @Test + public void assertUpdateOrCreateTopicWithError() { + setResponseError(); + CreateTopicRequestHeader requestHeader = mock(CreateTopicRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.updateOrCreateTopic(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertUpdateOrCreateSubscriptionGroupWithSuccess() throws Exception { + setResponseSuccess(null); + SubscriptionGroupConfig config = mock(SubscriptionGroupConfig.class); + CompletableFuture actual = mqClientAdminImpl.updateOrCreateSubscriptionGroup(defaultBrokerAddr, config, defaultTimeout); + assertNull(actual.get()); + } + + @Test + public void assertUpdateOrCreateSubscriptionGroupWithError() { + setResponseError(); + SubscriptionGroupConfig config = mock(SubscriptionGroupConfig.class); + CompletableFuture actual = mqClientAdminImpl.updateOrCreateSubscriptionGroup(defaultBrokerAddr, config, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertDeleteTopicInBrokerWithSuccess() throws Exception { + setResponseSuccess(null); + DeleteTopicRequestHeader requestHeader = mock(DeleteTopicRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.deleteTopicInBroker(defaultBrokerAddr, requestHeader, defaultTimeout); + assertNull(actual.get()); + } + + @Test + public void assertDeleteTopicInBrokerWithError() { + setResponseError(); + DeleteTopicRequestHeader requestHeader = mock(DeleteTopicRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.deleteTopicInBroker(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertDeleteTopicInNameserverWithSuccess() throws Exception { + setResponseSuccess(null); + DeleteTopicFromNamesrvRequestHeader requestHeader = mock(DeleteTopicFromNamesrvRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.deleteTopicInNameserver(defaultBrokerAddr, requestHeader, defaultTimeout); + assertNull(actual.get()); + } + + @Test + public void assertDeleteTopicInNameserverWithError() { + setResponseError(); + DeleteTopicFromNamesrvRequestHeader requestHeader = mock(DeleteTopicFromNamesrvRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.deleteTopicInNameserver(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertDeleteKvConfigWithSuccess() throws Exception { + setResponseSuccess(null); + DeleteKVConfigRequestHeader requestHeader = mock(DeleteKVConfigRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.deleteKvConfig(defaultBrokerAddr, requestHeader, defaultTimeout); + assertNull(actual.get()); + } + + @Test + public void assertDeleteKvConfigWithError() { + setResponseError(); + DeleteKVConfigRequestHeader requestHeader = mock(DeleteKVConfigRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.deleteKvConfig(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertDeleteSubscriptionGroupWithSuccess() throws Exception { + setResponseSuccess(null); + DeleteSubscriptionGroupRequestHeader requestHeader = mock(DeleteSubscriptionGroupRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.deleteSubscriptionGroup(defaultBrokerAddr, requestHeader, defaultTimeout); + assertNull(actual.get()); + } + + @Test + public void assertDeleteSubscriptionGroupWithError() { + setResponseError(); + DeleteSubscriptionGroupRequestHeader requestHeader = mock(DeleteSubscriptionGroupRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.deleteSubscriptionGroup(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertInvokeBrokerToResetOffsetWithSuccess() throws Exception { + ResetOffsetBody responseBody = new ResetOffsetBody(); + setResponseSuccess(RemotingSerializable.encode(responseBody)); + ResetOffsetRequestHeader requestHeader = mock(ResetOffsetRequestHeader.class); + CompletableFuture> actual = mqClientAdminImpl.invokeBrokerToResetOffset(defaultBrokerAddr, requestHeader, defaultTimeout); + assertEquals(0, actual.get().size()); + } + + @Test + public void assertInvokeBrokerToResetOffsetWithError() { + setResponseError(); + ResetOffsetRequestHeader requestHeader = mock(ResetOffsetRequestHeader.class); + CompletableFuture> actual = mqClientAdminImpl.invokeBrokerToResetOffset(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertViewMessageWithSuccess() throws Exception { + setResponseSuccess(getMessageResult()); + ViewMessageRequestHeader requestHeader = mock(ViewMessageRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.viewMessage(defaultBrokerAddr, requestHeader, defaultTimeout); + MessageExt result = actual.get(); + assertNotNull(result); + assertEquals(defaultTopic, result.getTopic()); + } + + @Test + public void assertViewMessageWithError() { + setResponseError(); + ViewMessageRequestHeader requestHeader = mock(ViewMessageRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.viewMessage(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertGetBrokerClusterInfoWithSuccess() throws Exception { + ClusterInfo responseBody = new ClusterInfo(); + setResponseSuccess(RemotingSerializable.encode(responseBody)); + CompletableFuture actual = mqClientAdminImpl.getBrokerClusterInfo(defaultBrokerAddr, defaultTimeout); + ClusterInfo result = actual.get(); + assertNotNull(result); + } + + @Test + public void assertGetBrokerClusterInfoWithError() { + setResponseError(); + CompletableFuture actual = mqClientAdminImpl.getBrokerClusterInfo(defaultBrokerAddr, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertGetConsumerConnectionListWithSuccess() throws Exception { + ConsumerConnection responseBody = new ConsumerConnection(); + setResponseSuccess(RemotingSerializable.encode(responseBody)); + GetConsumerConnectionListRequestHeader requestHeader = mock(GetConsumerConnectionListRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.getConsumerConnectionList(defaultBrokerAddr, requestHeader, defaultTimeout); + ConsumerConnection result = actual.get(); + assertNotNull(result); + assertEquals(0, result.getConnectionSet().size()); + } + + @Test + public void assertGetConsumerConnectionListWithError() { + setResponseError(); + GetConsumerConnectionListRequestHeader requestHeader = mock(GetConsumerConnectionListRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.getConsumerConnectionList(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertQueryTopicsByConsumerWithSuccess() throws Exception { + TopicList responseBody = new TopicList(); + setResponseSuccess(RemotingSerializable.encode(responseBody)); + QueryTopicsByConsumerRequestHeader requestHeader = mock(QueryTopicsByConsumerRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.queryTopicsByConsumer(defaultBrokerAddr, requestHeader, defaultTimeout); + TopicList result = actual.get(); + assertNotNull(result); + assertEquals(0, result.getTopicList().size()); + } + + @Test + public void assertQueryTopicsByConsumerWithError() { + setResponseError(); + QueryTopicsByConsumerRequestHeader requestHeader = mock(QueryTopicsByConsumerRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.queryTopicsByConsumer(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertQuerySubscriptionByConsumerWithSuccess() throws Exception { + SubscriptionData responseBody = new SubscriptionData(); + setResponseSuccess(RemotingSerializable.encode(responseBody)); + QuerySubscriptionByConsumerRequestHeader requestHeader = mock(QuerySubscriptionByConsumerRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.querySubscriptionByConsumer(defaultBrokerAddr, requestHeader, defaultTimeout); + assertNull(actual.get()); + } + + @Test + public void assertQuerySubscriptionByConsumerWithError() { + setResponseError(); + QuerySubscriptionByConsumerRequestHeader requestHeader = mock(QuerySubscriptionByConsumerRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.querySubscriptionByConsumer(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertGetConsumeStatsWithSuccess() throws Exception { + ConsumeStats responseBody = new ConsumeStats(); + setResponseSuccess(RemotingSerializable.encode(responseBody)); + GetConsumeStatsRequestHeader requestHeader = mock(GetConsumeStatsRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.getConsumeStats(defaultBrokerAddr, requestHeader, defaultTimeout); + ConsumeStats result = actual.get(); + assertNotNull(result); + assertEquals(0, result.getOffsetTable().size()); + } + + @Test + public void assertGetConsumeStatsWithError() { + setResponseError(); + GetConsumeStatsRequestHeader requestHeader = mock(GetConsumeStatsRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.getConsumeStats(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertQueryTopicConsumeByWhoWithSuccess() throws Exception { + GroupList responseBody = new GroupList(); + setResponseSuccess(RemotingSerializable.encode(responseBody)); + QueryTopicConsumeByWhoRequestHeader requestHeader = mock(QueryTopicConsumeByWhoRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.queryTopicConsumeByWho(defaultBrokerAddr, requestHeader, defaultTimeout); + GroupList result = actual.get(); + assertNotNull(result); + assertEquals(0, result.getGroupList().size()); + } + + @Test + public void assertQueryTopicConsumeByWhoWithError() { + setResponseError(); + QueryTopicConsumeByWhoRequestHeader requestHeader = mock(QueryTopicConsumeByWhoRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.queryTopicConsumeByWho(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertGetConsumerRunningInfoWithSuccess() throws Exception { + ConsumerRunningInfo responseBody = new ConsumerRunningInfo(); + setResponseSuccess(RemotingSerializable.encode(responseBody)); + GetConsumerRunningInfoRequestHeader requestHeader = mock(GetConsumerRunningInfoRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.getConsumerRunningInfo(defaultBrokerAddr, requestHeader, defaultTimeout); + ConsumerRunningInfo result = actual.get(); + assertNotNull(result); + assertEquals(0, result.getProperties().size()); + } + + @Test + public void assertGetConsumerRunningInfoWithError() { + setResponseError(); + GetConsumerRunningInfoRequestHeader requestHeader = mock(GetConsumerRunningInfoRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.getConsumerRunningInfo(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + @Test + public void assertConsumeMessageDirectlyWithSuccess() throws Exception { + ConsumeMessageDirectlyResult responseBody = new ConsumeMessageDirectlyResult(); + setResponseSuccess(RemotingSerializable.encode(responseBody)); + ConsumeMessageDirectlyResultRequestHeader requestHeader = mock(ConsumeMessageDirectlyResultRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.consumeMessageDirectly(defaultBrokerAddr, requestHeader, defaultTimeout); + ConsumeMessageDirectlyResult result = actual.get(); + assertNotNull(result); + assertTrue(result.isAutoCommit()); + } + + @Test + public void assertConsumeMessageDirectlyWithError() { + setResponseError(); + ConsumeMessageDirectlyResultRequestHeader requestHeader = mock(ConsumeMessageDirectlyResultRequestHeader.class); + CompletableFuture actual = mqClientAdminImpl.consumeMessageDirectly(defaultBrokerAddr, requestHeader, defaultTimeout); + Throwable thrown = assertThrows(ExecutionException.class, actual::get); + assertTrue(thrown.getCause() instanceof MQClientException); + MQClientException mqException = (MQClientException) thrown.getCause(); + assertEquals(ResponseCode.SYSTEM_ERROR, mqException.getResponseCode()); + assertTrue(mqException.getMessage().contains("CODE: 1 DESC: null")); + } + + private byte[] getMessageResult() throws Exception { + byte[] bytes = MessageDecoder.encode(createMessageExt(), false); + ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length); + byteBuffer.put(bytes); + return byteBuffer.array(); + } + + private MessageExt createMessageExt() { + MessageExt result = new MessageExt(); + result.setBody("body".getBytes(StandardCharsets.UTF_8)); + result.setTopic(defaultTopic); + result.setBrokerName("defaultBroker"); + result.putUserProperty("key", "value"); + result.getProperties().put(MessageConst.PROPERTY_PRODUCER_GROUP, "defaultGroup"); + result.getProperties().put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "TX1"); + result.setKeys("keys"); + SocketAddress bornHost = new InetSocketAddress("127.0.0.1", 12911); + SocketAddress storeHost = new InetSocketAddress("127.0.0.1", 10911); + result.setBornHost(bornHost); + result.setStoreHost(storeHost); + return result; + } + + private void setResponseSuccess(byte[] body) { + when(response.getCode()).thenReturn(ResponseCode.SUCCESS); + when(response.getBody()).thenReturn(body); + } + + private void setResponseError() { + when(response.getCode()).thenReturn(ResponseCode.SYSTEM_ERROR); + } +}