Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add RetryableConnection for the case for wait a moment when The channelMaxlimit is reached #2556

Merged
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -53,6 +53,7 @@
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.backoff.BackOff;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.AddressResolver;
Expand Down Expand Up @@ -162,6 +163,7 @@ public void handleRecovery(Recoverable recoverable) {

private volatile boolean contextStopped;

private BackOff connectionCreatingBackOff;
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
/**
* Create a new AbstractConnectionFactory for the given target ConnectionFactory, with no publisher connection
* factory.
Expand Down Expand Up @@ -556,6 +558,16 @@ public boolean hasPublisherConnectionFactory() {
return this.publisherConnectionFactory != null;
}

/**
* Support backoff policy when get an empty channel from connection.
* @param backOff {@link BackOff}
* @since 3.1.3
* @see BackOff#start()
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
*/
public void setConnectionCreatingBackOff(BackOff backOff) {
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
this.connectionCreatingBackOff = backOff;
}

@Override
public ConnectionFactory getPublisherConnectionFactory() {
return this.publisherConnectionFactory;
Expand All @@ -566,8 +578,8 @@ protected final Connection createBareConnection() {
String connectionName = this.connectionNameStrategy.obtainNewConnectionName(this);

com.rabbitmq.client.Connection rabbitConnection = connect(connectionName);

Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout);
Connection connection = new SimpleConnection(rabbitConnection, this.closeTimeout,
this.connectionCreatingBackOff == null ? null : this.connectionCreatingBackOff.start());
if (rabbitConnection instanceof AutorecoveringConnection auto) {
auto.addRecoveryListener(new RecoveryListener() {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2022 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,9 +19,13 @@
import java.io.IOException;
import java.net.InetAddress;

import javax.annotation.Nullable;

import org.springframework.amqp.AmqpResourceNotAvailableException;
import org.springframework.amqp.AmqpTimeoutException;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.util.ObjectUtils;
import org.springframework.util.backoff.BackOffExecution;

import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.BlockedListener;
Expand All @@ -46,16 +50,38 @@ public class SimpleConnection implements Connection, NetworkConnection {

private volatile boolean explicitlyClosed;

public SimpleConnection(com.rabbitmq.client.Connection delegate,
int closeTimeout) {
private final BackOffExecution backOffExecution;
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved

public SimpleConnection(com.rabbitmq.client.Connection delegate, int closeTimeout) {
this(delegate, closeTimeout, null);
}

/**
* Construct an instance with the {@link org.springframework.util.backoff.BackOffExecution} arguments.
* @param delegate delegate connection
* @param closeTimeout the time of physical close time out
* @param backOffExecution backOffExecution is nullable
* @since 3.1.3
*/
public SimpleConnection(com.rabbitmq.client.Connection delegate, int closeTimeout,
@Nullable BackOffExecution backOffExecution) {
this.delegate = delegate;
this.closeTimeout = closeTimeout;
this.backOffExecution = backOffExecution;
}

@Override
public Channel createChannel(boolean transactional) {
try {
Channel channel = this.delegate.createChannel();
while (channel == null && this.backOffExecution != null) {
long interval = this.backOffExecution.nextBackOff();
if (interval == BackOffExecution.STOP) {
break;
}
Thread.sleep(interval);
channel = this.delegate.createChannel();
}
if (channel == null) {
throw new AmqpResourceNotAvailableException("The channelMax limit is reached. Try later.");
}
Expand All @@ -65,6 +91,10 @@ public Channel createChannel(boolean transactional) {
}
return channel;
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AmqpTimeoutException("Interrupted while creating a new channel", e);
}
catch (IOException e) {
throw RabbitExceptionTranslator.convertRabbitAccessException(e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2023 the original author or authors.
* Copyright 2010-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,6 +17,7 @@
package org.springframework.amqp.rabbit.connection;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
Expand All @@ -39,10 +40,13 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import org.springframework.amqp.AmqpResourceNotAvailableException;
import org.springframework.amqp.rabbit.connection.AbstractConnectionFactory.AddressShuffleMode;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.StopWatch;
import org.springframework.util.backoff.FixedBackOff;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
Expand Down Expand Up @@ -212,4 +216,21 @@ public void testCreatesConnectionWithGivenFactory() {
assertThat(mockConnectionFactory.getThreadFactory()).isEqualTo(connectionThreadFactory);
}

@Test
public void testConnectionCreatingBackOff() throws Exception {
int maxAttempts = 2;
long interval = 500L;
com.rabbitmq.client.Connection mockConnection = mock(com.rabbitmq.client.Connection.class);
given(mockConnection.createChannel()).willReturn(null);
SimpleConnection simpleConnection = new SimpleConnection(mockConnection, 5,
new FixedBackOff(interval, maxAttempts).start());
StopWatch stopWatch = new StopWatch();
stopWatch.start();
assertThatExceptionOfType(AmqpResourceNotAvailableException.class).isThrownBy(() -> {
simpleConnection.createChannel(false);
});
stopWatch.stop();
assertThat(stopWatch.getTotalTimeMillis()).isGreaterThanOrEqualTo(maxAttempts * interval);
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved
}

}
2 changes: 2 additions & 0 deletions src/reference/antora/modules/ROOT/pages/amqp/connections.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ Simple publisher confirmations are supported by all three factories.
When configuring a `RabbitTemplate` to use a xref:amqp/template.adoc#separate-connection[separate connection], you can now, starting with version 2.3.2, configure the publishing connection factory to be a different type.
By default, the publishing factory is the same type and any properties set on the main factory are also propagated to the publishing factory.

Starting with version 3.1, the connectionCreatingBackOff property is supported to define backoff policy when creating connection channel.
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved

[[pooledchannelconnectionfactory]]
=== `PooledChannelConnectionFactory`

Expand Down
6 changes: 6 additions & 0 deletions src/reference/antora/modules/ROOT/pages/whats-new.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ It remains possible to configure your own logging behavior by setting the `exclu
In addition, the `SimpleMessageListenerContainer` consumer restart after such an exception is now logged at DEBUG level by default (previously INFO).
A new method `logRestart()` has been added to the `ConditionalExceptionLogger` to allow this to be changed.
See xref:amqp/receiving-messages/consumer-events.adoc[Consumer Events] and <<channel-close-logging>> for more information.

=== Connections Enhancement

Connection Factory supported backoff policy when creating connection channel.See xref:amqp/connections.adoc[Choosing a Connection Factory] and https://github.com/spring-projects/spring-amqp/pull/2556[2556] for more information.
javaecrainbow marked this conversation as resolved.
Show resolved Hide resolved


Loading