Skip to content

Commit

Permalink
Spotless.
Browse files Browse the repository at this point in the history
  • Loading branch information
SanHalacogluImproving committed Jan 24, 2024
1 parent 7f86228 commit 5cc435d
Show file tree
Hide file tree
Showing 13 changed files with 236 additions and 234 deletions.
48 changes: 25 additions & 23 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,31 +20,33 @@
*/
public class RedisClient extends BaseClient implements BaseCommands {

/**
* Request an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
ThreadPoolResource threadPoolResource = config.getThreadPoolResource();
if (threadPoolResource == null) {
threadPoolResource =
ThreadPoolResourceAllocator.getOrCreate(Platform.getThreadPoolResourceSupplier());
/**
* Request an async (non-blocking) Redis client in Standalone mode.
*
* @param config - Redis Client Configuration
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config)
throws InterruptedException {
ThreadPoolResource threadPoolResource = config.getThreadPoolResource();
if (threadPoolResource == null) {
threadPoolResource =
ThreadPoolResourceAllocator.getOrCreate(Platform.getThreadPoolResourceSupplier());
}
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource);
var connectionManager = buildConnectionManager(channelHandler);
var commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
}
ChannelHandler channelHandler = buildChannelHandler(threadPoolResource);
var connectionManager = buildConnectionManager(channelHandler);
var commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
}

protected static ChannelHandler buildChannelHandler(ThreadPoolResource threadPoolResource) {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
return new ChannelHandler(callbackDispatcher, getSocket(), threadPoolResource);
}
protected static ChannelHandler buildChannelHandler(ThreadPoolResource threadPoolResource)
throws InterruptedException {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
return new ChannelHandler(callbackDispatcher, getSocket(), threadPoolResource);
}

protected static ConnectionManager buildConnectionManager(ChannelHandler channelHandler) {
return new ConnectionManager(channelHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,56 +15,56 @@
*/
@AllArgsConstructor
public class BaseCommandResponseResolver
implements RedisExceptionCheckedFunction<Response, Object> {
implements RedisExceptionCheckedFunction<Response, Object> {

private RedisExceptionCheckedFunction<Long, Object> respPointerResolver;
private RedisExceptionCheckedFunction<Long, Object> respPointerResolver;

/**
* Extracts value from the RESP pointer. <br>
* Throws errors when the response is unsuccessful.
*
* @return A generic Object with the Response | null if the response is empty
*/
public Object apply(Response response) throws RedisException {
// TODO: handle object if the object is small
// TODO: handle RESP2 object if configuration is set
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
String msg = error.getMessage();
switch (error.getType()) {
case Unspecified:
// Unspecified error on Redis service-side
throw new RequestException(msg);
case ExecAbort:
// Transactional error on Redis service-side
throw new ExecAbortException(msg);
case Timeout:
// Timeout from Glide to Redis service
throw new TimeoutException(msg);
case Disconnect:
// Connection problem between Glide and Redis
throw new ConnectionException(msg);
default:
// Request or command error from Redis
throw new RequestException(msg);
}
/**
* Extracts value from the RESP pointer. <br>
* Throws errors when the response is unsuccessful.
*
* @return A generic Object with the Response | null if the response is empty
*/
public Object apply(Response response) throws RedisException {
// TODO: handle object if the object is small
// TODO: handle RESP2 object if configuration is set
if (response.hasRequestError()) {
RequestError error = response.getRequestError();
String msg = error.getMessage();
switch (error.getType()) {
case Unspecified:
// Unspecified error on Redis service-side
throw new RequestException(msg);
case ExecAbort:
// Transactional error on Redis service-side
throw new ExecAbortException(msg);
case Timeout:
// Timeout from Glide to Redis service
throw new TimeoutException(msg);
case Disconnect:
// Connection problem between Glide and Redis
throw new ConnectionException(msg);
default:
// Request or command error from Redis
throw new RequestException(msg);
}
}
if (response.hasClosingError()) {
// A closing error is thrown when Rust-core is not connected to Redis
// We want to close shop and throw a ClosingException
// TODO: close the channel on a closing error
// channel.close();
throw new ClosingException(response.getClosingError());
}
if (response.hasConstantResponse()) {
// Return "OK"
return response.getConstantResponse().toString();
}
if (response.hasRespPointer()) {
// Return the shared value - which may be a null value
return respPointerResolver.apply(response.getRespPointer());
}
// if no response payload is provided, assume null
return null;
}
if (response.hasClosingError()) {
// A closing error is thrown when Rust-core is not connected to Redis
// We want to close shop and throw a ClosingException
// TODO: close the channel on a closing error
// channel.close();
throw new ClosingException(response.getClosingError());
}
if (response.hasConstantResponse()) {
// Return "OK"
return response.getConstantResponse().toString();
}
if (response.hasRespPointer()) {
// Return the shared value - which may be a null value
return respPointerResolver.apply(response.getRespPointer());
}
// if no response payload is provided, assume null
return null;
}
}
16 changes: 8 additions & 8 deletions java/client/src/main/java/glide/api/commands/Command.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
@EqualsAndHashCode
public class Command {

/** Redis command request type */
@NonNull final RequestType requestType;
/** Redis command request type */
@NonNull final RequestType requestType;

/** List of Arguments for the Redis command request */
@Builder.Default final String[] arguments = new String[] {};
/** List of Arguments for the Redis command request */
@Builder.Default final String[] arguments = new String[] {};

public enum RequestType {
/** Call a custom command with list of string arguments */
CUSTOM_COMMAND,
}
public enum RequestType {
/** Call a custom command with list of string arguments */
CUSTOM_COMMAND,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
@FunctionalInterface
public interface RedisExceptionCheckedFunction<R, T> {

/**
* Functional response handler that takes a protobuf Response object. <br>
* Returns a typed object on a successful Redis response. <br>
* Throws RedisException when receiving a Redis error response. <br>
*
* @param response - Redis Response
* @return T - response payload type
* @throws RedisException
*/
T apply(R response) throws RedisException;
/**
* Functional response handler that takes a protobuf Response object. <br>
* Returns a typed object on a successful Redis response. <br>
* Throws RedisException when receiving a Redis error response. <br>
*
* @param response - Redis Response
* @return T - response payload type
* @throws RedisException
*/
T apply(R response) throws RedisException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import glide.connectors.resources.ThreadPoolResource;
import java.util.List;

import glide.connectors.resources.ThreadPoolResource;
import lombok.Builder;
import lombok.Getter;
import lombok.NonNull;
Expand Down Expand Up @@ -46,18 +44,18 @@ public abstract class BaseClientConfiguration {
*/
private final RedisCredentials credentials;

/**
* The duration in milliseconds that the client should wait for a request to complete. This
* duration encompasses sending the request, awaiting for a response from the server, and any
* required reconnections or retries. If the specified timeout is exceeded for a pending request,
* it will result in a timeout error. If not set, a default value will be used.
*/
private final Integer requestTimeout;
/**
* The duration in milliseconds that the client should wait for a request to complete. This
* duration encompasses sending the request, awaiting for a response from the server, and any
* required reconnections or retries. If the specified timeout is exceeded for a pending request,
* it will result in a timeout error. If not set, a default value will be used.
*/
private final Integer requestTimeout;

/**
* Advanced users can pass an extended {@link glide.connectors.resources.ThreadPoolResource} to
* pass a user-defined event loop group. Users are responsible for shutting the resource down when
* no longer in use.
*/
private final ThreadPoolResource threadPoolResource;
/**
* Advanced users can pass an extended {@link glide.connectors.resources.ThreadPoolResource} to
* pass a user-defined event loop group. Users are responsible for shutting the resource down when
* no longer in use.
*/
private final ThreadPoolResource threadPoolResource;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,27 @@ public class ChannelHandler {

/**
* Open a new channel for a new client and running it on the provided EventLoopGroup.
*
* @param callbackDispatcher - dispatcher to handle callbacks
* @param socketPath - address to connect
* @param threadPoolResource - resource to choose ELG and domainSocketChannelClass
*/
public ChannelHandler(
CallbackDispatcher callbackDispatcher,
String socketPath,
ThreadPoolResource threadPoolResource) throws InterruptedException {
public ChannelHandler(
CallbackDispatcher callbackDispatcher,
String socketPath,
ThreadPoolResource threadPoolResource)
throws InterruptedException {

channel =
new Bootstrap()
.group(threadPoolResource.getEventLoopGroup())
.channel(threadPoolResource.getDomainSocketChannelClass())
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
.sync()
.channel();
this.callbackDispatcher = callbackDispatcher;
}
channel =
new Bootstrap()
.group(threadPoolResource.getEventLoopGroup())
.channel(threadPoolResource.getDomainSocketChannelClass())
.handler(new ProtobufSocketChannelInitializer(callbackDispatcher))
.connect(new DomainSocketAddress(socketPath))
.sync()
.channel();
this.callbackDispatcher = callbackDispatcher;
}

/**
* Complete a protobuf message and write it to the channel (to UDS).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
* configurations.
*/
public class EpollResource extends ThreadPoolResource {
private static final String EPOLL_EVENT_LOOP_IDENTIFIER = "glide-channel-epoll-elg";
private static final String EPOLL_EVENT_LOOP_IDENTIFIER = "glide-channel-epoll-elg";

public EpollResource() {
this(
new EpollEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(EPOLL_EVENT_LOOP_IDENTIFIER, true)));
}
public EpollResource() {
this(
new EpollEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(EPOLL_EVENT_LOOP_IDENTIFIER, true)));
}

public EpollResource(EpollEventLoopGroup epollEventLoopGroup) {
super(epollEventLoopGroup, EpollDomainSocketChannel.class);
}
public EpollResource(EpollEventLoopGroup epollEventLoopGroup) {
super(epollEventLoopGroup, EpollDomainSocketChannel.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
* configurations.
*/
public class KQueuePoolResource extends ThreadPoolResource {
private static final String KQUEUE_EVENT_LOOP_IDENTIFIER = "glide-channel-kqueue-elg";
private static final String KQUEUE_EVENT_LOOP_IDENTIFIER = "glide-channel-kqueue-elg";

public KQueuePoolResource() {
this(
new KQueueEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(KQUEUE_EVENT_LOOP_IDENTIFIER, true)));
}
public KQueuePoolResource() {
this(
new KQueueEventLoopGroup(
Runtime.getRuntime().availableProcessors(),
new DefaultThreadFactory(KQUEUE_EVENT_LOOP_IDENTIFIER, true)));
}

public KQueuePoolResource(KQueueEventLoopGroup eventLoopGroup) {
super(eventLoopGroup, KQueueDomainSocketChannel.class);
}
public KQueuePoolResource(KQueueEventLoopGroup eventLoopGroup) {
super(eventLoopGroup, KQueueDomainSocketChannel.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ private static boolean isEPollAvailable() {
}
}

public static Supplier<ThreadPoolResource> getThreadPoolResourceSupplier() {
if (Platform.getCapabilities().isKQueueAvailable()) {
return KQueuePoolResource::new;
}
public static Supplier<ThreadPoolResource> getThreadPoolResourceSupplier() {
if (Platform.getCapabilities().isKQueueAvailable()) {
return KQueuePoolResource::new;
}

if (Platform.getCapabilities().isEPollAvailable()) {
return EpollResource::new;
if (Platform.getCapabilities().isEPollAvailable()) {
return EpollResource::new;
}
// TODO support IO-Uring and NIO
throw new RuntimeException("Current platform supports no known thread pool resources");
}
// TODO support IO-Uring and NIO
throw new RuntimeException("Current platform supports no known thread pool resources");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
*/
@Getter
public abstract class ThreadPoolResource {
private EventLoopGroup eventLoopGroup;
private Class<? extends DomainSocketChannel> domainSocketChannelClass;
private EventLoopGroup eventLoopGroup;
private Class<? extends DomainSocketChannel> domainSocketChannelClass;

public ThreadPoolResource(
@NonNull EventLoopGroup eventLoopGroup,
@NonNull Class<? extends DomainSocketChannel> domainSocketChannelClass) {
this.eventLoopGroup = eventLoopGroup;
this.domainSocketChannelClass = domainSocketChannelClass;
}
public ThreadPoolResource(
@NonNull EventLoopGroup eventLoopGroup,
@NonNull Class<? extends DomainSocketChannel> domainSocketChannelClass) {
this.eventLoopGroup = eventLoopGroup;
this.domainSocketChannelClass = domainSocketChannelClass;
}
}
Loading

0 comments on commit 5cc435d

Please sign in to comment.