Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support transactions #65

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion java/client/src/main/java/glide/api/RedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import glide.connectors.handlers.ChannelHandler;
import glide.managers.CommandManager;
import glide.managers.ConnectionManager;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
Expand All @@ -30,7 +32,7 @@ public static CompletableFuture<RedisClient> CreateClient(RedisClientConfigurati
// TODO: Support exception throwing, including interrupted exceptions
return connectionManager
.connectToRedis(config)
.thenApply(ignore -> new RedisClient(connectionManager, commandManager));
.thenApplyAsync(ignore -> new RedisClient(connectionManager, commandManager));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not run it in the same thread?

}

protected static ChannelHandler buildChannelHandler() {
Expand Down Expand Up @@ -62,4 +64,17 @@ public CompletableFuture<Object> customCommand(String[] args) {
Command.builder().requestType(Command.RequestType.CUSTOM_COMMAND).arguments(args).build();
return commandManager.submitNewCommand(command, BaseCommands::handleObjectResponse);
}

@Override
public CompletableFuture<Object[]> customTransaction(String[][] args) {
List<Command> commands = new ArrayList<>();
for (var command : args) {
commands.add(
Command.builder()
.requestType(Command.RequestType.CUSTOM_COMMAND)
.arguments(command)
.build());
}
return commandManager.submitNewTransaction(commands, BaseCommands::handleTransactionResponse);
}
}
23 changes: 14 additions & 9 deletions java/client/src/main/java/glide/api/commands/BaseCommands.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package glide.api.commands;

import glide.ffi.resolvers.RedisValueResolver;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import response.ResponseOuterClass.Response;

