From 2807e82c631fc1bd08cba3209cae978b37ba47f3 Mon Sep 17 00:00:00 2001 From: Andrew Carbonetto Date: Wed, 17 Jan 2024 14:49:37 -0800 Subject: [PATCH] Add test for ThreadPoolResourceAllocator Signed-off-by: Andrew Carbonetto --- .../src/main/java/glide/api/RedisClient.java | 3 +- .../connectors/resources/EpollResource.java | 6 +-- .../resources/KQueuePoolResource.java | 6 +-- .../glide/connectors/resources/Platform.java | 15 ------- .../ThreadPoolResourceAllocator.java | 10 ++--- .../ThreadPoolResourceAllocatorTest.java | 43 +++++++++++++++++++ 6 files changed, 55 insertions(+), 28 deletions(-) create mode 100644 java/client/src/test/java/glide/connectors/resources/ThreadPoolResourceAllocatorTest.java diff --git a/java/client/src/main/java/glide/api/RedisClient.java b/java/client/src/main/java/glide/api/RedisClient.java index 98698c63ff..f6e461f298 100644 --- a/java/client/src/main/java/glide/api/RedisClient.java +++ b/java/client/src/main/java/glide/api/RedisClient.java @@ -7,6 +7,7 @@ import glide.api.models.configuration.RedisClientConfiguration; import glide.connectors.handlers.CallbackDispatcher; import glide.connectors.handlers.ChannelHandler; +import glide.connectors.resources.Platform; import glide.connectors.resources.ThreadPoolResource; import glide.connectors.resources.ThreadPoolResourceAllocator; import glide.managers.CommandManager; @@ -28,7 +29,7 @@ public class RedisClient extends BaseClient implements BaseCommands { public static CompletableFuture CreateClient(RedisClientConfiguration config) { ThreadPoolResource threadPoolResource = config.getThreadPoolResource(); if (threadPoolResource == null) { - threadPoolResource = ThreadPoolResourceAllocator.createOrGetThreadPoolResource(); + threadPoolResource = ThreadPoolResourceAllocator.getOrCreate(Platform.getThreadPoolResourceSupplier()); } ChannelHandler channelHandler = buildChannelHandler(threadPoolResource); var connectionManager = buildConnectionManager(channelHandler); diff --git a/java/client/src/main/java/glide/connectors/resources/EpollResource.java b/java/client/src/main/java/glide/connectors/resources/EpollResource.java index ec2301a2e1..203a64b57a 100644 --- a/java/client/src/main/java/glide/connectors/resources/EpollResource.java +++ b/java/client/src/main/java/glide/connectors/resources/EpollResource.java @@ -12,11 +12,11 @@ public class EpollResource extends ThreadPoolResource { private static final String EPOLL_EVENT_LOOP_IDENTIFIER = "glide-channel-epoll-elg"; public EpollResource() { - super( + this( new EpollEventLoopGroup( Runtime.getRuntime().availableProcessors(), - new DefaultThreadFactory(EPOLL_EVENT_LOOP_IDENTIFIER, true)), - EpollDomainSocketChannel.class); + new DefaultThreadFactory(EPOLL_EVENT_LOOP_IDENTIFIER, true)) + ); } public EpollResource(EpollEventLoopGroup epollEventLoopGroup) { diff --git a/java/client/src/main/java/glide/connectors/resources/KQueuePoolResource.java b/java/client/src/main/java/glide/connectors/resources/KQueuePoolResource.java index 6130b430e4..2d1ce83c78 100644 --- a/java/client/src/main/java/glide/connectors/resources/KQueuePoolResource.java +++ b/java/client/src/main/java/glide/connectors/resources/KQueuePoolResource.java @@ -12,11 +12,11 @@ public class KQueuePoolResource extends ThreadPoolResource { private static final String KQUEUE_EVENT_LOOP_IDENTIFIER = "glide-channel-kqueue-elg"; public KQueuePoolResource() { - super( + this( new KQueueEventLoopGroup( Runtime.getRuntime().availableProcessors(), - new DefaultThreadFactory(KQUEUE_EVENT_LOOP_IDENTIFIER, true)), - KQueueDomainSocketChannel.class); + new DefaultThreadFactory(KQUEUE_EVENT_LOOP_IDENTIFIER, true)) + ); } public KQueuePoolResource(KQueueEventLoopGroup eventLoopGroup) { diff --git a/java/client/src/main/java/glide/connectors/resources/Platform.java b/java/client/src/main/java/glide/connectors/resources/Platform.java index bb48b9d0f0..a5d50426fe 100644 --- a/java/client/src/main/java/glide/connectors/resources/Platform.java +++ b/java/client/src/main/java/glide/connectors/resources/Platform.java @@ -68,19 +68,4 @@ public static Supplier getThreadPoolResourceSupplier() { // TODO support IO-Uring and NIO throw new RuntimeException("Current platform supports no known thread pool resources"); } - - /** - * Get a channel class required by Netty to open a client UDS channel. - * - * @return Return a class supported by the current platform. - */ - public static Class getClientUdsNettyChannelType() { - if (capabilities.isKQueueAvailable()) { - return KQueueDomainSocketChannel.class; - } - if (capabilities.isEPollAvailable()) { - return EpollDomainSocketChannel.class; - } - throw new RuntimeException("Current platform supports no known socket types"); - } } diff --git a/java/client/src/main/java/glide/connectors/resources/ThreadPoolResourceAllocator.java b/java/client/src/main/java/glide/connectors/resources/ThreadPoolResourceAllocator.java index a21ec86c11..7837d4e3d5 100644 --- a/java/client/src/main/java/glide/connectors/resources/ThreadPoolResourceAllocator.java +++ b/java/client/src/main/java/glide/connectors/resources/ThreadPoolResourceAllocator.java @@ -7,11 +7,8 @@ public class ThreadPoolResourceAllocator { private static final Object lock = new Object(); private static ThreadPoolResource defaultThreadPoolResource = null; - public static ThreadPoolResource createOrGetThreadPoolResource() { - return getOrCreate(Platform.getThreadPoolResourceSupplier()); - } - - private static ThreadPoolResource getOrCreate(Supplier supplier) { + public static ThreadPoolResource getOrCreate(Supplier supplier) { + // once the default is set, we want to avoid hitting the lock if (defaultThreadPoolResource != null) { return defaultThreadPoolResource; } @@ -30,11 +27,12 @@ private static ThreadPoolResource getOrCreate(Supplier suppl * resources. It is recommended to use a class instead of lambda to ensure that it is called.
* See {@link Runtime#addShutdownHook}. */ - private static class ShutdownHook implements Runnable { + protected static class ShutdownHook implements Runnable { @Override public void run() { if (defaultThreadPoolResource != null) { defaultThreadPoolResource.getEventLoopGroup().shutdownGracefully(); + defaultThreadPoolResource = null; } } } diff --git a/java/client/src/test/java/glide/connectors/resources/ThreadPoolResourceAllocatorTest.java b/java/client/src/test/java/glide/connectors/resources/ThreadPoolResourceAllocatorTest.java new file mode 100644 index 0000000000..a7a08b8ecd --- /dev/null +++ b/java/client/src/test/java/glide/connectors/resources/ThreadPoolResourceAllocatorTest.java @@ -0,0 +1,43 @@ +package glide.connectors.resources; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +import io.netty.channel.EventLoopGroup; +import java.util.function.Supplier; +import org.junit.jupiter.api.Test; + +public class ThreadPoolResourceAllocatorTest { + + ThreadPoolResourceAllocator service; + + @Test + public void getOrCreateReturnsDefault() { + (new ThreadPoolResourceAllocator.ShutdownHook()).run(); + + ThreadPoolResource mockedThreadPool = mock(ThreadPoolResource.class); + Supplier threadPoolSupplier = mock(Supplier.class); + when(threadPoolSupplier.get()).thenReturn(mockedThreadPool); + + ThreadPoolResource theResource = service.getOrCreate(threadPoolSupplier); + assertEquals(mockedThreadPool, theResource); + + ThreadPoolResource theSameResource = service.getOrCreate(threadPoolSupplier); + assertEquals(mockedThreadPool, theSameResource); + + // and test that the supplier isn't used any longer once the default is set + Supplier notUsedSupplier = mock(Supplier.class); + ThreadPoolResource firstResource = service.getOrCreate(notUsedSupplier); + verify(notUsedSupplier, times(0)).get(); + assertEquals(mockedThreadPool, firstResource); + + // teardown + // remove the mocked resource + EventLoopGroup mockedELG = mock(EventLoopGroup.class); + when(mockedThreadPool.getEventLoopGroup()).thenReturn(mockedELG); + (new ThreadPoolResourceAllocator.ShutdownHook()).run(); + } +}