Skip to content

Commit

Permalink
[eclipse-hono#3543] Acknowledge Pub/Sub messages on command_internal …
Browse files Browse the repository at this point in the history
…subscription when they are being processed by Hono

Signed-off-by: Matthias Kaemmer <[email protected]>
  • Loading branch information
mattkaem committed Sep 26, 2023
1 parent a7ec5d6 commit 00a508e
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,11 @@ public Future<Void> start() {
}

private void createReceiver() {
receiver = (PubsubMessage message, AckReplyConsumer consumer) -> handleCommandMessage(message);
receiver = this::handleCommandMessage;
}

Future<Void> handleCommandMessage(final PubsubMessage message) {
Future<Void> handleCommandMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
consumer.ack();
final PubSubBasedCommand command;
try {
command = PubSubBasedCommand.fromRoutedCommandMessage(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.PubsubMessage;

Expand Down Expand Up @@ -81,6 +83,8 @@ public class PubSubBasedInternalCommandConsumerTest {

private TenantClient tenantClient;

private AckReplyConsumer ackReplyConsumer;

private PubSubBasedInternalCommandConsumer internalCommandConsumer;

@BeforeEach
Expand All @@ -91,6 +95,9 @@ void setUp() {
final Tracer tracer = TracingMockSupport.mockTracer(TracingMockSupport.mockSpan());
commandHandlers = new CommandHandlers();
tenantClient = mock(TenantClient.class);
ackReplyConsumer = mock(AckReplyConsumer.class);

doNothing().when(ackReplyConsumer).ack();

doAnswer(invocation -> {
final String tenantId = invocation.getArgument(0);
Expand All @@ -109,7 +116,7 @@ void setUp() {
.thenReturn(Future.succeededFuture(topicAndSubscription));
when(adminClientManager
.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.thenReturn(Future.succeededFuture(topicAndSubscription));
.thenReturn(Future.succeededFuture(topicAndSubscription));

subscriber = mock(PubSubSubscriberClient.class);
when(subscriber.subscribe(true)).thenReturn(Future.succeededFuture());
Expand Down Expand Up @@ -156,7 +163,8 @@ public void testHandleCommandMessageSendErrorResponse() {
Future.failedFuture(
StatusCodeMapper.from(HttpURLConnection.HTTP_UNAVAILABLE, "failed to retrieve tenant")));

internalCommandConsumer.handleCommandMessage(message);
internalCommandConsumer.handleCommandMessage(message, ackReplyConsumer);
verify(ackReplyConsumer).ack();
verify(commandHandler, never()).apply(any(PubSubBasedCommandContext.class));
verify(commandResponseSender).sendCommandResponse(
argThat(t -> t.getTenantId().equals(tenantId)),
Expand All @@ -166,7 +174,8 @@ public void testHandleCommandMessageSendErrorResponse() {
}

/**
* Verifies that the consumer handles an invalid command message with missing subject by invoking the matching handler.
* Verifies that the consumer handles an invalid command message with missing subject by invoking the matching
* handler.
*/
@Test
public void testHandleCommandMessageWithInvalidAttribute() {
Expand All @@ -175,10 +184,11 @@ public void testHandleCommandMessageWithInvalidAttribute() {
final Context context = VertxMockSupport.mockContext(mock(Vertx.class));
commandHandlers.putCommandHandler(tenantId, deviceId, null, commandHandler, context);

internalCommandConsumer.handleCommandMessage(message);
internalCommandConsumer.handleCommandMessage(message, ackReplyConsumer);

final ArgumentCaptor<CommandContext> commandContextArgumentCaptor = ArgumentCaptor
.forClass(CommandContext.class);
verify(ackReplyConsumer).ack();
verify(commandHandler).apply(commandContextArgumentCaptor.capture());
assertThat(commandContextArgumentCaptor.getValue()).isNotNull();
assertThat(commandContextArgumentCaptor.getValue().getCommand().isValid()).isFalse();
Expand All @@ -196,10 +206,11 @@ public void testHandleCommandMessageWithHandlerForDevice() {
final Context context = VertxMockSupport.mockContext(mock(Vertx.class));
commandHandlers.putCommandHandler(tenantId, deviceId, null, commandHandler, context);

internalCommandConsumer.handleCommandMessage(message);
internalCommandConsumer.handleCommandMessage(message, ackReplyConsumer);

final ArgumentCaptor<CommandContext> commandContextArgumentCaptor = ArgumentCaptor
.forClass(CommandContext.class);
verify(ackReplyConsumer).ack();
verify(commandHandler).apply(commandContextArgumentCaptor.capture());
assertThat(commandContextArgumentCaptor.getValue()).isNotNull();
assertThat(commandContextArgumentCaptor.getValue().getCommand().isValid()).isTrue();
Expand All @@ -213,7 +224,8 @@ public void testHandleCommandMessageWithNoHandlerForDevice() {
final PubsubMessage message = getPubSubMessage(subject, "true");
final Function<CommandContext, Future<Void>> commandHandler = mock(Function.class);

internalCommandConsumer.handleCommandMessage(message);
internalCommandConsumer.handleCommandMessage(message, ackReplyConsumer);
verify(ackReplyConsumer).ack();
verify(commandHandler, never()).apply(any(PubSubBasedCommandContext.class));
verify(commandResponseSender).sendCommandResponse(
argThat(t -> t.getTenantId().equals(tenantId)),
Expand All @@ -223,8 +235,8 @@ public void testHandleCommandMessageWithNoHandlerForDevice() {
}

/**
* Verifies that a failed future is returned if an IllegalArgumentException is thrown when no command can be
* created by the PubSubMessage.
* Verifies that a failed future is returned if an IllegalArgumentException is thrown when no command can be created
* by the PubSubMessage.
*/
@Test
public void testHandleCommandMessageFailedWhenMessageContainsNoAttributes() {
Expand All @@ -233,8 +245,9 @@ public void testHandleCommandMessageFailedWhenMessageContainsNoAttributes() {
final Context context = VertxMockSupport.mockContext(mock(Vertx.class));
commandHandlers.putCommandHandler(tenantId, deviceId, null, commandHandler, context);

final Future<Void> result = internalCommandConsumer.handleCommandMessage(message);
final Future<Void> result = internalCommandConsumer.handleCommandMessage(message, ackReplyConsumer);
assertThat(result.failed()).isTrue();
verify(ackReplyConsumer).ack();
}

private PubsubMessage getPubSubMessage(final String subject, final String responseRequired) {
Expand All @@ -246,7 +259,8 @@ private PubsubMessage getPubSubMessage(final String subject, final String respon
Optional.ofNullable(subject)
.ifPresent(ok -> attributes.put(MessageHelper.SYS_PROPERTY_SUBJECT, subject));
Optional.ofNullable(responseRequired)
.ifPresent(ok -> attributes.put(PubSubMessageHelper.PUBSUB_PROPERTY_RESPONSE_REQUIRED, responseRequired));
.ifPresent(
ok -> attributes.put(PubSubMessageHelper.PUBSUB_PROPERTY_RESPONSE_REQUIRED, responseRequired));

return PubsubMessage.newBuilder().putAllAttributes(attributes).build();
}
Expand Down

0 comments on commit 00a508e

Please sign in to comment.