diff --git a/quarkus-solace-messaging-connector/runtime/pom.xml b/quarkus-solace-messaging-connector/runtime/pom.xml index 6246485..0cf22fa 100644 --- a/quarkus-solace-messaging-connector/runtime/pom.xml +++ b/quarkus-solace-messaging-connector/runtime/pom.xml @@ -11,10 +11,6 @@ quarkus-solace-messaging-connector Quarkus Solace Messaging Connector - Runtime - - io.smallrye.config - smallrye-config - io.quarkus quarkus-smallrye-reactive-messaging diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java index 9b5b9b7..634a05a 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java @@ -119,18 +119,19 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i .call(() -> Uni.createFrom().completionStage(receiver.startAsync())) : m) .onItem().invoke(() -> alive.set(true)) - .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke((t) -> { - failures.add(t); - alive.set(false); - }); + .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(this::reportFailure); if (!lazyStart) { receiver.start(); } } - private void reportFailure(Throwable throwable) { - failures.add(throwable); + private synchronized void reportFailure(Throwable throwable) { alive.set(false); + // Don't keep all the failures, there are only there for reporting. + if (failures.size() == 10) { + failures.remove(0); + } + failures.add(throwable); } private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) { @@ -208,12 +209,16 @@ public void close() { } closed.compareAndSet(false, true); if (this.pollerThread != null) { - this.pollerThread.shutdown(); - try { - this.pollerThread.awaitTermination(3000, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - SolaceLogging.log.shutdownException(e.getMessage()); - throw new RuntimeException(e); + if (this.gracefulShutdown) { + this.pollerThread.shutdown(); + try { + this.pollerThread.awaitTermination(3000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + SolaceLogging.log.shutdownException(e.getMessage()); + throw new RuntimeException(e); + } + } else { + this.pollerThread.shutdownNow(); } } receiver.terminate(3000); @@ -233,10 +238,8 @@ public void isAlive(HealthReport.HealthReportBuilder builder) { synchronized (this) { reportedFailures = new ArrayList<>(failures); } - builder.add(channel, solace.isConnected() && alive.get(), reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining())); - failures.removeAll(reportedFailures); } else { builder.add(channel, solace.isConnected() && alive.get()); } diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java index ca01133..1b4d353 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java @@ -76,10 +76,7 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o boolean lazyStart = oc.getClientLazyStart(); this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel)); this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(), - m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke((t) -> { - failures.add(t); - alive.set(false); - })); + m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke(this::reportFailure)); this.subscriber = MultiUtils.via(processor, multi -> multi.plug( m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m)); if (!lazyStart) { @@ -106,11 +103,18 @@ private Uni sendMessage(MessagingService solace, Message m, boolean wai return Uni.createFrom().completionStage(m.getAck()); }) .onFailure().recoverWithUni(t -> { - failures.add(t); - alive.set(false); + reportFailure(t); return Uni.createFrom().completionStage(m.nack(t)); }); + } + private synchronized void reportFailure(Throwable throwable) { + alive.set(false); + // Don't keep all the failures, there are only there for reporting. + if (failures.size() == 10) { + failures.remove(0); + } + failures.add(throwable); } private Uni publishMessage(PersistentMessagePublisher publisher, Message m, @@ -249,9 +253,9 @@ public void isAlive(HealthReport.HealthReportBuilder builder) { synchronized (this) { reportedFailures = new ArrayList<>(failures); } + System.out.println(reportedFailures); builder.add(channel, solace.isConnected() && alive.get(), reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining())); - failures.removeAll(reportedFailures); } else { builder.add(channel, solace.isConnected() && alive.get()); } diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java index ca679c0..d5708b2 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java @@ -46,7 +46,7 @@ private SolaceConsumerTest() { @Test @Order(1) void consumer() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") @@ -74,7 +74,7 @@ void consumer() { @Test @Order(2) void consumerReplay() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") @@ -94,7 +94,7 @@ void consumerReplay() { @Test @Order(3) void consumerWithSelectorQuery() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") @@ -124,7 +124,7 @@ void consumerWithSelectorQuery() { @Test @Order(4) void consumerFailedProcessingPublishToErrorTopic() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") @@ -158,7 +158,7 @@ void consumerFailedProcessingPublishToErrorTopic() { @Test @Order(5) void consumerFailedProcessingMoveToDMQ() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") @@ -193,7 +193,7 @@ void consumerFailedProcessingMoveToDMQ() { @Test @Order(6) void partitionedQueue() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.incoming.consumer-1.connector", "quarkus-solace") .with("mp.messaging.incoming.consumer-1.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME) @@ -262,7 +262,7 @@ void partitionedQueue() { @Test @Order(7) void consumerPublishToErrorTopicPermissionException() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") @@ -340,7 +340,7 @@ void consumerGracefulCloseTest() { @Test @Order(9) void consumerCreateMissingResourceAddSubscriptionPermissionException() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceProcessorTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceProcessorTest.java index a08872b..e9071e8 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceProcessorTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceProcessorTest.java @@ -28,7 +28,7 @@ public class SolaceProcessorTest extends WeldTestBase { @Test void consumer() { String processedTopic = topic + "/processed"; - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolacePublisherTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolacePublisherTest.java index 6a8c5f4..d9474db 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolacePublisherTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolacePublisherTest.java @@ -30,7 +30,7 @@ public class SolacePublisherTest extends WeldTestBase { @Test void publisher() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", topic); @@ -53,7 +53,7 @@ void publisher() { @Test void publisherWithDynamicDestination() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", topic); @@ -80,7 +80,7 @@ void publisherWithDynamicDestination() { @Test void publisherWithBackPressureReject() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", topic) .with("mp.messaging.outgoing.out.producer.back-pressure.buffer-capacity", 1); diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceBrokerExtension.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceBrokerExtension.java index 2dab5f6..d3a4403 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceBrokerExtension.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceBrokerExtension.java @@ -44,7 +44,7 @@ public static SolaceContainer createSolaceContainer() { public void startSolaceBroker() { solace = createSolaceContainer() .withCredentials("user", "pass") - .withExposedPorts(SolaceContainer.Service.SMF.getPort()) + .withExposedPorts(SolaceContainer.Service.SMF.getPort(), 8080) .withPublishTopic("quarkus/integration/test/replay/messages", SolaceContainer.Service.SMF) .withPublishTopic("quarkus/integration/test/default/>", SolaceContainer.Service.SMF) .withPublishTopic("quarkus/integration/test/provisioned/>", SolaceContainer.Service.SMF) diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/WeldTestBase.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/WeldTestBase.java index 69ae66f..29f94c7 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/WeldTestBase.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/WeldTestBase.java @@ -139,4 +139,9 @@ public boolean isAlive() { return getHealth().getLiveness().isOk(); } + public MapBasedConfig commonConfig() { + return new MapBasedConfig() + .with("mp.messaging.connector.quarkus-solace.client.graceful-shutdown", false); + } + } diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolacePublisherHealthCheck.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolacePublisherHealthTest.java similarity index 96% rename from quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolacePublisherHealthCheck.java rename to quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolacePublisherHealthTest.java index 559d49e..9e61332 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolacePublisherHealthCheck.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/health/SolacePublisherHealthTest.java @@ -22,7 +22,7 @@ import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; -public class SolacePublisherHealthCheck extends WeldTestBase { +public class SolacePublisherHealthTest extends WeldTestBase { @Test void publisherHealthCheck() { MapBasedConfig config = new MapBasedConfig() @@ -73,7 +73,9 @@ void publisherLivenessCheck() { // Run app that publish messages MyApp app = runApplication(config, MyApp.class); - await().until(() -> isStarted() && isReady()); + await().until(() -> isStarted() && isReady() && !isAlive()); + + await().until(() -> !isAlive()); HealthReport startup = getHealth().getStartup(); HealthReport liveness = getHealth().getLiveness(); @@ -94,7 +96,6 @@ static class MyApp { @Outgoing("out") Multi> out() { - return Multi.createFrom().items("1", "2", "3", "4", "5") .map(payload -> Message.of(payload).withAck(() -> { acked.add(payload); diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationAckTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationAckTest.java index dbf8a3a..ea77aa9 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationAckTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationAckTest.java @@ -28,7 +28,7 @@ public class LocalPropagationAckTest extends WeldTestBase { private MapBasedConfig dataconfig() { - return new MapBasedConfig() + return commonConfig() .with("mp.messaging.incoming.data.connector", SolaceConnector.CONNECTOR_NAME) .with("mp.messaging.incoming.data.consumer.queue.subscriptions", topic) .with("mp.messaging.incoming.data.consumer.queue.add-additional-subscriptions", "true") diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java index b1a6d96..3956aa7 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/locals/LocalPropagationTest.java @@ -39,7 +39,7 @@ public class LocalPropagationTest extends WeldTestBase { private MapBasedConfig dataconfig() { - return new MapBasedConfig() + return commonConfig() .with("mp.messaging.incoming.data.connector", SolaceConnector.CONNECTOR_NAME) .with("mp.messaging.incoming.data.consumer.queue.subscriptions", topic) .with("mp.messaging.incoming.data.consumer.queue.add-additional-subscriptions", "true") diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/EndToEndPerformanceTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/EndToEndPerformanceTest.java index a20ef6e..1eb67cf 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/EndToEndPerformanceTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/EndToEndPerformanceTest.java @@ -37,7 +37,7 @@ public class EndToEndPerformanceTest extends WeldTestBase { @Test public void endToEndPerformanceTesttWithBackPressureWaitAndWaitForPublishReceipt() { String processedTopic = topic + "/processed"; - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceConsumerPerformanceTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceConsumerPerformanceTest.java index 22ce277..71276a1 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceConsumerPerformanceTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolaceConsumerPerformanceTest.java @@ -31,13 +31,12 @@ public void solaceConsumerPerformanceTest() { .build() .start(); - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); - // .with("mp.messaging.incoming.in.client.graceful-shutdown", false); // Run app that consumes messages MyConsumer app = runApplication(config, MyConsumer.class); diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolacePublisherPerformanceTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolacePublisherPerformanceTest.java index 60e576b..cc4c397 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolacePublisherPerformanceTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/perf/SolacePublisherPerformanceTest.java @@ -28,7 +28,7 @@ public class SolacePublisherPerformanceTest extends WeldTestBase { @Test void publisherPerformanceTestWithBackPressureWaitAndWaitForPublishReceipt() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", topic); @@ -65,7 +65,7 @@ void publisherPerformanceTestWithBackPressureWaitAndWaitForPublishReceipt() { @Test void publisherPerformanceTestWithBackPressureWaitAndNoWaitForPublishReceipt() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", topic) .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); @@ -103,7 +103,7 @@ void publisherPerformanceTestWithBackPressureWaitAndNoWaitForPublishReceipt() { @Test void publisherPerformanceTestWithBackPressureElasticAndWaitForPublishReceipt() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", topic) .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic"); @@ -141,7 +141,7 @@ void publisherPerformanceTestWithBackPressureElasticAndWaitForPublishReceipt() { @Test void publisherPerformanceTestWithBackPressureElasticAndNoWaitForPublishReceipt() { - MapBasedConfig config = new MapBasedConfig() + MapBasedConfig config = commonConfig() .with("mp.messaging.outgoing.out.connector", "quarkus-solace") .with("mp.messaging.outgoing.out.producer.topic", topic) .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic")