Skip to content

Commit

Permalink
Review fixes:
Browse files Browse the repository at this point in the history
changed code to execute calls to pubsub on worker thread plus some additional refactoring

Signed-off-by: Matthias Kaemmer <[email protected]>
  • Loading branch information
mattkaem committed Oct 27, 2023
1 parent 72b66f4 commit 0ab4653
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactoryImpl;
import org.eclipse.hono.client.pubsub.PubSubConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.PubSubPublisherOptions;
Expand Down Expand Up @@ -441,6 +442,10 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
vertx,
pubSubConfigProperties.getProjectId(),
provider);
final var pubSubBasedAdminClientManagerFactory = new PubSubBasedAdminClientManagerFactoryImpl(
pubSubConfigProperties,
provider,
vertx);

commandConsumerFactory.registerInternalCommandConsumer(
(id, handlers) -> new PubSubBasedInternalCommandConsumer(
Expand All @@ -451,8 +456,9 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
tenantClient,
tracer,
subscriberFactory,
pubSubConfigProperties.getProjectId(),
provider));
pubSubBasedAdminClientManagerFactory,
null
));
}
}, () -> LOG.error("Could not initialize Pub/Sub based internal command consumer, no Credentials Provider present."));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.hono.client.command.pubsub;

import java.net.HttpURLConnection;
Expand All @@ -23,6 +24,7 @@
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.InternalCommandConsumer;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactory;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory;
import org.eclipse.hono.client.pubsub.tracing.PubSubTracingHelper;
Expand All @@ -33,7 +35,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.pubsub.v1.PubsubMessage;
Expand Down Expand Up @@ -61,9 +62,9 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum
private final Tracer tracer;
private final PubSubSubscriberFactory subscriberFactory;
private final LifecycleStatus lifecycleStatus = new LifecycleStatus();
private final PubSubBasedAdminClientManager adminClientManager;
private final PubSubBasedAdminClientManagerFactory adminClientManagerFactory;
private final Vertx vertx;
private MessageReceiver receiver;
private final MessageReceiver receiver;

