Skip to content

Commit

Permalink
Turn off graceful shutdown for connector tests by default
Browse files Browse the repository at this point in the history
Rename Solace publisher health test to enable it
  • Loading branch information
ozangunalp committed Feb 5, 2024
1 parent d86ea3f commit e1c0fc5
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 50 deletions.
4 changes: 0 additions & 4 deletions quarkus-solace-messaging-connector/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
<artifactId>quarkus-solace-messaging-connector</artifactId>
<name>Quarkus Solace Messaging Connector - Runtime</name>
<dependencies>
<dependency>
<groupId>io.smallrye.config</groupId>
<artifactId>smallrye-config</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,19 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
.call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
: m)
.onItem().invoke(() -> alive.set(true))
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke((t) -> {
failures.add(t);
alive.set(false);
});
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(this::reportFailure);
if (!lazyStart) {
receiver.start();
}
}

private void reportFailure(Throwable throwable) {
failures.add(throwable);
private synchronized void reportFailure(Throwable throwable) {
alive.set(false);
// Don't keep all the failures, there are only there for reporting.
if (failures.size() == 10) {
failures.remove(0);
}
failures.add(throwable);
}

private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
Expand Down Expand Up @@ -208,12 +209,16 @@ public void close() {
}
closed.compareAndSet(false, true);
if (this.pollerThread != null) {
this.pollerThread.shutdown();
try {
this.pollerThread.awaitTermination(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
SolaceLogging.log.shutdownException(e.getMessage());
throw new RuntimeException(e);
if (this.gracefulShutdown) {
this.pollerThread.shutdown();
try {
this.pollerThread.awaitTermination(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
SolaceLogging.log.shutdownException(e.getMessage());
throw new RuntimeException(e);
}
} else {
this.pollerThread.shutdownNow();
}
}
receiver.terminate(3000);
Expand All @@ -233,10 +238,8 @@ public void isAlive(HealthReport.HealthReportBuilder builder) {
synchronized (this) {
reportedFailures = new ArrayList<>(failures);
}

builder.add(channel, solace.isConnected() && alive.get(),
reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
failures.removeAll(reportedFailures);
} else {
builder.add(channel, solace.isConnected() && alive.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
boolean lazyStart = oc.getClientLazyStart();
this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel));
this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(),
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke((t) -> {
failures.add(t);
alive.set(false);
}));
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke(this::reportFailure));
this.subscriber = MultiUtils.via(processor, multi -> multi.plug(
m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m));
if (!lazyStart) {
Expand All @@ -106,11 +103,18 @@ private Uni<Void> sendMessage(MessagingService solace, Message<?> m, boolean wai
return Uni.createFrom().completionStage(m.getAck());
})
.onFailure().recoverWithUni(t -> {
failures.add(t);
alive.set(false);
reportFailure(t);
return Uni.createFrom().completionStage(m.nack(t));
});
}

private synchronized void reportFailure(Throwable throwable) {
alive.set(false);
// Don't keep all the failures, there are only there for reporting.
if (failures.size() == 10) {
failures.remove(0);
}
failures.add(throwable);
}

private Uni<PublishReceipt> publishMessage(PersistentMessagePublisher publisher, Message<?> m,
Expand Down Expand Up @@ -249,9 +253,9 @@ public void isAlive(HealthReport.HealthReportBuilder builder) {
synchronized (this) {
reportedFailures = new ArrayList<>(failures);
}
System.out.println(reportedFailures);
builder.add(channel, solace.isConnected() && alive.get(),
reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
failures.removeAll(reportedFailures);
} else {
builder.add(channel, solace.isConnected() && alive.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private SolaceConsumerTest() {
@Test
@Order(1)
void consumer() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
Expand Down Expand Up @@ -74,7 +74,7 @@ void consumer() {
@Test
@Order(2)
void consumerReplay() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand All @@ -94,7 +94,7 @@ void consumerReplay() {
@Test
@Order(3)
void consumerWithSelectorQuery() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
Expand Down Expand Up @@ -124,7 +124,7 @@ void consumerWithSelectorQuery() {
@Test
@Order(4)
void consumerFailedProcessingPublishToErrorTopic() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand Down Expand Up @@ -158,7 +158,7 @@ void consumerFailedProcessingPublishToErrorTopic() {
@Test
@Order(5)
void consumerFailedProcessingMoveToDMQ() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand Down Expand Up @@ -193,7 +193,7 @@ void consumerFailedProcessingMoveToDMQ() {
@Test
@Order(6)
void partitionedQueue() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.consumer-1.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-1.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
Expand Down Expand Up @@ -262,7 +262,7 @@ void partitionedQueue() {
@Test
@Order(7)
void consumerPublishToErrorTopicPermissionException() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand Down Expand Up @@ -340,7 +340,7 @@ void consumerGracefulCloseTest() {
@Test
@Order(9)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SolaceProcessorTest extends WeldTestBase {
@Test
void consumer() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SolacePublisherTest extends WeldTestBase {

@Test
void publisher() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic);

Expand All @@ -53,7 +53,7 @@ void publisher() {

@Test
void publisherWithDynamicDestination() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic);

Expand All @@ -80,7 +80,7 @@ void publisherWithDynamicDestination() {

@Test
void publisherWithBackPressureReject() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.back-pressure.buffer-capacity", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static SolaceContainer createSolaceContainer() {
public void startSolaceBroker() {
solace = createSolaceContainer()
.withCredentials("user", "pass")
.withExposedPorts(SolaceContainer.Service.SMF.getPort())
.withExposedPorts(SolaceContainer.Service.SMF.getPort(), 8080)
.withPublishTopic("quarkus/integration/test/replay/messages", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/default/>", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/provisioned/>", SolaceContainer.Service.SMF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,9 @@ public boolean isAlive() {
return getHealth().getLiveness().isOk();
}

public MapBasedConfig commonConfig() {
return new MapBasedConfig()
.with("mp.messaging.connector.quarkus-solace.client.graceful-shutdown", false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class SolacePublisherHealthCheck extends WeldTestBase {
public class SolacePublisherHealthTest extends WeldTestBase {
@Test
void publisherHealthCheck() {
MapBasedConfig config = new MapBasedConfig()
Expand Down Expand Up @@ -73,7 +73,9 @@ void publisherLivenessCheck() {
// Run app that publish messages
MyApp app = runApplication(config, MyApp.class);

await().until(() -> isStarted() && isReady());
await().until(() -> isStarted() && isReady() && !isAlive());

await().until(() -> !isAlive());

HealthReport startup = getHealth().getStartup();
HealthReport liveness = getHealth().getLiveness();
Expand All @@ -94,7 +96,6 @@ static class MyApp {

@Outgoing("out")
Multi<Message<String>> out() {

return Multi.createFrom().items("1", "2", "3", "4", "5")
.map(payload -> Message.of(payload).withAck(() -> {
acked.add(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class LocalPropagationAckTest extends WeldTestBase {

private MapBasedConfig dataconfig() {
return new MapBasedConfig()
return commonConfig()
.with("mp.messaging.incoming.data.connector", SolaceConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.data.consumer.queue.subscriptions", topic)
.with("mp.messaging.incoming.data.consumer.queue.add-additional-subscriptions", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class LocalPropagationTest extends WeldTestBase {

private MapBasedConfig dataconfig() {
return new MapBasedConfig()
return commonConfig()
.with("mp.messaging.incoming.data.connector", SolaceConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.data.consumer.queue.subscriptions", topic)
.with("mp.messaging.incoming.data.consumer.queue.add-additional-subscriptions", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class EndToEndPerformanceTest extends WeldTestBase {
@Test
public void endToEndPerformanceTesttWithBackPressureWaitAndWaitForPublishReceipt() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ public void solaceConsumerPerformanceTest() {
.build()
.start();

MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);
// .with("mp.messaging.incoming.in.client.graceful-shutdown", false);

// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SolacePublisherPerformanceTest extends WeldTestBase {

@Test
void publisherPerformanceTestWithBackPressureWaitAndWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic);

Expand Down Expand Up @@ -65,7 +65,7 @@ void publisherPerformanceTestWithBackPressureWaitAndWaitForPublishReceipt() {

@Test
void publisherPerformanceTestWithBackPressureWaitAndNoWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false);
Expand Down Expand Up @@ -103,7 +103,7 @@ void publisherPerformanceTestWithBackPressureWaitAndNoWaitForPublishReceipt() {

@Test
void publisherPerformanceTestWithBackPressureElasticAndWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic");
Expand Down Expand Up @@ -141,7 +141,7 @@ void publisherPerformanceTestWithBackPressureElasticAndWaitForPublishReceipt() {

@Test
void publisherPerformanceTestWithBackPressureElasticAndNoWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic")
Expand Down

0 comments on commit e1c0fc5

Please sign in to comment.