/** Base Commands interface to handle generic command and transaction requests. */
public interface BaseCommands {

/**
* default Object handler from response
* The default Object handler from response
*
* @return BaseCommandResponseResolver to deliver the response
*/
Expand All @@ -19,31 +18,37 @@ static BaseCommandResponseResolver applyBaseCommandResponseResolver() {

/**
* Extracts the response from the Protobuf response and either throws an exception or returns the
* appropriate response has an Object
* appropriate response as an Object
*
* @param response Redis protobuf message
* @return Response Object
*/
static Object handleObjectResponse(Response response) {
// return function to convert protobuf.Response into the response object by
// calling valueFromPointer
return BaseCommands.applyBaseCommandResponseResolver().apply(response);
return applyBaseCommandResponseResolver().apply(response);
}

public static List<Object> handleTransactionResponse(Response response) {
static Object[] handleTransactionResponse(Response response) {
// return function to convert protobuf.Response into the response object by
// calling valueFromPointer

List<Object> transactionResponse =
(List<Object>) BaseCommands.applyBaseCommandResponseResolver().apply(response);
return transactionResponse;
return (Object[]) applyBaseCommandResponseResolver().apply(response);
}

/**
* Execute a @see{Command} by sending command via socket manager
* Execute a custom {@link Command}.
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
*/
CompletableFuture<Object> customCommand(String[] args);

/**
* Execute a transaction of custom {@link Command}s.
*
* @param args arguments for the custom command
* @return a CompletableFuture with response result from Redis
*/
CompletableFuture<Object[]> customTransaction(String[][] args);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about exposing?

CompletableFuture<Object[]> customTransaction(Transaction transaction);

This is what the node client does: https://github.com/Bit-Quill/glide-for-redis/blob/46f831c5d2b31a667f7d7c53035b99e8b3851f26/node/src/RedisClient.ts#L98

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may have to create a Transactions class that implements BaseCommands that adds the transaction calls to the list.
Maybe also a ClusterTransactions class that implements BaseClusterCommands too...

reference: https://github.com/Bit-Quill/glide-for-redis/blob/main/node/src/Transaction.ts#L72

}
63 changes: 44 additions & 19 deletions java/client/src/main/java/glide/managers/CommandManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
import glide.api.commands.Command;
import glide.api.commands.RedisExceptionCheckedFunction;
import glide.connectors.handlers.ChannelHandler;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.RequiredArgsConstructor;
import redis_request.RedisRequestOuterClass;
import redis_request.RedisRequestOuterClass.Command.ArgsArray;
import redis_request.RedisRequestOuterClass.RedisRequest;
import redis_request.RedisRequestOuterClass.RequestType;
import redis_request.RedisRequestOuterClass.Transaction;
import response.ResponseOuterClass.Response;

/**
Expand All @@ -21,8 +26,8 @@ public class CommandManager {
/**
* Build a command and send.
*
* @param command
* @param responseHandler - to handle the response object
* @param command The command to execute
* @param responseHandler The handler of the response object
* @return A result promise of type T
*/
public <T> CompletableFuture<T> submitNewCommand(
Expand All @@ -31,8 +36,35 @@ public <T> CompletableFuture<T> submitNewCommand(
// create protobuf message from command
// submit async call
return channel
.write(prepareRedisRequest(command.getRequestType(), command.getArguments()), true)
.thenApplyAsync(response -> responseHandler.apply(response));
.write(
RedisRequest.newBuilder()
.setSingleCommand(
prepareRedisCommand(command.getRequestType(), command.getArguments())),
true)
.thenApplyAsync(responseHandler::apply);
}

/**
* Build a transaction and send.
*
* @param transaction The command to execute
* @param responseHandler The handler of the response object
* @return A result promise of type T
*/
public <T> CompletableFuture<T> submitNewTransaction(
List<Command> transaction, RedisExceptionCheckedFunction<Response, T> responseHandler) {
// register callback
// create protobuf message from command
// submit async call
var transactionBuilder = Transaction.newBuilder();
for (var command : transaction) {
transactionBuilder.addCommands(
prepareRedisCommand(command.getRequestType(), command.getArguments()));

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this work routes? Each command gets a route, right?
I think the API.Command object can be used to pass that in, and there will be a third argument in prepareRedisCommand that takes a command.getRouteOption(), right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to protobuf - route set to the entire request

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. That makes sense.

}

return channel
.write(RedisRequest.newBuilder().setTransaction(transactionBuilder.build()), true)
.thenApplyAsync(responseHandler::apply);
}

/**
Expand All @@ -42,30 +74,23 @@ public <T> CompletableFuture<T> submitNewCommand(
* @return An uncompleted request. CallbackDispatcher is responsible to complete it by adding a
* callback id.
*/
private RedisRequestOuterClass.RedisRequest.Builder prepareRedisRequest(
private RedisRequestOuterClass.Command prepareRedisCommand(
Command.RequestType command, String[] args) {
RedisRequestOuterClass.Command.ArgsArray.Builder commandArgs =
RedisRequestOuterClass.Command.ArgsArray.newBuilder();
ArgsArray.Builder commandArgs = ArgsArray.newBuilder();
for (var arg : args) {
commandArgs.addArgs(arg);
}

return RedisRequestOuterClass.RedisRequest.newBuilder()
.setSingleCommand(
RedisRequestOuterClass.Command.newBuilder()
.setRequestType(mapRequestTypes(command))
.setArgsArray(commandArgs.build())
.build())
.setRoute(
RedisRequestOuterClass.Routes.newBuilder()
.setSimpleRoutes(RedisRequestOuterClass.SimpleRoutes.AllNodes)
.build());
return RedisRequestOuterClass.Command.newBuilder()
.setRequestType(mapRequestTypes(command))
.setArgsArray(commandArgs.build())
.build();
}

private RedisRequestOuterClass.RequestType mapRequestTypes(Command.RequestType inType) {
private RequestType mapRequestTypes(Command.RequestType inType) {
switch (inType) {
case CUSTOM_COMMAND:
return RedisRequestOuterClass.RequestType.CustomCommand;
return RequestType.CustomCommand;
}
throw new RuntimeException("Unsupported request type");
}
Expand Down
77 changes: 63 additions & 14 deletions java/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
use glide_core::start_socket_listener;

use jni::objects::{JClass, JObject, JThrowable};
use jni::objects::{JClass, JObject, JObjectArray, JThrowable};
use jni::sys::jlong;
use jni::JNIEnv;
use log::error;
use redis::Value;
use std::sync::mpsc;

fn redis_value_to_java(mut env: JNIEnv, val: Value) -> JObject {
#[cfg(ffi_test)]
mod ffi_test;
#[cfg(ffi_test)]
pub use ffi_test::*;

// TODO: Consider caching method IDs here in a static variable (might need RwLock to mutate)
fn redis_value_to_java<'local>(env: &mut JNIEnv<'local>, val: Value) -> JObject<'local> {
match val {
Value::Nil => JObject::null(),
Value::SimpleString(str) => JObject::from(env.new_string(str).unwrap()),
Value::Okay => JObject::from(env.new_string("OK").unwrap()),
// TODO use primitive integer
Value::Int(num) => env
.new_object("java/lang/Integer", "(I)V", &[num.into()])
.new_object("java/lang/Long", "(J)V", &[num.into()])
.unwrap(),
Value::BulkString(data) => match std::str::from_utf8(data.as_ref()) {
Ok(val) => JObject::from(env.new_string(val).unwrap()),
Expand All @@ -23,16 +28,60 @@ fn redis_value_to_java(mut env: JNIEnv, val: Value) -> JObject {
JObject::null()
}
},
Value::Array(_array) => {
let _ = env.throw("Not implemented");
JObject::null()
Value::Array(array) => {
let items: JObjectArray = env
.new_object_array(array.len() as i32, "java/lang/Object", JObject::null())
.unwrap();

for (i, item) in array.into_iter().enumerate() {
let java_value = redis_value_to_java(env, item);
env.set_object_array_element(&items, i as i32, java_value)
.unwrap();
}

items.into()
}
Value::Map(map) => {
let hashmap = env.new_object("java/util/HashMap", "()V", &[]).unwrap();

for (key, value) in map {
let java_key = redis_value_to_java(env, key);
let java_value = redis_value_to_java(env, value);
env.call_method(
&hashmap,
"put",
"(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;",
&[(&java_key).into(), (&java_value).into()],
)
.unwrap();
}

hashmap
}
Value::Map(_map) => todo!(),
Value::Double(_float) => todo!(),
Value::Boolean(_bool) => todo!(),
Value::VerbatimString { format: _, text: _ } => todo!(),
Value::Double(float) => env
.new_object("java/lang/Double", "(D)V", &[float.into_inner().into()])
.unwrap(),
Value::Boolean(bool) => env
.new_object("java/lang/Boolean", "(Z)V", &[bool.into()])
.unwrap(),
Value::VerbatimString { format: _, text } => JObject::from(env.new_string(text).unwrap()),
Value::BigNumber(_num) => todo!(),
Value::Set(_array) => todo!(),
Value::Set(array) => {
let set = env.new_object("java/util/HashSet", "()V", &[]).unwrap();

for elem in array {
let java_value = redis_value_to_java(env, elem);
env.call_method(
&set,
"add",
"(Ljava/lang/Object;)Z",
&[(&java_value).into()],
)
.unwrap();
}

set
}
Value::Attribute {
data: _,
attributes: _,
Expand All @@ -43,12 +92,12 @@ fn redis_value_to_java(mut env: JNIEnv, val: Value) -> JObject {

#[no_mangle]
pub extern "system" fn Java_glide_ffi_resolvers_RedisValueResolver_valueFromPointer<'local>(
env: JNIEnv<'local>,
mut env: JNIEnv<'local>,
_class: JClass<'local>,
pointer: jlong,
) -> JObject<'local> {
let value = unsafe { Box::from_raw(pointer as *mut Value) };
redis_value_to_java(env, *value)
redis_value_to_java(&mut env, *value)
}

#[no_mangle]
Expand Down
Loading