Skip to content

Commit

Permalink
Spotless.
Browse files Browse the repository at this point in the history
  • Loading branch information
SanHalacogluImproving committed Jan 24, 2024
1 parent fe83851 commit 26fc46d
Show file tree
Hide file tree
Showing 10 changed files with 169 additions and 167 deletions.
48 changes: 25 additions & 23 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,33 @@
*/
public class RedisClient extends BaseClient implements BaseCommands {

/**
* Request an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
ThreadPoolResource threadPoolResource = config.getThreadPoolResource();
if (threadPoolResource == null) {
threadPoolResource =
ThreadPoolResourceAllocator.getOrCreate(Platform.getThreadPoolResourceSupplier());
/**
* Request an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config)
throws InterruptedException {
ThreadPoolResource threadPoolResource = config.getThreadPoolResource();
if (threadPoolResource == null) {
threadPoolResource =
ThreadPoolResourceAllocator.getOrCreate(Platform.getThreadPoolResourceSupplier());
}
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource);
var connectionManager = buildConnectionManager(channelHandler);
var commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
}
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource);
var connectionManager = buildConnectionManager(channelHandler);
var commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
}

protected static ChannelHandler buildChannelHandler(ThreadPoolResource threadPoolResource) {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
return new ChannelHandler(callbackDispatcher, getSocket(), threadPoolResource);
}
protected static ChannelHandler buildChannelHandler(ThreadPoolResource threadPoolResource)
throws InterruptedException {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
return new ChannelHandler(callbackDispatcher, getSocket(), threadPoolResource);
}

protected static ConnectionManager buildConnectionManager(ChannelHandler channelHandler) {
return new ConnectionManager(channelHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import glide.connectors.resources.ThreadPoolResource;
import java.util.List;

import glide.connectors.resources.ThreadPoolResource;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -46,18 +44,18 @@ public abstract class BaseClientConfiguration {
*/
private final RedisCredentials credentials;

/**
* The duration in milliseconds that the client should wait for a request to complete. This
* duration encompasses sending the request, awaiting for a response from the server, and any
* required reconnections or retries. If the specified timeout is exceeded for a pending request,
* it will result in a timeout error. If not set, a default value will be used.
*/
private final Integer requestTimeout;
/**
* The duration in milliseconds that the client should wait for a request to complete. This
* duration encompasses sending the request, awaiting for a response from the server, and any
* required reconnections or retries. If the specified timeout is exceeded for a pending request,
* it will result in a timeout error. If not set, a default value will be used.
*/
private final Integer requestTimeout;

/**
* Advanced users can pass an extended {@link glide.connectors.resources.ThreadPoolResource} to
* pass a user-defined event loop group. Users are responsible for shutting the resource down when
* no longer in use.
*/
private final ThreadPoolResource threadPoolResource;
/**
* Advanced users can pass an extended {@link glide.connectors.resources.ThreadPoolResource} to
* pass a user-defined event loop group. Users are responsible for shutting the resource down when
* no longer in use.
*/
private final ThreadPoolResource threadPoolResource;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,27 @@ public class ChannelHandler {

/**
* Open a new channel for a new client and running it on the provided EventLoopGroup.
*
* @param callbackDispatcher - dispatcher to handle callbacks
* @param socketPath - address to connect
* @param threadPoolResource - resource to choose ELG and domainSocketChannelClass
*/
public ChannelHandler(
CallbackDispatcher callbackDispatcher,
String socketPath,
ThreadPoolResource threadPoolResource) throws InterruptedException {
public ChannelHandler(
CallbackDispatcher callbackDispatcher,
String socketPath,
ThreadPoolResource threadPoolResource)
throws InterruptedException {

channel =
new Bootstrap()
.group(threadPoolResource.getEventLoopGroup())
.channel(threadPoolResource.getDomainSocketChannelClass())
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
.sync()
.channel();
this.callbackDispatcher = callbackDispatcher;
}
channel =
new Bootstrap()
.group(threadPoolResource.getEventLoopGroup())
.channel(threadPoolResource.getDomainSocketChannelClass())
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
.sync()
.channel();
this.callbackDispatcher = callbackDispatcher;
}

/**
* Complete a protobuf message and write it to the channel (to UDS).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
* configurations.
*/
public class EpollResource extends ThreadPoolResource {
private static final String EPOLL_EVENT_LOOP_IDENTIFIER = "glide-channel-epoll-elg";
private static final String EPOLL_EVENT_LOOP_IDENTIFIER = "glide-channel-epoll-elg";

public EpollResource() {
this(
new EpollEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(EPOLL_EVENT_LOOP_IDENTIFIER, true)));
}
public EpollResource() {
this(
new EpollEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(EPOLL_EVENT_LOOP_IDENTIFIER, true)));
}

public EpollResource(EpollEventLoopGroup epollEventLoopGroup) {
super(epollEventLoopGroup, EpollDomainSocketChannel.class);
}
public EpollResource(EpollEventLoopGroup epollEventLoopGroup) {
super(epollEventLoopGroup, EpollDomainSocketChannel.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
* configurations.
*/
public class KQueuePoolResource extends ThreadPoolResource {
private static final String KQUEUE_EVENT_LOOP_IDENTIFIER = "glide-channel-kqueue-elg";
private static final String KQUEUE_EVENT_LOOP_IDENTIFIER = "glide-channel-kqueue-elg";

public KQueuePoolResource() {
this(
new KQueueEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(KQUEUE_EVENT_LOOP_IDENTIFIER, true)));
}
public KQueuePoolResource() {
this(
new KQueueEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(KQUEUE_EVENT_LOOP_IDENTIFIER, true)));
}

public KQueuePoolResource(KQueueEventLoopGroup eventLoopGroup) {
super(eventLoopGroup, KQueueDomainSocketChannel.class);
}
public KQueuePoolResource(KQueueEventLoopGroup eventLoopGroup) {
super(eventLoopGroup, KQueueDomainSocketChannel.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ private static boolean isEPollAvailable() {
}
}

public static Supplier<ThreadPoolResource> getThreadPoolResourceSupplier() {
if (Platform.getCapabilities().isKQueueAvailable()) {
return KQueuePoolResource::new;
}
public static Supplier<ThreadPoolResource> getThreadPoolResourceSupplier() {
if (Platform.getCapabilities().isKQueueAvailable()) {
return KQueuePoolResource::new;
}

if (Platform.getCapabilities().isEPollAvailable()) {
return EpollResource::new;
if (Platform.getCapabilities().isEPollAvailable()) {
return EpollResource::new;
}
// TODO support IO-Uring and NIO
throw new RuntimeException("Current platform supports no known thread pool resources");
}
// TODO support IO-Uring and NIO
throw new RuntimeException("Current platform supports no known thread pool resources");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
*/
@Getter
public abstract class ThreadPoolResource {
private EventLoopGroup eventLoopGroup;
private Class<? extends DomainSocketChannel> domainSocketChannelClass;
private EventLoopGroup eventLoopGroup;
private Class<? extends DomainSocketChannel> domainSocketChannelClass;

public ThreadPoolResource(
@NonNull EventLoopGroup eventLoopGroup,
@NonNull Class<? extends DomainSocketChannel> domainSocketChannelClass) {
this.eventLoopGroup = eventLoopGroup;
this.domainSocketChannelClass = domainSocketChannelClass;
}
public ThreadPoolResource(
@NonNull EventLoopGroup eventLoopGroup,
@NonNull Class<? extends DomainSocketChannel> domainSocketChannelClass) {
this.eventLoopGroup = eventLoopGroup;
this.domainSocketChannelClass = domainSocketChannelClass;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,40 @@

/** A class responsible to allocating and deallocating the default Thread Pool Resource. */
public class ThreadPoolResourceAllocator {
private static final Object lock = new Object();
private static ThreadPoolResource defaultThreadPoolResource = null;
private static final Object lock = new Object();
private static ThreadPoolResource defaultThreadPoolResource = null;

public static ThreadPoolResource getOrCreate(Supplier<ThreadPoolResource> supplier) {
// once the default is set, we want to avoid hitting the lock
if (defaultThreadPoolResource != null) {
return defaultThreadPoolResource;
}
public static ThreadPoolResource getOrCreate(Supplier<ThreadPoolResource> supplier) {
// once the default is set, we want to avoid hitting the lock
if (defaultThreadPoolResource != null) {
return defaultThreadPoolResource;
}

synchronized (lock) {
if (defaultThreadPoolResource == null) {
defaultThreadPoolResource = supplier.get();
}
}
synchronized (lock) {
if (defaultThreadPoolResource == null) {
defaultThreadPoolResource = supplier.get();
}
}

return defaultThreadPoolResource;
}
return defaultThreadPoolResource;
}

/**
* A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br>
* See {@link Runtime#addShutdownHook}.
*/
protected static class ShutdownHook implements Runnable {
@Override
public void run() {
if (defaultThreadPoolResource != null) {
defaultThreadPoolResource.getEventLoopGroup().shutdownGracefully();
defaultThreadPoolResource = null;
}
/**
* A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br>
* See {@link Runtime#addShutdownHook}.
*/
protected static class ShutdownHook implements Runnable {
@Override
public void run() {
if (defaultThreadPoolResource != null) {
defaultThreadPoolResource.getEventLoopGroup().shutdownGracefully();
defaultThreadPoolResource = null;
}
}
}
}

static {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Glide-shutdown-hook"));
}
static {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Glide-shutdown-hook"));
}
}
40 changes: 20 additions & 20 deletions java/client/src/test/java/glide/api/RedisClientCreateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,25 @@

public class RedisClientCreateTest {

private MockedStatic<RedisClient> mockedClient;
private ChannelHandler channelHandler;
private ConnectionManager connectionManager;
private CommandManager commandManager;
private ThreadPoolResource threadPoolResource;
private MockedStatic<RedisClient> mockedClient;
private ChannelHandler channelHandler;
private ConnectionManager connectionManager;
private CommandManager commandManager;
private ThreadPoolResource threadPoolResource;

@BeforeEach
public void init() {
mockedClient = Mockito.mockStatic(RedisClient.class);

channelHandler = mock(ChannelHandler.class);
commandManager = mock(CommandManager.class);
connectionManager = mock(ConnectionManager.class);
threadPoolResource = mock(ThreadPoolResource.class);
channelHandler = mock(ChannelHandler.class);
commandManager = mock(CommandManager.class);
connectionManager = mock(ConnectionManager.class);
threadPoolResource = mock(ThreadPoolResource.class);

mockedClient.when(() -> buildChannelHandler(any())).thenReturn(channelHandler);
mockedClient.when(() -> buildConnectionManager(channelHandler)).thenReturn(connectionManager);
mockedClient.when(() -> buildCommandManager(channelHandler)).thenReturn(commandManager);
}
mockedClient.when(() -> buildChannelHandler(any())).thenReturn(channelHandler);
mockedClient.when(() -> buildConnectionManager(channelHandler)).thenReturn(connectionManager);
mockedClient.when(() -> buildCommandManager(channelHandler)).thenReturn(commandManager);
}

@AfterEach
public void teardown() {
Expand All @@ -57,11 +57,11 @@ public void teardown() {
@SneakyThrows
public void createClient_withConfig_successfullyReturnsRedisClient() {

// setup
CompletableFuture<Void> connectToRedisFuture = new CompletableFuture<>();
connectToRedisFuture.complete(null);
RedisClientConfiguration config =
RedisClientConfiguration.builder().threadPoolResource(threadPoolResource).build();
// setup
CompletableFuture<Void> connectToRedisFuture = new CompletableFuture<>();
connectToRedisFuture.complete(null);
RedisClientConfiguration config =
RedisClientConfiguration.builder().threadPoolResource(threadPoolResource).build();

when(connectionManager.connectToRedis(eq(config))).thenReturn(connectToRedisFuture);
mockedClient.when(() -> CreateClient(config)).thenCallRealMethod();
Expand All @@ -84,8 +84,8 @@ public void createClient_errorOnConnectionThrowsExecutionException() {
connectToRedisFuture.completeExceptionally(exception);
RedisClientConfiguration config = RedisClientConfiguration.builder().build();

when(connectionManager.connectToRedis(any())).thenReturn(connectToRedisFuture);
mockedClient.when(() -> CreateClient(any())).thenCallRealMethod();
when(connectionManager.connectToRedis(any())).thenReturn(connectToRedisFuture);
mockedClient.when(() -> CreateClient(any())).thenCallRealMethod();

// exercise
CompletableFuture<RedisClient> result = CreateClient(config);
Expand Down
Loading

0 comments on commit 26fc46d

Please sign in to comment.