Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add RetryableConnection for the case for wait a moment when The channelMaxlimit is reached #2556

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,7 @@
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.backoff.BackOff;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.AddressResolver;
Expand Down Expand Up @@ -162,6 +163,7 @@ public void handleRecovery(Recoverable recoverable) {

private volatile boolean contextStopped;

private BackOff connectionCreatingBackOff;
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
/**
* Create a new AbstractConnectionFactory for the given target ConnectionFactory, with no publisher connection
* factory.
Expand Down Expand Up @@ -257,7 +259,7 @@ public void setPassword(String password) {
public void setHost(String host) {
this.rabbitConnectionFactory.setHost(host);
}

javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
/**
* Set the {@link ThreadFactory} on the underlying rabbit connection factory.
* @param threadFactory the thread factory.
Expand Down Expand Up @@ -556,6 +558,15 @@ public boolean hasPublisherConnectionFactory() {
return this.publisherConnectionFactory != null;
}

/**
* support backoff policy when get an empty channel from connection.
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
* @param backOff {@link BackOff}
* @since 3.1.3
*/
public void setConnectionCreatingBackOff(BackOff backOff) {
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
this.connectionCreatingBackOff = backOff;
}

@Override
public ConnectionFactory getPublisherConnectionFactory() {
return this.publisherConnectionFactory;
Expand All @@ -566,8 +577,8 @@ protected final Connection createBareConnection() {
String connectionName = this.connectionNameStrategy.obtainNewConnectionName(this);

com.rabbitmq.client.Connection rabbitConnection = connect(connectionName);

Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout);
Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout,
this.connectionCreatingBackOff == null ? null : this.connectionCreatingBackOff.start());
if (rabbitConnection instanceof AutorecoveringConnection auto) {
auto.addRecoveryListener(new RecoveryListener() {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,13 @@
import java.io.IOException;
import java.net.InetAddress;

import javax.annotation.Nullable;

import org.springframework.amqp.AmqpResourceNotAvailableException;
import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.util.ObjectUtils;
import org.springframework.util.backoff.BackOffExecution;

import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.BlockedListener;
Expand All @@ -46,16 +50,40 @@ public class SimpleConnection implements Connection, NetworkConnection {

private volatile boolean explicitlyClosed;

private BackOffExecution backOffExecution;
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved

public SimpleConnection(com.rabbitmq.client.Connection delegate,
int closeTimeout) {
int closeTimeout) {
this.delegate = delegate;
this.closeTimeout = closeTimeout;
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Construct an instance with the {@link org.springframework.util.backoff.BackOffExecution} arguments.
* @param delegate delegate connection
* @param closeTimeout the time of physical close time out
* @param backOffExecution backOffExecution is nullable
* @since 3.1.3
*/
public SimpleConnection(com.rabbitmq.client.Connection delegate, int closeTimeout,
@Nullable BackOffExecution backOffExecution) {
this.delegate = delegate;
this.closeTimeout = closeTimeout;
this.backOffExecution = backOffExecution;
}

@Override
public Channel createChannel(boolean transactional) {
try {
Channel channel = this.delegate.createChannel();
while (channel == null && this.backOffExecution != null) {
long interval = this.backOffExecution.nextBackOff();
if (interval == BackOffExecution.STOP) {
break;
}
Thread.sleep(interval);
channel = this.delegate.createChannel();
}
if (channel == null) {
throw new AmqpResourceNotAvailableException("The channelMax limit is reached. Try later.");
}
Expand All @@ -65,6 +93,10 @@ public Channel createChannel(boolean transactional) {
}
return channel;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AmqpTimeoutException("Interrupted while creating a new channel", e);
}
catch (IOException e) {
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2023 the original author or authors.
* Copyright 2010-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit.connection;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
Expand All @@ -39,10 +40,13 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import org.springframework.amqp.AmqpResourceNotAvailableException;
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.AddressShuffleMode;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.StopWatch;
import org.springframework.util.backoff.FixedBackOff;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
Expand Down Expand Up @@ -212,4 +216,21 @@ public void testCreatesConnectionWithGivenFactory() {
assertThat(mockConnectionFactory.getThreadFactory()).isEqualTo(connectionThreadFactory);
}

@Test
public void testConnectionCreatingBackOff() throws Exception {
int maxAttempts = 3;
long interval = 3000L;
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
given(mockConnection.createChannel()).willReturn(null);
SimpleConnection simpleConnection = new SimpleConnection(mockConnection, 5,
new FixedBackOff(interval, maxAttempts).start());
StopWatch stopWatch = new StopWatch();
stopWatch.start();
assertThatExceptionOfType(AmqpResourceNotAvailableException.class).isThrownBy(() -> {
simpleConnection.createChannel(false);
stopWatch.stop();
assertThat(stopWatch.getTotalTimeMillis()).isGreaterThanOrEqualTo(maxAttempts * interval);
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1859,7 +1859,7 @@ public void testShuffleRandom() throws IOException, TimeoutException {
Channel mockChannel = mock(Channel.class);

given(mockConnectionFactory.newConnection((ExecutorService) isNull(), any(List.class), anyString()))
.willReturn(mockConnection);
.willReturn(mockConnection);
given(mockConnection.createChannel()).willReturn(mockChannel);
given(mockChannel.isOpen()).willReturn(true);
given(mockConnection.isOpen()).willReturn(true);
Expand All @@ -1873,11 +1873,11 @@ public void testShuffleRandom() throws IOException, TimeoutException {
ArgumentCaptor<List<Address>> captor = ArgumentCaptor.forClass(List.class);
verify(mockConnectionFactory, times(100)).newConnection(isNull(), captor.capture(), anyString());
List<String> firstAddress = captor.getAllValues()
.stream()
.map(addresses -> addresses.get(0).getHost())
.distinct()
.sorted()
.collect(Collectors.toList());
.stream()
.map(addresses -> addresses.get(0).getHost())
.distinct()
.sorted()
.collect(Collectors.toList());
assertThat(firstAddress).containsExactly("host1", "host2", "host3");
}

Expand All @@ -1889,7 +1889,7 @@ public void testShuffleInOrder() throws IOException, TimeoutException {
Channel mockChannel = mock(Channel.class);

given(mockConnectionFactory.newConnection((ExecutorService) isNull(), any(List.class), anyString()))
.willReturn(mockConnection);
.willReturn(mockConnection);
given(mockConnection.createChannel()).willReturn(mockChannel);
given(mockChannel.isOpen()).willReturn(true);
given(mockConnection.isOpen()).willReturn(true);
Expand All @@ -1903,9 +1903,9 @@ public void testShuffleInOrder() throws IOException, TimeoutException {
ArgumentCaptor<List<Address>> captor = ArgumentCaptor.forClass(List.class);
verify(mockConnectionFactory, times(3)).newConnection(isNull(), captor.capture(), anyString());
List<String> connectAddresses = captor.getAllValues()
.stream()
.map(addresses -> addresses.get(0).getHost())
.collect(Collectors.toList());
.stream()
.map(addresses -> addresses.get(0).getHost())
.collect(Collectors.toList());
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
assertThat(connectAddresses).containsExactly("host1", "host2", "host3");
}

Expand Down
4 changes: 4 additions & 0 deletions src/reference/antora/modules/ROOT/pages/amqp/connections.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,10 @@ The `ConnectionNameStrategy` for the publisher connection is the same as the pri
Starting with version 1.7.7, an `AmqpResourceNotAvailableException` is provided, which is thrown when `SimpleConnection.createChannel()` cannot create a `Channel` (for example, because the `channelMax` limit is reached and there are no available channels in the cache).
You can use this exception in the `RetryPolicy` to recover the operation after some back-off.

Starting with version 3.1, The construct of `SimpleConnection` supports `BackOffExceution` to add retry policies when `SimpleConnection.createChannel()` get an empty channel at the connection layer
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved



[[connection-factory]]
== Configuring the Underlying Client Connection Factory

Expand Down
6 changes: 6 additions & 0 deletions src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ It remains possible to configure your own logging behavior by setting the `exclu
In addition, the `SimpleMessageListenerContainer` consumer restart after such an exception is now logged at DEBUG level by default (previously INFO).
A new method `logRestart()` has been added to the `ConditionalExceptionLogger` to allow this to be changed.
See xref:amqp/receiving-messages/consumer-events.adoc[Consumer Events] and <<channel-close-logging>> for more information.

=== blocked-connections-and-resource-constraints

See xref:amqp/connections.adoc[Blocked Connections and Resource Constraints] and <<blocked-connections-and-resource-constraints>> for more information.
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved