diff --git a/java/client/build.gradle b/java/client/build.gradle index 0fb95b43ec..e903540621 100644 --- a/java/client/build.gradle +++ b/java/client/build.gradle @@ -2,6 +2,7 @@ import java.nio.file.Paths plugins { id 'java-library' + id 'jacoco' } repositories { @@ -103,4 +104,15 @@ tasks.withType(Test) { showStandardStreams true } jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug" + finalizedBy jacocoTestReport, jacocoTestCoverageVerification } + +jacocoTestReport { + afterEvaluate { + classDirectories.setFrom(files(classDirectories.files.collect { + fileTree(dir: it, exclude: ['**/connection_request/**', '**/response/**', '**/redis_request/**', '**/connectors/resources/**']) + })) + } +} + + diff --git a/java/client/src/main/java/glide/api/RedisClient.java b/java/client/src/main/java/glide/api/RedisClient.java index 642635dda3..f9fb3126da 100644 --- a/java/client/src/main/java/glide/api/RedisClient.java +++ b/java/client/src/main/java/glide/api/RedisClient.java @@ -2,9 +2,14 @@ import static glide.ffi.resolvers.SocketListenerResolver.getSocket; +import glide.api.commands.BaseCommands; +import glide.api.commands.Command; import glide.api.models.configuration.RedisClientConfiguration; import glide.connectors.handlers.CallbackDispatcher; import glide.connectors.handlers.ChannelHandler; +import glide.connectors.handlers.DefaultThreadPoolResourceHandler; +import glide.connectors.resources.Platform; +import glide.connectors.resources.ThreadPoolResource; import glide.managers.CommandManager; import glide.managers.ConnectionManager; import java.util.concurrent.CompletableFuture; @@ -13,27 +18,32 @@ * Async (non-blocking) client for Redis in Standalone mode. Use {@link * #CreateClient(RedisClientConfiguration)} to request a client to Redis. */ -public class RedisClient extends BaseClient { +public class RedisClient extends BaseClient implements BaseCommands { /** * Request an async (non-blocking) Redis client in Standalone mode. * * @param config - Redis Client Configuration - * @return a Future to connect and return a RedisClient + * @return a future to connect and return a RedisClient */ public static CompletableFuture CreateClient(RedisClientConfiguration config) { - ChannelHandler channelHandler = buildChannelHandler(); - ConnectionManager connectionManager = buildConnectionManager(channelHandler); - CommandManager commandManager = buildCommandManager(channelHandler); + ThreadPoolResource threadPoolResource = config.getThreadPoolResource(); + if (threadPoolResource == null) { + threadPoolResource = + DefaultThreadPoolResourceHandler.getOrCreate(Platform.getThreadPoolResourceSupplier()); + } + ChannelHandler channelHandler = buildChannelHandler(threadPoolResource); + var connectionManager = buildConnectionManager(channelHandler); + var commandManager = buildCommandManager(channelHandler); // TODO: Support exception throwing, including interrupted exceptions return connectionManager .connectToRedis(config) .thenApply(ignore -> new RedisClient(connectionManager, commandManager)); } - protected static ChannelHandler buildChannelHandler() { + protected static ChannelHandler buildChannelHandler(ThreadPoolResource threadPoolResource) { CallbackDispatcher callbackDispatcher = new CallbackDispatcher(); - return new ChannelHandler(callbackDispatcher, getSocket()); + return new ChannelHandler(callbackDispatcher, getSocket(), threadPoolResource); } protected static ConnectionManager buildConnectionManager(ChannelHandler channelHandler) { @@ -47,4 +57,17 @@ protected static CommandManager buildCommandManager(ChannelHandler channelHandle protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) { super(connectionManager, commandManager); } + + /** + * Executes a single custom command, without checking inputs. Every part of the command, including + * subcommands, should be added as a separate value in args. + * + * @param args command and arguments for the custom command call + * @return CompletableFuture with the response + */ + public CompletableFuture customCommand(String[] args) { + Command command = + Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build(); + return commandManager.submitNewCommand(command, BaseCommands::handleObjectResponse); + } } diff --git a/java/client/src/main/java/glide/api/commands/BaseCommandResponseResolver.java b/java/client/src/main/java/glide/api/commands/BaseCommandResponseResolver.java new file mode 100644 index 0000000000..eebdef03df --- /dev/null +++ b/java/client/src/main/java/glide/api/commands/BaseCommandResponseResolver.java @@ -0,0 +1,70 @@ +package glide.api.commands; + +import glide.api.models.exceptions.ClosingException; +import glide.api.models.exceptions.ConnectionException; +import glide.api.models.exceptions.ExecAbortException; +import glide.api.models.exceptions.RedisException; +import glide.api.models.exceptions.RequestException; +import glide.api.models.exceptions.TimeoutException; +import lombok.AllArgsConstructor; +import response.ResponseOuterClass.RequestError; +import response.ResponseOuterClass.Response; + +/** + * Response resolver responsible for evaluating the Redis response object with a success or failure. + */ +@AllArgsConstructor +public class BaseCommandResponseResolver + implements RedisExceptionCheckedFunction { + + private RedisExceptionCheckedFunction respPointerResolver; + + /** + * Extracts value from the RESP pointer.
+ * Throws errors when the response is unsuccessful. + * + * @return A generic Object with the Response | null if the response is empty + */ + public Object apply(Response response) throws RedisException { + // TODO: handle object if the object is small + // TODO: handle RESP2 object if configuration is set + if (response.hasRequestError()) { + RequestError error = response.getRequestError(); + String msg = error.getMessage(); + switch (error.getType()) { + case Unspecified: + // Unspecified error on Redis service-side + throw new RequestException(msg); + case ExecAbort: + // Transactional error on Redis service-side + throw new ExecAbortException(msg); + case Timeout: + // Timeout from Glide to Redis service + throw new TimeoutException(msg); + case Disconnect: + // Connection problem between Glide and Redis + throw new ConnectionException(msg); + default: + // Request or command error from Redis + throw new RequestException(msg); + } + } + if (response.hasClosingError()) { + // A closing error is thrown when Rust-core is not connected to Redis + // We want to close shop and throw a ClosingException + // TODO: close the channel on a closing error + // channel.close(); + throw new ClosingException(response.getClosingError()); + } + if (response.hasConstantResponse()) { + // Return "OK" + return response.getConstantResponse().toString(); + } + if (response.hasRespPointer()) { + // Return the shared value - which may be a null value + return respPointerResolver.apply(response.getRespPointer()); + } + // if no response payload is provided, assume null + return null; + } +} diff --git a/java/client/src/main/java/glide/api/commands/BaseCommands.java b/java/client/src/main/java/glide/api/commands/BaseCommands.java new file mode 100644 index 0000000000..06268d564b --- /dev/null +++ b/java/client/src/main/java/glide/api/commands/BaseCommands.java @@ -0,0 +1,49 @@ +package glide.api.commands; + +import glide.ffi.resolvers.RedisValueResolver; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import response.ResponseOuterClass.Response; + +/** Base Commands interface to handle generic command and transaction requests. */ +public interface BaseCommands { + + /** + * default Object handler from response + * + * @return BaseCommandResponseResolver to deliver the response + */ + static BaseCommandResponseResolver applyBaseCommandResponseResolver() { + return new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer); + } + + /** + * Extracts the response from the Protobuf response and either throws an exception or returns the + * appropriate response has an Object + * + * @param response Redis protobuf message + * @return Response Object + */ + static Object handleObjectResponse(Response response) { + // return function to convert protobuf.Response into the response object by + // calling valueFromPointer + return BaseCommands.applyBaseCommandResponseResolver().apply(response); + } + + public static List handleTransactionResponse(Response response) { + // return function to convert protobuf.Response into the response object by + // calling valueFromPointer + + List transactionResponse = + (List) BaseCommands.applyBaseCommandResponseResolver().apply(response); + return transactionResponse; + } + + /** + * Execute a @see{Command} by sending command via socket manager + * + * @param args arguments for the custom command + * @return a CompletableFuture with response result from Redis + */ + CompletableFuture customCommand(String[] args); +} diff --git a/java/client/src/main/java/glide/api/commands/Command.java b/java/client/src/main/java/glide/api/commands/Command.java new file mode 100644 index 0000000000..413ea63c30 --- /dev/null +++ b/java/client/src/main/java/glide/api/commands/Command.java @@ -0,0 +1,24 @@ +package glide.api.commands; + +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; + +/** Base Command class to send a single request to Redis. */ +@Builder +@Getter +@EqualsAndHashCode +public class Command { + + /** Redis command request type */ + @NonNull final RequestType requestType; + + /** List of Arguments for the Redis command request */ + @Builder.Default final String[] arguments = new String[] {}; + + public enum RequestType { + /** Call a custom command with list of string arguments */ + CUSTOM_COMMAND, + } +} diff --git a/java/client/src/main/java/glide/api/commands/RedisExceptionCheckedFunction.java b/java/client/src/main/java/glide/api/commands/RedisExceptionCheckedFunction.java new file mode 100644 index 0000000000..8e8267b011 --- /dev/null +++ b/java/client/src/main/java/glide/api/commands/RedisExceptionCheckedFunction.java @@ -0,0 +1,18 @@ +package glide.api.commands; + +import glide.api.models.exceptions.RedisException; + +@FunctionalInterface +public interface RedisExceptionCheckedFunction { + + /** + * Functional response handler that takes a protobuf Response object.
+ * Returns a typed object on a successful Redis response.
+ * Throws RedisException when receiving a Redis error response.
+ * + * @param response - Redis Response + * @return T - response payload type + * @throws RedisException + */ + T apply(R response) throws RedisException; +} diff --git a/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java b/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java index 7561aa0f3f..f0ac340bd6 100644 --- a/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java +++ b/java/client/src/main/java/glide/api/models/configuration/BaseClientConfiguration.java @@ -1,5 +1,6 @@ package glide.api.models.configuration; +import glide.connectors.resources.ThreadPoolResource; import java.util.List; import lombok.Builder; import lombok.Getter; @@ -50,4 +51,13 @@ public abstract class BaseClientConfiguration { * it will result in a timeout error. If not set, a default value will be used. */ private final Integer requestTimeout; + + /** + * Field for customizing the Event Loop Group and Channel Configuration in Netty applications. + * Advanced users can utilize {@link glide.connectors.resources.KQueuePoolResource}/{@link + * glide.connectors.resources.EpollResource} to set their custom event loop group. If not + * explicitly set, the system selects a default based on available resources. Recommended for + * advanced users. + */ + private final ThreadPoolResource threadPoolResource; } diff --git a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java index 6664c660a4..935f2b2b03 100644 --- a/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java +++ b/java/client/src/main/java/glide/connectors/handlers/ChannelHandler.java @@ -1,17 +1,11 @@ package glide.connectors.handlers; import connection_request.ConnectionRequestOuterClass.ConnectionRequest; -import glide.connectors.resources.Platform; -import glide.connectors.resources.ThreadPoolAllocator; +import glide.connectors.resources.ThreadPoolResource; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelInitializer; -import io.netty.channel.EventLoopGroup; import io.netty.channel.unix.DomainSocketAddress; -import io.netty.channel.unix.DomainSocketChannel; -import io.netty.channel.unix.UnixChannel; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import redis_request.RedisRequestOuterClass.RedisRequest; import response.ResponseOuterClass.Response; @@ -28,36 +22,17 @@ public class ChannelHandler { private final CallbackDispatcher callbackDispatcher; /** Open a new channel for a new client. */ - public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) { - this( - ThreadPoolAllocator.createOrGetNettyThreadPool(THREAD_POOL_NAME, Optional.empty()), - Platform.getClientUdsNettyChannelType(), - new ProtobufSocketChannelInitializer(callbackDispatcher), - new DomainSocketAddress(socketPath), - callbackDispatcher); - } - - /** - * Open a new channel for a new client and running it on the provided EventLoopGroup - * - * @param eventLoopGroup - ELG to run handler on - * @param domainSocketChannelClass - socket channel class for Handler - * @param channelInitializer - UnixChannel initializer - * @param domainSocketAddress - address to connect - * @param callbackDispatcher - dispatcher to handle callbacks - */ public ChannelHandler( - EventLoopGroup eventLoopGroup, - Class domainSocketChannelClass, - ChannelInitializer channelInitializer, - DomainSocketAddress domainSocketAddress, - CallbackDispatcher callbackDispatcher) { + CallbackDispatcher callbackDispatcher, + String socketPath, + ThreadPoolResource threadPoolResource) { + channel = new Bootstrap() - .group(eventLoopGroup) - .channel(domainSocketChannelClass) - .handler(channelInitializer) - .connect(domainSocketAddress) + .group(threadPoolResource.getEventLoopGroup()) + .channel(threadPoolResource.getDomainSocketChannelClass()) + .handler(new ProtobufSocketChannelInitializer(callbackDispatcher)) + .connect(new DomainSocketAddress(socketPath)) // TODO call here .sync() if needed or remove this comment .channel(); this.callbackDispatcher = callbackDispatcher; diff --git a/java/client/src/main/java/glide/connectors/handlers/DefaultThreadPoolResourceHandler.java b/java/client/src/main/java/glide/connectors/handlers/DefaultThreadPoolResourceHandler.java new file mode 100644 index 0000000000..4e2dd28dcc --- /dev/null +++ b/java/client/src/main/java/glide/connectors/handlers/DefaultThreadPoolResourceHandler.java @@ -0,0 +1,44 @@ +package glide.connectors.handlers; + +import glide.connectors.resources.ThreadPoolResource; +import java.util.function.Supplier; + +/** A class responsible to allocating and deallocating the default Thread Pool Resource. */ +public class DefaultThreadPoolResourceHandler { + private static final Object lock = new Object(); + private static ThreadPoolResource defaultThreadPoolResource = null; + + public static ThreadPoolResource getOrCreate(Supplier supplier) { + // once the default is set, we want to avoid hitting the lock + if (defaultThreadPoolResource != null) { + return defaultThreadPoolResource; + } + + synchronized (lock) { + if (defaultThreadPoolResource == null) { + defaultThreadPoolResource = supplier.get(); + } + } + + return defaultThreadPoolResource; + } + + /** + * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing + * resources. It is recommended to use a class instead of lambda to ensure that it is called.
+ * See {@link Runtime#addShutdownHook}. + */ + protected static class ShutdownHook implements Runnable { + @Override + public void run() { + if (defaultThreadPoolResource != null) { + defaultThreadPoolResource.getEventLoopGroup().shutdownGracefully(); + defaultThreadPoolResource = null; + } + } + } + + static { + Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Glide-shutdown-hook")); + } +} diff --git a/java/client/src/main/java/glide/connectors/resources/EpollResource.java b/java/client/src/main/java/glide/connectors/resources/EpollResource.java new file mode 100644 index 0000000000..f3a77deebc --- /dev/null +++ b/java/client/src/main/java/glide/connectors/resources/EpollResource.java @@ -0,0 +1,24 @@ +package glide.connectors.resources; + +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; + +/** + * Implementation of ThreadPoolResource for Epoll-based systems. Enabling custom/default + * configurations. + */ +public class EpollResource extends ThreadPoolResource { + private static final String EPOLL_EVENT_LOOP_IDENTIFIER = "glide-channel-epoll-elg"; + + public EpollResource() { + this( + new EpollEventLoopGroup( + Runtime.getRuntime().availableProcessors(), + new DefaultThreadFactory(EPOLL_EVENT_LOOP_IDENTIFIER, true))); + } + + public EpollResource(EpollEventLoopGroup epollEventLoopGroup) { + super(epollEventLoopGroup, EpollDomainSocketChannel.class); + } +} diff --git a/java/client/src/main/java/glide/connectors/resources/KQueuePoolResource.java b/java/client/src/main/java/glide/connectors/resources/KQueuePoolResource.java new file mode 100644 index 0000000000..078e65a69e --- /dev/null +++ b/java/client/src/main/java/glide/connectors/resources/KQueuePoolResource.java @@ -0,0 +1,24 @@ +package glide.connectors.resources; + +import io.netty.channel.kqueue.KQueueDomainSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.util.concurrent.DefaultThreadFactory; + +/** + * Implementation of ThreadPoolResource for Kqueue-based systems. Enabling custom/default + * configurations. + */ +public class KQueuePoolResource extends ThreadPoolResource { + private static final String KQUEUE_EVENT_LOOP_IDENTIFIER = "glide-channel-kqueue-elg"; + + public KQueuePoolResource() { + this( + new KQueueEventLoopGroup( + Runtime.getRuntime().availableProcessors(), + new DefaultThreadFactory(KQUEUE_EVENT_LOOP_IDENTIFIER, true))); + } + + public KQueuePoolResource(KQueueEventLoopGroup eventLoopGroup) { + super(eventLoopGroup, KQueueDomainSocketChannel.class); + } +} diff --git a/java/client/src/main/java/glide/connectors/resources/Platform.java b/java/client/src/main/java/glide/connectors/resources/Platform.java index 332d0967ad..7ee62ebaf8 100644 --- a/java/client/src/main/java/glide/connectors/resources/Platform.java +++ b/java/client/src/main/java/glide/connectors/resources/Platform.java @@ -1,10 +1,8 @@ package glide.connectors.resources; import io.netty.channel.epoll.Epoll; -import io.netty.channel.epoll.EpollDomainSocketChannel; import io.netty.channel.kqueue.KQueue; -import io.netty.channel.kqueue.KQueueDomainSocketChannel; -import io.netty.channel.unix.DomainSocketChannel; +import java.util.function.Supplier; import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Getter; @@ -56,18 +54,15 @@ private static boolean isEPollAvailable() { } } - /** - * Get a channel class required by Netty to open a client UDS channel. - * - * @return Return a class supported by the current platform. - */ - public static Class getClientUdsNettyChannelType() { - if (capabilities.isKQueueAvailable()) { - return KQueueDomainSocketChannel.class; + public static Supplier getThreadPoolResourceSupplier() { + if (Platform.getCapabilities().isKQueueAvailable()) { + return KQueuePoolResource::new; } - if (capabilities.isEPollAvailable()) { - return EpollDomainSocketChannel.class; + + if (Platform.getCapabilities().isEPollAvailable()) { + return EpollResource::new; } - throw new RuntimeException("Current platform supports no known socket types"); + // TODO support IO-Uring and NIO + throw new RuntimeException("Current platform supports no known thread pool resources"); } } diff --git a/java/client/src/main/java/glide/connectors/resources/ThreadPoolAllocator.java b/java/client/src/main/java/glide/connectors/resources/ThreadPoolAllocator.java deleted file mode 100644 index 26a30537cf..0000000000 --- a/java/client/src/main/java/glide/connectors/resources/ThreadPoolAllocator.java +++ /dev/null @@ -1,73 +0,0 @@ -package glide.connectors.resources; - -import io.netty.channel.EventLoopGroup; -import io.netty.channel.epoll.EpollEventLoopGroup; -import io.netty.channel.kqueue.KQueueEventLoopGroup; -import io.netty.util.concurrent.DefaultThreadFactory; -import java.util.Map; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Supplier; - -/** A class responsible to allocating and deallocating shared thread pools. */ -public class ThreadPoolAllocator { - - /** - * Thread pools supplied to Netty to perform all async IO.
- * Map key is supposed to be pool name + thread count as a string concat product. - */ - private static final Map groups = new ConcurrentHashMap<>(); - - /** - * Allocate (create new or share existing) Netty thread pool required to manage connection. A - * thread pool could be shared across multiple connections. - * - * @return A new thread pool. - */ - public static EventLoopGroup createOrGetNettyThreadPool( - String prefix, Optional threadLimit) { - int threadCount = threadLimit.orElse(Runtime.getRuntime().availableProcessors()); - if (Platform.getCapabilities().isKQueueAvailable()) { - String name = prefix + "-kqueue-elg"; - return getOrCreate( - name + threadCount, - () -> new KQueueEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); - } else if (Platform.getCapabilities().isEPollAvailable()) { - String name = prefix + "-epoll-elg"; - return getOrCreate( - name + threadCount, - () -> new EpollEventLoopGroup(threadCount, new DefaultThreadFactory(name, true))); - } - // TODO support IO-Uring and NIO - - throw new RuntimeException("Current platform supports no known thread pool types"); - } - - /** - * Get a cached thread pool from {@link #groups} or create a new one by given lambda and cache. - */ - private static EventLoopGroup getOrCreate(String name, Supplier supplier) { - if (groups.containsKey(name)) { - return groups.get(name); - } - EventLoopGroup group = supplier.get(); - groups.put(name, group); - return group; - } - - /** - * A JVM shutdown hook to be registered. It is responsible for closing connection and freeing - * resources. It is recommended to use a class instead of lambda to ensure that it is called.
- * See {@link Runtime#addShutdownHook}. - */ - private static class ShutdownHook implements Runnable { - @Override - public void run() { - groups.values().forEach(EventLoopGroup::shutdownGracefully); - } - } - - static { - Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Glide-shutdown-hook")); - } -} diff --git a/java/client/src/main/java/glide/connectors/resources/ThreadPoolResource.java b/java/client/src/main/java/glide/connectors/resources/ThreadPoolResource.java new file mode 100644 index 0000000000..040016c60e --- /dev/null +++ b/java/client/src/main/java/glide/connectors/resources/ThreadPoolResource.java @@ -0,0 +1,23 @@ +package glide.connectors.resources; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.unix.DomainSocketChannel; +import lombok.Getter; +import lombok.NonNull; + +/** + * Abstract base class that sets up the EventLoopGroup and channel configuration for Netty + * applications. + */ +@Getter +public abstract class ThreadPoolResource { + private EventLoopGroup eventLoopGroup; + private Class domainSocketChannelClass; + + public ThreadPoolResource( + @NonNull EventLoopGroup eventLoopGroup, + @NonNull Class domainSocketChannelClass) { + this.eventLoopGroup = eventLoopGroup; + this.domainSocketChannelClass = domainSocketChannelClass; + } +} diff --git a/java/client/src/main/java/glide/managers/CommandManager.java b/java/client/src/main/java/glide/managers/CommandManager.java index 4fcb3be574..da036595e0 100644 --- a/java/client/src/main/java/glide/managers/CommandManager.java +++ b/java/client/src/main/java/glide/managers/CommandManager.java @@ -1,19 +1,11 @@ package glide.managers; -import glide.api.models.exceptions.ClosingException; -import glide.api.models.exceptions.ConnectionException; -import glide.api.models.exceptions.ExecAbortException; -import glide.api.models.exceptions.RedisException; -import glide.api.models.exceptions.RequestException; -import glide.api.models.exceptions.TimeoutException; +import glide.api.commands.Command; +import glide.api.commands.RedisExceptionCheckedFunction; import glide.connectors.handlers.ChannelHandler; -import glide.ffi.resolvers.RedisValueResolver; -import glide.models.RequestBuilder; -import java.util.List; import java.util.concurrent.CompletableFuture; import lombok.RequiredArgsConstructor; -import redis_request.RedisRequestOuterClass.RequestType; -import response.ResponseOuterClass.RequestError; +import redis_request.RedisRequestOuterClass; import response.ResponseOuterClass.Response; /** @@ -27,82 +19,54 @@ public class CommandManager { private final ChannelHandler channel; /** - * Async (non-blocking) get.
- * See REDIS docs for GET. + * Build a command and send. * - * @param key The key name + * @param command + * @param responseHandler - to handle the response object + * @return A result promise of type T */ - public CompletableFuture get(String key) { - return submitNewRequest(RequestType.GetString, List.of(key)); + public CompletableFuture submitNewCommand( + Command command, RedisExceptionCheckedFunction responseHandler) { + // register callback + // create protobuf message from command + // submit async call + return channel + .write(prepareRedisRequest(command.getRequestType(), command.getArguments()), true) + .thenApplyAsync(response -> responseHandler.apply(response)); } /** - * Async (non-blocking) set.
- * See REDIS docs for SET. + * Build a protobuf command/transaction request object.
+ * Used by {@link CommandManager}. * - * @param key The key name - * @param value The value to set + * @return An uncompleted request. CallbackDispatcher is responsible to complete it by adding a + * callback id. */ - public CompletableFuture set(String key, String value) { - return submitNewRequest(RequestType.SetString, List.of(key, value)); - } + private RedisRequestOuterClass.RedisRequest.Builder prepareRedisRequest( + Command.RequestType command, String[] args) { + RedisRequestOuterClass.Command.ArgsArray.Builder commandArgs = + RedisRequestOuterClass.Command.ArgsArray.newBuilder(); + for (var arg : args) { + commandArgs.addArgs(arg); + } - /** - * Build a command and submit it Netty to send. - * - * @param command Command type - * @param args Command arguments - * @return A result promise - */ - private CompletableFuture submitNewRequest(RequestType command, List args) { - return channel - .write(RequestBuilder.prepareRedisRequest(command, args), true) - .thenApplyAsync(this::extractValueFromGlideRsResponse); + return RedisRequestOuterClass.RedisRequest.newBuilder() + .setSingleCommand( + RedisRequestOuterClass.Command.newBuilder() + .setRequestType(mapRequestTypes(command)) + .setArgsArray(commandArgs.build()) + .build()) + .setRoute( + RedisRequestOuterClass.Routes.newBuilder() + .setSimpleRoutes(RedisRequestOuterClass.SimpleRoutes.AllNodes) + .build()); } - /** - * Check response and extract data from it. - * - * @param response A response received from rust core lib - * @return A String from the Redis response, or Ok. Otherwise, returns null - */ - private String extractValueFromGlideRsResponse(Response response) { - if (response.hasRequestError()) { - RequestError error = response.getRequestError(); - String msg = error.getMessage(); - switch (error.getType()) { - case Unspecified: - // Unspecified error on Redis service-side - throw new RequestException(msg); - case ExecAbort: - // Transactional error on Redis service-side - throw new ExecAbortException(msg); - case Timeout: - // Timeout from Glide to Redis service - throw new TimeoutException(msg); - case Disconnect: - // Connection problem between Glide and Redis - throw new ConnectionException(msg); - default: - // Request or command error from Redis - throw new RedisException(msg); - } - } - if (response.hasClosingError()) { - // A closing error is thrown when Rust-core is not connected to Redis - // We want to close shop and throw a ClosingException - channel.close(); - throw new ClosingException(response.getClosingError()); - } - if (response.hasConstantResponse()) { - // Return "OK" - return response.getConstantResponse().toString(); - } - if (response.hasRespPointer()) { - // Return the shared value - which may be a null value - return RedisValueResolver.valueFromPointer(response.getRespPointer()).toString(); + private RedisRequestOuterClass.RequestType mapRequestTypes(Command.RequestType inType) { + switch (inType) { + case CUSTOM_COMMAND: + return RedisRequestOuterClass.RequestType.CustomCommand; } - // if no response payload is provided, assume null - return null; + throw new RuntimeException("Unsupported request type"); } } diff --git a/java/client/src/main/java/glide/models/RequestBuilder.java b/java/client/src/main/java/glide/models/RequestBuilder.java deleted file mode 100644 index cd8e2dc1b6..0000000000 --- a/java/client/src/main/java/glide/models/RequestBuilder.java +++ /dev/null @@ -1,54 +0,0 @@ -package glide.models; - -import connection_request.ConnectionRequestOuterClass.ConnectionRequest; -import connection_request.ConnectionRequestOuterClass.NodeAddress; -import connection_request.ConnectionRequestOuterClass.ReadFrom; -import connection_request.ConnectionRequestOuterClass.TlsMode; -import glide.connectors.handlers.CallbackDispatcher; -import glide.managers.CommandManager; -import glide.managers.ConnectionManager; -import java.util.List; -import redis_request.RedisRequestOuterClass.Command; -import redis_request.RedisRequestOuterClass.Command.ArgsArray; -import redis_request.RedisRequestOuterClass.RedisRequest; -import redis_request.RedisRequestOuterClass.RequestType; -import redis_request.RedisRequestOuterClass.Routes; -import redis_request.RedisRequestOuterClass.SimpleRoutes; - -public class RequestBuilder { - - /** - * Build a protobuf connection request.
- * Used by {@link ConnectionManager#connectToRedis}. - */ - // TODO support more parameters and/or configuration object - public static ConnectionRequest createConnectionRequest( - String host, int port, boolean useSsl, boolean clusterMode) { - return ConnectionRequest.newBuilder() - .addAddresses(NodeAddress.newBuilder().setHost(host).setPort(port).build()) - .setTlsMode(useSsl ? TlsMode.SecureTls : TlsMode.NoTls) - .setClusterModeEnabled(clusterMode) - .setReadFrom(ReadFrom.Primary) - .setDatabaseId(0) - .build(); - } - - /** - * Build a protobuf command/transaction request draft.
- * Used by {@link CommandManager}. - * - * @return An uncompleted request. {@link CallbackDispatcher} is responsible to complete it by - * adding a callback id. - */ - public static RedisRequest.Builder prepareRedisRequest(RequestType command, List args) { - var commandArgs = ArgsArray.newBuilder(); - for (var arg : args) { - commandArgs.addArgs(arg); - } - - return RedisRequest.newBuilder() - .setSingleCommand( - Command.newBuilder().setRequestType(command).setArgsArray(commandArgs.build()).build()) - .setRoute(Routes.newBuilder().setSimpleRoutes(SimpleRoutes.AllNodes).build()); - } -} diff --git a/java/client/src/main/java/lombok.config b/java/client/src/main/java/lombok.config new file mode 100644 index 0000000000..8f7e8aa1ac --- /dev/null +++ b/java/client/src/main/java/lombok.config @@ -0,0 +1 @@ +lombok.addLombokGeneratedAnnotation = true \ No newline at end of file diff --git a/java/client/src/test/java/glide/api/RedisClientConfigurationTest.java b/java/client/src/test/java/glide/api/RedisClientConfigurationTest.java new file mode 100644 index 0000000000..047df757cc --- /dev/null +++ b/java/client/src/test/java/glide/api/RedisClientConfigurationTest.java @@ -0,0 +1,138 @@ +package glide.api; + +import static glide.api.models.configuration.NodeAddress.DEFAULT_HOST; +import static glide.api.models.configuration.NodeAddress.DEFAULT_PORT; +import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertNull; + +import glide.api.models.configuration.*; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +public class RedisClientConfigurationTest { + private static String HOST = "aws.com"; + private static int PORT = 9999; + + private static String USERNAME = "JohnDoe"; + private static String PASSWORD = "Password1"; + + private static int NUM_OF_RETRIES = 5; + private static int FACTOR = 10; + private static int EXPONENT_BASE = 50; + + private static int DATABASE_ID = 1; + + private static int REQUEST_TIMEOUT = 3; + + @Test + public void NodeAddress_DefaultConfig() { + NodeAddress nodeAddress = NodeAddress.builder().build(); + + assertEquals(DEFAULT_HOST, nodeAddress.getHost()); + assertEquals(DEFAULT_PORT, nodeAddress.getPort()); + } + + @Test + public void NodeAddress_CustomConfig() { + NodeAddress nodeAddress = NodeAddress.builder().host(HOST).port(PORT).build(); + + assertEquals(HOST, nodeAddress.getHost()); + assertEquals(PORT, nodeAddress.getPort()); + } + + @Test + public void BackoffStrategy_CustomConfig() { + BackoffStrategy backoffStrategy = + BackoffStrategy.builder() + .numOfRetries(NUM_OF_RETRIES) + .factor(FACTOR) + .exponentBase(EXPONENT_BASE) + .build(); + + assertEquals(NUM_OF_RETRIES, backoffStrategy.getNumOfRetries()); + assertEquals(FACTOR, backoffStrategy.getFactor()); + assertEquals(EXPONENT_BASE, backoffStrategy.getExponentBase()); + } + + @Test + public void RedisCredentials_CustomConfig() { + RedisCredentials redisCredentials = + RedisCredentials.builder().password(PASSWORD).username(USERNAME).build(); + + assertEquals(PASSWORD, redisCredentials.getPassword()); + assertEquals(USERNAME, redisCredentials.getUsername()); + } + + @Test + public void RedisClientConfiguration_DefaultConfig() { + RedisClientConfiguration redisClientConfiguration = RedisClientConfiguration.builder().build(); + + assertEquals(new ArrayList(), redisClientConfiguration.getAddresses()); + assertFalse(redisClientConfiguration.isUseTLS()); + assertEquals(ReadFrom.PRIMARY, redisClientConfiguration.getReadFrom()); + assertNull(redisClientConfiguration.getCredentials()); + assertNull(redisClientConfiguration.getRequestTimeout()); + assertNull(redisClientConfiguration.getDatabaseId()); + assertNull(redisClientConfiguration.getReconnectStrategy()); + } + + @Test + public void RedisClientConfiguration_CustomConfig() { + RedisClientConfiguration redisClientConfiguration = + RedisClientConfiguration.builder() + .address(NodeAddress.builder().host(HOST).port(PORT).build()) + .address(NodeAddress.builder().host(DEFAULT_HOST).port(DEFAULT_PORT).build()) + .useTLS(true) + .readFrom(ReadFrom.PREFER_REPLICA) + .credentials(RedisCredentials.builder().username(USERNAME).password(PASSWORD).build()) + .requestTimeout(REQUEST_TIMEOUT) + .reconnectStrategy( + BackoffStrategy.builder() + .numOfRetries(NUM_OF_RETRIES) + .exponentBase(EXPONENT_BASE) + .factor(FACTOR) + .build()) + .databaseId(DATABASE_ID) + .build(); + + List expectedAddresses = new ArrayList<>(); + NodeAddress address1 = NodeAddress.builder().host(HOST).port(PORT).build(); + NodeAddress address2 = NodeAddress.builder().host(DEFAULT_HOST).port(DEFAULT_PORT).build(); + expectedAddresses.add(address1); + expectedAddresses.add(address2); + + List actualAddresses = redisClientConfiguration.getAddresses(); + assertEquals( + expectedAddresses.size(), actualAddresses.size(), "Lists should be of the same size"); + for (int i = 0; i < actualAddresses.size(); i++) { + NodeAddress actualNodeAddress = actualAddresses.get(i); + NodeAddress expectedNodeAddress = expectedAddresses.get(i); + assertAll( + "Object fields should match", + () -> assertEquals(expectedNodeAddress.getHost(), actualNodeAddress.getHost()), + () -> assertEquals(expectedNodeAddress.getPort(), actualNodeAddress.getPort())); + } + assertTrue(redisClientConfiguration.isUseTLS()); + assertEquals(ReadFrom.PREFER_REPLICA, redisClientConfiguration.getReadFrom()); + assertEquals(PASSWORD, redisClientConfiguration.getCredentials().getPassword()); + assertEquals(USERNAME, redisClientConfiguration.getCredentials().getUsername()); + assertEquals(REQUEST_TIMEOUT, redisClientConfiguration.getRequestTimeout()); + assertEquals(NUM_OF_RETRIES, redisClientConfiguration.getReconnectStrategy().getNumOfRetries()); + assertEquals(FACTOR, redisClientConfiguration.getReconnectStrategy().getFactor()); + assertEquals(EXPONENT_BASE, redisClientConfiguration.getReconnectStrategy().getExponentBase()); + assertEquals(DATABASE_ID, redisClientConfiguration.getDatabaseId()); + } + + @Test + public void RedisClusterClientConfiguration_DefaultConfig() { + RedisClusterClientConfiguration redisClusterClientConfiguration = + RedisClusterClientConfiguration.builder().build(); + + assertEquals(new ArrayList(), redisClusterClientConfiguration.getAddresses()); + assertFalse(redisClusterClientConfiguration.isUseTLS()); + assertEquals(ReadFrom.PRIMARY, redisClusterClientConfiguration.getReadFrom()); + assertNull(redisClusterClientConfiguration.getCredentials()); + assertNull(redisClusterClientConfiguration.getRequestTimeout()); + } +} diff --git a/java/client/src/test/java/glide/api/RedisClientCreateTest.java b/java/client/src/test/java/glide/api/RedisClientCreateTest.java index d25c6b40ab..d2edf36301 100644 --- a/java/client/src/test/java/glide/api/RedisClientCreateTest.java +++ b/java/client/src/test/java/glide/api/RedisClientCreateTest.java @@ -1,19 +1,26 @@ package glide.api; import static glide.api.RedisClient.CreateClient; +import static glide.api.RedisClient.buildChannelHandler; import static glide.api.RedisClient.buildCommandManager; import static glide.api.RedisClient.buildConnectionManager; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; import glide.api.models.configuration.RedisClientConfiguration; import glide.api.models.exceptions.ClosingException; import glide.connectors.handlers.ChannelHandler; +import glide.connectors.resources.ThreadPoolResource; import glide.managers.CommandManager; import glide.managers.ConnectionManager; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import lombok.SneakyThrows; @@ -29,6 +36,7 @@ public class RedisClientCreateTest { private ChannelHandler channelHandler; private ConnectionManager connectionManager; private CommandManager commandManager; + private ThreadPoolResource threadPoolResource; @BeforeEach public void init() { @@ -37,8 +45,9 @@ public void init() { channelHandler = mock(ChannelHandler.class); commandManager = mock(CommandManager.class); connectionManager = mock(ConnectionManager.class); + threadPoolResource = mock(ThreadPoolResource.class); - mockedClient.when(RedisClient::buildChannelHandler).thenReturn(channelHandler); + mockedClient.when(() -> buildChannelHandler(any())).thenReturn(channelHandler); mockedClient.when(() -> buildConnectionManager(channelHandler)).thenReturn(connectionManager); mockedClient.when(() -> buildCommandManager(channelHandler)).thenReturn(commandManager); } @@ -55,18 +64,24 @@ public void createClient_withConfig_successfullyReturnsRedisClient() { // setup CompletableFuture connectToRedisFuture = new CompletableFuture<>(); connectToRedisFuture.complete(null); - RedisClientConfiguration config = RedisClientConfiguration.builder().build(); + RedisClientConfiguration config = + RedisClientConfiguration.builder().threadPoolResource(threadPoolResource).build(); when(connectionManager.connectToRedis(eq(config))).thenReturn(connectToRedisFuture); mockedClient.when(() -> CreateClient(config)).thenCallRealMethod(); + CompletableFuture closeRedisFuture = new CompletableFuture<>(); + closeRedisFuture.complete(null); + when(connectionManager.closeConnection()).thenReturn(closeRedisFuture); + // exercise - CompletableFuture result = CreateClient(config); - RedisClient client = result.get(); + try (RedisClient client = CreateClient(config).get()) { + // verify + assertEquals(connectionManager, client.connectionManager); + assertEquals(commandManager, client.commandManager); + } - // verify - assertEquals(connectionManager, client.connectionManager); - assertEquals(commandManager, client.commandManager); + verify(connectionManager, times(1)).closeConnection(); } @SneakyThrows @@ -78,8 +93,8 @@ public void createClient_errorOnConnectionThrowsExecutionException() { connectToRedisFuture.completeExceptionally(exception); RedisClientConfiguration config = RedisClientConfiguration.builder().build(); - when(connectionManager.connectToRedis(eq(config))).thenReturn(connectToRedisFuture); - mockedClient.when(() -> CreateClient(config)).thenCallRealMethod(); + when(connectionManager.connectToRedis(any())).thenReturn(connectToRedisFuture); + mockedClient.when(() -> CreateClient(any())).thenCallRealMethod(); // exercise CompletableFuture result = CreateClient(config); @@ -90,4 +105,65 @@ public void createClient_errorOnConnectionThrowsExecutionException() { // verify assertEquals(exception, executionException.getCause()); } + + @SneakyThrows + @Test + public void redisClientClose_throwsCancellationException() { + + // setup + CompletableFuture connectToRedisFuture = new CompletableFuture<>(); + connectToRedisFuture.complete(null); + RedisClientConfiguration config = + RedisClientConfiguration.builder().threadPoolResource(threadPoolResource).build(); + + when(connectionManager.connectToRedis(eq(config))).thenReturn(connectToRedisFuture); + mockedClient.when(() -> CreateClient(config)).thenCallRealMethod(); + + CompletableFuture closeRedisFuture = new CompletableFuture<>(); + InterruptedException interruptedException = new InterruptedException("Interrupted"); + closeRedisFuture.cancel(true); + when(connectionManager.closeConnection()).thenReturn(closeRedisFuture); + + // exercise + try (RedisClient client = CreateClient(config).get()) { + // verify + assertEquals(connectionManager, client.connectionManager); + assertEquals(commandManager, client.commandManager); + } catch (Exception cancellationException) { + assertTrue(cancellationException instanceof CancellationException); + } + + verify(connectionManager, times(1)).closeConnection(); + } + + @SneakyThrows + @Test + public void redisClientClose_throwsInterruptedException() { + + // setup + CompletableFuture connectToRedisFuture = new CompletableFuture<>(); + connectToRedisFuture.complete(null); + RedisClientConfiguration config = + RedisClientConfiguration.builder().threadPoolResource(threadPoolResource).build(); + + when(connectionManager.connectToRedis(eq(config))).thenReturn(connectToRedisFuture); + mockedClient.when(() -> CreateClient(config)).thenCallRealMethod(); + + CompletableFuture closeRedisFuture = mock(CompletableFuture.class); + InterruptedException interruptedException = new InterruptedException("Interrupted"); + when(closeRedisFuture.get()).thenThrow(interruptedException); + + when(connectionManager.closeConnection()).thenReturn(closeRedisFuture); + + // exercise + try (RedisClient client = CreateClient(config).get()) { + // verify + assertEquals(connectionManager, client.connectionManager); + assertEquals(commandManager, client.commandManager); + } catch (Exception exception) { + assertEquals(interruptedException, exception.getCause()); + } + + verify(connectionManager, times(1)).closeConnection(); + } } diff --git a/java/client/src/test/java/glide/api/RedisClientTest.java b/java/client/src/test/java/glide/api/RedisClientTest.java new file mode 100644 index 0000000000..ac7340de7b --- /dev/null +++ b/java/client/src/test/java/glide/api/RedisClientTest.java @@ -0,0 +1,75 @@ +package glide.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import glide.managers.CommandManager; +import glide.managers.ConnectionManager; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class RedisClientTest { + + RedisClient service; + + ConnectionManager connectionManager; + + CommandManager commandManager; + + @BeforeEach + public void setUp() { + connectionManager = mock(ConnectionManager.class); + commandManager = mock(CommandManager.class); + service = new RedisClient(connectionManager, commandManager); + } + + @Test + public void customCommand_success() throws ExecutionException, InterruptedException { + // setup + String key = "testKey"; + Object value = "testValue"; + String cmd = "GETSTRING"; + CompletableFuture testResponse = mock(CompletableFuture.class); + when(testResponse.get()).thenReturn(value); + when(commandManager.submitNewCommand(any(), any())).thenReturn(testResponse); + + // exercise + CompletableFuture response = service.customCommand(new String[] {cmd, key}); + String payload = (String) response.get(); + + // verify + assertEquals(testResponse, response); + assertEquals(value, payload); + + // teardown + } + + @Test + public void customCommand_interruptedException() throws ExecutionException, InterruptedException { + // setup + String key = "testKey"; + Object value = "testValue"; + String cmd = "GETSTRING"; + CompletableFuture testResponse = mock(CompletableFuture.class); + InterruptedException interruptedException = new InterruptedException(); + when(testResponse.get()).thenThrow(interruptedException); + when(commandManager.submitNewCommand(any(), any())).thenReturn(testResponse); + + // exercise + InterruptedException exception = + assertThrows( + InterruptedException.class, + () -> { + CompletableFuture response = service.customCommand(new String[] {cmd, key}); + response.get(); + }); + + // verify + assertEquals(interruptedException, exception); + } +} diff --git a/java/client/src/test/java/glide/connectors/handlers/DefaultThreadPoolResourceHandlerTest.java b/java/client/src/test/java/glide/connectors/handlers/DefaultThreadPoolResourceHandlerTest.java new file mode 100644 index 0000000000..35f51e49a8 --- /dev/null +++ b/java/client/src/test/java/glide/connectors/handlers/DefaultThreadPoolResourceHandlerTest.java @@ -0,0 +1,44 @@ +package glide.connectors.handlers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +import glide.connectors.resources.ThreadPoolResource; +import io.netty.channel.EventLoopGroup; +import java.util.function.Supplier; +import org.junit.jupiter.api.Test; + +public class DefaultThreadPoolResourceHandlerTest { + + DefaultThreadPoolResourceHandler service; + + @Test + public void getOrCreateReturnsDefault() { + (new DefaultThreadPoolResourceHandler.ShutdownHook()).run(); + + ThreadPoolResource mockedThreadPool = mock(ThreadPoolResource.class); + Supplier threadPoolSupplier = mock(Supplier.class); + when(threadPoolSupplier.get()).thenReturn(mockedThreadPool); + + ThreadPoolResource theResource = service.getOrCreate(threadPoolSupplier); + assertEquals(mockedThreadPool, theResource); + + ThreadPoolResource theSameResource = service.getOrCreate(threadPoolSupplier); + assertEquals(mockedThreadPool, theSameResource); + + // and test that the supplier isn't used any longer once the default is set + Supplier notUsedSupplier = mock(Supplier.class); + ThreadPoolResource firstResource = service.getOrCreate(notUsedSupplier); + verify(notUsedSupplier, times(0)).get(); + assertEquals(mockedThreadPool, firstResource); + + // teardown + // remove the mocked resource + EventLoopGroup mockedELG = mock(EventLoopGroup.class); + when(mockedThreadPool.getEventLoopGroup()).thenReturn(mockedELG); + (new DefaultThreadPoolResourceHandler.ShutdownHook()).run(); + } +} diff --git a/java/client/src/test/java/glide/handlers/ReadHandlerTest.java b/java/client/src/test/java/glide/handlers/ReadHandlerTest.java new file mode 100644 index 0000000000..198f9e2d7d --- /dev/null +++ b/java/client/src/test/java/glide/handlers/ReadHandlerTest.java @@ -0,0 +1,72 @@ +package glide.handlers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import glide.connectors.handlers.CallbackDispatcher; +import glide.connectors.handlers.ReadHandler; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import response.ResponseOuterClass; + +public class ReadHandlerTest { + + EmbeddedChannel embeddedChannel; + ReadHandler readHandler; + CallbackDispatcher dispatcher; + + @BeforeEach + public void init() { + dispatcher = mock(CallbackDispatcher.class); + readHandler = new ReadHandler(dispatcher); + embeddedChannel = new EmbeddedChannel(readHandler); + } + + @AfterEach + public void teardown() { + embeddedChannel.finishAndReleaseAll(); + } + + @Test + public void readHandlerRead_testInboundProtobufMessages() { + ResponseOuterClass.Response msg = + ResponseOuterClass.Response.newBuilder() + .setConstantResponse(ResponseOuterClass.ConstantResponse.OK) + .build(); + + assertTrue(embeddedChannel.writeInbound(msg, msg, msg)); + assertTrue(embeddedChannel.finish()); + + verify(dispatcher, times(3)).completeRequest(msg); + } + + @Test + public void readHandlerRead_testInboundProtobufMessages_invalidMessage() { + + String invalidMsg = "Invalid"; + + Exception e = + assertThrows( + Exception.class, + () -> embeddedChannel.writeInbound(invalidMsg, invalidMsg, invalidMsg)); + assertEquals("Unexpected message in socket", e.getMessage()); + + verify(dispatcher, times(0)).completeRequest(any()); + + ResponseOuterClass.Response msg = + ResponseOuterClass.Response.newBuilder() + .setConstantResponse(ResponseOuterClass.ConstantResponse.OK) + .build(); + assertTrue(embeddedChannel.writeInbound(msg)); + assertTrue(embeddedChannel.finish()); + + verify(dispatcher, times(1)).completeRequest(msg); + } +} diff --git a/java/client/src/test/java/glide/managers/CommandManagerTest.java b/java/client/src/test/java/glide/managers/CommandManagerTest.java new file mode 100644 index 0000000000..45cd168105 --- /dev/null +++ b/java/client/src/test/java/glide/managers/CommandManagerTest.java @@ -0,0 +1,271 @@ +package glide.managers; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import glide.api.commands.BaseCommandResponseResolver; +import glide.api.commands.Command; +import glide.api.models.exceptions.ClosingException; +import glide.api.models.exceptions.ConnectionException; +import glide.api.models.exceptions.ExecAbortException; +import glide.api.models.exceptions.RedisException; +import glide.api.models.exceptions.TimeoutException; +import glide.connectors.handlers.ChannelHandler; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import response.ResponseOuterClass.RequestError; +import response.ResponseOuterClass.Response; + +public class CommandManagerTest { + + ChannelHandler channelHandler; + + CommandManager service; + + Command command; + + @BeforeEach + void init() { + command = Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).build(); + + channelHandler = mock(ChannelHandler.class); + service = new CommandManager(channelHandler); + } + + @Test + public void submitNewCommand_returnObjectResult() + throws ExecutionException, InterruptedException { + + // setup + long pointer = -1; + Response respPointerResponse = Response.newBuilder().setRespPointer(pointer).build(); + Object respObject = mock(Object.class); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(respPointerResponse); + when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + + // exercise + CompletableFuture result = + service.submitNewCommand( + command, new BaseCommandResponseResolver((ptr) -> ptr == pointer ? respObject : null)); + Object respPointer = result.get(); + + // verify + assertEquals(respObject, respPointer); + } + + @Test + public void submitNewCommand_returnNullResult() throws ExecutionException, InterruptedException { + // setup + Response respPointerResponse = Response.newBuilder().build(); + CompletableFuture future = new CompletableFuture<>(); + future.complete(respPointerResponse); + when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + + // exercise + CompletableFuture result = + service.submitNewCommand( + command, new BaseCommandResponseResolver((p) -> new RuntimeException(""))); + Object respPointer = result.get(); + + assertNull(respPointer); + } + + @Test + public void submitNewCommand_returnStringResult() + throws ExecutionException, InterruptedException { + + // setup + long pointer = 123; + String testString = "TEST STRING"; + + Response respPointerResponse = Response.newBuilder().setRespPointer(pointer).build(); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(respPointerResponse); + when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + + // exercise + CompletableFuture result = + service.submitNewCommand( + command, new BaseCommandResponseResolver((p) -> p == pointer ? testString : null)); + Object respPointer = result.get(); + + // verify + assertTrue(respPointer instanceof String); + assertEquals(testString, respPointer); + } + + @Test + public void submitNewCommand_throwClosingException() { + + // setup + String errorMsg = "Closing"; + + Response closingErrorResponse = Response.newBuilder().setClosingError(errorMsg).build(); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(closingErrorResponse); + when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + + // exercise + ExecutionException e = + assertThrows( + ExecutionException.class, + () -> { + CompletableFuture result = + service.submitNewCommand( + command, new BaseCommandResponseResolver((ptr) -> new Object())); + result.get(); + }); + + // verify + assertTrue(e.getCause() instanceof ClosingException); + assertEquals(errorMsg, e.getCause().getMessage()); + } + + @Test + public void submitNewCommand_throwConnectionException() { + + // setup + int disconnectedType = 3; + String errorMsg = "Disconnected"; + + Response respPointerResponse = + Response.newBuilder() + .setRequestError( + RequestError.newBuilder() + .setTypeValue(disconnectedType) + .setMessage(errorMsg) + .build()) + .build(); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(respPointerResponse); + when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + + // exercise + ExecutionException e = + assertThrows( + ExecutionException.class, + () -> { + CompletableFuture result = + service.submitNewCommand( + command, new BaseCommandResponseResolver((ptr) -> new Object())); + result.get(); + }); + + // verify + assertTrue(e.getCause() instanceof ConnectionException); + assertEquals(errorMsg, e.getCause().getMessage()); + } + + @Test + public void submitNewCommand_throwTimeoutException() { + + // setup + int timeoutType = 2; + String errorMsg = "Timeout"; + + Response timeoutErrorResponse = + Response.newBuilder() + .setRequestError( + RequestError.newBuilder().setTypeValue(timeoutType).setMessage(errorMsg).build()) + .build(); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(timeoutErrorResponse); + when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + + // exercise + ExecutionException e = + assertThrows( + ExecutionException.class, + () -> { + CompletableFuture result = + service.submitNewCommand( + command, new BaseCommandResponseResolver((ptr) -> new Object())); + result.get(); + }); + + // verify + assertTrue(e.getCause() instanceof TimeoutException); + assertEquals(errorMsg, e.getCause().getMessage()); + } + + @Test + public void submitNewCommand_throwExecAbortException() { + // setup + int execAbortType = 1; + String errorMsg = "ExecAbort"; + + Response execAbortErrorResponse = + Response.newBuilder() + .setRequestError( + RequestError.newBuilder().setTypeValue(execAbortType).setMessage(errorMsg).build()) + .build(); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(execAbortErrorResponse); + when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + + // exercise + ExecutionException e = + assertThrows( + ExecutionException.class, + () -> { + CompletableFuture result = + service.submitNewCommand( + command, new BaseCommandResponseResolver((ptr) -> new Object())); + result.get(); + }); + + // verify + assertTrue(e.getCause() instanceof ExecAbortException); + assertEquals(errorMsg, e.getCause().getMessage()); + } + + @Test + public void submitNewCommand_handledUnspecifiedError() { + // setup + int unspecifiedType = 0; + String errorMsg = "Unspecified"; + + Response unspecifiedErrorResponse = + Response.newBuilder() + .setRequestError( + RequestError.newBuilder() + .setTypeValue(unspecifiedType) + .setMessage(errorMsg) + .build()) + .build(); + + CompletableFuture future = new CompletableFuture<>(); + future.complete(unspecifiedErrorResponse); + when(channelHandler.write(any(), anyBoolean())).thenReturn(future); + + // exercise + ExecutionException executionException = + assertThrows( + ExecutionException.class, + () -> { + CompletableFuture result = + service.submitNewCommand( + command, new BaseCommandResponseResolver((ptr) -> new Object())); + result.get(); + }); + + // verify + assertTrue(executionException.getCause() instanceof RedisException); + assertEquals(errorMsg, executionException.getCause().getMessage()); + } +} diff --git a/java/client/src/test/java/glide/managers/ConnectionManagerTest.java b/java/client/src/test/java/glide/managers/ConnectionManagerTest.java index 42419f9703..9ae2d94a4e 100644 --- a/java/client/src/test/java/glide/managers/ConnectionManagerTest.java +++ b/java/client/src/test/java/glide/managers/ConnectionManagerTest.java @@ -209,7 +209,8 @@ public void onConnection_emptyResponse_throwsClosingException() { assertTrue(executionException.getCause() instanceof ClosingException); assertEquals("Unexpected empty data in response", executionException.getCause().getMessage()); - verify(channel).close(); + // TODO: channel may not be completed since we don't wait for the close to complete + // verify(channel).close(); } @Test