diff --git a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java index 468d100464..11292e1a26 100644 --- a/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java +++ b/adapter-base/src/main/java/org/eclipse/hono/adapter/AbstractProtocolAdapterApplication.java @@ -59,6 +59,7 @@ import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions; import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties; import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties; +import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager; import org.eclipse.hono.client.pubsub.PubSubConfigProperties; import org.eclipse.hono.client.pubsub.PubSubMessageHelper; import org.eclipse.hono.client.pubsub.PubSubPublisherOptions; @@ -441,6 +442,10 @@ protected void setCollaborators(final AbstractProtocolAdapterBase adapter) { vertx, pubSubConfigProperties.getProjectId(), provider); + final var pubSubBasedAdminClientManager = new PubSubBasedAdminClientManager( + pubSubConfigProperties, + provider, + vertx); commandConsumerFactory.registerInternalCommandConsumer( (id, handlers) -> new PubSubBasedInternalCommandConsumer( @@ -451,8 +456,9 @@ protected void setCollaborators(final AbstractProtocolAdapterBase adapter) { tenantClient, tracer, subscriberFactory, - pubSubConfigProperties.getProjectId(), - provider)); + pubSubBasedAdminClientManager, + null + )); } }, () -> LOG.error("Could not initialize Pub/Sub based internal command consumer, no Credentials Provider present.")); } diff --git a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java index 946ef3da7d..24021d4545 100644 --- a/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java +++ b/clients/command-pubsub/src/main/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumer.java @@ -10,6 +10,7 @@ * * SPDX-License-Identifier: EPL-2.0 */ + package org.eclipse.hono.client.command.pubsub; import java.net.HttpURLConnection; @@ -33,7 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.api.gax.core.CredentialsProvider; import com.google.cloud.pubsub.v1.AckReplyConsumer; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.pubsub.v1.PubsubMessage; @@ -63,7 +63,7 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum private final LifecycleStatus lifecycleStatus = new LifecycleStatus(); private final PubSubBasedAdminClientManager adminClientManager; private final Vertx vertx; - private MessageReceiver receiver; + private final MessageReceiver receiver; /** * Creates a Pub/Sub based internal command consumer. @@ -75,54 +75,10 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum * @param tenantClient The client to use for retrieving tenant configuration data. * @param tracer The OpenTracing tracer. * @param subscriberFactory The subscriber factory for creating Pub/Sub subscribers for receiving messages. - * @param projectId The identifier of the Google Cloud Project to connect to. - * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. - * @throws NullPointerException If any of these parameters are {@code null}. - */ - public PubSubBasedInternalCommandConsumer( - final CommandResponseSender commandResponseSender, - final Vertx vertx, - final String adapterInstanceId, - final CommandHandlers commandHandlers, - final TenantClient tenantClient, - final Tracer tracer, - final PubSubSubscriberFactory subscriberFactory, - final String projectId, - final CredentialsProvider credentialsProvider) { - Objects.requireNonNull(projectId); - Objects.requireNonNull(credentialsProvider); - this.vertx = Objects.requireNonNull(vertx); - this.commandResponseSender = Objects.requireNonNull(commandResponseSender); - this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId); - this.commandHandlers = Objects.requireNonNull(commandHandlers); - this.tenantClient = Objects.requireNonNull(tenantClient); - this.tracer = Objects.requireNonNull(tracer); - this.subscriberFactory = Objects.requireNonNull(subscriberFactory); - this.adminClientManager = new PubSubBasedAdminClientManager(projectId, credentialsProvider); - createReceiver(); - adminClientManager - .getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId) - .onFailure(thr -> log.error("Could not create topic for endpoint {} and {}", - CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) - .compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)) - .onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}", - CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) - .onSuccess(s -> subscriberFactory.getOrCreateSubscriber(s, receiver)); - } - - /** - * Creates a Pub/Sub based internal command consumer. To be used for Unittests. - * - * @param commandResponseSender The sender used to send command responses. - * @param vertx The Vert.x instance to use. - * @param adapterInstanceId The adapter instance id. - * @param commandHandlers The command handlers to choose from for handling a received command. - * @param tenantClient The client to use for retrieving tenant configuration data. - * @param tracer The OpenTracing tracer. - * @param subscriberFactory The subscriber factory for creating Pub/Sub subscribers for receiving messages. - * @param adminClientManager The Pub/Sub based admin client manager to manage topics and subscriptions. + * @param adminClientManager The factory to create Pub/Sub based admin client manager to manage topics and + * subscriptions. * @param receiver The message receiver used to process the received message. - * @throws NullPointerException If any of these parameters are {@code null}. + * @throws NullPointerException If any of these parameters except receiver are {@code null}. */ public PubSubBasedInternalCommandConsumer( final CommandResponseSender commandResponseSender, @@ -142,15 +98,11 @@ public PubSubBasedInternalCommandConsumer( this.tracer = Objects.requireNonNull(tracer); this.subscriberFactory = Objects.requireNonNull(subscriberFactory); this.adminClientManager = Objects.requireNonNull(adminClientManager); - this.receiver = Objects.requireNonNull(receiver); - adminClientManager - .getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId) - .onFailure(thr -> log.error("Could not create topic for endpoint {} and {}", - CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) - .compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)) - .onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}", - CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) - .onSuccess(s -> subscriberFactory.getOrCreateSubscriber(s, receiver)); + this.receiver = receiver != null ? receiver : createReceiver(); + } + + private MessageReceiver createReceiver() { + return this::handleCommandMessage; } @Override @@ -191,21 +143,26 @@ public Future start() { return Future.failedFuture(new IllegalStateException("subscriber is already started/stopping")); } - final String subscriptionId = PubSubMessageHelper.getTopicName( - CommandConstants.INTERNAL_COMMAND_ENDPOINT, - adapterInstanceId); - return subscriberFactory - .getOrCreateSubscriber(subscriptionId, receiver) - .subscribe(true) + return adminClientManager + .getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId) + .onFailure(thr -> log.error("Could not create topic for endpoint {} and {}", + CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) + .compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, + adapterInstanceId)) + .onComplete(v -> vertx.executeBlocking(promise -> { + adminClientManager.closeAdminClients(); + promise.complete(); + })) + .onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}", + CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr)) + .compose(s -> subscriberFactory.getOrCreateSubscriber(PubSubMessageHelper.getTopicName( + CommandConstants.INTERNAL_COMMAND_ENDPOINT, + adapterInstanceId), receiver).subscribe(true)) .onSuccess(s -> lifecycleStatus.setStarted()) .onFailure( e -> log.warn("Error starting Internal Command Consumer for adapter {}", adapterInstanceId, e)); } - private void createReceiver() { - receiver = this::handleCommandMessage; - } - Future handleCommandMessage(final PubsubMessage message, final AckReplyConsumer consumer) { consumer.ack(); final PubSubBasedCommand command; @@ -260,13 +217,8 @@ Future handleCommandMessage(final PubsubMessage message, final AckReplyCon @Override public Future stop() { - return lifecycleStatus.runStopAttempt(() -> Future.all( - subscriberFactory.closeSubscriber(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId), - vertx.executeBlocking(promise -> { - adminClientManager.closeAdminClients(); - promise.complete(); - }) - ).mapEmpty()); + return lifecycleStatus.runStopAttempt( + () -> subscriberFactory.closeSubscriber(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)); } } diff --git a/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java b/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java index 98b4ff625a..3abe813a3f 100644 --- a/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java +++ b/clients/command-pubsub/src/test/java/org/eclipse/hono/client/command/pubsub/PubSubBasedInternalCommandConsumerTest.java @@ -10,6 +10,7 @@ * * SPDX-License-Identifier: EPL-2.0 */ + package org.eclipse.hono.client.command.pubsub; import static org.mockito.ArgumentMatchers.any; diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java index 93e9484472..8b0c890ba7 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubBasedAdminClientManager.java @@ -10,25 +10,25 @@ * * SPDX-License-Identifier: EPL-2.0 *******************************************************************************/ + package org.eclipse.hono.client.pubsub; import java.io.IOException; -import java.util.HashSet; import java.util.Objects; -import java.util.Set; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.api.gax.core.CredentialsProvider; -import com.google.api.gax.rpc.AlreadyExistsException; import com.google.api.gax.rpc.ApiException; +import com.google.api.gax.rpc.NotFoundException; import com.google.cloud.pubsub.v1.SubscriptionAdminClient; import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; import com.google.cloud.pubsub.v1.TopicAdminClient; import com.google.cloud.pubsub.v1.TopicAdminSettings; import com.google.protobuf.util.Durations; +import com.google.pubsub.v1.ExpirationPolicy; import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.Subscription; import com.google.pubsub.v1.SubscriptionName; @@ -36,6 +36,7 @@ import com.google.pubsub.v1.TopicName; import io.vertx.core.Future; +import io.vertx.core.Vertx; /** * A Pub/Sub based admin client manager to manage topics and subscriptions. Wraps a TopicAdminClient and a @@ -50,29 +51,25 @@ public class PubSubBasedAdminClientManager { */ private static final long MESSAGE_RETENTION = 600000; private final String projectId; - - /** - * A set of existing subscriptions. It contains subscriptions with the format - * `"projects/{project}/subscriptions/{subscription}"`. - */ - private final Set subscriptions = new HashSet<>(); - /** - * A set of existing topics. It contains topics with the format `"projects/{project}/topics/{topic}"`. - */ - private final Set topics = new HashSet<>(); private final CredentialsProvider credentialsProvider; + private final Vertx vertx; private SubscriptionAdminClient subscriptionAdminClient; private TopicAdminClient topicAdminClient; /** * Creates a new PubSubBasedAdminClientManager. * - * @param projectId The identifier of the Google Cloud Project to connect to. + * @param pubSubConfigProperties The Pub/Sub config properties containing the Google project ID. * @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service. + * @param vertx The Vert.x instance to use. + * @throws NullPointerException if vertx, credentialsProvider or projectId is {@code null}. */ - public PubSubBasedAdminClientManager(final String projectId, final CredentialsProvider credentialsProvider) { - this.projectId = Objects.requireNonNull(projectId); + public PubSubBasedAdminClientManager(final PubSubConfigProperties pubSubConfigProperties, + final CredentialsProvider credentialsProvider, final Vertx vertx) { + Objects.requireNonNull(pubSubConfigProperties); + this.projectId = Objects.requireNonNull(pubSubConfigProperties.getProjectId()); this.credentialsProvider = Objects.requireNonNull(credentialsProvider); + this.vertx = Objects.requireNonNull(vertx); } private Future getOrCreateTopicAdminClient() { @@ -114,96 +111,118 @@ private Future getOrCreateSubscriptionAdminClient() { * * @param endpoint The endpoint name of the topic, e.g. command_internal. * @param prefix The prefix of the topic, e.g. the adapter instance ID. - * @return A succeeded Future if the topic is successfully created or already exists, or a failed - * Future if it could not be created. + * @return A succeeded Future if the topic is successfully created or already exists, or a failed Future if it could + * not be created. */ public Future getOrCreateTopic(final String endpoint, final String prefix) { - final String topicName = PubSubMessageHelper.getTopicName(endpoint, prefix); - final TopicName topic = TopicName.of(projectId, topicName); + final TopicName topicName = TopicName.of(projectId, PubSubMessageHelper.getTopicName(endpoint, prefix)); - if (topics.contains(topic.toString())) { - LOG.debug("Topic {} already exists, continue", topic); - return Future.succeededFuture(topic.getTopic()); - } return getOrCreateTopicAdminClient() .onFailure(thr -> LOG.debug("admin client creation failed", thr)) - .compose(client -> getOrCreateTopic(projectId, topic, client)); + .compose(client -> getTopic(topicName, client) + .recover(thr -> { + if (thr instanceof NotFoundException) { + return createTopic(topicName, client); + } else { + return Future.failedFuture(thr); + } + })); } - private Future getOrCreateTopic(final String projectId, final TopicName topic, final TopicAdminClient client) { - try { - final Topic createdTopic = client.createTopic(topic); - if (createdTopic == null) { - LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topic, projectId); - return Future.failedFuture("Topic creation failed."); + private Future getTopic(final TopicName topicName, final TopicAdminClient client) { + return vertx.executeBlocking(promise -> { + try { + final Topic topic = client.getTopic(topicName); + promise.complete(topic.getName()); + } catch (ApiException e) { + promise.fail(e); } - topics.add(createdTopic.getName()); - return Future.succeededFuture(topic.getTopic()); - } catch (AlreadyExistsException ex) { - return Future.succeededFuture(topic.getTopic()); - } catch (ApiException e) { - LOG.debug("Error creating topic {} on project {}", topic, projectId, e); - return Future.failedFuture("Topic creation failed."); - } + }); + } + private Future createTopic(final TopicName topicName, final TopicAdminClient client) { + final Future createdTopic = vertx + .executeBlocking(promise -> { + try { + final Topic topic = client.createTopic(topicName); + promise.complete(topic.getName()); + } catch (ApiException e) { + promise.fail(e); + } + }); + createdTopic.onSuccess(top -> LOG.debug("Topic {} created successfully.", topicName)) + .onFailure(thr -> LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topicName, projectId)); + return createdTopic; } /** - * Gets an existing subscription or creates a new one on Pub/Sub based on the given subscription endpoint and prefix. + * Gets an existing subscription or creates a new one on Pub/Sub based on the given subscription endpoint and + * prefix. * * @param endpoint The endpoint name of the subscription, e.g. command_internal. * @param prefix The prefix of the subscription, e.g. the adapter instance ID. - * @return A succeeded Future if the subscription is successfully created or already exists, or a failed - * Future if it could not be created. + * @return A succeeded Future if the subscription is successfully created or already exists, or a failed Future if + * it could not be created. */ public Future getOrCreateSubscription(final String endpoint, final String prefix) { final String topicAndSubscriptionName = PubSubMessageHelper.getTopicName(endpoint, prefix); - final TopicName topic = TopicName.of(projectId, topicAndSubscriptionName); - final SubscriptionName subscription = SubscriptionName.of(projectId, topicAndSubscriptionName); + final TopicName topicName = TopicName.of(projectId, topicAndSubscriptionName); + final SubscriptionName subscriptionName = SubscriptionName.of(projectId, topicAndSubscriptionName); - if (subscriptions.contains(subscription.toString())) { - LOG.debug("Subscription {} already exists, continue", subscription); - return Future.succeededFuture(subscription.getSubscription()); - } return getOrCreateSubscriptionAdminClient() .onFailure(thr -> LOG.debug("admin client creation failed", thr)) - .compose(client -> getOrCreateSubscription(projectId, subscription, topic, client)); + .compose(client -> getSubscription(subscriptionName, client) + .recover(thr -> { + if (thr instanceof NotFoundException) { + return createSubscription(subscriptionName, topicName, client); + } else { + return Future.failedFuture(thr); + } + })); } - private Future getOrCreateSubscription( - final String projectId, - final SubscriptionName subscription, - final TopicName topic, + private Future getSubscription(final SubscriptionName subscriptionName, final SubscriptionAdminClient client) { - try { - final Subscription request = Subscription.newBuilder() - .setName(subscription.toString()) - .setTopic(topic.toString()) - .setPushConfig(PushConfig.getDefaultInstance()) - .setAckDeadlineSeconds(0) - .setMessageRetentionDuration(Durations.fromMillis(MESSAGE_RETENTION)) - .build(); - final Subscription createdSubscription = client.createSubscription(request); - if (createdSubscription == null) { - LOG.debug("Creating subscription failed [subscription: {}, topic: {}, project: {}]", subscription, - topic, - projectId); - return Future.failedFuture("Subscription creation failed."); + return vertx.executeBlocking(promise -> { + try { + final Subscription subscription = client.getSubscription(subscriptionName); + promise.complete(subscription.getName()); + } catch (ApiException e) { + promise.fail(e); } - subscriptions.add(createdSubscription.getName()); - return Future.succeededFuture(subscription.getSubscription()); - } catch (AlreadyExistsException ex) { - return Future.succeededFuture(subscription.getSubscription()); - } catch (ApiException e) { - LOG.debug("Error creating subscription {} for topic {} on project {}", subscription, topic, projectId, e); - return Future.failedFuture("Subscription creation failed."); - } + }); + } + + private Future createSubscription(final SubscriptionName subscriptionName, final TopicName topicName, + final SubscriptionAdminClient client) { + final Subscription request = Subscription.newBuilder() + .setName(subscriptionName.toString()) + .setTopic(topicName.toString()) + .setPushConfig(PushConfig.getDefaultInstance()) + .setAckDeadlineSeconds(0) + .setMessageRetentionDuration(Durations.fromMillis(MESSAGE_RETENTION)) + .setExpirationPolicy(ExpirationPolicy.getDefaultInstance()) + .build(); + final Future createdSubscription = vertx + .executeBlocking(promise -> { + try { + final Subscription subscription = client.createSubscription(request); + promise.complete(subscription.getName()); + } catch (ApiException e) { + promise.fail(e); + } + }); + createdSubscription.onSuccess(sub -> LOG.debug("Subscription {} created successfully.", subscriptionName)) + .onFailure( + thr -> LOG.debug("Creating subscription failed [subscription: {}, topic: {}, project: {}]", + subscriptionName, topicName, projectId)); + return createdSubscription; } /** * Closes the TopicAdminClient and the SubscriptionAdminClient if they exist. This method is expected to be invoked - * as soon as the TopicAdminClient and the SubscriptionAdminClient is no longer needed. - * This method will bock the current thread for up to 10 seconds! + * as soon as the TopicAdminClient and the SubscriptionAdminClient is no longer needed. This method will block the + * current thread for up to 10 seconds! */ public void closeAdminClients() { if (topicAdminClient == null && subscriptionAdminClient == null) { diff --git a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java index d77278561e..39e807b3a9 100644 --- a/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java +++ b/clients/pubsub-common/src/main/java/org/eclipse/hono/client/pubsub/PubSubConfigProperties.java @@ -10,11 +10,15 @@ * * SPDX-License-Identifier: EPL-2.0 *******************************************************************************/ + package org.eclipse.hono.client.pubsub; +import jakarta.inject.Singleton; + /** * Common configuration properties required for access to Pub/Sub. */ +@Singleton public final class PubSubConfigProperties { private String projectId = null; diff --git a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/CommonConfigPropertiesProducer.java b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/CommonConfigPropertiesProducer.java index 531dd98d26..0afe5ed92f 100644 --- a/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/CommonConfigPropertiesProducer.java +++ b/services/device-registry-base/src/main/java/org/eclipse/hono/deviceregistry/app/CommonConfigPropertiesProducer.java @@ -19,8 +19,6 @@ import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions; import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties; import org.eclipse.hono.client.notification.kafka.NotificationKafkaProducerConfigProperties; -import org.eclipse.hono.client.pubsub.PubSubConfigProperties; -import org.eclipse.hono.client.pubsub.PubSubPublisherOptions; import org.eclipse.hono.deviceregistry.service.device.AutoProvisionerConfigOptions; import org.eclipse.hono.deviceregistry.service.device.AutoProvisionerConfigProperties; @@ -49,12 +47,6 @@ ClientConfigProperties downstreamSenderProperties( return result; } - @Produces - @Singleton - PubSubConfigProperties pubSubConfigProperties(final PubSubPublisherOptions options) { - return new PubSubConfigProperties(options); - } - @Produces @Singleton MessagingKafkaProducerConfigProperties eventKafkaProducerClientOptions(