Skip to content

Commit

Permalink
Add test for ThreadPoolResourceAllocator
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Carbonetto <[email protected]>
  • Loading branch information
acarbonetto committed Jan 17, 2024
1 parent 0f5862c commit 2807e82
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 28 deletions.
3 changes: 2 additions & 1 deletion java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import glide.api.models.configuration.RedisClientConfiguration;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.connectors.resources.Platform;
import glide.connectors.resources.ThreadPoolResource;
import glide.connectors.resources.ThreadPoolResourceAllocator;
import glide.managers.CommandManager;
Expand All @@ -28,7 +29,7 @@ public class RedisClient extends BaseClient implements BaseCommands {
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
ThreadPoolResource threadPoolResource = config.getThreadPoolResource();
if (threadPoolResource == null) {
threadPoolResource = ThreadPoolResourceAllocator.createOrGetThreadPoolResource();
threadPoolResource = ThreadPoolResourceAllocator.getOrCreate(Platform.getThreadPoolResourceSupplier());
}
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource);
var connectionManager = buildConnectionManager(channelHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ public class EpollResource extends ThreadPoolResource {
private static final String EPOLL_EVENT_LOOP_IDENTIFIER = "glide-channel-epoll-elg";

public EpollResource() {
super(
this(
new EpollEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(EPOLL_EVENT_LOOP_IDENTIFIER, true)),
EpollDomainSocketChannel.class);
new DefaultThreadFactory(EPOLL_EVENT_LOOP_IDENTIFIER, true))
);
}

public EpollResource(EpollEventLoopGroup epollEventLoopGroup) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ public class KQueuePoolResource extends ThreadPoolResource {
private static final String KQUEUE_EVENT_LOOP_IDENTIFIER = "glide-channel-kqueue-elg";

public KQueuePoolResource() {
super(
this(
new KQueueEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(KQUEUE_EVENT_LOOP_IDENTIFIER, true)),
KQueueDomainSocketChannel.class);
new DefaultThreadFactory(KQUEUE_EVENT_LOOP_IDENTIFIER, true))
);
}

public KQueuePoolResource(KQueueEventLoopGroup eventLoopGroup) {
Expand Down
15 changes: 0 additions & 15 deletions java/client/src/main/java/glide/connectors/resources/Platform.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,4 @@ public static Supplier<ThreadPoolResource> getThreadPoolResourceSupplier() {
// TODO support IO-Uring and NIO
throw new RuntimeException("Current platform supports no known thread pool resources");
}

/**
* Get a channel class required by Netty to open a client UDS channel.
*
* @return Return a class supported by the current platform.
*/
public static Class<? extends DomainSocketChannel> getClientUdsNettyChannelType() {
if (capabilities.isKQueueAvailable()) {
return KQueueDomainSocketChannel.class;
}
if (capabilities.isEPollAvailable()) {
return EpollDomainSocketChannel.class;
}
throw new RuntimeException("Current platform supports no known socket types");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@ public class ThreadPoolResourceAllocator {
private static final Object lock = new Object();
private static ThreadPoolResource defaultThreadPoolResource = null;

public static ThreadPoolResource createOrGetThreadPoolResource() {
return getOrCreate(Platform.getThreadPoolResourceSupplier());
}

private static ThreadPoolResource getOrCreate(Supplier<ThreadPoolResource> supplier) {
public static ThreadPoolResource getOrCreate(Supplier<ThreadPoolResource> supplier) {
// once the default is set, we want to avoid hitting the lock
if (defaultThreadPoolResource != null) {
return defaultThreadPoolResource;
}
Expand All @@ -30,11 +27,12 @@ private static ThreadPoolResource getOrCreate(Supplier<ThreadPoolResource> suppl
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br>
* See {@link Runtime#addShutdownHook}.
*/
private static class ShutdownHook implements Runnable {
protected static class ShutdownHook implements Runnable {
@Override
public void run() {
if (defaultThreadPoolResource != null) {
defaultThreadPoolResource.getEventLoopGroup().shutdownGracefully();
defaultThreadPoolResource = null;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package glide.connectors.resources;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;

import io.netty.channel.EventLoopGroup;
import java.util.function.Supplier;
import org.junit.jupiter.api.Test;

public class ThreadPoolResourceAllocatorTest {

ThreadPoolResourceAllocator service;

@Test
public void getOrCreateReturnsDefault() {
(new ThreadPoolResourceAllocator.ShutdownHook()).run();

ThreadPoolResource mockedThreadPool = mock(ThreadPoolResource.class);
Supplier<ThreadPoolResource> threadPoolSupplier = mock(Supplier.class);
when(threadPoolSupplier.get()).thenReturn(mockedThreadPool);

ThreadPoolResource theResource = service.getOrCreate(threadPoolSupplier);
assertEquals(mockedThreadPool, theResource);

ThreadPoolResource theSameResource = service.getOrCreate(threadPoolSupplier);
assertEquals(mockedThreadPool, theSameResource);

// and test that the supplier isn't used any longer once the default is set
Supplier<ThreadPoolResource> notUsedSupplier = mock(Supplier.class);
ThreadPoolResource firstResource = service.getOrCreate(notUsedSupplier);
verify(notUsedSupplier, times(0)).get();
assertEquals(mockedThreadPool, firstResource);

// teardown
// remove the mocked resource
EventLoopGroup mockedELG = mock(EventLoopGroup.class);
when(mockedThreadPool.getEventLoopGroup()).thenReturn(mockedELG);
(new ThreadPoolResourceAllocator.ShutdownHook()).run();
}
}

0 comments on commit 2807e82

Please sign in to comment.