Skip to content

Commit

Permalink
Merge pull request #54 from ozangunalp/turn_off_graceful_shutdown_for…
Browse files Browse the repository at this point in the history
…_tests

Turn off graceful shutdown for connector tests by default
  • Loading branch information
ozangunalp authored Feb 5, 2024
2 parents c34bfcd + 0b1cc5d commit 7e8b624
Show file tree
Hide file tree
Showing 14 changed files with 58 additions and 50 deletions.
4 changes: 0 additions & 4 deletions quarkus-solace-messaging-connector/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,6 @@
<artifactId>quarkus-solace-messaging-connector</artifactId>
<name>Quarkus Solace Messaging Connector - Runtime</name>
<dependencies>
<dependency>
<groupId>io.smallrye.config</groupId>
<artifactId>smallrye-config</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-reactive-messaging</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,19 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
.call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
: m)
.onItem().invoke(() -> alive.set(true))
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke((t) -> {
failures.add(t);
alive.set(false);
});
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(this::reportFailure);
if (!lazyStart) {
receiver.start();
}
}

private void reportFailure(Throwable throwable) {
failures.add(throwable);
private synchronized void reportFailure(Throwable throwable) {
alive.set(false);
// Don't keep all the failures, there are only there for reporting.
if (failures.size() == 10) {
failures.remove(0);
}
failures.add(throwable);
}

private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
Expand Down Expand Up @@ -208,12 +209,16 @@ public void close() {
}
closed.compareAndSet(false, true);
if (this.pollerThread != null) {
this.pollerThread.shutdown();
try {
this.pollerThread.awaitTermination(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
SolaceLogging.log.shutdownException(e.getMessage());
throw new RuntimeException(e);
if (this.gracefulShutdown) {
this.pollerThread.shutdown();
try {
this.pollerThread.awaitTermination(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
SolaceLogging.log.shutdownException(e.getMessage());
throw new RuntimeException(e);
}
} else {
this.pollerThread.shutdownNow();
}
}
receiver.terminate(3000);
Expand All @@ -233,10 +238,8 @@ public void isAlive(HealthReport.HealthReportBuilder builder) {
synchronized (this) {
reportedFailures = new ArrayList<>(failures);
}

builder.add(channel, solace.isConnected() && alive.get(),
reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
failures.removeAll(reportedFailures);
} else {
builder.add(channel, solace.isConnected() && alive.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,7 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
boolean lazyStart = oc.getClientLazyStart();
this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel));
this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(),
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke((t) -> {
failures.add(t);
alive.set(false);
}));
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke(this::reportFailure));
this.subscriber = MultiUtils.via(processor, multi -> multi.plug(
m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m));
if (!lazyStart) {
Expand All @@ -106,11 +103,18 @@ private Uni<Void> sendMessage(MessagingService solace, Message<?> m, boolean wai
return Uni.createFrom().completionStage(m.getAck());
})
.onFailure().recoverWithUni(t -> {
failures.add(t);
alive.set(false);
reportFailure(t);
return Uni.createFrom().completionStage(m.nack(t));
});
}

private synchronized void reportFailure(Throwable throwable) {
alive.set(false);
// Don't keep all the failures, there are only there for reporting.
if (failures.size() == 10) {
failures.remove(0);
}
failures.add(throwable);
}

private Uni<PublishReceipt> publishMessage(PersistentMessagePublisher publisher, Message<?> m,
Expand Down Expand Up @@ -249,9 +253,9 @@ public void isAlive(HealthReport.HealthReportBuilder builder) {
synchronized (this) {
reportedFailures = new ArrayList<>(failures);
}
System.out.println(reportedFailures);
builder.add(channel, solace.isConnected() && alive.get(),
reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
failures.removeAll(reportedFailures);
} else {
builder.add(channel, solace.isConnected() && alive.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private SolaceConsumerTest() {
@Test
@Order(1)
void consumer() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
Expand Down Expand Up @@ -74,7 +74,7 @@ void consumer() {
@Test
@Order(2)
void consumerReplay() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand All @@ -94,7 +94,7 @@ void consumerReplay() {
@Test
@Order(3)
void consumerWithSelectorQuery() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
Expand Down Expand Up @@ -124,7 +124,7 @@ void consumerWithSelectorQuery() {
@Test
@Order(4)
void consumerFailedProcessingPublishToErrorTopic() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand Down Expand Up @@ -158,7 +158,7 @@ void consumerFailedProcessingPublishToErrorTopic() {
@Test
@Order(5)
void consumerFailedProcessingMoveToDMQ() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand Down Expand Up @@ -193,7 +193,7 @@ void consumerFailedProcessingMoveToDMQ() {
@Test
@Order(6)
void partitionedQueue() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.consumer-1.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-1.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
Expand Down Expand Up @@ -262,7 +262,7 @@ void partitionedQueue() {
@Test
@Order(7)
void consumerPublishToErrorTopicPermissionException() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME)
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
Expand Down Expand Up @@ -340,7 +340,7 @@ void consumerGracefulCloseTest() {
@Test
@Order(9)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SolaceProcessorTest extends WeldTestBase {
@Test
void consumer() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public class SolacePublisherTest extends WeldTestBase {

@Test
void publisher() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic);

Expand All @@ -53,7 +53,7 @@ void publisher() {

@Test
void publisherWithDynamicDestination() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic);

Expand All @@ -80,7 +80,7 @@ void publisherWithDynamicDestination() {

@Test
void publisherWithBackPressureReject() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.back-pressure.buffer-capacity", 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static SolaceContainer createSolaceContainer() {
public void startSolaceBroker() {
solace = createSolaceContainer()
.withCredentials("user", "pass")
.withExposedPorts(SolaceContainer.Service.SMF.getPort())
.withExposedPorts(SolaceContainer.Service.SMF.getPort(), 8080)
.withPublishTopic("quarkus/integration/test/replay/messages", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/default/>", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/provisioned/>", SolaceContainer.Service.SMF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,9 @@ public boolean isAlive() {
return getHealth().getLiveness().isOk();
}

public MapBasedConfig commonConfig() {
return new MapBasedConfig()
.with("mp.messaging.connector.quarkus-solace.client.graceful-shutdown", false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class SolacePublisherHealthCheck extends WeldTestBase {
public class SolacePublisherHealthTest extends WeldTestBase {
@Test
void publisherHealthCheck() {
MapBasedConfig config = new MapBasedConfig()
Expand Down Expand Up @@ -73,7 +73,9 @@ void publisherLivenessCheck() {
// Run app that publish messages
MyApp app = runApplication(config, MyApp.class);

await().until(() -> isStarted() && isReady());
await().until(() -> isStarted() && isReady() && !isAlive());

await().until(() -> !isAlive());

HealthReport startup = getHealth().getStartup();
HealthReport liveness = getHealth().getLiveness();
Expand All @@ -94,7 +96,6 @@ static class MyApp {

@Outgoing("out")
Multi<Message<String>> out() {

return Multi.createFrom().items("1", "2", "3", "4", "5")
.map(payload -> Message.of(payload).withAck(() -> {
acked.add(payload);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class LocalPropagationAckTest extends WeldTestBase {

private MapBasedConfig dataconfig() {
return new MapBasedConfig()
return commonConfig()
.with("mp.messaging.incoming.data.connector", SolaceConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.data.consumer.queue.subscriptions", topic)
.with("mp.messaging.incoming.data.consumer.queue.add-additional-subscriptions", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class LocalPropagationTest extends WeldTestBase {

private MapBasedConfig dataconfig() {
return new MapBasedConfig()
return commonConfig()
.with("mp.messaging.incoming.data.connector", SolaceConnector.CONNECTOR_NAME)
.with("mp.messaging.incoming.data.consumer.queue.subscriptions", topic)
.with("mp.messaging.incoming.data.consumer.queue.add-additional-subscriptions", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class EndToEndPerformanceTest extends WeldTestBase {
@Test
public void endToEndPerformanceTesttWithBackPressureWaitAndWaitForPublishReceipt() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ public void solaceConsumerPerformanceTest() {
.build()
.start();

MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);
// .with("mp.messaging.incoming.in.client.graceful-shutdown", false);

// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class SolacePublisherPerformanceTest extends WeldTestBase {

@Test
void publisherPerformanceTestWithBackPressureWaitAndWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic);

Expand Down Expand Up @@ -65,7 +65,7 @@ void publisherPerformanceTestWithBackPressureWaitAndWaitForPublishReceipt() {

@Test
void publisherPerformanceTestWithBackPressureWaitAndNoWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false);
Expand Down Expand Up @@ -103,7 +103,7 @@ void publisherPerformanceTestWithBackPressureWaitAndNoWaitForPublishReceipt() {

@Test
void publisherPerformanceTestWithBackPressureElasticAndWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic");
Expand Down Expand Up @@ -141,7 +141,7 @@ void publisherPerformanceTestWithBackPressureElasticAndWaitForPublishReceipt() {

@Test
void publisherPerformanceTestWithBackPressureElasticAndNoWaitForPublishReceipt() {
MapBasedConfig config = new MapBasedConfig()
MapBasedConfig config = commonConfig()
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", topic)
.with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic")
Expand Down

0 comments on commit 7e8b624

Please sign in to comment.