From aabafcef5571302dae30c3751c885eaa17a3891e Mon Sep 17 00:00:00 2001 From: Matthias Kaemmer Date: Fri, 2 Feb 2024 13:57:01 +0100 Subject: [PATCH] [#3585] Add acknowledgement required command feature for Google Pub/Sub based commands Signed-off-by: Matthias Kaemmer --- ...AbstractVertxBasedMqttProtocolAdapter.java | 18 +++- .../command/amqp/ProtonBasedCommand.java | 7 +- .../command/kafka/KafkaBasedCommand.java | 7 +- .../command/pubsub/PubSubBasedCommand.java | 18 +++- .../command/AbstractCommandContext.java | 84 +++++++++++++++---- .../eclipse/hono/client/command/Command.java | 9 +- .../client/pubsub/PubSubMessageHelper.java | 17 +++- .../eclipse/hono/util/CommandConstants.java | 7 +- .../api/command-and-control-pubsub/index.md | 21 +++-- 9 files changed, 153 insertions(+), 35 deletions(-) diff --git a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java index d35eafd549..b02fb04c22 100644 --- a/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java +++ b/adapters/mqtt-base/src/main/java/org/eclipse/hono/adapter/mqtt/AbstractVertxBasedMqttProtocolAdapter.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -59,6 +59,7 @@ import org.eclipse.hono.client.NoConsumerException; import org.eclipse.hono.client.ServerErrorException; import org.eclipse.hono.client.ServiceInvocationException; +import org.eclipse.hono.client.command.AbstractCommandContext; import org.eclipse.hono.client.command.Command; import org.eclipse.hono.client.command.CommandContext; import org.eclipse.hono.client.command.CommandResponse; @@ -1734,8 +1735,19 @@ private void afterCommandPublished( reportPublishedCommand(tenantObject, subscription, commandContext, ProcessingOutcome.FORWARDED); log.debug("received PUBACK [packet-id: {}] for command [tenant-id: {}, device-id: {}, MQTT client-id: {}]", msgId, subscription.getTenant(), subscription.getDeviceId(), endpoint.clientIdentifier()); - commandContext.getTracingSpan().log("received PUBACK from device"); - commandContext.accept(); + final Span span = commandContext.getTracingSpan(); + span.log("received PUBACK from device"); + final Command command = commandContext.getCommand(); + if (command.isAckRequired() && command.isValid() + && commandContext instanceof AbstractCommandContext abstractCommandContext) { + abstractCommandContext + .sendDeliverySuccessCommandResponseMessage(HttpURLConnection.HTTP_ACCEPTED, + "Command successfully received", span, command.getCorrelationId(), + command.getMessagingType()) + .onComplete(v -> commandContext.accept()); + } else { + commandContext.accept(); + } }; final Handler onAckTimeoutHandler = v -> { diff --git a/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java b/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java index 3f9f65383a..c31def5fda 100644 --- a/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java +++ b/clients/command-amqp/src/main/java/org/eclipse/hono/client/command/amqp/ProtonBasedCommand.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -193,6 +193,11 @@ public boolean isOneWay() { return replyToId == null; } + @Override + public boolean isAckRequired() { + return false; + } + @Override public boolean isValid() { return !validationError.isPresent(); diff --git a/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java b/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java index a8c5d2fd25..0bfb3ea937 100644 --- a/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java +++ b/clients/command-kafka/src/main/java/org/eclipse/hono/client/command/kafka/KafkaBasedCommand.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2021, 2022 Contributors to the Eclipse Foundation + * Copyright (c) 2021, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -213,6 +213,11 @@ public boolean isOneWay() { return !responseRequired; } + @Override + public boolean isAckRequired() { + return false; + } + @Override public boolean isValid() { return !validationError.isPresent(); diff --git a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java index 84f0d289ce..8bb6d29d7c 100644 --- a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java +++ b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedCommand.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -47,6 +47,7 @@ public final class PubSubBasedCommand implements Command { private final String contentType; private final String requestId; private final boolean responseRequired; + private final boolean ackRequired; private String gatewayId; @@ -58,7 +59,8 @@ private PubSubBasedCommand( final String correlationId, final String subject, final String contentType, - final boolean responseRequired) { + final boolean responseRequired, + final boolean ackRequired) { this.validationError = validationError; this.pubsubMessage = pubsubMessage; @@ -68,6 +70,7 @@ private PubSubBasedCommand( this.subject = subject; this.contentType = contentType; this.responseRequired = responseRequired; + this.ackRequired = ackRequired; this.requestId = Commands.encodeRequestIdParameters(correlationId, MessagingType.pubsub); } @@ -134,10 +137,11 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage, final StringJoiner validationErrorJoiner = new StringJoiner(", "); final boolean responseRequired = PubSubMessageHelper.isResponseRequired(attributes); + final boolean ackRequired = PubSubMessageHelper.isAckRequired(attributes); final String correlationId = PubSubMessageHelper.getCorrelationId(attributes) .filter(id -> !id.isEmpty()) .orElseGet(() -> { - if (responseRequired) { + if (responseRequired || ackRequired) { validationErrorJoiner.add("correlation-id is not set"); } return null; @@ -157,7 +161,8 @@ private static PubSubBasedCommand getCommand(final PubsubMessage pubsubMessage, correlationId, subject, contentType, - responseRequired); + responseRequired, + ackRequired); } /** @@ -169,6 +174,11 @@ public PubsubMessage getPubsubMessage() { return pubsubMessage; } + @Override + public boolean isAckRequired() { + return !responseRequired && ackRequired; + } + @Override public boolean isOneWay() { return !responseRequired; diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java b/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java index 69bf4754d2..898e1e2ca9 100644 --- a/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java +++ b/clients/command/src/main/java/org/eclipse/hono/client/command/AbstractCommandContext.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -14,12 +14,14 @@ import java.net.HttpURLConnection; import java.util.Collections; +import java.util.Map; import java.util.Objects; import java.util.Optional; import org.eclipse.hono.tracing.TracingHelper; import org.eclipse.hono.util.CommandConstants; import org.eclipse.hono.util.MapBasedExecutionContext; +import org.eclipse.hono.util.MessageHelper; import org.eclipse.hono.util.MessagingType; import org.eclipse.hono.util.RegistrationAssertion; import org.eclipse.hono.util.TenantObject; @@ -36,7 +38,8 @@ * in a Pub/Sub and Kafka based command context. * @param The type of Command. */ -public abstract class AbstractCommandContext extends MapBasedExecutionContext implements CommandContext { +public abstract class AbstractCommandContext extends MapBasedExecutionContext + implements CommandContext { protected static final Logger LOG = LoggerFactory.getLogger(AbstractCommandContext.class); @@ -157,24 +160,73 @@ protected Future sendDeliveryFailureCommandResponseMessage( commandResponse.setAdditionalProperties( Collections.unmodifiableMap(command.getDeliveryFailureNotificationProperties())); - return commandResponseSender.sendCommandResponse( - // try to retrieve tenant configuration from context - Optional.ofNullable(get(KEY_TENANT_CONFIG)) - .filter(TenantObject.class::isInstance) - .map(TenantObject.class::cast) - // and fall back to default configuration - .orElseGet(() -> TenantObject.from(command.getTenant())), - new RegistrationAssertion(command.getDeviceId()), - commandResponse, - span.context()) - .onFailure(thr -> { - LOG.debug("failed to publish command response [{}]", commandResponse, thr); - TracingHelper.logError(span, "failed to publish command response message", thr); - }) + return sendCommandResponse(commandResponse, span) .onSuccess(v -> { LOG.debug("published error command response [{}, cause: {}]", commandResponse, cause != null ? cause.getMessage() : error); span.log("published error command response"); }); } + + /** + * Sends a command response as a command acknowledgement. + * + * @param status The HTTP status code indicating the outcome of processing the command. + * @param successMessage The message for the response message body. + * @param span The active OpenTracing span to use for tracking this operation. + * @param correlationId The correlation ID of the command that this is the response for. + * @param messagingType The type of the messaging system via which the command message was received. + * @return A future indicating the outcome of the operation. + *

+ * The future will be succeeded if the command response has been sent. + *

+ * The future will be failed if the command response could not be sent. + */ + public Future sendDeliverySuccessCommandResponseMessage( + final int status, + final String successMessage, + final Span span, + final String correlationId, + final MessagingType messagingType) { + if (correlationId == null) { + TracingHelper.logError(span, "can't send command response message - no correlation id set"); + return Future.failedFuture("missing correlation id"); + } + final JsonObject payloadJson = new JsonObject(); + payloadJson.put("acknowledgement", successMessage != null ? successMessage : ""); + + final CommandResponse commandResponse = new CommandResponse( + command.getTenant(), + command.getDeviceId(), + payloadJson.toBuffer(), + CommandConstants.CONTENT_TYPE_DELIVERY_SUCCESS_NOTIFICATION, + status, + correlationId, + "", + messagingType); + commandResponse.setAdditionalProperties(Map.of(MessageHelper.SYS_PROPERTY_SUBJECT, command.getName())); + + return sendCommandResponse(commandResponse, span) + .onSuccess(v -> { + LOG.debug("published ack command response [{}]", commandResponse); + span.log("published ack command response"); + }); + } + + private Future sendCommandResponse(final CommandResponse commandResponse, final Span span) { + return commandResponseSender.sendCommandResponse( + // try to retrieve tenant configuration from context + Optional.ofNullable(get(KEY_TENANT_CONFIG)) + .filter(TenantObject.class::isInstance) + .map(TenantObject.class::cast) + // and fall back to default configuration + .orElseGet(() -> TenantObject.from(command.getTenant())), + new RegistrationAssertion(command.getDeviceId()), + commandResponse, + span.context()) + .onFailure(thr -> { + LOG.debug("failed to publish command response [{}]", commandResponse, thr); + TracingHelper.logError(span, "failed to publish command response message", thr); + }); + } } diff --git a/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java b/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java index b08fdb2bdb..aea1e9c461 100644 --- a/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java +++ b/clients/command/src/main/java/org/eclipse/hono/client/command/Command.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2020, 2021 Contributors to the Eclipse Foundation + * Copyright (c) 2020, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -35,6 +35,13 @@ public interface Command { */ boolean isOneWay(); + /** + * Checks if an acknowledgement of this command should be sent to the messaging infrastructure. + * + * @return {@code true} if an acknowledgement is required. + */ + boolean isAckRequired(); + /** * Checks if this command contains all required information. * diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java index 4289d6f523..89a1ac9b3d 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubMessageHelper.java @@ -1,5 +1,5 @@ /** - * Copyright (c) 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2023, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -43,6 +43,10 @@ public final class PubSubMessageHelper { * The name of the Pub/Sub message property indicating whether a response to the message is expected/required. */ public static final String PUBSUB_PROPERTY_RESPONSE_REQUIRED = "response-required"; + /** + * The name of the Pub/Sub message property indicating whether an acknowledgement to the message is expected/required. + */ + public static final String PUBSUB_PROPERTY_ACK_REQUIRED = "ack-required"; /** * Prefix to use in the Pub/Sub message properties for marking properties of command messages that should be @@ -202,6 +206,17 @@ public static boolean isResponseRequired(final Map attributesMap .parseBoolean(getAttributesValue(attributesMap, PUBSUB_PROPERTY_RESPONSE_REQUIRED).orElse("false")); } + /** + * Gets the value of the {@value PUBSUB_PROPERTY_ACK_REQUIRED} attribute. + * + * @param attributesMap The attributes map to get the value from. + * @return The attributes value. + */ + public static boolean isAckRequired(final Map attributesMap) { + return Boolean + .parseBoolean(getAttributesValue(attributesMap, PUBSUB_PROPERTY_ACK_REQUIRED).orElse("false")); + } + /** * Gets the value of the {@value MessageHelper#SYS_PROPERTY_CONTENT_TYPE} attribute. * diff --git a/core/src/main/java/org/eclipse/hono/util/CommandConstants.java b/core/src/main/java/org/eclipse/hono/util/CommandConstants.java index 4dd05d9dd9..42f3ecc4a7 100644 --- a/core/src/main/java/org/eclipse/hono/util/CommandConstants.java +++ b/core/src/main/java/org/eclipse/hono/util/CommandConstants.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2016, 2023 Contributors to the Eclipse Foundation + * Copyright (c) 2016, 2024 Contributors to the Eclipse Foundation * * See the NOTICE file(s) distributed with this work for additional * information regarding copyright ownership. @@ -76,6 +76,11 @@ public class CommandConstants { */ public static final String COMMAND_RESPONSE_RESPONSE_PART_SHORT = "s"; + /** + * The content type that is defined for acknowledgement command response messages. + */ + public static final String CONTENT_TYPE_DELIVERY_SUCCESS_NOTIFICATION = "application/vnd.eclipse-hono-delivery-success-notification+json"; + /** * The content type that is defined for error command response messages sent by a protocol adapter or Command Router. */ diff --git a/site/documentation/content/api/command-and-control-pubsub/index.md b/site/documentation/content/api/command-and-control-pubsub/index.md index 609c191b79..10d9b5bdaa 100644 --- a/site/documentation/content/api/command-and-control-pubsub/index.md +++ b/site/documentation/content/api/command-and-control-pubsub/index.md @@ -63,16 +63,23 @@ project and `${tenant_id}` is the ID of the tenant that the client wants to send Metadata MUST be set as Pub/Sub attributes on a message. The following table provides an overview of the attributes the *Business Application* needs to set on a one-way command message. -| Name | Mandatory | Type | Description | -|:---------------|:---------:|:---------|:--------------------------------------------------------------| -| *device_id* | yes | *string* | The identifier of the device that the command is targeted at. | -| *subject* | yes | *string* | The name of the command to be executed by the device. | -| *content-type* | no | *string* | If present, MUST contain a *Media Type* as defined by [RFC 2046](https://tools.ietf.org/html/rfc2046) which describes the semantics and format of the command's input data contained in the message payload. However, not all protocol adapters will support this property as not all transport protocols provide means to convey this information, e.g. MQTT 3.1.1 has no notion of message headers. | +| Name | Mandatory | Type | Description | +|:-----------------|:---------:|:----------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| *device_id* | yes | *string* | The identifier of the device that the command is targeted at. | +| *subject* | yes | *string* | The name of the command to be executed by the device. | +| *content-type* | no | *string* | If present, MUST contain a *Media Type* as defined by [RFC 2046](https://tools.ietf.org/html/rfc2046) which describes the semantics and format of the command's input data contained in the message payload. However, not all protocol adapters will support this property as not all transport protocols provide means to convey this information, e.g. MQTT 3.1.1 has no notion of message headers. | +| *ack-required* | no | *boolean* | If set to `true` a command acknowledgement message will be sent on the command-response topic once the device acknowledges the command. Currently this only works with MQTT devices which have a QoS 1 subscription on the command topic. | +| *correlation-id* | no | *string* | MUST be set if *ack-required* is set to `true`. The identifier used to correlate a response message to the original request. It is used as the *correlation-id* attribute in the response. | The command message MAY contain arbitrary payload, set as message value, to be sent to the device. The value of the message's *subject* attribute may provide a hint to the device regarding the format, encoding and semantics of the -payload -data. +payload data. + +{{% notice info %}} +Currently the acknowledgement mechanism only works with devices connected via the MQTT protocol which have a +subscription on the command topic with a QoS level of 1. Getting an acknowledgement indicates that the device has +successfully received the command. However, it does not confirm whether the device has successfully processed the command. +{{% /notice %}} ## Send a (Request/Response) Command