Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add RetryableConnection for the case for wait a moment when The channelMaxlimit is reached #2556

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,6 +63,8 @@
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import org.springframework.util.backoff.BackOff;
import org.springframework.util.backoff.FixedBackOff;

/**
* @author Dave Syer
Expand Down Expand Up @@ -162,6 +164,8 @@ public void handleRecovery(Recoverable recoverable) {

private volatile boolean contextStopped;

private BackOff connectionCreatingBackOff;
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved

/**
* Create a new AbstractConnectionFactory for the given target ConnectionFactory, with no publisher connection
* factory.
Expand Down Expand Up @@ -257,7 +261,6 @@ public void setPassword(String password) {
public void setHost(String host) {
this.rabbitConnectionFactory.setHost(host);
}

/**
* Set the {@link ThreadFactory} on the underlying rabbit connection factory.
* @param threadFactory the thread factory.
Expand Down Expand Up @@ -556,6 +559,24 @@ public boolean hasPublisherConnectionFactory() {
return this.publisherConnectionFactory != null;
}

public void setConnectionCreatingBackOff(BackOff backOff) {
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
this.connectionCreatingBackOff = backOff;
}

public void setConnectionCreatingAttempts(long maxAttempts) {
this.connectionCreatingBackOff = new FixedBackOff(FixedBackOff.DEFAULT_INTERVAL, maxAttempts);
}

public void setConnectionCreatingInterval(long interval) {
this.connectionCreatingBackOff = new FixedBackOff(interval, FixedBackOff.UNLIMITED_ATTEMPTS);
}

public void setConnectionCreatingBackOff(long interval, long maxAttempts) {
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
this.connectionCreatingBackOff = new FixedBackOff(interval, maxAttempts);
}



@Override
public ConnectionFactory getPublisherConnectionFactory() {
return this.publisherConnectionFactory;
Expand All @@ -566,8 +587,8 @@ protected final Connection createBareConnection() {
String connectionName = this.connectionNameStrategy.obtainNewConnectionName(this);

com.rabbitmq.client.Connection rabbitConnection = connect(connectionName);

Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout);
Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout,
connectionCreatingBackOff == null ? null : connectionCreatingBackOff.start());
if (rabbitConnection instanceof AutorecoveringConnection auto) {
auto.addRecoveryListener(new RecoveryListener() {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the
* License at
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
*
* https://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
*/

package org.springframework.amqp.rabbit.connection;
Expand All @@ -20,6 +16,7 @@
import java.net.InetAddress;

import org.springframework.amqp.AmqpResourceNotAvailableException;
import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.util.ObjectUtils;

Expand All @@ -28,6 +25,7 @@
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.impl.NetworkConnection;
import com.rabbitmq.client.impl.recovery.AutorecoveringConnection;
import org.springframework.util.backoff.BackOffExecution;

