Skip to content

Commit

Permalink
Review fixes:
Browse files Browse the repository at this point in the history
* removed PubSubBasedAdminClientManagerFactory
* added try catch block to Pub/Sub calls

Signed-off-by: Matthias Kaemmer <[email protected]>
  • Loading branch information
mattkaem committed Oct 31, 2023
1 parent 0ab4653 commit b5c0e27
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
import org.eclipse.hono.client.kafka.producer.KafkaProducerOptions;
import org.eclipse.hono.client.kafka.producer.MessagingKafkaProducerConfigProperties;
import org.eclipse.hono.client.notification.kafka.NotificationKafkaConsumerConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactoryImpl;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager;
import org.eclipse.hono.client.pubsub.PubSubConfigProperties;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.PubSubPublisherOptions;
Expand Down Expand Up @@ -442,7 +442,7 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
vertx,
pubSubConfigProperties.getProjectId(),
provider);
final var pubSubBasedAdminClientManagerFactory = new PubSubBasedAdminClientManagerFactoryImpl(
final var pubSubBasedAdminClientManager = new PubSubBasedAdminClientManager(
pubSubConfigProperties,
provider,
vertx);
Expand All @@ -456,7 +456,7 @@ protected void setCollaborators(final AbstractProtocolAdapterBase<?> adapter) {
tenantClient,
tracer,
subscriberFactory,
pubSubBasedAdminClientManagerFactory,
pubSubBasedAdminClientManager,
null
));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.command.InternalCommandConsumer;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactory;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory;
import org.eclipse.hono.client.pubsub.tracing.PubSubTracingHelper;
Expand Down Expand Up @@ -62,7 +61,7 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum
private final Tracer tracer;
private final PubSubSubscriberFactory subscriberFactory;
private final LifecycleStatus lifecycleStatus = new LifecycleStatus();
private final PubSubBasedAdminClientManagerFactory adminClientManagerFactory;
private final PubSubBasedAdminClientManager adminClientManager;
private final Vertx vertx;
private final MessageReceiver receiver;

