Skip to content

Commit

Permalink
Polishing.
Browse files Browse the repository at this point in the history
Move executor from ClusterConfiguration to connection factories as the executor is a Spring concept that isn't tied to endpoint details or the client config.

Reorder static factory methods after constructors and property accessors after static factory methods. Inline single-line single-use methods that aren't intended as extension hooks for easier readability.

Remove NonNull annotations as default non-nullability is defined on the package level.

Simplify tests to use integration tests to avoid excessive mocking.

See #2594
Original pull request: #2669
  • Loading branch information
mp911de committed Aug 17, 2023
1 parent 5edfbac commit 9af17ab
Show file tree
Hide file tree
Showing 9 changed files with 1,199 additions and 1,301 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@
import org.springframework.data.redis.TooManyClusterRedirectionsException;
import org.springframework.data.redis.connection.util.ByteArraySet;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.lang.NonNull;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
import org.springframework.util.ObjectUtils;
Expand Down Expand Up @@ -78,7 +76,7 @@ public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterN
* @param topologyProvider must not be {@literal null}.
* @param resourceProvider must not be {@literal null}.
* @param exceptionTranslation must not be {@literal null}.
* @param executor can be {@literal null}. Defaulted to {@link ThreadPoolTaskExecutor}.
* @param executor the task executor to null, defaults to {@link SimpleAsyncTaskExecutor} if {@literal null}.
*/
public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterNodeResourceProvider resourceProvider,
ExceptionTranslationStrategy exceptionTranslation, @Nullable AsyncTaskExecutor executor) {
Expand All @@ -90,11 +88,7 @@ public ClusterCommandExecutor(ClusterTopologyProvider topologyProvider, ClusterN
this.topologyProvider = topologyProvider;
this.resourceProvider = resourceProvider;
this.exceptionTranslationStrategy = exceptionTranslation;
this.executor = resolveTaskExecutor(executor);
}

private @NonNull AsyncTaskExecutor resolveTaskExecutor(@Nullable AsyncTaskExecutor taskExecutor) {
return taskExecutor != null ? taskExecutor : DEFAULT_TASK_EXECUTOR;
this.executor = executor != null ? executor : DEFAULT_TASK_EXECUTOR;
}

/**
Expand Down Expand Up @@ -149,9 +143,8 @@ private <S, T> NodeResult<T> executeCommandOnSingleNode(ClusterCommandCallback<S
RuntimeException translatedException = convertToDataAccessException(cause);

if (translatedException instanceof ClusterRedirectException clusterRedirectException) {
return executeCommandOnSingleNode(cmd, topologyProvider.getTopology()
.lookup(clusterRedirectException.getTargetHost(), clusterRedirectException.getTargetPort()),
redirectCount + 1);
return executeCommandOnSingleNode(cmd, topologyProvider.getTopology().lookup(
clusterRedirectException.getTargetHost(), clusterRedirectException.getTargetPort()), redirectCount + 1);
} else {
throw translatedException != null ? translatedException : cause;
}
Expand Down Expand Up @@ -182,7 +175,7 @@ private RedisClusterNode lookupNode(RedisClusterNode node) {
* @param cmd must not be {@literal null}.
* @return never {@literal null}.
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
*/
public <S, T> MultiNodeResult<T> executeCommandOnAllNodes(final ClusterCommandCallback<S, T> cmd) {
return executeCommandAsyncOnNodes(cmd, getClusterTopology().getActiveMasterNodes());
Expand All @@ -193,7 +186,7 @@ public <S, T> MultiNodeResult<T> executeCommandOnAllNodes(final ClusterCommandCa
* @param nodes must not be {@literal null}.
* @return never {@literal null}.
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
* {@link ClusterCommandCallback command} on any given {@link RedisClusterNode node}.
* @throws IllegalArgumentException in case the node could not be resolved to a topology-known node
*/
public <S, T> MultiNodeResult<T> executeCommandAsyncOnNodes(ClusterCommandCallback<S, T> callback,
Expand Down Expand Up @@ -295,7 +288,7 @@ private <T> MultiNodeResult<T> collectResults(Map<NodeExecution, Future<NodeResu
* @param commandCallback must not be {@literal null}.
* @return never {@literal null}.
* @throws ClusterCommandExecutionFailureException if a failure occurs while executing the given
* {@link MultiKeyClusterCommandCallback command}.
* {@link MultiKeyClusterCommandCallback command}.
*/
public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCallback<S, T> commandCallback,
Iterable<byte[]> keys) {
Expand All @@ -315,8 +308,8 @@ public <S, T> MultiNodeResult<T> executeMultiKeyCommand(MultiKeyClusterCommandCa

if (entry.getKey().isMaster()) {
for (PositionalKey key : entry.getValue()) {
futures.put(new NodeExecution(entry.getKey(), key), this.executor.submit(() ->
executeMultiKeyCommandOnSingleNode(commandCallback, entry.getKey(), key.getBytes())));
futures.put(new NodeExecution(entry.getKey(), key), this.executor
.submit(() -> executeMultiKeyCommandOnSingleNode(commandCallback, entry.getKey(), key.getBytes())));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package org.springframework.data.redis.connection;

import static org.springframework.util.StringUtils.commaDelimitedListToSet;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -26,7 +24,6 @@

import org.springframework.core.env.MapPropertySource;
import org.springframework.core.env.PropertySource;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.data.redis.connection.RedisConfiguration.ClusterConfiguration;
import org.springframework.data.redis.util.RedisAssertions;
import org.springframework.lang.Nullable;
Expand All @@ -36,8 +33,8 @@
import org.springframework.util.StringUtils;

/**
* Configuration class used to set up a {@link RedisConnection} via {@link RedisConnectionFactory} for connecting
* to <a href="https://redis.io/topics/cluster-spec">Redis Cluster</a>. Useful when setting up a highly available Redis
* Configuration class used to set up a {@link RedisConnection} via {@link RedisConnectionFactory} for connecting to
* <a href="https://redis.io/topics/cluster-spec">Redis Cluster</a>. Useful when setting up a highly available Redis
* environment.
*
* @author Christoph Strobl
Expand All @@ -52,8 +49,6 @@ public class RedisClusterConfiguration implements RedisConfiguration, ClusterCon

private @Nullable Integer maxRedirects;

private @Nullable AsyncTaskExecutor executor;

private RedisPassword password = RedisPassword.none();

private final Set<RedisNode> clusterNodes;
Expand Down Expand Up @@ -103,10 +98,12 @@ public RedisClusterConfiguration(PropertySource<?> propertySource) {
this.clusterNodes = new LinkedHashSet<>();

if (propertySource.containsProperty(REDIS_CLUSTER_NODES_CONFIG_PROPERTY)) {

Object redisClusterNodes = propertySource.getProperty(REDIS_CLUSTER_NODES_CONFIG_PROPERTY);
appendClusterNodes(commaDelimitedListToSet(String.valueOf(redisClusterNodes)));
appendClusterNodes(StringUtils.commaDelimitedListToSet(String.valueOf(redisClusterNodes)));
}
if (propertySource.containsProperty(REDIS_CLUSTER_MAX_REDIRECTS_CONFIG_PROPERTY)) {

Object clusterMaxRedirects = propertySource.getProperty(REDIS_CLUSTER_MAX_REDIRECTS_CONFIG_PROPERTY);
this.maxRedirects = NumberUtils.parseNumber(String.valueOf(clusterMaxRedirects), Integer.class);
}
Expand Down Expand Up @@ -204,16 +201,6 @@ public RedisPassword getPassword() {
return password;
}

@Override
public void setAsyncTaskExecutor(@Nullable AsyncTaskExecutor executor) {
this.executor = executor;
}

@Nullable @Override
public AsyncTaskExecutor getAsyncTaskExecutor() {
return this.executor;
}

@Override
public boolean equals(@Nullable Object obj) {

Expand All @@ -226,9 +213,9 @@ public boolean equals(@Nullable Object obj) {
}

return ObjectUtils.nullSafeEquals(this.clusterNodes, that.clusterNodes)
&& ObjectUtils.nullSafeEquals(this.maxRedirects, that.maxRedirects)
&& ObjectUtils.nullSafeEquals(this.username, that.username)
&& ObjectUtils.nullSafeEquals(this.password, that.password);
&& ObjectUtils.nullSafeEquals(this.maxRedirects, that.maxRedirects)
&& ObjectUtils.nullSafeEquals(this.username, that.username)
&& ObjectUtils.nullSafeEquals(this.password, that.password);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.function.IntSupplier;
import java.util.function.Supplier;

import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

Expand Down Expand Up @@ -346,20 +345,6 @@ interface WithDomainSocket {
*/
interface ClusterConfiguration extends WithPassword {

/**
* Configures the {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
*
* @param executor {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
*/
void setAsyncTaskExecutor(AsyncTaskExecutor executor);

/**
* Returns the configured {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
*
* @return the configured {@link AsyncTaskExecutor} used to execute commands asynchronously across the cluster.
*/
AsyncTaskExecutor getAsyncTaskExecutor();

/**
* Returns an {@link Collections#unmodifiableSet(Set) Set} of {@link RedisNode cluster nodes}.
*
Expand Down
Loading

0 comments on commit 9af17ab

Please sign in to comment.