/**
* Simply a Connection.
Expand All @@ -46,16 +44,32 @@ public class SimpleConnection implements Connection, NetworkConnection {

private volatile boolean explicitlyClosed;

public SimpleConnection(com.rabbitmq.client.Connection delegate,
int closeTimeout) {
private BackOffExecution backOffExecution;
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved

public SimpleConnection(com.rabbitmq.client.Connection delegate, int closeTimeout) {
this(delegate, closeTimeout, null);

}

public SimpleConnection(com.rabbitmq.client.Connection delegate, int closeTimeout,
BackOffExecution backOffExecution) {
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
this.delegate = delegate;
this.closeTimeout = closeTimeout;
this.backOffExecution = backOffExecution;
}

@Override
public Channel createChannel(boolean transactional) {
try {
Channel channel = this.delegate.createChannel();
while (channel == null && backOffExecution != null) {
long interval = backOffExecution.nextBackOff();
if (interval == BackOffExecution.STOP) {
break;
}
Thread.sleep(interval);
channel = this.delegate.createChannel();
}
if (channel == null) {
throw new AmqpResourceNotAvailableException("The channelMax limit is reached. Try later.");
}
Expand All @@ -65,6 +79,10 @@ public Channel createChannel(boolean transactional) {
}
return channel;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AmqpTimeoutException("Interrupted while creating a new channel", e);
}
catch (IOException e) {
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
Expand All @@ -87,16 +105,17 @@ public void close() {

/**
* True if the connection is open.
*
* @return true if the connection is open
* @throws AutoRecoverConnectionNotCurrentlyOpenException if the connection is an
* {@link AutorecoveringConnection} and is currently closed; this is required to
* prevent the {@link CachingConnectionFactory} from discarding this connection
* and opening a new one, in which case the "old" connection would eventually be recovered
* and orphaned - also any consumers belonging to it might be recovered too
* and the broker will deliver messages to them when there is no code actually running
* to deal with those messages (when using the {@code SimpleMessageListenerContainer}).
* If we have actually closed the connection
* (e.g. via {@link CachingConnectionFactory#resetConnection()}) this will return false.
* {@link AutorecoveringConnection} and is currently closed; this is required to prevent
* the {@link CachingConnectionFactory} from discarding this connection and opening a new
* one, in which case the "old" connection would eventually be recovered and orphaned -
* also any consumers belonging to it might be recovered too and the broker will deliver
* messages to them when there is no code actually running to deal with those messages
* (when using the {@code SimpleMessageListenerContainer}). If we have actually closed the
* connection (e.g. via {@link CachingConnectionFactory#resetConnection()}) this will
* return false.
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
*/
@Override
public boolean isOpen() {
Expand All @@ -106,7 +125,6 @@ public boolean isOpen() {
return this.delegate != null && (this.delegate.isOpen());
}


@Override
public int getLocalPort() {
if (this.delegate instanceof NetworkConnection networkConn) {
Expand Down Expand Up @@ -150,9 +168,7 @@ public com.rabbitmq.client.Connection getDelegate() {

@Override
public String toString() {
return "SimpleConnection@"
+ ObjectUtils.getIdentityHexString(this)
+ " [delegate=" + this.delegate + ", localPort=" + getLocalPort() + "]";
return "SimpleConnection@" + ObjectUtils.getIdentityHexString(this) + " [delegate=" + this.delegate
+ ", localPort=" + getLocalPort() + "]";
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
}

javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2023 the original author or authors.
* Copyright 2010-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit.connection;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
Expand All @@ -39,13 +40,15 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import org.springframework.amqp.AmqpResourceNotAvailableException;
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.AddressShuffleMode;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.util.StopWatch;
import org.springframework.util.backoff.FixedBackOff;

/**
* @author Dave Syer
Expand Down Expand Up @@ -168,8 +171,8 @@ public void testCloseInvalidConnection() throws Exception {
com.rabbitmq.client.Connection mockConnection1 = mock(com.rabbitmq.client.Connection.class);
com.rabbitmq.client.Connection mockConnection2 = mock(com.rabbitmq.client.Connection.class);

given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString()))
.willReturn(mockConnection1, mockConnection2);
given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection1,
mockConnection2);
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
// simulate a dead connection
given(mockConnection1.isOpen()).willReturn(false);
given(mockConnection2.createChannel()).willReturn(mock(Channel.class));
Expand Down Expand Up @@ -212,4 +215,20 @@ public void testCreatesConnectionWithGivenFactory() {
assertThat(mockConnectionFactory.getThreadFactory()).isEqualTo(connectionThreadFactory);
}

@Test
public void testConnectionCreatingBackOff() throws Exception {
int maxAttempts = 3;
long interval = 3000L;
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
given(mockConnection.createChannel()).willReturn(null);
SimpleConnection simpleConnection = new SimpleConnection(mockConnection, 5,
new FixedBackOff(interval, maxAttempts).start());
StopWatch stopWatch = new StopWatch();
stopWatch.start();
assertThatExceptionOfType(AmqpResourceNotAvailableException.class)
.isThrownBy(() -> simpleConnection.createChannel(false));
stopWatch.stop();
assertThat(stopWatch.getTotalTimeMillis()).isGreaterThanOrEqualTo(maxAttempts * interval);
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import com.rabbitmq.client.ShutdownSignalException;

/**
* @author Mark Pollack
* @author Dave Syer
Expand Down Expand Up @@ -1967,4 +1966,5 @@ public void onShutDown(ShutdownSignalException signal) {
assertThat(chanShutDown.get()).isFalse();
}


javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
}
5 changes: 5 additions & 0 deletions src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// suppress inspection "AsciiDocLinkResolve" for whole file
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
[[whats-new]]
= What's New
:page-section-summary-toc: 1
Expand All @@ -18,3 +19,7 @@ It remains possible to configure your own logging behavior by setting the `exclu
In addition, the `SimpleMessageListenerContainer` consumer restart after such an exception is now logged at DEBUG level by default (previously INFO).
A new method `logRestart()` has been added to the `ConditionalExceptionLogger` to allow this to be changed.
See xref:amqp/receiving-messages/consumer-events.adoc[Consumer Events] and <<channel-close-logging>> for more information.

==== `@AbstractConnectionFactory` Changes
A new property `connectionCreatingBackOff` allows to create `SimpleConnection` with backoff policy to support for retry mechanism at the connection level for creating channels to avoid the exception in connection module
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved

Loading