diff --git a/src/main/java/org/springframework/data/redis/cache/RedisCache.java b/src/main/java/org/springframework/data/redis/cache/RedisCache.java index 0eae1a3c7b..bc17d7b7be 100644 --- a/src/main/java/org/springframework/data/redis/cache/RedisCache.java +++ b/src/main/java/org/springframework/data/redis/cache/RedisCache.java @@ -69,15 +69,15 @@ public class RedisCache extends AbstractValueAdaptingCache { private final String name; /** - * Create a new {@link RedisCache}. + * Create a new {@link RedisCache} with the given {@link String name}. * * @param name {@link String name} for this {@link Cache}; must not be {@literal null}. - * @param cacheWriter {@link RedisCacheWriter} used to perform {@link RedisCache} operations by executing the - * necessary Redis commands; must not be {@literal null}. - * @param cacheConfiguration {@link RedisCacheConfiguration} applied to this {@link RedisCache} on creation; must not - * be {@literal null}. + * @param cacheWriter {@link RedisCacheWriter} used to perform {@link RedisCache} operations by + * executing the necessary Redis commands; must not be {@literal null}. + * @param cacheConfiguration {@link RedisCacheConfiguration} applied to this {@link RedisCache} on creation; + * must not be {@literal null}. * @throws IllegalArgumentException if either the given {@link RedisCacheWriter} or {@link RedisCacheConfiguration} - * are {@literal null} or the given {@link String} name for this {@link RedisCache} is {@literal null}. + * are {@literal null} or the given {@link String} name for this {@link RedisCache} is {@literal null}. */ protected RedisCache(String name, RedisCacheWriter cacheWriter, RedisCacheConfiguration cacheConfiguration) { @@ -160,15 +160,14 @@ public T get(Object key, Callable valueLoader) { private T getSynchronized(Object key, Callable valueLoader) { lock.lock(); + try { ValueWrapper result = get(key); return result != null ? (T) result.get() : loadCacheValue(key, valueLoader); } finally { lock.unlock(); - } - } /** @@ -422,6 +421,7 @@ private String convertCollectionLikeOrMapKey(Object key, TypeDescriptor source) target.append("}"); return target.toString(); + } else if (source.isCollection() || source.isArray()) { StringJoiner stringJoiner = new StringJoiner(","); diff --git a/src/main/java/org/springframework/data/redis/connection/RedisConnection.java b/src/main/java/org/springframework/data/redis/connection/RedisConnection.java index 87714a6248..b6f8f32ba8 100644 --- a/src/main/java/org/springframework/data/redis/connection/RedisConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/RedisConnection.java @@ -29,7 +29,7 @@ * The methods follow as much as possible the Redis names and conventions. *

* {@link RedisConnection Redis connections}, unlike perhaps their underlying native connection are not Thread-safe and - * should not be shared across multiple threads. + * should not be shared across multiple threads, concurrently or simultaneously. * * @author Costin Leau * @author Christoph Strobl diff --git a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java index 608034017d..d02bea66a5 100644 --- a/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/jedis/JedisClusterConnection.java @@ -805,12 +805,14 @@ public void returnResourceForSpecificNode(RedisClusterNode node, Object client) */ public static class JedisClusterTopologyProvider implements ClusterTopologyProvider { - private final Object lock = new Object(); - private final JedisCluster cluster; - private final long cacheTimeMs; private long time = 0; + + private final long cacheTimeMs; + private @Nullable ClusterTopology cached; + private final JedisCluster cluster; + /** * Create new {@link JedisClusterTopologyProvider}. Uses a default cache timeout of 100 milliseconds. * @@ -847,6 +849,7 @@ public ClusterTopology getTopology() { Map errors = new LinkedHashMap<>(); List> list = new ArrayList<>(cluster.getClusterNodes().entrySet()); + Collections.shuffle(list); for (Entry entry : list) { @@ -854,25 +857,26 @@ public ClusterTopology getTopology() { try (Connection connection = entry.getValue().getResource()) { time = System.currentTimeMillis(); + Set nodes = Converters.toSetOfRedisClusterNodes(new Jedis(connection).clusterNodes()); - synchronized (lock) { - cached = new ClusterTopology(nodes); - } + cached = new ClusterTopology(nodes); + return cached; - } catch (Exception ex) { - errors.put(entry.getKey(), ex); + + } catch (Exception cause) { + errors.put(entry.getKey(), cause); } } - StringBuilder sb = new StringBuilder(); + StringBuilder stringBuilder = new StringBuilder(); for (Entry entry : errors.entrySet()) { - sb.append(String.format("\r\n\t- %s failed: %s", entry.getKey(), entry.getValue().getMessage())); + stringBuilder.append(String.format("\r\n\t- %s failed: %s", entry.getKey(), entry.getValue().getMessage())); } throw new ClusterStateFailureException( - "Could not retrieve cluster information; CLUSTER NODES returned with error" + sb.toString()); + "Could not retrieve cluster information; CLUSTER NODES returned with error" + stringBuilder); } /** diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java b/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java index 37c9d9669b..c30d0a9351 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/ClusterConnectionProvider.java @@ -33,22 +33,26 @@ import org.springframework.util.Assert; /** - * Connection provider for Cluster connections. + * {@link LettuceConnectionProvider} and {@link RedisClientProvider} for Redis Cluster connections. * * @author Mark Paluch * @author Christoph Strobl * @author Bruce Cloud + * @author John Blum * @since 2.0 */ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClientProvider { - private final RedisClusterClient client; - private final RedisCodec codec; - private final Optional readFrom; + private volatile boolean initialized; private final Lock lock = new ReentrantLock(); - private volatile boolean initialized; + @Nullable + private final ReadFrom readFrom; + + private final RedisClusterClient client; + + private final RedisCodec codec; /** * Create new {@link ClusterConnectionProvider}. @@ -75,7 +79,11 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien this.client = client; this.codec = codec; - this.readFrom = Optional.ofNullable(readFrom); + this.readFrom = readFrom; + } + + private Optional getReadFrom() { + return Optional.ofNullable(this.readFrom); } @Override @@ -83,10 +91,10 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien if (!initialized) { - // partitions have to be initialized before asynchronous usage. - // Needs to happen only once. Initialize eagerly if - // blocking is not an options. + // Partitions have to be initialized before asynchronous usage. + // Needs to happen only once. Initialize eagerly if blocking is not an options. lock.lock(); + try { if (!initialized) { client.getPartitions(); @@ -100,27 +108,25 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien if (connectionType.equals(StatefulRedisPubSubConnection.class) || connectionType.equals(StatefulRedisClusterPubSubConnection.class)) { - return client.connectPubSubAsync(codec) // - .thenApply(connectionType::cast); + return client.connectPubSubAsync(codec).thenApply(connectionType::cast); } if (StatefulRedisClusterConnection.class.isAssignableFrom(connectionType) || connectionType.equals(StatefulConnection.class)) { - return client.connectAsync(codec) // - .thenApply(connection -> { - - readFrom.ifPresent(connection::setReadFrom); + return client.connectAsync(codec).thenApply(connection -> { + getReadFrom().ifPresent(connection::setReadFrom); return connectionType.cast(connection); }); } - return LettuceFutureUtils - .failed(new InvalidDataAccessApiUsageException("Connection type " + connectionType + " not supported")); + String message = String.format("Connection type %s not supported", connectionType); + + return LettuceFutureUtils.failed(new InvalidDataAccessApiUsageException(message)); } @Override public RedisClusterClient getRedisClient() { - return client; + return this.client; } } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java index 422b45c18c..cfca71fba9 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceClusterConnection.java @@ -554,7 +554,9 @@ protected interface LettuceMultiKeyClusterCommandCallback static class LettuceClusterNodeResourceProvider implements ClusterNodeResourceProvider, DisposableBean { private final Lock lock = new ReentrantLock(); + private final LettuceConnectionProvider connectionProvider; + private volatile @Nullable StatefulRedisClusterConnection connection; LettuceClusterNodeResourceProvider(LettuceConnectionProvider connectionProvider) { @@ -567,14 +569,16 @@ public RedisClusterCommands getResourceForSpecificNode(RedisClus Assert.notNull(node, "Node must not be null"); - if (connection == null) { - lock.lock(); + if (this.connection == null) { + + this.lock.lock(); + try { - if (connection == null) { - this.connection = connectionProvider.getConnection(StatefulRedisClusterConnection.class); + if (this.connection == null) { + this.connection = this.connectionProvider.getConnection(StatefulRedisClusterConnection.class); } } finally { - lock.unlock(); + this.lock.unlock(); } } @@ -586,6 +590,7 @@ public void returnResourceForSpecificNode(RedisClusterNode node, Object resource @Override public void destroy() throws Exception { + if (this.connection != null) { this.connectionProvider.release(this.connection); } diff --git a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java index 46c92756f6..17e7b6c145 100644 --- a/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java +++ b/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java @@ -15,7 +15,8 @@ */ package org.springframework.data.redis.connection.lettuce; -import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*; +import static org.springframework.data.redis.connection.lettuce.LettuceConnection.CODEC; +import static org.springframework.data.redis.connection.lettuce.LettuceConnection.PipeliningFlushPolicy; import io.lettuce.core.AbstractRedisClient; import io.lettuce.core.ClientOptions; @@ -44,6 +45,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.commons.logging.Log; @@ -57,10 +59,24 @@ import org.springframework.data.redis.ExceptionTranslationStrategy; import org.springframework.data.redis.PassThroughExceptionTranslationStrategy; import org.springframework.data.redis.RedisConnectionFailureException; -import org.springframework.data.redis.connection.*; +import org.springframework.data.redis.connection.ClusterCommandExecutor; +import org.springframework.data.redis.connection.ClusterTopologyProvider; +import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory; +import org.springframework.data.redis.connection.RedisClusterConfiguration; +import org.springframework.data.redis.connection.RedisClusterConnection; +import org.springframework.data.redis.connection.RedisConfiguration; import org.springframework.data.redis.connection.RedisConfiguration.ClusterConfiguration; import org.springframework.data.redis.connection.RedisConfiguration.WithDatabaseIndex; import org.springframework.data.redis.connection.RedisConfiguration.WithPassword; +import org.springframework.data.redis.connection.RedisConnection; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.RedisPassword; +import org.springframework.data.redis.connection.RedisSentinelConfiguration; +import org.springframework.data.redis.connection.RedisSentinelConnection; +import org.springframework.data.redis.connection.RedisSocketConfiguration; +import org.springframework.data.redis.connection.RedisStandaloneConfiguration; +import org.springframework.data.redis.connection.RedisStaticMasterReplicaConfiguration; +import org.springframework.data.redis.util.RedisAssertions; import org.springframework.data.util.Optionals; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -69,7 +85,7 @@ import org.springframework.util.StringUtils; /** - * Connection factory creating Lettuce-based connections. + * {@link RedisConnectionFactory Connection factory} creating Lettuce-based connections. *

* This factory creates a new {@link LettuceConnection} on each call to {@link #getConnection()}. While multiple * {@link LettuceConnection}s share a single thread-safe native connection by default, {@link LettuceConnection} and its @@ -109,6 +125,7 @@ * @author Luis De Bello * @author Andrea Como * @author Chris Bono + * @author John Blum */ public class LettuceConnectionFactory implements RedisConnectionFactory, ReactiveRedisConnectionFactory, InitializingBean, DisposableBean, SmartLifecycle { @@ -116,10 +133,10 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy( LettuceExceptionConverter.INSTANCE); - private boolean validateConnection = false; - private boolean shareNativeConnection = true; - private boolean eagerInitialization = false; private boolean convertPipelineAndTxResults = true; + private boolean eagerInitialization = false; + private boolean shareNativeConnection = true; + private boolean validateConnection = false; private int phase = 0; // in between min and max values @@ -138,9 +155,6 @@ public class LettuceConnectionFactory implements RedisConnectionFactory, Reactiv private final Log log = LogFactory.getLog(getClass()); - /** Synchronization monitor for the shared Connection */ - private final Object connectionMonitor = new Object(); - private final Lock lock = new ReentrantLock(); private PipeliningFlushPolicy pipeliningFlushPolicy = PipeliningFlushPolicy.flushEachCommand(); @@ -349,9 +363,7 @@ public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) { ClusterCommandExecutor getRequiredClusterCommandExecutor() { - if (this.clusterCommandExecutor == null) { - throw new IllegalStateException("ClusterCommandExecutor not initialized"); - } + Assert.state(this.clusterCommandExecutor != null, "ClusterCommandExecutor not initialized"); return this.clusterCommandExecutor; } @@ -659,11 +671,8 @@ public AbstractRedisClient getNativeClient() { */ public AbstractRedisClient getRequiredNativeClient() { - AbstractRedisClient client = getNativeClient(); - - Assert.state(client != null, "Client not yet initialized; Did you forget to call initialize the bean"); - - return client; + return RedisAssertions.requireState(getNativeClient(), + "Client not yet initialized; Did you forget to call initialize the bean"); } @Nullable @@ -823,7 +832,7 @@ public void setConvertPipelineAndTxResults(boolean convertPipelineAndTxResults) * @since 2.1 */ private boolean isStaticMasterReplicaAware() { - return RedisConfiguration.isStaticMasterReplicaConfiguration(configuration); + return RedisConfiguration.isStaticMasterReplicaConfiguration(this.configuration); } /** @@ -831,7 +840,7 @@ private boolean isStaticMasterReplicaAware() { * @since 1.5 */ public boolean isRedisSentinelAware() { - return RedisConfiguration.isSentinelConfiguration(configuration); + return RedisConfiguration.isSentinelConfiguration(this.configuration); } /** @@ -839,7 +848,7 @@ public boolean isRedisSentinelAware() { * @since 2.1 */ private boolean isDomainSocketAware() { - return RedisConfiguration.isDomainSocketConfiguration(configuration); + return RedisConfiguration.isDomainSocketConfiguration(this.configuration); } /** @@ -847,7 +856,7 @@ private boolean isDomainSocketAware() { * @since 1.7 */ public boolean isClusterAware() { - return RedisConfiguration.isClusterConfiguration(configuration); + return RedisConfiguration.isClusterConfiguration(this.configuration); } @Override @@ -858,12 +867,15 @@ public void start() { if (isCreatedOrStopped(current)) { AbstractRedisClient client = createClient(); + this.client = client; + LettuceConnectionProvider connectionProvider = new ExceptionTranslatingConnectionProvider( - createConnectionProvider(this.client, CODEC)); + createConnectionProvider(client, CODEC)); + this.connectionProvider = connectionProvider; this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider( - createConnectionProvider(this.client, LettuceReactiveRedisConnection.CODEC)); + createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC)); if (isClusterAware()) { this.clusterCommandExecutor = createClusterCommandExecutor((RedisClusterClient) client, connectionProvider); @@ -903,7 +915,6 @@ public void stop() { reactiveConnectionProvider = null; if (client != null) { - try { Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod(); Duration timeout = clientConfiguration.getShutdownTimeout(); @@ -923,7 +934,7 @@ public void stop() { @Override public int getPhase() { - return phase; + return this.phase; } /** @@ -943,6 +954,7 @@ public boolean isRunning() { @Override public void afterPropertiesSet() { + if (isAutoStartup()) { start(); } @@ -990,8 +1002,8 @@ public RedisConnection getConnection() { return getClusterConnection(); } - LettuceConnection connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(), - getDatabase()); + LettuceConnection connection = + doCreateLettuceConnection(getSharedConnection(), this.connectionProvider, getTimeout(), getDatabase()); connection.setConvertPipelineAndTxResults(this.convertPipelineAndTxResults); @@ -1007,7 +1019,7 @@ public RedisClusterConnection getClusterConnection() { throw new InvalidDataAccessApiUsageException("Cluster is not configured"); } - RedisClusterClient clusterClient = (RedisClusterClient) client; + RedisClusterClient clusterClient = (RedisClusterClient) this.client; StatefulRedisClusterConnection sharedConnection = getSharedClusterConnection(); @@ -1017,6 +1029,14 @@ public RedisClusterConnection getClusterConnection() { getRequiredClusterCommandExecutor(), this.clientConfiguration.getCommandTimeout()); } + @Override + public RedisSentinelConnection getSentinelConnection() { + + assertStarted(); + + return new LettuceSentinelConnection(this.connectionProvider); + } + /** * Customization hook for {@link LettuceConnection} creation. * @@ -1118,14 +1138,14 @@ public void initConnection() { */ public void resetConnection() { - Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection)) - .forEach(SharedConnection::resetConnection); + doInLock(() -> { - synchronized (this.connectionMonitor) { + Optionals.toStream(Optional.ofNullable(this.connection), Optional.ofNullable(this.reactiveConnection)) + .forEach(SharedConnection::resetConnection); this.connection = null; this.reactiveConnection = null; - } + }); } /** @@ -1141,26 +1161,26 @@ public void validateConnection() { private SharedConnection getOrCreateSharedConnection() { - synchronized (this.connectionMonitor) { + return doInLock(() -> { if (this.connection == null) { - this.connection = new SharedConnection<>(connectionProvider); + this.connection = new SharedConnection<>(this.connectionProvider); } return this.connection; - } + }); } private SharedConnection getOrCreateSharedReactiveConnection() { - synchronized (this.connectionMonitor) { + return doInLock(() -> { if (this.reactiveConnection == null) { - this.reactiveConnection = new SharedConnection<>(reactiveConnectionProvider); + this.reactiveConnection = new SharedConnection<>(this.reactiveConnectionProvider); } return this.reactiveConnection; - } + }); } @Override @@ -1231,7 +1251,7 @@ protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClie return isStaticMasterReplicaAware() ? createStaticMasterReplicaConnectionProvider((RedisClient) client, codec) : isClusterAware() ? createClusterConnectionProvider((RedisClusterClient) client, codec) - : createStandaloneConnectionProvider((RedisClient) client, codec); + : createStandaloneConnectionProvider((RedisClient) client, codec); } @SuppressWarnings("all") @@ -1258,7 +1278,8 @@ protected AbstractRedisClient createClient() { return isStaticMasterReplicaAware() ? createStaticMasterReplicaClient() : isRedisSentinelAware() ? createSentinelClient() - : isClusterAware() ? createClusterClient() : createBasicClient(); + : isClusterAware() ? createClusterClient() + : createBasicClient(); } private RedisClient createStaticMasterReplicaClient() { @@ -1324,7 +1345,8 @@ private RedisClusterClient createClusterClient() { ClusterConfiguration clusterConfiguration = (ClusterConfiguration) this.configuration; clusterConfiguration.getClusterNodes().stream() - .map(node -> createRedisURIAndApplySettings(node.getHost(), node.getPort())).forEach(initialUris::add); + .map(node -> createRedisURIAndApplySettings(node.getHost(), node.getPort())) + .forEach(initialUris::add); RedisClusterClient clusterClient = this.clientConfiguration.getClientResources() .map(clientResources -> RedisClusterClient.create(clientResources, initialUris)) @@ -1368,7 +1390,7 @@ private RedisClient createBasicClient() { private void assertStarted() { - State current = state.get(); + State current = this.state.get(); if (State.STARTED.equals(current)) { return; @@ -1408,16 +1430,13 @@ private RedisURI createRedisURIAndApplySettings(String host, int port) { private RedisURI createRedisSocketURIAndApplySettings(String socketPath) { - RedisURI.Builder builder = RedisURI.Builder.socket(socketPath); - - applyAuthentication(builder); - builder.withDatabase(getDatabase()); - builder.withTimeout(clientConfiguration.getCommandTimeout()); - - return builder.build(); + return applyAuthentication(RedisURI.Builder.socket(socketPath)) + .withTimeout(this.clientConfiguration.getCommandTimeout()) + .withDatabase(getDatabase()) + .build(); } - private void applyAuthentication(RedisURI.Builder builder) { + private RedisURI.Builder applyAuthentication(RedisURI.Builder builder) { String username = getRedisUsername(); @@ -1428,16 +1447,10 @@ private void applyAuthentication(RedisURI.Builder builder) { getRedisPassword().toOptional().ifPresent(builder::withPassword); } - clientConfiguration.getRedisCredentialsProviderFactory() + this.clientConfiguration.getRedisCredentialsProviderFactory() .ifPresent(factory -> builder.withAuthentication(factory.createCredentialsProvider(this.configuration))); - } - @Override - public RedisSentinelConnection getSentinelConnection() { - - assertStarted(); - - return new LettuceSentinelConnection(connectionProvider); + return builder; } private MutableLettuceClientConfiguration getMutableConfiguration() { @@ -1453,22 +1466,36 @@ private long getClientTimeout() { return clientConfiguration.getCommandTimeout().toMillis(); } + private void doInLock(Runnable runnable) { + doInLock(() -> { runnable.run(); return null; }); + } + + private T doInLock(Supplier supplier) { + + this.lock.lock(); + + try { + return supplier.get(); + } + finally { + this.lock.unlock(); + } + } + /** - * Wrapper for shared connections. Keeps track of the connection lifecycleThe wrapper is thread-safe as it + * Wrapper for shared connections. Keeps track of the connection lifecycle. The wrapper is Thread-safe as it * synchronizes concurrent calls by blocking. * * @param connection encoding. * @author Mark Paluch * @author Christoph Strobl + * @author Jonn Blum * @since 2.1 */ class SharedConnection { private final LettuceConnectionProvider connectionProvider; - /** Synchronization monitor for the shared Connection */ - private final Object connectionMonitor = new Object(); - private @Nullable StatefulConnection connection; SharedConnection(LettuceConnectionProvider connectionProvider) { @@ -1476,16 +1503,16 @@ class SharedConnection { } /** - * Returns a valid Lettuce connection. Initializes and validates the connection if - * {@link #setValidateConnection(boolean) enabled}. + * Returns a valid Lettuce {@link StatefulConnection connection}. + *

+ * Initializes and validates the connection if {@link #setValidateConnection(boolean) enabled}. * - * @return the connection. + * @return the new Lettuce {@link StatefulConnection connection}. */ @Nullable StatefulConnection getConnection() { - lock.lock(); - try { + return doInLock(() -> { if (this.connection == null) { this.connection = getNativeConnection(); @@ -1496,74 +1523,88 @@ StatefulConnection getConnection() { } return this.connection; - } finally { - lock.unlock(); - } + }); } /** - * Obtain a connection from the associated {@link LettuceConnectionProvider}. + * Acquire a {@link StatefulConnection native connection} from the associated {@link LettuceConnectionProvider}. * - * @return the connection. + * @return a new {@link StatefulConnection native connection}. + * @see org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider#getConnection(Class) + * @see io.lettuce.core.api.StatefulConnection */ + @SuppressWarnings("unchecked") private StatefulConnection getNativeConnection() { - return connectionProvider.getConnection(StatefulConnection.class); + return this.connectionProvider.getConnection(StatefulConnection.class); } /** - * Validate the connection. Invalid connections will be closed and the connection state will be reset. + * Null-safe operation to evaluate whether the given {@link StatefulConnection connetion} + * is {@link StatefulConnection#isOpen() open}. + * + * @param connection {@link StatefulConnection} to evaluate. + * @return a boolean value indicating whether the given {@link StatefulConnection} is not {@literal null} + * and is {@link StatefulConnection#isOpen() open}. + * @see io.lettuce.core.api.StatefulConnection#isOpen() + */ + private boolean isOpen(@Nullable StatefulConnection connection) { + return connection != null && connection.isOpen(); + } + + /** + * Validate the {@link StatefulConnection connection}. + *

+ * {@link StatefulConnection Connections} are considered valid if they can send/receive ping packets. + * Invalid {@link StatefulConnection connections} will be closed and the connection state will be reset. */ void validateConnection() { - lock.lock(); - try { + doInLock(() -> { + StatefulConnection connection = this.connection; boolean valid = false; - if (connection != null && connection.isOpen()) { + if (isOpen(connection)) { try { - if (connection instanceof StatefulRedisConnection) { - ((StatefulRedisConnection) connection).sync().ping(); + if (connection instanceof StatefulRedisConnection statefulConnection) { + statefulConnection.sync().ping(); } - if (connection instanceof StatefulRedisClusterConnection) { - ((StatefulRedisClusterConnection) connection).sync().ping(); + if (connection instanceof StatefulRedisClusterConnection statefulClusterConnection) { + statefulClusterConnection.sync().ping(); } valid = true; + } catch (Exception cause) { log.debug("Validation failed", cause); } } if (!valid) { - log.info("Validation of shared connection failed; Creating a new connection."); resetConnection(); this.connection = getNativeConnection(); } - } finally { - lock.unlock(); - } + }); } /** - * Reset the underlying shared Connection, to be reinitialized on next access. + * Reset the underlying shared {@link StatefulConnection connection}, to be reinitialized on next access. */ void resetConnection() { - lock.lock(); - try { + doInLock(() -> { + + StatefulConnection connection = this.connection; - if (this.connection != null) { - this.connectionProvider.release(this.connection); + if (connection != null) { + this.connectionProvider.release(connection); } this.connection = null; - } finally { - lock.unlock(); - } + }); } } diff --git a/src/main/java/org/springframework/data/redis/core/script/DefaultRedisScript.java b/src/main/java/org/springframework/data/redis/core/script/DefaultRedisScript.java index c4f36bc510..cd69431f13 100644 --- a/src/main/java/org/springframework/data/redis/core/script/DefaultRedisScript.java +++ b/src/main/java/org/springframework/data/redis/core/script/DefaultRedisScript.java @@ -39,11 +39,13 @@ */ public class DefaultRedisScript implements RedisScript, InitializingBean { + private @Nullable Class resultType; + private final Lock lock = new ReentrantLock(); private @Nullable ScriptSource scriptSource; + private @Nullable String sha1; - private @Nullable Class resultType; /** * Creates a new {@link DefaultRedisScript} @@ -81,6 +83,7 @@ public void afterPropertiesSet() { public String getSha1() { lock.lock(); + try { if (sha1 == null || scriptSource.isModified()) { this.sha1 = DigestUtils.sha1DigestAsHex(getScriptAsString()); diff --git a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java index 7d33613913..11df8deefd 100644 --- a/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java +++ b/src/main/java/org/springframework/data/redis/listener/RedisMessageListenerContainer.java @@ -125,34 +125,29 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab /** Logger available to subclasses */ protected final Log logger = LogFactory.getLog(getClass()); - private @Nullable ErrorHandler errorHandler; - - private @Nullable Executor subscriptionExecutor; - - private @Nullable Executor taskExecutor; - - private @Nullable RedisConnectionFactory connectionFactory; - - private RedisSerializer serializer = RedisSerializer.string(); - - private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME; - - private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS); - private @Nullable String beanName; - // whether the container has been initialized via afterPropertiesSet private boolean afterPropertiesSet = false; // whether the TaskExecutor was created by the container private boolean manageExecutor = false; - private @Nullable Subscriber subscriber; + private long maxSubscriptionRegistrationWaitingTime = DEFAULT_SUBSCRIPTION_REGISTRATION_WAIT_TIME; private final AtomicBoolean started = new AtomicBoolean(); // whether the container is running (or not) private final AtomicReference state = new AtomicReference<>(State.notListening()); + private BackOff backOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS); + + private volatile CompletableFuture listenFuture = new CompletableFuture<>(); + private volatile CompletableFuture unsubscribeFuture = new CompletableFuture<>(); + + private @Nullable ErrorHandler errorHandler; + + private @Nullable Executor subscriptionExecutor; + private @Nullable Executor taskExecutor; + // Lookup maps; to avoid creation of hashes for each message, the maps use raw byte arrays (wrapped to respect // the equals/hashcode contract) @@ -163,9 +158,13 @@ public class RedisMessageListenerContainer implements InitializingBean, Disposab // lookup map between listeners and channels private final Map> listenerTopics = new ConcurrentHashMap<>(); - private volatile CompletableFuture listenFuture = new CompletableFuture<>(); + private @Nullable RedisConnectionFactory connectionFactory; - private volatile CompletableFuture unsubscribeFuture = new CompletableFuture<>(); + private RedisSerializer serializer = RedisSerializer.string(); + + private @Nullable String beanName; + + private @Nullable Subscriber subscriber; /** * Set an ErrorHandler to be invoked in case of any uncaught exceptions thrown while processing a Message. By default, @@ -323,8 +322,6 @@ protected TaskExecutor createDefaultTaskExecutor() { /** * Destroy the container and stop it. - * - * @throws Exception */ @Override public void destroy() throws Exception { @@ -397,7 +394,7 @@ private void lazyListen() { private CompletableFuture lazyListen(BackOffExecution backOffExecution) { if (!hasTopics()) { - logger.debug("Postpone listening for Redis messages until actual listeners are added"); + logDebug(() -> "Postpone listening for Redis messages until actual listeners are added"); return CompletableFuture.completedFuture(null); } @@ -430,13 +427,14 @@ private boolean doSubscribe(BackOffExecution backOffExecution) { CompletableFuture listenFuture = getRequiredSubscriber().initialize(backOffExecution, patternMapping.keySet().stream().map(ByteArrayWrapper::getArray).collect(Collectors.toList()), channelMapping.keySet().stream().map(ByteArrayWrapper::getArray).collect(Collectors.toList())); + listenFuture.whenComplete((unused, throwable) -> { if (throwable == null) { - logger.debug("RedisMessageListenerContainer listeners registered successfully"); + logDebug(() -> "RedisMessageListenerContainer listeners registered successfully"); this.state.set(State.listening()); } else { - logger.debug("Failed to start RedisMessageListenerContainer listeners", throwable); + logDebug(() -> "Failed to start RedisMessageListenerContainer listeners", throwable); this.state.set(State.notListening()); } @@ -448,7 +446,7 @@ private boolean doSubscribe(BackOffExecution backOffExecution) { } }); - logger.debug("Subscribing to topics for RedisMessageListenerContainer"); + logDebug(() -> "Subscribing to topics for RedisMessageListenerContainer"); return true; } @@ -484,18 +482,14 @@ public void stop() { public void stop(Runnable callback) { if (this.started.compareAndSet(true, false)) { - stopListening(); - - if (logger.isDebugEnabled()) { - logger.debug("Stopped RedisMessageListenerContainer"); - } - + logDebug(() -> "Stopped RedisMessageListenerContainer"); callback.run(); } } private void stopListening() { + while (!doUnsubscribe()) { // busy-loop, allow for synchronization against doSubscribe therefore we want to retry. } @@ -523,9 +517,7 @@ private boolean doUnsubscribe() { this.listenFuture = new CompletableFuture<>(); this.unsubscribeFuture = new CompletableFuture<>(); - if (logger.isDebugEnabled()) { - logger.debug("Stopped listening"); - } + logDebug(() -> "Stopped listening"); return true; } else { @@ -850,7 +842,7 @@ protected void handleListenerException(Throwable cause) { } else { // Rare case: listener thread failed after container shutdown. // Log at debug level, to avoid spamming the shutdown logger. - logger.debug("Listener exception after container shutdown", cause); + logDebug(() -> "Listener exception after container shutdown", cause); } } @@ -873,22 +865,23 @@ protected void invokeErrorHandler(Throwable cause) { * Handle subscription task exception. Will attempt to restart the subscription if the Exception is a connection * failure (for example, Redis was restarted). * - * @param ex Throwable exception + * @param cause Throwable exception */ protected void handleSubscriptionException(CompletableFuture future, BackOffExecution backOffExecution, - Throwable ex) { + Throwable cause) { getRequiredSubscriber().closeConnection(); - if (ex instanceof RedisConnectionFailureException && isRunning()) { + if (cause instanceof RedisConnectionFailureException && isRunning()) { BackOffExecution loggingBackOffExecution = () -> { long recoveryInterval = backOffExecution.nextBackOff(); if (recoveryInterval != BackOffExecution.STOP) { - logger.error(String.format("Connection failure occurred: %s; Restarting subscription task after %s ms", ex, - recoveryInterval), ex); + String message = String.format("Connection failure occurred: %s; Restarting subscription task after %s ms", + cause, recoveryInterval); + logger.error(message, cause); } return recoveryInterval; @@ -904,16 +897,16 @@ protected void handleSubscriptionException(CompletableFuture future, BackO return; } - logger.error("SubscriptionTask aborted with exception:", ex); - future.completeExceptionally(new IllegalStateException("Subscription attempts exceeded", ex)); + logger.error("SubscriptionTask aborted with exception:", cause); + future.completeExceptionally(new IllegalStateException("Subscription attempts exceeded", cause)); return; } if (isRunning()) { // log only if the container is still running to prevent close errors from logging - logger.error("SubscriptionTask aborted with exception:", ex); + logger.error("SubscriptionTask aborted with exception:", cause); } - future.completeExceptionally(ex); + future.completeExceptionally(cause); } /** @@ -928,6 +921,7 @@ private boolean potentiallyRecover(BackOffExecution backOffExecution, Runnable r } try { + if (subscriptionExecutor instanceof ScheduledExecutorService) { ((ScheduledExecutorService) subscriptionExecutor).schedule(retryRunnable, recoveryInterval, TimeUnit.MILLISECONDS); @@ -937,8 +931,9 @@ private boolean potentiallyRecover(BackOffExecution backOffExecution, Runnable r } return true; - } catch (InterruptedException interEx) { - logger.debug("Thread interrupted while sleeping the recovery interval"); + + } catch (InterruptedException ignore) { + logDebug(() -> "Thread interrupted while sleeping the recovery interval"); Thread.currentThread().interrupt(); return false; } @@ -1014,6 +1009,13 @@ private void logDebug(Supplier message) { } } + private void logDebug(Supplier message, Throwable cause) { + + if (this.logger.isDebugEnabled()) { + this.logger.debug(message.get(), cause); + } + } + private void logTrace(Supplier message) { if (this.logger.isTraceEnabled()) { @@ -1154,11 +1156,14 @@ public void onPatternUnsubscribed(byte[] pattern, long count) { */ class Subscriber { + private final DispatchMessageListener delegateListener = new DispatchMessageListener(); + + private final Lock lock = new ReentrantLock(); + private volatile @Nullable RedisConnection connection; private final RedisConnectionFactory connectionFactory; - private final Lock lock = new ReentrantLock(); - private final DispatchMessageListener delegateListener = new DispatchMessageListener(); + private final SynchronizingMessageListener synchronizingMessageListener = new SynchronizingMessageListener( delegateListener, delegateListener); @@ -1178,18 +1183,21 @@ class Subscriber { public CompletableFuture initialize(BackOffExecution backOffExecution, Collection patterns, Collection channels) { - lock.lock(); - try { + return doInLock(() -> { CompletableFuture initFuture = new CompletableFuture<>(); + try { - RedisConnection connection = connectionFactory.getConnection(); + + RedisConnection connection = this.connectionFactory.getConnection(); + this.connection = connection; if (connection.isSubscribed()) { initFuture.completeExceptionally( - new IllegalStateException("Retrieved connection is already subscribed; aborting listening")); + new IllegalStateException("Retrieved connection is already subscribed; aborting listening")); + return initFuture; } @@ -1198,14 +1206,12 @@ public CompletableFuture initialize(BackOffExecution backOffExecution, Col } catch (Throwable t) { handleSubscriptionException(initFuture, backOffExecution, t); } - } catch (RuntimeException e) { - initFuture.completeExceptionally(e); + } catch (RuntimeException cause) { + initFuture.completeExceptionally(cause); } return initFuture; - } finally { - lock.unlock(); - } + }); } /** @@ -1248,21 +1254,18 @@ void addSynchronization(SynchronizingMessageListener.SubscriptionSynchronization public void unsubscribeAll() { - lock.lock(); - try { + doInLock(() -> { RedisConnection connection = this.connection; - if (connection == null) { - return; - } - doUnsubscribe(connection); - } finally { - lock.unlock(); - } + if (connection != null) { + doUnsubscribe(connection); + } + }); } void doUnsubscribe(RedisConnection connection) { + closeSubscription(connection); closeConnection(); @@ -1274,18 +1277,14 @@ void doUnsubscribe(RedisConnection connection) { */ public void cancel() { - lock.lock(); - try { + doInLock(() -> { RedisConnection connection = this.connection; - if (connection == null) { - return; - } - doCancel(connection); - } finally { - lock.unlock(); - } + if (connection != null) { + doCancel(connection); + } + }); } void doCancel(RedisConnection connection) { @@ -1295,22 +1294,18 @@ void doCancel(RedisConnection connection) { void closeSubscription(RedisConnection connection) { - if (logger.isTraceEnabled()) { - logger.trace("Cancelling Redis subscription..."); - } + logTrace(() -> "Cancelling Redis subscription..."); - Subscription sub = connection.getSubscription(); + Subscription subscription = connection.getSubscription(); - if (sub != null) { + if (subscription != null) { - if (logger.isTraceEnabled()) { - logger.trace("Unsubscribing from all channels"); - } + logTrace(() -> "Unsubscribing from all channels"); try { - sub.close(); - } catch (Exception e) { - logger.warn("Unable to unsubscribe from subscriptions", e); + subscription.close(); + } catch (Exception cause) { + logger.warn("Unable to unsubscribe from subscriptions", cause); } } } @@ -1320,23 +1315,21 @@ void closeSubscription(RedisConnection connection) { */ public void closeConnection() { - lock.lock(); - try { + doInLock(() -> { RedisConnection connection = this.connection; + this.connection = null; if (connection != null) { - logger.trace("Closing connection"); + logTrace(() -> "Closing connection"); try { connection.close(); - } catch (Exception e) { - logger.warn("Error closing subscription connection", e); + } catch (Exception cause) { + logger.warn("Error closing subscription connection", cause); } } - } finally { - lock.unlock(); - } + }); } /** @@ -1381,20 +1374,31 @@ private void doWithSubscription(byte[][] data, BiConsumer { RedisConnection connection = this.connection; if (connection != null) { - Subscription sub = connection.getSubscription(); - if (sub != null) { - function.accept(sub, data); + Subscription subscription = connection.getSubscription(); + if (subscription != null) { + function.accept(subscription, data); } } + }); + } + + private void doInLock(Runnable runner) { + doInLock(() -> { runner.run(); return null; }); + } + + private T doInLock(Supplier supplier) { + + this.lock.lock(); + + try { + return supplier.get(); } finally { - lock.unlock(); + this.lock.unlock(); } } - } /** @@ -1443,7 +1447,7 @@ protected void eventuallyPerformSubscription(RedisConnection connection, BackOff addSynchronization(new SynchronizingMessageListener.SubscriptionSynchronization(patterns, channels, () -> subscriptionDone.complete(null))); - executor.execute(() -> { + this.executor.execute(() -> { try { doSubscribe(connection, patterns, initiallySubscribeToChannels);