From d135246d713be2fdd063440ea39be005d85388b3 Mon Sep 17 00:00:00 2001 From: Salk Lee Date: Wed, 13 Mar 2024 22:21:53 +0800 Subject: [PATCH] Add BackOff support into `ConnectionFactory` Make a `connection.createChannel()` as retryable operation based on the provided `BackOff` --- .../connection/AbstractConnectionFactory.java | 22 +++++++++-- .../rabbit/connection/SimpleConnection.java | 38 +++++++++++++++++-- .../AbstractConnectionFactoryTests.java | 25 +++++++++++- .../modules/ROOT/pages/amqp/connections.adoc | 3 ++ .../antora/modules/ROOT/pages/whats-new.adoc | 8 ++++ 5 files changed, 89 insertions(+), 7 deletions(-) diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java index 2acc7e2546..33cb11dccf 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -53,6 +53,7 @@ import org.springframework.util.Assert; import org.springframework.util.ObjectUtils; import org.springframework.util.StringUtils; +import org.springframework.util.backoff.BackOff; import com.rabbitmq.client.Address; import com.rabbitmq.client.AddressResolver; @@ -71,6 +72,7 @@ * @author Artem Bilan * @author Will Droste * @author Christian Tzolov + * @author Salk Lee * */ public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware, @@ -162,6 +164,8 @@ public void handleRecovery(Recoverable recoverable) { private volatile boolean contextStopped; + @Nullable + private BackOff connectionCreatingBackOff; /** * Create a new AbstractConnectionFactory for the given target ConnectionFactory, with no publisher connection * factory. @@ -556,6 +560,18 @@ public boolean hasPublisherConnectionFactory() { return this.publisherConnectionFactory != null; } + /** + * Set the backoff strategy for creating connections. This enhancement supports custom + * retry policies within the connection module, particularly useful when the maximum + * channel limit is reached. The {@link SimpleConnection#createChannel(boolean)} method + * utilizes this backoff strategy to gracefully handle such limit exceptions. + * @param backOff the backoff strategy to be applied during connection creation + * @since 3.1.3 + */ + public void setConnectionCreatingBackOff(@Nullable BackOff backOff) { + this.connectionCreatingBackOff = backOff; + } + @Override public ConnectionFactory getPublisherConnectionFactory() { return this.publisherConnectionFactory; @@ -566,8 +582,8 @@ protected final Connection createBareConnection() { String connectionName = this.connectionNameStrategy.obtainNewConnectionName(this); com.rabbitmq.client.Connection rabbitConnection = connect(connectionName); - - Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout); + Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout, + this.connectionCreatingBackOff == null ? null : this.connectionCreatingBackOff.start()); if (rabbitConnection instanceof AutorecoveringConnection auto) { auto.addRecoveryListener(new RecoveryListener() { diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java index 4c14f1d02f..9815f2c8cf 100755 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/SimpleConnection.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,9 +19,13 @@ import java.io.IOException; import java.net.InetAddress; +import javax.annotation.Nullable; + import org.springframework.amqp.AmqpResourceNotAvailableException; +import org.springframework.amqp.AmqpTimeoutException; import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator; import org.springframework.util.ObjectUtils; +import org.springframework.util.backoff.BackOffExecution; import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.BlockedListener; @@ -35,6 +39,7 @@ * @author Dave Syer * @author Gary Russell * @author Artem Bilan + * @author Salk Lee * * @since 1.0 */ @@ -46,16 +51,39 @@ public class SimpleConnection implements Connection, NetworkConnection { private volatile boolean explicitlyClosed; - public SimpleConnection(com.rabbitmq.client.Connection delegate, - int closeTimeout) { + @Nullable + private final BackOffExecution backOffExecution; + + public SimpleConnection(com.rabbitmq.client.Connection delegate, int closeTimeout) { + this(delegate, closeTimeout, null); + } + + /** + * Construct an instance with the {@link org.springframework.util.backoff.BackOffExecution} arguments. + * @param delegate delegate connection + * @param closeTimeout the time of physical close time out + * @param backOffExecution backOffExecution is nullable + * @since 3.1.3 + */ + public SimpleConnection(com.rabbitmq.client.Connection delegate, int closeTimeout, + @Nullable BackOffExecution backOffExecution) { this.delegate = delegate; this.closeTimeout = closeTimeout; + this.backOffExecution = backOffExecution; } @Override public Channel createChannel(boolean transactional) { try { Channel channel = this.delegate.createChannel(); + while (channel == null && this.backOffExecution != null) { + long interval = this.backOffExecution.nextBackOff(); + if (interval == BackOffExecution.STOP) { + break; + } + Thread.sleep(interval); + channel = this.delegate.createChannel(); + } if (channel == null) { throw new AmqpResourceNotAvailableException("The channelMax limit is reached. Try later."); } @@ -65,6 +93,10 @@ public Channel createChannel(boolean transactional) { } return channel; } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new AmqpTimeoutException("Interrupted while creating a new channel", e); + } catch (IOException e) { throw RabbitExceptionTranslator.convertRabbitAccessException(e); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactoryTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactoryTests.java index 60e2591b72..58546447c8 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactoryTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactoryTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2010-2023 the original author or authors. + * Copyright 2010-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.amqp.rabbit.connection; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; @@ -39,10 +40,13 @@ import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; +import org.springframework.amqp.AmqpResourceNotAvailableException; import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.AddressShuffleMode; import org.springframework.amqp.utils.test.TestUtils; import org.springframework.beans.DirectFieldAccessor; import org.springframework.scheduling.concurrent.CustomizableThreadFactory; +import org.springframework.util.StopWatch; +import org.springframework.util.backoff.FixedBackOff; import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConnectionFactory; @@ -52,6 +56,7 @@ * @author Gary Russell * @author Dmitry Dbrazhnikov * @author Artem Bilan + * @author Salk Lee */ public abstract class AbstractConnectionFactoryTests { @@ -212,4 +217,22 @@ public void testCreatesConnectionWithGivenFactory() { assertThat(mockConnectionFactory.getThreadFactory()).isEqualTo(connectionThreadFactory); } + @Test + public void testConnectionCreatingBackOff() throws Exception { + int maxAttempts = 2; + long interval = 100L; + com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class); + given(mockConnection.createChannel()).willReturn(null); + SimpleConnection simpleConnection = new SimpleConnection(mockConnection, 5, + new FixedBackOff(interval, maxAttempts).start()); + StopWatch stopWatch = new StopWatch(); + stopWatch.start(); + assertThatExceptionOfType(AmqpResourceNotAvailableException.class).isThrownBy(() -> { + simpleConnection.createChannel(false); + }); + stopWatch.stop(); + assertThat(stopWatch.getTotalTimeMillis()).isGreaterThanOrEqualTo(maxAttempts * interval); + verify(mockConnection, times(maxAttempts + 1)).createChannel(); + } + } diff --git a/src/reference/antora/modules/ROOT/pages/amqp/connections.adoc b/src/reference/antora/modules/ROOT/pages/amqp/connections.adoc index 44abca5502..42a4b0a199 100644 --- a/src/reference/antora/modules/ROOT/pages/amqp/connections.adoc +++ b/src/reference/antora/modules/ROOT/pages/amqp/connections.adoc @@ -28,6 +28,9 @@ Simple publisher confirmations are supported by all three factories. When configuring a `RabbitTemplate` to use a xref:amqp/template.adoc#separate-connection[separate connection], you can now, starting with version 2.3.2, configure the publishing connection factory to be a different type. By default, the publishing factory is the same type and any properties set on the main factory are also propagated to the publishing factory. +Starting with version 3.1, the `AbstractConnectionFactory` includes the `connectionCreatingBackOff` property, which supports a backoff policy in the connection module. +Currently, there is support in the behavior of `createChannel()` to handle exceptions that occur when the `channelMax` limit is reached, implementing a backoff strategy based on attempts and intervals. + [[pooledchannelconnectionfactory]] === `PooledChannelConnectionFactory` diff --git a/src/reference/antora/modules/ROOT/pages/whats-new.adoc b/src/reference/antora/modules/ROOT/pages/whats-new.adoc index ff9eb62ad4..4c933d4213 100644 --- a/src/reference/antora/modules/ROOT/pages/whats-new.adoc +++ b/src/reference/antora/modules/ROOT/pages/whats-new.adoc @@ -18,3 +18,11 @@ It remains possible to configure your own logging behavior by setting the `exclu In addition, the `SimpleMessageListenerContainer` consumer restart after such an exception is now logged at DEBUG level by default (previously INFO). A new method `logRestart()` has been added to the `ConditionalExceptionLogger` to allow this to be changed. See xref:amqp/receiving-messages/consumer-events.adoc[Consumer Events] and <> for more information. + +[[x31-conn-backoff]] +=== Connections Enhancement + +Connection Factory supported backoff policy when creating connection channel. +See xref:amqp/connections.adoc[Choosing a Connection Factory] for more information. + +