/**
* Creates a Pub/Sub based internal command consumer.
Expand All @@ -75,54 +76,10 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum
* @param tenantClient The client to use for retrieving tenant configuration data.
* @param tracer The OpenTracing tracer.
* @param subscriberFactory The subscriber factory for creating Pub/Sub subscribers for receiving messages.
* @param projectId The identifier of the Google Cloud Project to connect to.
* @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service.
* @throws NullPointerException If any of these parameters are {@code null}.
*/
public PubSubBasedInternalCommandConsumer(
final CommandResponseSender commandResponseSender,
final Vertx vertx,
final String adapterInstanceId,
final CommandHandlers commandHandlers,
final TenantClient tenantClient,
final Tracer tracer,
final PubSubSubscriberFactory subscriberFactory,
final String projectId,
final CredentialsProvider credentialsProvider) {
Objects.requireNonNull(projectId);
Objects.requireNonNull(credentialsProvider);
this.vertx = Objects.requireNonNull(vertx);
this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
this.adapterInstanceId = Objects.requireNonNull(adapterInstanceId);
this.commandHandlers = Objects.requireNonNull(commandHandlers);
this.tenantClient = Objects.requireNonNull(tenantClient);
this.tracer = Objects.requireNonNull(tracer);
this.subscriberFactory = Objects.requireNonNull(subscriberFactory);
this.adminClientManager = new PubSubBasedAdminClientManager(projectId, credentialsProvider);
createReceiver();
adminClientManager
.getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)
.onFailure(thr -> log.error("Could not create topic for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.onSuccess(s -> subscriberFactory.getOrCreateSubscriber(s, receiver));
}

/**
* Creates a Pub/Sub based internal command consumer. To be used for Unittests.
*
* @param commandResponseSender The sender used to send command responses.
* @param vertx The Vert.x instance to use.
* @param adapterInstanceId The adapter instance id.
* @param commandHandlers The command handlers to choose from for handling a received command.
* @param tenantClient The client to use for retrieving tenant configuration data.
* @param tracer The OpenTracing tracer.
* @param subscriberFactory The subscriber factory for creating Pub/Sub subscribers for receiving messages.
* @param adminClientManager The Pub/Sub based admin client manager to manage topics and subscriptions.
* @param adminClientManagerFactory The factory to create Pub/Sub based admin client manager to manage topics and
* subscriptions.
* @param receiver The message receiver used to process the received message.
* @throws NullPointerException If any of these parameters are {@code null}.
* @throws NullPointerException If any of these parameters except receiver are {@code null}.
*/
public PubSubBasedInternalCommandConsumer(
final CommandResponseSender commandResponseSender,
Expand All @@ -132,7 +89,7 @@ public PubSubBasedInternalCommandConsumer(
final TenantClient tenantClient,
final Tracer tracer,
final PubSubSubscriberFactory subscriberFactory,
final PubSubBasedAdminClientManager adminClientManager,
final PubSubBasedAdminClientManagerFactory adminClientManagerFactory,
final MessageReceiver receiver) {
this.vertx = Objects.requireNonNull(vertx);
this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
Expand All @@ -141,16 +98,12 @@ public PubSubBasedInternalCommandConsumer(
this.tenantClient = Objects.requireNonNull(tenantClient);
this.tracer = Objects.requireNonNull(tracer);
this.subscriberFactory = Objects.requireNonNull(subscriberFactory);
this.adminClientManager = Objects.requireNonNull(adminClientManager);
this.receiver = Objects.requireNonNull(receiver);
adminClientManager
.getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)
.onFailure(thr -> log.error("Could not create topic for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.onSuccess(s -> subscriberFactory.getOrCreateSubscriber(s, receiver));
this.adminClientManagerFactory = Objects.requireNonNull(adminClientManagerFactory);
this.receiver = receiver != null ? receiver : createReceiver();
}

private MessageReceiver createReceiver() {
return this::handleCommandMessage;
}

@Override
Expand Down Expand Up @@ -190,22 +143,28 @@ public Future<Void> start() {
} else if (!lifecycleStatus.setStarting()) {
return Future.failedFuture(new IllegalStateException("subscriber is already started/stopping"));
}
final PubSubBasedAdminClientManager adminClientManager = adminClientManagerFactory.createAdminClientManager();

final String subscriptionId = PubSubMessageHelper.getTopicName(
CommandConstants.INTERNAL_COMMAND_ENDPOINT,
adapterInstanceId);
return subscriberFactory
.getOrCreateSubscriber(subscriptionId, receiver)
.subscribe(true)
return adminClientManager
.getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)
.onFailure(thr -> log.error("Could not create topic for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.compose(t -> adminClientManager.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT,
adapterInstanceId))
.onComplete(v -> vertx.executeBlocking(promise -> {
adminClientManager.closeAdminClients();
promise.complete();
}))
.onFailure(thr -> log.error("Could not create subscription for endpoint {} and {}",
CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId, thr))
.compose(s -> subscriberFactory.getOrCreateSubscriber(PubSubMessageHelper.getTopicName(
CommandConstants.INTERNAL_COMMAND_ENDPOINT,
adapterInstanceId), receiver).subscribe(true))
.onSuccess(s -> lifecycleStatus.setStarted())
.onFailure(
e -> log.warn("Error starting Internal Command Consumer for adapter {}", adapterInstanceId, e));
}

private void createReceiver() {
receiver = this::handleCommandMessage;
}

Future<Void> handleCommandMessage(final PubsubMessage message, final AckReplyConsumer consumer) {
consumer.ack();
final PubSubBasedCommand command;
Expand Down Expand Up @@ -260,13 +219,8 @@ Future<Void> handleCommandMessage(final PubsubMessage message, final AckReplyCon

@Override
public Future<Void> stop() {
return lifecycleStatus.runStopAttempt(() -> Future.all(
subscriberFactory.closeSubscriber(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId),
vertx.executeBlocking(promise -> {
adminClientManager.closeAdminClients();
promise.complete();
})
).mapEmpty());
return lifecycleStatus.runStopAttempt(
() -> subscriberFactory.closeSubscriber(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/

package org.eclipse.hono.client.command.pubsub;

import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -37,6 +38,7 @@
import org.eclipse.hono.client.command.CommandResponse;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactory;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberClient;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory;
Expand Down Expand Up @@ -118,6 +120,9 @@ void setUp() {
.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.thenReturn(Future.succeededFuture(topicAndSubscription));

final var adminClientManagerFactory = mock(PubSubBasedAdminClientManagerFactory.class);
when(adminClientManagerFactory.createAdminClientManager()).thenReturn(adminClientManager);

subscriber = mock(PubSubSubscriberClient.class);
when(subscriber.subscribe(true)).thenReturn(Future.succeededFuture());

Expand All @@ -134,7 +139,7 @@ void setUp() {
tenantClient,
tracer,
subscriberFactory,
adminClientManager,
adminClientManagerFactory,
receiver);

}
Expand Down
Loading

0 comments on commit 0ab4653

Please sign in to comment.