Skip to content

Commit

Permalink
Polish for spring-projects#2690.
Browse files Browse the repository at this point in the history
Closes spring-projects#2690
Original pull request: spring-projects#2691
  • Loading branch information
jxblum committed Sep 13, 2023
1 parent ec4370f commit 75eed8f
Show file tree
Hide file tree
Showing 8 changed files with 301 additions and 238 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ public class RedisCache extends AbstractValueAdaptingCache {
private final String name;

/**
* Create a new {@link RedisCache}.
* Create a new {@link RedisCache} with the given {@link String name}.
*
* @param name {@link String name} for this {@link Cache}; must not be {@literal null}.
* @param cacheWriter {@link RedisCacheWriter} used to perform {@link RedisCache} operations by executing the
* necessary Redis commands; must not be {@literal null}.
* @param cacheConfiguration {@link RedisCacheConfiguration} applied to this {@link RedisCache} on creation; must not
* be {@literal null}.
* @param cacheWriter {@link RedisCacheWriter} used to perform {@link RedisCache} operations by
* executing the necessary Redis commands; must not be {@literal null}.
* @param cacheConfiguration {@link RedisCacheConfiguration} applied to this {@link RedisCache} on creation;
* must not be {@literal null}.
* @throws IllegalArgumentException if either the given {@link RedisCacheWriter} or {@link RedisCacheConfiguration}
* are {@literal null} or the given {@link String} name for this {@link RedisCache} is {@literal null}.
* are {@literal null} or the given {@link String} name for this {@link RedisCache} is {@literal null}.
*/
protected RedisCache(String name, RedisCacheWriter cacheWriter, RedisCacheConfiguration cacheConfiguration) {

Expand Down Expand Up @@ -160,15 +160,14 @@ public <T> T get(Object key, Callable<T> valueLoader) {
private <T> T getSynchronized(Object key, Callable<T> valueLoader) {

lock.lock();

try {
ValueWrapper result = get(key);

return result != null ? (T) result.get() : loadCacheValue(key, valueLoader);
} finally {
lock.unlock();

}

}

/**
Expand Down Expand Up @@ -422,6 +421,7 @@ private String convertCollectionLikeOrMapKey(Object key, TypeDescriptor source)
target.append("}");

return target.toString();

} else if (source.isCollection() || source.isArray()) {

StringJoiner stringJoiner = new StringJoiner(",");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* The methods follow as much as possible the Redis names and conventions.
* <p>
* {@link RedisConnection Redis connections}, unlike perhaps their underlying native connection are not Thread-safe and
* should not be shared across multiple threads.
* should not be shared across multiple threads, concurrently or simultaneously.
*
* @author Costin Leau
* @author Christoph Strobl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,12 +805,14 @@ public void returnResourceForSpecificNode(RedisClusterNode node, Object client)
*/
public static class JedisClusterTopologyProvider implements ClusterTopologyProvider {

private final Object lock = new Object();
private final JedisCluster cluster;
private final long cacheTimeMs;
private long time = 0;

private final long cacheTimeMs;

private @Nullable ClusterTopology cached;

private final JedisCluster cluster;

/**
* Create new {@link JedisClusterTopologyProvider}. Uses a default cache timeout of 100 milliseconds.
*
Expand Down Expand Up @@ -847,32 +849,34 @@ public ClusterTopology getTopology() {
Map<String, Exception> errors = new LinkedHashMap<>();

List<Entry<String, ConnectionPool>> list = new ArrayList<>(cluster.getClusterNodes().entrySet());

Collections.shuffle(list);

for (Entry<String, ConnectionPool> entry : list) {

try (Connection connection = entry.getValue().getResource()) {

time = System.currentTimeMillis();

Set<RedisClusterNode> nodes = Converters.toSetOfRedisClusterNodes(new Jedis(connection).clusterNodes());

synchronized (lock) {
cached = new ClusterTopology(nodes);
}
cached = new ClusterTopology(nodes);

return cached;
} catch (Exception ex) {
errors.put(entry.getKey(), ex);

} catch (Exception cause) {
errors.put(entry.getKey(), cause);
}
}

StringBuilder sb = new StringBuilder();
StringBuilder stringBuilder = new StringBuilder();

for (Entry<String, Exception> entry : errors.entrySet()) {
sb.append(String.format("\r\n\t- %s failed: %s", entry.getKey(), entry.getValue().getMessage()));
stringBuilder.append(String.format("\r\n\t- %s failed: %s", entry.getKey(), entry.getValue().getMessage()));
}

throw new ClusterStateFailureException(
"Could not retrieve cluster information; CLUSTER NODES returned with error" + sb.toString());
"Could not retrieve cluster information; CLUSTER NODES returned with error" + stringBuilder);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,26 @@
import org.springframework.util.Assert;

/**
* Connection provider for Cluster connections.
* {@link LettuceConnectionProvider} and {@link RedisClientProvider} for Redis Cluster connections.
*
* @author Mark Paluch
* @author Christoph Strobl
* @author Bruce Cloud
* @author John Blum
* @since 2.0
*/
class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClientProvider {

private final RedisClusterClient client;
private final RedisCodec<?, ?> codec;
private final Optional<ReadFrom> readFrom;
private volatile boolean initialized;

private final Lock lock = new ReentrantLock();

private volatile boolean initialized;
@Nullable
private final ReadFrom readFrom;

private final RedisClusterClient client;

private final RedisCodec<?, ?> codec;

/**
* Create new {@link ClusterConnectionProvider}.
Expand All @@ -75,18 +79,22 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien

this.client = client;
this.codec = codec;
this.readFrom = Optional.ofNullable(readFrom);
this.readFrom = readFrom;
}

private Optional<ReadFrom> getReadFrom() {
return Optional.ofNullable(this.readFrom);
}

@Override
public <T extends StatefulConnection<?, ?>> CompletableFuture<T> getConnectionAsync(Class<T> connectionType) {

if (!initialized) {

// partitions have to be initialized before asynchronous usage.
// Needs to happen only once. Initialize eagerly if
// blocking is not an options.
// Partitions have to be initialized before asynchronous usage.
// Needs to happen only once. Initialize eagerly if blocking is not an options.
lock.lock();

try {
if (!initialized) {
client.getPartitions();
Expand All @@ -100,27 +108,25 @@ class ClusterConnectionProvider implements LettuceConnectionProvider, RedisClien
if (connectionType.equals(StatefulRedisPubSubConnection.class)
|| connectionType.equals(StatefulRedisClusterPubSubConnection.class)) {

return client.connectPubSubAsync(codec) //
.thenApply(connectionType::cast);
return client.connectPubSubAsync(codec).thenApply(connectionType::cast);
}

if (StatefulRedisClusterConnection.class.isAssignableFrom(connectionType)
|| connectionType.equals(StatefulConnection.class)) {

return client.connectAsync(codec) //
.thenApply(connection -> {

readFrom.ifPresent(connection::setReadFrom);
return client.connectAsync(codec).thenApply(connection -> {
getReadFrom().ifPresent(connection::setReadFrom);
return connectionType.cast(connection);
});
}

return LettuceFutureUtils
.failed(new InvalidDataAccessApiUsageException("Connection type " + connectionType + " not supported"));
String message = String.format("Connection type %s not supported", connectionType);

return LettuceFutureUtils.failed(new InvalidDataAccessApiUsageException(message));
}

@Override
public RedisClusterClient getRedisClient() {
return client;
return this.client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,9 @@ protected interface LettuceMultiKeyClusterCommandCallback<T>
static class LettuceClusterNodeResourceProvider implements ClusterNodeResourceProvider, DisposableBean {

private final Lock lock = new ReentrantLock();

private final LettuceConnectionProvider connectionProvider;

private volatile @Nullable StatefulRedisClusterConnection<byte[], byte[]> connection;

LettuceClusterNodeResourceProvider(LettuceConnectionProvider connectionProvider) {
Expand All @@ -567,14 +569,16 @@ public RedisClusterCommands<byte[], byte[]> getResourceForSpecificNode(RedisClus

Assert.notNull(node, "Node must not be null");

if (connection == null) {
lock.lock();
if (this.connection == null) {

this.lock.lock();

try {
if (connection == null) {
this.connection = connectionProvider.getConnection(StatefulRedisClusterConnection.class);
if (this.connection == null) {
this.connection = this.connectionProvider.getConnection(StatefulRedisClusterConnection.class);
}
} finally {
lock.unlock();
this.lock.unlock();
}
}

Expand All @@ -586,6 +590,7 @@ public void returnResourceForSpecificNode(RedisClusterNode node, Object resource

@Override
public void destroy() throws Exception {

if (this.connection != null) {
this.connectionProvider.release(this.connection);
}
Expand Down
Loading

0 comments on commit 75eed8f

Please sign in to comment.