Expand All @@ -76,7 +75,7 @@ public class PubSubBasedInternalCommandConsumer implements InternalCommandConsum
* @param tenantClient The client to use for retrieving tenant configuration data.
* @param tracer The OpenTracing tracer.
* @param subscriberFactory The subscriber factory for creating Pub/Sub subscribers for receiving messages.
* @param adminClientManagerFactory The factory to create Pub/Sub based admin client manager to manage topics and
* @param adminClientManager The factory to create Pub/Sub based admin client manager to manage topics and
* subscriptions.
* @param receiver The message receiver used to process the received message.
* @throws NullPointerException If any of these parameters except receiver are {@code null}.
Expand All @@ -89,7 +88,7 @@ public PubSubBasedInternalCommandConsumer(
final TenantClient tenantClient,
final Tracer tracer,
final PubSubSubscriberFactory subscriberFactory,
final PubSubBasedAdminClientManagerFactory adminClientManagerFactory,
final PubSubBasedAdminClientManager adminClientManager,
final MessageReceiver receiver) {
this.vertx = Objects.requireNonNull(vertx);
this.commandResponseSender = Objects.requireNonNull(commandResponseSender);
Expand All @@ -98,7 +97,7 @@ public PubSubBasedInternalCommandConsumer(
this.tenantClient = Objects.requireNonNull(tenantClient);
this.tracer = Objects.requireNonNull(tracer);
this.subscriberFactory = Objects.requireNonNull(subscriberFactory);
this.adminClientManagerFactory = Objects.requireNonNull(adminClientManagerFactory);
this.adminClientManager = Objects.requireNonNull(adminClientManager);
this.receiver = receiver != null ? receiver : createReceiver();
}

Expand Down Expand Up @@ -143,7 +142,6 @@ public Future<Void> start() {
} else if (!lifecycleStatus.setStarting()) {
return Future.failedFuture(new IllegalStateException("subscriber is already started/stopping"));
}
final PubSubBasedAdminClientManager adminClientManager = adminClientManagerFactory.createAdminClientManager();

return adminClientManager
.getOrCreateTopic(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.eclipse.hono.client.command.CommandResponse;
import org.eclipse.hono.client.command.CommandResponseSender;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManager;
import org.eclipse.hono.client.pubsub.PubSubBasedAdminClientManagerFactory;
import org.eclipse.hono.client.pubsub.PubSubMessageHelper;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberClient;
import org.eclipse.hono.client.pubsub.subscriber.PubSubSubscriberFactory;
Expand Down Expand Up @@ -120,9 +119,6 @@ void setUp() {
.getOrCreateSubscription(CommandConstants.INTERNAL_COMMAND_ENDPOINT, adapterInstanceId))
.thenReturn(Future.succeededFuture(topicAndSubscription));

final var adminClientManagerFactory = mock(PubSubBasedAdminClientManagerFactory.class);
when(adminClientManagerFactory.createAdminClientManager()).thenReturn(adminClientManager);

subscriber = mock(PubSubSubscriberClient.class);
when(subscriber.subscribe(true)).thenReturn(Future.succeededFuture());

Expand All @@ -139,7 +135,7 @@ void setUp() {
tenantClient,
tracer,
subscriberFactory,
adminClientManagerFactory,
adminClientManager,
receiver);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.slf4j.LoggerFactory;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.NotFoundException;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
Expand All @@ -31,6 +32,7 @@
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;

import io.vertx.core.Future;
Expand All @@ -49,7 +51,6 @@ public class PubSubBasedAdminClientManager {
*/
private static final long MESSAGE_RETENTION = 600000;
private final String projectId;

private final CredentialsProvider credentialsProvider;
private final Vertx vertx;
private SubscriptionAdminClient subscriptionAdminClient;
Expand All @@ -58,14 +59,15 @@ public class PubSubBasedAdminClientManager {
/**
* Creates a new PubSubBasedAdminClientManager.
*
* @param projectId The identifier of the Google Cloud Project to connect to.
* @param pubSubConfigProperties The Pub/Sub config properties containing the Google project ID.
* @param credentialsProvider The provider for credentials to use for authenticating to the Pub/Sub service.
* @param vertx The Vert.x instance to use.
* @throws NullPointerException if vertx, credentialsProvider or projectId is {@code null}.
*/
PubSubBasedAdminClientManager(final String projectId, final CredentialsProvider credentialsProvider,
final Vertx vertx) {
this.projectId = Objects.requireNonNull(projectId);
public PubSubBasedAdminClientManager(final PubSubConfigProperties pubSubConfigProperties,
final CredentialsProvider credentialsProvider, final Vertx vertx) {
Objects.requireNonNull(pubSubConfigProperties);
this.projectId = Objects.requireNonNull(pubSubConfigProperties.getProjectId());
this.credentialsProvider = Objects.requireNonNull(credentialsProvider);
this.vertx = Objects.requireNonNull(vertx);
}
Expand Down Expand Up @@ -113,30 +115,43 @@ private Future<SubscriptionAdminClient> getOrCreateSubscriptionAdminClient() {
* not be created.
*/
public Future<String> getOrCreateTopic(final String endpoint, final String prefix) {
final String topicName = PubSubMessageHelper.getTopicName(endpoint, prefix);
final TopicName topic = TopicName.of(projectId, topicName);
final TopicName topicName = TopicName.of(projectId, PubSubMessageHelper.getTopicName(endpoint, prefix));

return getOrCreateTopicAdminClient()
.onFailure(thr -> LOG.debug("admin client creation failed", thr))
.compose(client -> getTopic(topic, client)
.compose(client -> getTopic(topicName, client)
.recover(thr -> {
if (thr instanceof NotFoundException) {
return createTopic(topic, client);
return createTopic(topicName, client);
} else {
return Future.failedFuture(thr);
}
}));
}

private Future<String> getTopic(final TopicName topic, final TopicAdminClient client) {
return vertx.executeBlocking(promise -> promise.tryComplete(client.getTopic(topic).getName()));
private Future<String> getTopic(final TopicName topicName, final TopicAdminClient client) {
return vertx.executeBlocking(promise -> {
try {
final Topic topic = client.getTopic(topicName);
promise.complete(topic.getName());
} catch (ApiException e) {
promise.fail(e);
}
});
}

private Future<String> createTopic(final TopicName topic, final TopicAdminClient client) {
private Future<String> createTopic(final TopicName topicName, final TopicAdminClient client) {
final Future<String> createdTopic = vertx
.executeBlocking(promise -> promise.tryComplete(client.createTopic(topic).getName()));
createdTopic.onSuccess(top -> LOG.debug("Topic {} created successfully.", topic))
.onFailure(thr -> LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topic, projectId));
.executeBlocking(promise -> {
try {
final Topic topic = client.createTopic(topicName);
promise.complete(topic.getName());
} catch (ApiException e) {
promise.fail(e);
}
});
createdTopic.onSuccess(top -> LOG.debug("Topic {} created successfully.", topicName))
.onFailure(thr -> LOG.debug("Creating topic failed [topic: {}, projectId: {}]", topicName, projectId));
return createdTopic;
}

Expand All @@ -151,41 +166,56 @@ private Future<String> createTopic(final TopicName topic, final TopicAdminClient
*/
public Future<String> getOrCreateSubscription(final String endpoint, final String prefix) {
final String topicAndSubscriptionName = PubSubMessageHelper.getTopicName(endpoint, prefix);
final TopicName topic = TopicName.of(projectId, topicAndSubscriptionName);
final SubscriptionName subscription = SubscriptionName.of(projectId, topicAndSubscriptionName);
final TopicName topicName = TopicName.of(projectId, topicAndSubscriptionName);
final SubscriptionName subscriptionName = SubscriptionName.of(projectId, topicAndSubscriptionName);

return getOrCreateSubscriptionAdminClient()
.onFailure(thr -> LOG.debug("admin client creation failed", thr))
.compose(client -> getSubscription(subscription, client)
.compose(client -> getSubscription(subscriptionName, client)
.recover(thr -> {
if (thr instanceof NotFoundException) {
return createSubscription(subscription, topic, client);
return createSubscription(subscriptionName, topicName, client);
} else {
return Future.failedFuture(thr);
}
}));
}

private Future<String> getSubscription(final SubscriptionName subscription, final SubscriptionAdminClient client) {
return vertx.executeBlocking(promise -> promise.tryComplete(client.getSubscription(subscription).getName()));
private Future<String> getSubscription(final SubscriptionName subscriptionName,
final SubscriptionAdminClient client) {
return vertx.executeBlocking(promise -> {
try {
final Subscription subscription = client.getSubscription(subscriptionName);
promise.complete(subscription.getName());
} catch (ApiException e) {
promise.fail(e);
}
});
}

private Future<String> createSubscription(final SubscriptionName subscription, final TopicName topic,
private Future<String> createSubscription(final SubscriptionName subscriptionName, final TopicName topicName,
final SubscriptionAdminClient client) {
final Subscription request = Subscription.newBuilder()
.setName(subscription.toString())
.setTopic(topic.toString())
.setName(subscriptionName.toString())
.setTopic(topicName.toString())
.setPushConfig(PushConfig.getDefaultInstance())
.setAckDeadlineSeconds(0)
.setMessageRetentionDuration(Durations.fromMillis(MESSAGE_RETENTION))
.setExpirationPolicy(ExpirationPolicy.getDefaultInstance())
.build();
final Future<String> createdSubscription = vertx
.executeBlocking(promise -> promise.tryComplete(client.createSubscription(request).getName()));
createdSubscription.onSuccess(sub -> LOG.debug("Subscription {} created successfully.", subscription))
.executeBlocking(promise -> {
try {
final Subscription subscription = client.createSubscription(request);
promise.complete(subscription.getName());
} catch (ApiException e) {
promise.fail(e);
}
});
createdSubscription.onSuccess(sub -> LOG.debug("Subscription {} created successfully.", subscriptionName))
.onFailure(
thr -> LOG.debug("Creating subscription failed [subscription: {}, topic: {}, project: {}]",
subscription, topic, projectId));
subscriptionName, topicName, projectId));
return createdSubscription;
}

Expand Down

This file was deleted.

This file was deleted.

0 comments on commit b5c0e27

Please sign in to comment.