Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Configuration Tests and Configured Jacoco #56

Closed
12 changes: 12 additions & 0 deletions java/client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import java.nio.file.Paths

plugins {
id 'java-library'
id 'jacoco'
}

repositories {
Expand Down Expand Up @@ -103,4 +104,15 @@ tasks.withType(Test) {
showStandardStreams true
}
jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug"
finalizedBy jacocoTestReport, jacocoTestCoverageVerification
}

jacocoTestReport {
afterEvaluate {
classDirectories.setFrom(files(classDirectories.files.collect {
fileTree(dir: it, exclude: ['**/connection_request/**', '**/response/**', '**/redis_request/**', '**/connectors/resources/**'])
}))
}
}


37 changes: 30 additions & 7 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

import static glide.ffi.resolvers.SocketListenerResolver.getSocket;

import glide.api.commands.BaseCommands;
import glide.api.commands.Command;
import glide.api.models.configuration.RedisClientConfiguration;
import glide.connectors.handlers.CallbackDispatcher;
import glide.connectors.handlers.ChannelHandler;
import glide.connectors.handlers.DefaultThreadPoolResourceHandler;
import glide.connectors.resources.Platform;
import glide.connectors.resources.ThreadPoolResource;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.concurrent.CompletableFuture;
Expand All @@ -13,27 +18,32 @@
* Async (non-blocking) client for Redis in Standalone mode. Use {@link
* #CreateClient(RedisClientConfiguration)} to request a client to Redis.
*/
public class RedisClient extends BaseClient {
public class RedisClient extends BaseClient implements BaseCommands {

/**
* Request an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @return a Future to connect and return a RedisClient
* @return a future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
ThreadPoolResource threadPoolResource = config.getThreadPoolResource();
if (threadPoolResource == null) {
threadPoolResource =
DefaultThreadPoolResourceHandler.getOrCreate(Platform.getThreadPoolResourceSupplier());
}
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource);
var connectionManager = buildConnectionManager(channelHandler);
var commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
}

protected static ChannelHandler buildChannelHandler() {
protected static ChannelHandler buildChannelHandler(ThreadPoolResource threadPoolResource) {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
return new ChannelHandler(callbackDispatcher, getSocket());
return new ChannelHandler(callbackDispatcher, getSocket(), threadPoolResource);
}

protected static ConnectionManager buildConnectionManager(ChannelHandler channelHandler) {
Expand All @@ -47,4 +57,17 @@ protected static CommandManager buildCommandManager(ChannelHandler channelHandle
protected RedisClient(ConnectionManager connectionManager, CommandManager commandManager) {
super(connectionManager, commandManager);
}

/**
* Executes a single custom command, without checking inputs. Every part of the command, including
* subcommands, should be added as a separate value in args.
*
* @param args command and arguments for the custom command call
* @return CompletableFuture with the response
*/
public CompletableFuture<Object> customCommand(String[] args) {
Command command =
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, BaseCommands::handleObjectResponse);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package glide.api.commands;

import glide.api.models.exceptions.ClosingException;
import glide.api.models.exceptions.ConnectionException;
import glide.api.models.exceptions.ExecAbortException;
import glide.api.models.exceptions.RedisException;
import glide.api.models.exceptions.RequestException;
import glide.api.models.exceptions.TimeoutException;
import lombok.AllArgsConstructor;
import response.ResponseOuterClass.RequestError;
import response.ResponseOuterClass.Response;

/**
* Response resolver responsible for evaluating the Redis response object with a success or failure.
*/
@AllArgsConstructor
public class BaseCommandResponseResolver
implements RedisExceptionCheckedFunction<Response, Object> {

private RedisExceptionCheckedFunction<Long, Object> respPointerResolver;

/**
* Extracts value from the RESP pointer. <br>
* Throws errors when the response is unsuccessful.
*
* @return A generic Object with the Response | null if the response is empty
*/
public Object apply(Response response) throws RedisException {
// TODO: handle object if the object is small
// TODO: handle RESP2 object if configuration is set
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
String msg = error.getMessage();
switch (error.getType()) {
case Unspecified:
// Unspecified error on Redis service-side
throw new RequestException(msg);
case ExecAbort:
// Transactional error on Redis service-side
throw new ExecAbortException(msg);
case Timeout:
// Timeout from Glide to Redis service
throw new TimeoutException(msg);
case Disconnect:
// Connection problem between Glide and Redis
throw new ConnectionException(msg);
default:
// Request or command error from Redis
throw new RequestException(msg);
}
}
if (response.hasClosingError()) {
// A closing error is thrown when Rust-core is not connected to Redis
// We want to close shop and throw a ClosingException
// TODO: close the channel on a closing error
// channel.close();
throw new ClosingException(response.getClosingError());
}
if (response.hasConstantResponse()) {
// Return "OK"
return response.getConstantResponse().toString();
}
if (response.hasRespPointer()) {
// Return the shared value - which may be a null value
return respPointerResolver.apply(response.getRespPointer());
}
// if no response payload is provided, assume null
return null;
}
}
49 changes: 49 additions & 0 deletions java/client/src/main/java/glide/api/commands/BaseCommands.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package glide.api.commands;

import glide.ffi.resolvers.RedisValueResolver;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import response.ResponseOuterClass.Response;

/** Base Commands interface to handle generic command and transaction requests. */
public interface BaseCommands {

/**
* default Object handler from response
*
* @return BaseCommandResponseResolver to deliver the response
*/
static BaseCommandResponseResolver applyBaseCommandResponseResolver() {
return new BaseCommandResponseResolver(RedisValueResolver::valueFromPointer);
}

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
* appropriate response has an Object
*
* @param response Redis protobuf message
* @return Response Object
*/
static Object handleObjectResponse(Response response) {
// return function to convert protobuf.Response into the response object by
// calling valueFromPointer
return BaseCommands.applyBaseCommandResponseResolver().apply(response);
}

public static List<Object> handleTransactionResponse(Response response) {
// return function to convert protobuf.Response into the response object by
// calling valueFromPointer

List<Object> transactionResponse =
(List<Object>) BaseCommands.applyBaseCommandResponseResolver().apply(response);
return transactionResponse;
}

/**
* Execute a @see{Command} by sending command via socket manager
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
*/
CompletableFuture<Object> customCommand(String[] args);
}
24 changes: 24 additions & 0 deletions java/client/src/main/java/glide/api/commands/Command.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package glide.api.commands;

import lombok.Builder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NonNull;

/** Base Command class to send a single request to Redis. */
@Builder
@Getter
@EqualsAndHashCode
public class Command {

/** Redis command request type */
@NonNull final RequestType requestType;

/** List of Arguments for the Redis command request */
@Builder.Default final String[] arguments = new String[] {};

public enum RequestType {
/** Call a custom command with list of string arguments */
CUSTOM_COMMAND,
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package glide.api.commands;

import glide.api.models.exceptions.RedisException;

@FunctionalInterface
public interface RedisExceptionCheckedFunction<R, T> {

/**
* Functional response handler that takes a protobuf Response object. <br>
* Returns a typed object on a successful Redis response. <br>
* Throws RedisException when receiving a Redis error response. <br>
*
* @param response - Redis Response
* @return T - response payload type
* @throws RedisException
*/
T apply(R response) throws RedisException;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package glide.api.models.configuration;

import glide.connectors.resources.ThreadPoolResource;
import java.util.List;
import lombok.Builder;
import lombok.Getter;
Expand Down Expand Up @@ -50,4 +51,13 @@ public abstract class BaseClientConfiguration {
* it will result in a timeout error. If not set, a default value will be used.
*/
private final Integer requestTimeout;

/**
* Field for customizing the Event Loop Group and Channel Configuration in Netty applications.
* Advanced users can utilize {@link glide.connectors.resources.KQueuePoolResource}/{@link
* glide.connectors.resources.EpollResource} to set their custom event loop group. If not
* explicitly set, the system selects a default based on available resources. Recommended for
* advanced users.
*/
private final ThreadPoolResource threadPoolResource;
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
package glide.connectors.handlers;

import connection_request.ConnectionRequestOuterClass.ConnectionRequest;
import glide.connectors.resources.Platform;
import glide.connectors.resources.ThreadPoolAllocator;
import glide.connectors.resources.ThreadPoolResource;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.unix.DomainSocketAddress;
import io.netty.channel.unix.DomainSocketChannel;
import io.netty.channel.unix.UnixChannel;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import redis_request.RedisRequestOuterClass.RedisRequest;
import response.ResponseOuterClass.Response;
Expand All @@ -28,36 +22,17 @@ public class ChannelHandler {
private final CallbackDispatcher callbackDispatcher;

/** Open a new channel for a new client. */
public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) {
this(
ThreadPoolAllocator.createOrGetNettyThreadPool(THREAD_POOL_NAME, Optional.empty()),
Platform.getClientUdsNettyChannelType(),
new ProtobufSocketChannelInitializer(callbackDispatcher),
new DomainSocketAddress(socketPath),
callbackDispatcher);
}

/**
* Open a new channel for a new client and running it on the provided EventLoopGroup
*
* @param eventLoopGroup - ELG to run handler on
* @param domainSocketChannelClass - socket channel class for Handler
* @param channelInitializer - UnixChannel initializer
* @param domainSocketAddress - address to connect
* @param callbackDispatcher - dispatcher to handle callbacks
*/
public ChannelHandler(
EventLoopGroup eventLoopGroup,
Class<? extends DomainSocketChannel> domainSocketChannelClass,
ChannelInitializer<UnixChannel> channelInitializer,
DomainSocketAddress domainSocketAddress,
CallbackDispatcher callbackDispatcher) {
CallbackDispatcher callbackDispatcher,
String socketPath,
ThreadPoolResource threadPoolResource) {

channel =
new Bootstrap()
.group(eventLoopGroup)
.channel(domainSocketChannelClass)
.handler(channelInitializer)
.connect(domainSocketAddress)
.group(threadPoolResource.getEventLoopGroup())
.channel(threadPoolResource.getDomainSocketChannelClass())
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
// TODO call here .sync() if needed or remove this comment
.channel();
this.callbackDispatcher = callbackDispatcher;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package glide.connectors.handlers;

import glide.connectors.resources.ThreadPoolResource;
import java.util.function.Supplier;

/** A class responsible to allocating and deallocating the default Thread Pool Resource. */
public class DefaultThreadPoolResourceHandler {
private static final Object lock = new Object();
private static ThreadPoolResource defaultThreadPoolResource = null;

public static ThreadPoolResource getOrCreate(Supplier<ThreadPoolResource> supplier) {
// once the default is set, we want to avoid hitting the lock
if (defaultThreadPoolResource != null) {
return defaultThreadPoolResource;
}

synchronized (lock) {
if (defaultThreadPoolResource == null) {
defaultThreadPoolResource = supplier.get();
}
}

return defaultThreadPoolResource;
}

/**
* A JVM shutdown hook to be registered. It is responsible for closing connection and freeing
* resources. It is recommended to use a class instead of lambda to ensure that it is called.<br>
* See {@link Runtime#addShutdownHook}.
*/
protected static class ShutdownHook implements Runnable {
@Override
public void run() {
if (defaultThreadPoolResource != null) {
defaultThreadPoolResource.getEventLoopGroup().shutdownGracefully();
defaultThreadPoolResource = null;
}
}
}

static {
Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "Glide-shutdown-hook"));
}
}
Loading