Skip to content

Commit

Permalink
Java client: Fix how UDS connection established (valkey-io#838)
Browse files Browse the repository at this point in the history
* Fix
* `sync` instead of `syncUninterruptibly`.

Signed-off-by: Yury-Fridlyand <[email protected]>
  • Loading branch information
Yury-Fridlyand authored Jan 23, 2024
1 parent d340cef commit 9cdd788
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
23 changes: 15 additions & 8 deletions java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@ public class RedisClient extends BaseClient {
* @return a Future to connect and return a RedisClient
*/
public static CompletableFuture<RedisClient> CreateClient(RedisClientConfiguration config) {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
try {
ChannelHandler channelHandler = buildChannelHandler();
ConnectionManager connectionManager = buildConnectionManager(channelHandler);
CommandManager commandManager = buildCommandManager(channelHandler);
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
} catch (InterruptedException e) {
// Something bad happened while we were establishing netty connection to UDS
var future = new CompletableFuture<RedisClient>();
future.completeExceptionally(e);
return future;
}
}

protected static ChannelHandler buildChannelHandler() {
protected static ChannelHandler buildChannelHandler() throws InterruptedException {
CallbackDispatcher callbackDispatcher = new CallbackDispatcher();
return new ChannelHandler(callbackDispatcher, getSocket());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public class ChannelHandler {
private final CallbackDispatcher callbackDispatcher;

/** Open a new channel for a new client. */
public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath) {
public ChannelHandler(CallbackDispatcher callbackDispatcher, String socketPath)
throws InterruptedException {
this(
ThreadPoolAllocator.createOrGetNettyThreadPool(THREAD_POOL_NAME, Optional.empty()),
Platform.getClientUdsNettyChannelType(),
Expand All @@ -51,14 +52,15 @@ public ChannelHandler(
Class<? extends DomainSocketChannel> domainSocketChannelClass,
ChannelInitializer<UnixChannel> channelInitializer,
DomainSocketAddress domainSocketAddress,
CallbackDispatcher callbackDispatcher) {
CallbackDispatcher callbackDispatcher)
throws InterruptedException {
channel =
new Bootstrap()
.group(eventLoopGroup)
.channel(domainSocketChannelClass)
.handler(channelInitializer)
.connect(domainSocketAddress)
// TODO call here .sync() if needed or remove this comment
.sync()
.channel();
this.callbackDispatcher = callbackDispatcher;
}
Expand Down

0 comments on commit 9cdd788

Please sign in to comment.