diff --git a/.github/workflows/java.yml b/.github/workflows/java.yml index 3faffc3bc0..e6c417575f 100644 --- a/.github/workflows/java.yml +++ b/.github/workflows/java.yml @@ -41,12 +41,10 @@ jobs: distribution: "temurin" java-version: ${{ matrix.java }} - - name: Install and run protoc (protobuf) + - name: Install protoc (protobuf) run: | sudo apt update sudo apt install -y protobuf-compiler - mkdir -p java/client/src/main/java/org/babushka/javababushka/generated - protoc -Iprotobuf=babushka-core/src/protobuf/ --java_out=java/client/src/main/java/org/babushka/javababushka/generated babushka-core/src/protobuf/*.proto - name: Build rust part working-directory: java diff --git a/benchmarks/utilities/csv_exporter.py b/benchmarks/utilities/csv_exporter.py old mode 100644 new mode 100755 index 0700c3147e..753df2e5ab --- a/benchmarks/utilities/csv_exporter.py +++ b/benchmarks/utilities/csv_exporter.py @@ -1,3 +1,5 @@ +#!/bin/python3 + import csv import json import os diff --git a/java/Cargo.toml b/java/Cargo.toml index 01ecb23e4c..d9d0db8a4f 100644 --- a/java/Cargo.toml +++ b/java/Cargo.toml @@ -15,6 +15,8 @@ babushka = { path = "../babushka-core" } tokio = { version = "^1", features = ["rt", "macros", "rt-multi-thread", "time"] } logger_core = {path = "../logger_core"} tracing-subscriber = "0.3.16" +jni = "0.21.1" +log = "0.4.20" [profile.release] lto = true diff --git a/java/benchmarks/build.gradle b/java/benchmarks/build.gradle index 5e22fda5c7..6ade7532b0 100644 --- a/java/benchmarks/build.gradle +++ b/java/benchmarks/build.gradle @@ -1,6 +1,7 @@ plugins { // Apply the application plugin to add support for building a CLI application in Java. id 'application' + id 'io.freefair.lombok' } repositories { @@ -9,6 +10,8 @@ repositories { } dependencies { + implementation project(':client') + // Use JUnit test framework. testImplementation 'org.junit.jupiter:junit-jupiter:5.9.2' @@ -34,12 +37,14 @@ java { application { // Define the main class for the application. mainClass = 'javababushka.benchmarks.BenchmarkingApp' + applicationDefaultJvmArgs += "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug" } -tasks.withType(Test) { - testLogging { - exceptionFormat "full" - events "started", "skipped", "passed", "failed" - showStandardStreams true - } +tasks.withType(Test) { + testLogging { + exceptionFormat "full" + events "started", "skipped", "passed", "failed" + showStandardStreams true + } + jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug" } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/AsyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/AsyncClient.java deleted file mode 100644 index 92d10ac4a0..0000000000 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/AsyncClient.java +++ /dev/null @@ -1,16 +0,0 @@ -package javababushka.benchmarks; - -import java.util.concurrent.Future; - -public interface AsyncClient extends Client { - - long DEFAULT_TIMEOUT = 1000; // Milliseconds - - Future asyncSet(String key, String value); - - Future asyncGet(String key); - - T waitForResult(Future future); - - T waitForResult(Future future, long timeout); -} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java index 0647b0ae11..b401488584 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/BenchmarkingApp.java @@ -5,11 +5,12 @@ import java.util.Arrays; import java.util.Optional; import java.util.stream.Stream; -import javababushka.benchmarks.jedis.JedisClient; -import javababushka.benchmarks.jedis.JedisPseudoAsyncClient; -import javababushka.benchmarks.lettuce.LettuceAsyncClient; -import javababushka.benchmarks.lettuce.LettuceAsyncClusterClient; -import javababushka.benchmarks.lettuce.LettuceClient; +import javababushka.benchmarks.clients.babushka.JniNettyClient; +import javababushka.benchmarks.clients.jedis.JedisClient; +import javababushka.benchmarks.clients.jedis.JedisPseudoAsyncClient; +import javababushka.benchmarks.clients.lettuce.LettuceAsyncClient; +import javababushka.benchmarks.clients.lettuce.LettuceAsyncClusterClient; +import javababushka.benchmarks.clients.lettuce.LettuceClient; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.DefaultParser; @@ -48,9 +49,11 @@ public static void main(String[] args) { for (ClientName client : runConfiguration.clients) { switch (client) { case JEDIS: + // run testClientSetGet on JEDIS sync client testClientSetGet(JedisClient::new, runConfiguration, false); break; case JEDIS_ASYNC: + // run testClientSetGet on JEDIS pseudo-async client testClientSetGet(JedisPseudoAsyncClient::new, runConfiguration, true); break; case LETTUCE: @@ -63,8 +66,11 @@ public static void main(String[] args) { testClientSetGet(LettuceAsyncClient::new, runConfiguration, true); } break; + case BABUSHKA: + testClientSetGet(() -> new JniNettyClient(false), runConfiguration, false); + break; case BABUSHKA_ASYNC: - System.out.println("Babushka async not yet configured"); + testClientSetGet(() -> new JniNettyClient(true), runConfiguration, true); break; } } @@ -93,8 +99,8 @@ private static Options getOptions() { Option.builder("clients") .hasArg(true) .desc( - "one of: all|jedis|jedis_async|lettuce|lettuce_async" - + "|babushka_async|all_async|all_sync [all]") + "one of: all|jedis|jedis_async|lettuce|lettuce_async|" + + "babushka|babushka_async|all_async|all_sync") .build()); options.addOption(Option.builder("host").hasArg(true).desc("Hostname [localhost]").build()); options.addOption(Option.builder("port").hasArg(true).desc("Port number [6379]").build()); @@ -149,6 +155,7 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce return Stream.of( ClientName.JEDIS, ClientName.JEDIS_ASYNC, + ClientName.BABUSHKA, ClientName.BABUSHKA_ASYNC, ClientName.LETTUCE, ClientName.LETTUCE_ASYNC); @@ -158,7 +165,7 @@ private static RunConfiguration verifyOptions(CommandLine line) throws ParseExce ClientName.BABUSHKA_ASYNC, ClientName.LETTUCE_ASYNC); case ALL_SYNC: - return Stream.of(ClientName.JEDIS, ClientName.LETTUCE); + return Stream.of(ClientName.JEDIS, ClientName.LETTUCE, ClientName.BABUSHKA); default: return Stream.of(e); } @@ -210,6 +217,7 @@ public enum ClientName { LETTUCE("Lettuce"), LETTUCE_ASYNC("Lettuce async"), BABUSHKA_ASYNC("Babushka async"), + BABUSHKA("Babushka"), ALL("All"), ALL_SYNC("All sync"), ALL_ASYNC("All async"); @@ -250,8 +258,9 @@ public RunConfiguration() { concurrentTasks = new int[] {100, 1000}; clients = new ClientName[] { - // ClientName.BABUSHKA_ASYNC, - ClientName.LETTUCE_ASYNC + // ClientName.LETTUCE, + // ClientName.LETTUCE_ASYNC, + ClientName.BABUSHKA_ASYNC, ClientName.BABUSHKA, }; host = "localhost"; port = 6379; diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java new file mode 100644 index 0000000000..deae2ceee5 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/AsyncClient.java @@ -0,0 +1,29 @@ +package javababushka.benchmarks.clients; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import javababushka.benchmarks.utils.ConnectionSettings; + +/** A Redis client with async capabilities */ +public interface AsyncClient extends Client { + + long DEFAULT_TIMEOUT_MILLISECOND = 1000; + + Future asyncConnectToRedis(ConnectionSettings connectionSettings); + + Future asyncSet(String key, String value); + + Future asyncGet(String key); + + default T waitForResult(Future future) { + return waitForResult(future, DEFAULT_TIMEOUT_MILLISECOND); + } + + default T waitForResult(Future future, long timeout) { + try { + return future.get(timeout, TimeUnit.MILLISECONDS); + } catch (Exception ignored) { + return null; + } + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/Client.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java similarity index 76% rename from java/benchmarks/src/main/java/javababushka/benchmarks/Client.java rename to java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java index 16ab36847d..d95f31f25d 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/Client.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/Client.java @@ -1,7 +1,8 @@ -package javababushka.benchmarks; +package javababushka.benchmarks.clients; import javababushka.benchmarks.utils.ConnectionSettings; +/** A Redis client interface */ public interface Client { void connectToRedis(); diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/SyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java similarity index 56% rename from java/benchmarks/src/main/java/javababushka/benchmarks/SyncClient.java rename to java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java index c99174af3d..603f91e936 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/SyncClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/SyncClient.java @@ -1,5 +1,6 @@ -package javababushka.benchmarks; +package javababushka.benchmarks.clients; +/** A Redis client with sync capabilities */ public interface SyncClient extends Client { void set(String key, String value); diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java new file mode 100644 index 0000000000..a4fc37f4d9 --- /dev/null +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/babushka/JniNettyClient.java @@ -0,0 +1,69 @@ +package javababushka.benchmarks.clients.babushka; + +import static response.ResponseOuterClass.Response; + +import java.util.concurrent.Future; +import javababushka.Client; +import javababushka.benchmarks.clients.AsyncClient; +import javababushka.benchmarks.clients.SyncClient; +import javababushka.benchmarks.utils.ConnectionSettings; + +public class JniNettyClient implements SyncClient, AsyncClient { + + private final Client testClient; + private String name = "JNI Netty"; + + public JniNettyClient(boolean async) { + name += async ? " async" : " sync"; + testClient = new Client(); + } + + @Override + public String getName() { + return name; + } + + @Override + public void closeConnection() { + testClient.closeConnection(); + } + + @Override + public void connectToRedis() { + connectToRedis(new ConnectionSettings("localhost", 6379, false, false)); + } + + @Override + public void connectToRedis(ConnectionSettings connectionSettings) { + waitForResult(asyncConnectToRedis(connectionSettings)); + } + + @Override + public Future asyncConnectToRedis(ConnectionSettings connectionSettings) { + return testClient.asyncConnectToRedis( + connectionSettings.host, + connectionSettings.port, + connectionSettings.useSsl, + connectionSettings.clusterMode); + } + + @Override + public Future asyncSet(String key, String value) { + return testClient.asyncSet(key, value); + } + + @Override + public Future asyncGet(String key) { + return testClient.asyncGet(key); + } + + @Override + public void set(String key, String value) { + testClient.set(key, value); + } + + @Override + public String get(String key) { + return testClient.get(key); + } +} diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/jedis/JedisClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisClient.java similarity index 80% rename from java/benchmarks/src/main/java/javababushka/benchmarks/jedis/JedisClient.java rename to java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisClient.java index 214a6cae10..f088d5ac07 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/jedis/JedisClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisClient.java @@ -1,13 +1,11 @@ -/* - * This Java source file was generated by the Gradle 'init' task. - */ -package javababushka.benchmarks.jedis; +package javababushka.benchmarks.clients.jedis; -import javababushka.benchmarks.SyncClient; +import javababushka.benchmarks.clients.SyncClient; import javababushka.benchmarks.utils.ConnectionSettings; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; +/** A Jedis client with sync capabilities. See: https://github.com/redis/jedis */ public class JedisClient implements SyncClient { public static final String DEFAULT_HOST = "localhost"; @@ -15,10 +13,6 @@ public class JedisClient implements SyncClient { protected Jedis jedisResource; - public boolean someLibraryMethod() { - return true; - } - @Override public void connectToRedis() { JedisPool pool = new JedisPool(DEFAULT_HOST, DEFAULT_PORT); @@ -43,6 +37,9 @@ public void connectToRedis(ConnectionSettings connectionSettings) { jedisResource = new Jedis(connectionSettings.host, connectionSettings.port, connectionSettings.useSsl); jedisResource.connect(); + if (!jedisResource.isConnected()) { + throw new RuntimeException("failed to connect to jedis"); + } } public String info() { diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/jedis/JedisPseudoAsyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java similarity index 51% rename from java/benchmarks/src/main/java/javababushka/benchmarks/jedis/JedisPseudoAsyncClient.java rename to java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java index 598c455d1e..3970826a33 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/jedis/JedisPseudoAsyncClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/jedis/JedisPseudoAsyncClient.java @@ -1,13 +1,22 @@ -package javababushka.benchmarks.jedis; +package javababushka.benchmarks.clients.jedis; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import javababushka.benchmarks.AsyncClient; +import javababushka.benchmarks.clients.AsyncClient; +import javababushka.benchmarks.utils.ConnectionSettings; -// Jedis doesn't provide async API -// https://github.com/redis/jedis/issues/241 +/** + * A Jedis client with pseudo-async capabilities. Jedis doesn't provide async API + * https://github.com/redis/jedis/issues/241 + * + *

See: https://github.com/redis/jedis + */ public class JedisPseudoAsyncClient extends JedisClient implements AsyncClient { + @Override + public Future asyncConnectToRedis(ConnectionSettings connectionSettings) { + return CompletableFuture.runAsync(() -> super.connectToRedis(connectionSettings)); + } + @Override public Future asyncSet(String key, String value) { return CompletableFuture.runAsync(() -> super.set(key, value)); @@ -18,20 +27,6 @@ public Future asyncGet(String key) { return CompletableFuture.supplyAsync(() -> super.get(key)); } - @Override - public T waitForResult(Future future) { - return waitForResult(future, DEFAULT_TIMEOUT); - } - - @Override - public T waitForResult(Future future, long timeout) { - try { - return future.get(timeout, TimeUnit.MILLISECONDS); - } catch (Exception ignored) { - return null; - } - } - @Override public String getName() { return "Jedis pseudo-async"; diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClient.java similarity index 55% rename from java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClient.java rename to java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClient.java index 583f5e488f..ded154b3c5 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClient.java @@ -1,18 +1,17 @@ -/* - * This Java source file was generated by the Gradle 'init' task. - */ -package javababushka.benchmarks.lettuce; +package javababushka.benchmarks.clients.lettuce; import io.lettuce.core.RedisClient; import io.lettuce.core.RedisFuture; +import io.lettuce.core.RedisURI; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.async.RedisAsyncCommands; +import io.lettuce.core.codec.StringCodec; import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import javababushka.benchmarks.AsyncClient; +import javababushka.benchmarks.clients.AsyncClient; import javababushka.benchmarks.utils.ConnectionSettings; -public class LettuceAsyncClient implements AsyncClient { +/** A Lettuce client with async capabilities see: https://lettuce.io/ */ +public class LettuceAsyncClient implements AsyncClient { private RedisClient client; private RedisAsyncCommands asyncCommands; @@ -20,7 +19,7 @@ public class LettuceAsyncClient implements AsyncClient { @Override public void connectToRedis() { - connectToRedis(new ConnectionSettings("localhost", 6379, false)); + connectToRedis(new ConnectionSettings("localhost", 6379, false, false)); } @Override @@ -37,27 +36,29 @@ public void connectToRedis(ConnectionSettings connectionSettings) { } @Override - public RedisFuture asyncSet(String key, String value) { - return asyncCommands.set(key, value); + public Future asyncConnectToRedis(ConnectionSettings connectionSettings) { + client = RedisClient.create(); + var asyncConnection = + client.connectAsync( + new StringCodec(), + RedisURI.create( + String.format( + "%s://%s:%d", + connectionSettings.useSsl ? "rediss" : "redis", + connectionSettings.host, + connectionSettings.port))); + asyncConnection.whenComplete((connection, exception) -> asyncCommands = connection.async()); + return asyncConnection.thenApply((connection) -> "OK"); } @Override - public RedisFuture asyncGet(String key) { - return asyncCommands.get(key); - } - - @Override - public Object waitForResult(Future future) { - return waitForResult(future, DEFAULT_TIMEOUT); + public RedisFuture asyncSet(String key, String value) { + return asyncCommands.set(key, value); } @Override - public Object waitForResult(Future future, long timeoutMS) { - try { - return future.get(timeoutMS, TimeUnit.MILLISECONDS); - } catch (Exception ignored) { - return null; - } + public RedisFuture asyncGet(String key) { + return asyncCommands.get(key); } @Override diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClusterClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java similarity index 92% rename from java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClusterClient.java rename to java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java index 84e48eb691..6e5ed09ce3 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceAsyncClusterClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceAsyncClusterClient.java @@ -1,7 +1,7 @@ /* * This Java source file was generated by the Gradle 'init' task. */ -package javababushka.benchmarks.lettuce; +package javababushka.benchmarks.clients.lettuce; import io.lettuce.core.RedisFuture; import io.lettuce.core.RedisURI; @@ -18,7 +18,7 @@ public class LettuceAsyncClusterClient extends LettuceAsyncClient { @Override public void connectToRedis() { - connectToRedis(new ConnectionSettings("localhost", 6379, false)); + connectToRedis(new ConnectionSettings("localhost", 6379, false, true)); } @Override @@ -35,7 +35,7 @@ public void connectToRedis(ConnectionSettings connectionSettings) { } @Override - public RedisFuture asyncSet(String key, String value) { + public RedisFuture asyncSet(String key, String value) { return clusterAsyncCommands.set(key, value); } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceClient.java b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceClient.java similarity index 87% rename from java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceClient.java rename to java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceClient.java index e4e1830bda..87d7bc9d2e 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/lettuce/LettuceClient.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/clients/lettuce/LettuceClient.java @@ -1,14 +1,12 @@ -/* - * This Java source file was generated by the Gradle 'init' task. - */ -package javababushka.benchmarks.lettuce; +package javababushka.benchmarks.clients.lettuce; import io.lettuce.core.RedisClient; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.api.sync.RedisStringCommands; -import javababushka.benchmarks.SyncClient; +import javababushka.benchmarks.clients.SyncClient; import javababushka.benchmarks.utils.ConnectionSettings; +/** A Lettuce client with sync capabilities see: https://lettuce.io/ */ public class LettuceClient implements SyncClient { RedisClient client; @@ -17,7 +15,7 @@ public class LettuceClient implements SyncClient { @Override public void connectToRedis() { - connectToRedis(new ConnectionSettings("localhost", 6379, false)); + connectToRedis(new ConnectionSettings("localhost", 6379, false, false)); } @Override diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java index 1d3d842b35..4d826c427b 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/Benchmarking.java @@ -11,20 +11,26 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; -import javababushka.benchmarks.AsyncClient; import javababushka.benchmarks.BenchmarkingApp; -import javababushka.benchmarks.Client; -import javababushka.benchmarks.SyncClient; +import javababushka.benchmarks.clients.AsyncClient; +import javababushka.benchmarks.clients.Client; +import javababushka.benchmarks.clients.SyncClient; import org.apache.commons.lang3.tuple.Pair; +/** Class to calculate latency on client-actions */ public class Benchmarking { static final double PROB_GET = 0.8; static final double PROB_GET_EXISTING_KEY = 0.8; static final int SIZE_GET_KEYSPACE = 3750000; static final int SIZE_SET_KEYSPACE = 3000000; static final int ASYNC_OPERATION_TIMEOUT_SEC = 1; - // measurements are done in nanoseconds, but it should be converted to seconds later - public static final double SECONDS_IN_NANO = 1e-9; + static final double LATENCY_NORMALIZATION = 1000000.0; + static final int LATENCY_MIN = 100000; + static final int LATENCY_MAX = 10000000; + static final int LATENCY_MULTIPLIER = 10000; + static final double TPS_NORMALIZATION = 1000000000.0; // nano to seconds + // measurements are done in nano-seconds, but it should be converted to seconds later + static final double SECONDS_IN_NANO = 1e-9; public static final double NANO_TO_SECONDS = 1e9; private static ChosenAction randomAction() { @@ -57,13 +63,14 @@ public static Pair measurePerformance(Map latencies, int percentile) { + private static Long percentile(List latencies, int percentile) { int N = latencies.size(); double n = (N - 1) * percentile / 100. + 1; if (n == 1d) return latencies.get(0); @@ -73,7 +80,7 @@ private static Long percentile(ArrayList latencies, int percentile) { return Math.round(latencies.get(k - 1) + d * (latencies.get(k) - latencies.get(k - 1))); } - private static double stdDeviation(ArrayList latencies, Double avgLatency) { + private static double stdDeviation(List latencies, Double avgLatency) { double stdDeviation = latencies.stream() .mapToDouble(Long::doubleValue) @@ -83,47 +90,54 @@ private static double stdDeviation(ArrayList latencies, Double avgLatency) // This has the side-effect of sorting each latencies ArrayList public static Map calculateResults( - Map> actionLatencies) { + Map> actionLatencies) { Map results = new HashMap<>(); - for (Map.Entry> entry : actionLatencies.entrySet()) { + for (Map.Entry> entry : actionLatencies.entrySet()) { ChosenAction action = entry.getKey(); - ArrayList latencies = entry.getValue(); + List latencies = entry.getValue(); - double avgLatency = - latencies.size() == 0 - ? 0 - : SECONDS_IN_NANO - * latencies.stream().mapToLong(Long::longValue).sum() - / latencies.size(); + if (latencies.size() == 0) { + results.put(action, new LatencyResults(0, 0, 0, 0, 0, 0)); + } else { + double avgLatency = + SECONDS_IN_NANO + * latencies.stream().mapToLong(Long::longValue).sum() + / latencies.size(); - Collections.sort(latencies); - results.put( - action, - new LatencyResults( - avgLatency, - SECONDS_IN_NANO * percentile(latencies, 50), - SECONDS_IN_NANO * percentile(latencies, 90), - SECONDS_IN_NANO * percentile(latencies, 99), - SECONDS_IN_NANO * stdDeviation(latencies, avgLatency), - latencies.size())); + Collections.sort(latencies); + results.put( + action, + new LatencyResults( + avgLatency, + SECONDS_IN_NANO * percentile(latencies, 50), + SECONDS_IN_NANO * percentile(latencies, 90), + SECONDS_IN_NANO * percentile(latencies, 99), + SECONDS_IN_NANO * stdDeviation(latencies, avgLatency), + latencies.size())); + } } return results; } - public static void printResults(Map resultsMap) { + public static void printResults( + Map resultsMap, double duration, int iterations) { + System.out.printf("Runtime s: %f%n", duration); + System.out.printf("Iterations: %d%n", iterations); + System.out.printf("TPS: %f%n", iterations / duration); int totalHits = 0; for (Map.Entry entry : resultsMap.entrySet()) { ChosenAction action = entry.getKey(); LatencyResults results = entry.getValue(); - System.out.println("Avg. time in ms per " + action + ": " + results.avgLatency / 1000000.0); - System.out.println(action + " p50 latency in ms: " + results.p50Latency / 1000000.0); - System.out.println(action + " p90 latency in ms: " + results.p90Latency / 1000000.0); - System.out.println(action + " p99 latency in ms: " + results.p99Latency / 1000000.0); - System.out.println(action + " std dev in ms: " + results.stdDeviation / 1000000.0); - System.out.println(action + " total hits: " + results.totalHits); + System.out.printf("===> %s <===%n", action); + System.out.printf("avg. time ms: %f%n", results.avgLatency); + System.out.printf("std dev ms: %f%n", results.stdDeviation); + System.out.printf("p50 latency ms: %f%n", results.p50Latency); + System.out.printf("p90 latency ms: %f%n", results.p90Latency); + System.out.printf("p99 latency ms: %f%n", results.p99Latency); + System.out.printf("Total hits: %d%n", results.totalHits); totalHits += results.totalHits; } System.out.println("Total hits: " + totalHits); @@ -135,19 +149,23 @@ public static void testClientSetGet( int iterations = Math.min(Math.max(100000, concurrentNum * 10000), 10000000); for (int clientCount : config.clientCount) { for (int dataSize : config.dataSize) { - System.out.printf( - "%n =====> %s <===== %d clients %d concurrent %n%n", - clientCreator.get().getName(), clientCount, concurrentNum); - AtomicInteger iterationCounter = new AtomicInteger(0); - // create clients List clients = new LinkedList<>(); for (int cc = 0; cc < clientCount; cc++) { Client newClient = clientCreator.get(); - newClient.connectToRedis(new ConnectionSettings(config.host, config.port, config.tls)); + newClient.connectToRedis( + new ConnectionSettings( + config.host, config.port, config.tls, config.clusterModeEnabled)); clients.add(newClient); } + var clientName = clients.get(0).getName(); + + System.out.printf( + "%n =====> %s <===== %d clients %d concurrent %n%n", + clientName, clientCount, concurrentNum); + AtomicInteger iterationCounter = new AtomicInteger(0); + long started = System.nanoTime(); List>>> asyncTasks = new ArrayList<>(); @@ -179,7 +197,9 @@ public static void testClientSetGet( var actions = getActionMap(clients.get(clientIndex), dataSize, async); // operate and calculate tik-tok Pair result = measurePerformance(actions); - taskActionResults.get(result.getLeft()).add(result.getRight()); + if (result != null) { + taskActionResults.get(result.getLeft()).add(result.getRight()); + } iterationIncrement = iterationCounter.getAndIncrement(); clientIndex = iterationIncrement % clients.size(); @@ -188,7 +208,7 @@ public static void testClientSetGet( })); } if (config.debugLogging) { - System.out.printf("%s client Benchmarking: %n", clientCreator.get().getName()); + System.out.printf("%s client Benchmarking: %n", clientName); System.out.printf( "===> concurrentNum = %d, clientNum = %d, tasks = %d%n", concurrentNum, clientCount, asyncTasks.size()); @@ -207,7 +227,7 @@ public static void testClientSetGet( long after = System.nanoTime(); // Map to save latency results separately for each action - Map> actionResults = + Map> actionResults = Map.of( ChosenAction.GET_EXISTING, new ArrayList<>(), ChosenAction.GET_NON_EXISTING, new ArrayList<>(), @@ -226,6 +246,8 @@ public static void testClientSetGet( }); var calculatedResults = calculateResults(actionResults); + clients.forEach(Client::closeConnection); + if (config.resultsFile.isPresent()) { double tps = iterationCounter.get() * NANO_TO_SECONDS / (after - started); JsonWriter.Write( @@ -233,12 +255,12 @@ public static void testClientSetGet( config.resultsFile.get(), config.clusterModeEnabled, dataSize, - clientCreator.get().getName(), + clientName, clientCount, concurrentNum, tps); } - printResults(calculatedResults); + printResults(calculatedResults, (after - started) / TPS_NORMALIZATION, iterations); } } } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java index 2989d8b3b3..91c11c76a8 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/ConnectionSettings.java @@ -1,13 +1,12 @@ package javababushka.benchmarks.utils; -public class ConnectionSettings { - public String host; - public int port; - public boolean useSsl; +import lombok.AllArgsConstructor; - public ConnectionSettings(String host, int port, boolean useSsl) { - this.host = host; - this.port = port; - this.useSsl = useSsl; - } +/** Redis-client settings */ +@AllArgsConstructor +public class ConnectionSettings { + public final String host; + public final int port; + public final boolean useSsl; + public final boolean clusterMode; } diff --git a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java index 7c9ee65232..5d25d72a67 100644 --- a/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java +++ b/java/benchmarks/src/main/java/javababushka/benchmarks/utils/LatencyResults.java @@ -1,6 +1,9 @@ package javababushka.benchmarks.utils; -// Raw timing results in nanoseconds +import lombok.AllArgsConstructor; + +/** Raw timing results in nanoseconds */ +@AllArgsConstructor public class LatencyResults { public final double avgLatency; public final double p50Latency; @@ -8,19 +11,4 @@ public class LatencyResults { public final double p99Latency; public final double stdDeviation; public final int totalHits; - - public LatencyResults( - double avgLatency, - double p50Latency, - double p90Latency, - double p99Latency, - double stdDeviation, - int totalHits) { - this.avgLatency = avgLatency; - this.p50Latency = p50Latency; - this.p90Latency = p90Latency; - this.p99Latency = p99Latency; - this.stdDeviation = stdDeviation; - this.totalHits = totalHits; - } } diff --git a/java/benchmarks/src/test/java/javababushka/benchmarks/jedis/JedisClientIT.java b/java/benchmarks/src/test/java/javababushka/benchmarks/jedis/JedisClientIT.java index 12f800260f..9a6dbbe93e 100644 --- a/java/benchmarks/src/test/java/javababushka/benchmarks/jedis/JedisClientIT.java +++ b/java/benchmarks/src/test/java/javababushka/benchmarks/jedis/JedisClientIT.java @@ -7,7 +7,9 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import javababushka.benchmarks.clients.jedis.JedisClient; import javababushka.benchmarks.utils.Benchmarking; import javababushka.benchmarks.utils.ChosenAction; import org.junit.jupiter.api.BeforeAll; @@ -23,12 +25,6 @@ static void initializeJedisClient() { jedisClient.connectToRedis(); } - @Test - public void someLibraryMethodReturnsTrue() { - JedisClient classUnderTest = new JedisClient(); - assertTrue(classUnderTest.someLibraryMethod(), "someLibraryMethod should return 'true'"); - } - @Test public void testResourceInfo() { String result = jedisClient.info(); @@ -56,7 +52,7 @@ public void testResourceSetGet() { ChosenAction.GET_NON_EXISTING, () -> jedisClient.get(Benchmarking.generateKeyGet())); actions.put(ChosenAction.SET, () -> jedisClient.set(Benchmarking.generateKeySet(), value)); - Map> latencies = + Map> latencies = Map.of( ChosenAction.GET_EXISTING, new ArrayList<>(), ChosenAction.GET_NON_EXISTING, new ArrayList<>(), @@ -66,6 +62,6 @@ public void testResourceSetGet() { latencies.get(latency.getKey()).add(latency.getValue()); } - Benchmarking.printResults(Benchmarking.calculateResults(latencies)); + Benchmarking.printResults(Benchmarking.calculateResults(latencies), 0, iterations); } } diff --git a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java index 2c1f4eb93f..92f81df2da 100644 --- a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java +++ b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceAsyncClientIT.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.fail; import io.lettuce.core.RedisFuture; +import javababushka.benchmarks.clients.lettuce.LettuceAsyncClient; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; diff --git a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java index fd325a3414..e9670e8dbb 100644 --- a/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java +++ b/java/benchmarks/src/test/java/javababushka/benchmarks/lettuce/LettuceClientIT.java @@ -4,6 +4,7 @@ package javababushka.benchmarks.lettuce; import java.util.HashMap; +import javababushka.benchmarks.clients.lettuce.LettuceClient; import javababushka.benchmarks.utils.Benchmarking; import javababushka.benchmarks.utils.ChosenAction; import org.junit.jupiter.api.AfterAll; diff --git a/java/client/build.gradle b/java/client/build.gradle index d99f81ba7a..8dd4e5b7c5 100644 --- a/java/client/build.gradle +++ b/java/client/build.gradle @@ -10,23 +10,37 @@ repositories { dependencies { implementation group: 'com.google.protobuf', name: 'protobuf-java', version: '3.24.3' + implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.13.0' + + implementation group: 'io.netty', name: 'netty-handler', version: '4.1.100.Final' + // https://github.com/netty/netty/wiki/Native-transports + // Windows is not supported, because babushka does not support windows, because tokio does not support windows, because ... 42 + implementation group: 'io.netty', name: 'netty-transport-native-epoll', version: '4.1.100.Final', classifier: 'linux-x86_64' + implementation group: 'io.netty', name: 'netty-transport-native-kqueue', version: '4.1.100.Final', classifier: 'osx-x86_64' + implementation group: 'io.netty', name: 'netty-transport-native-kqueue', version: '4.1.100.Final', classifier: 'osx-aarch_64' } tasks.register('protobuf', Exec) { doFirst { - project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/org/babushka/javababushka/generated').toString()) + project.mkdir(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString()) } commandLine 'protoc', '-Iprotobuf=babushka-core/src/protobuf/', - '--java_out=java/client/src/main/java/org/babushka/javababushka/generated', + '--java_out=java/client/src/main/java/javababushka/generated', 'babushka-core/src/protobuf/connection_request.proto', 'babushka-core/src/protobuf/redis_request.proto', 'babushka-core/src/protobuf/response.proto' - workingDir Paths.get(project.rootDir.path, '../..').toFile() + workingDir Paths.get(project.rootDir.path, '..').toFile() +} + +tasks.register('cleanProtobuf') { + doFirst { + project.delete(Paths.get(project.projectDir.path, 'src/main/java/javababushka/generated').toString()) + } } tasks.register('buildRust', Exec) { - commandLine 'cargo', 'build' + commandLine 'cargo', 'build', '--release' workingDir project.rootDir } @@ -44,3 +58,15 @@ tasks.register('buildAll') { dependsOn 'protobuf', 'buildRust' finalizedBy 'build' } + +compileJava.dependsOn('protobuf') +clean.dependsOn('cleanProtobuf') + +tasks.withType(Test) { + testLogging { + exceptionFormat "full" + events "started", "skipped", "passed", "failed" + showStandardStreams true + } + jvmArgs "-Djava.library.path=${projectDir}/../target/release:${projectDir}/../target/debug" +} diff --git a/java/client/src/main/java/javababushka/BabushkaCoreNativeDefinitions.java b/java/client/src/main/java/javababushka/BabushkaCoreNativeDefinitions.java new file mode 100644 index 0000000000..3f26ebef91 --- /dev/null +++ b/java/client/src/main/java/javababushka/BabushkaCoreNativeDefinitions.java @@ -0,0 +1,11 @@ +package javababushka; + +public class BabushkaCoreNativeDefinitions { + public static native String startSocketListenerExternal() throws Exception; + + public static native Object valueFromPointer(long pointer); + + static { + System.loadLibrary("javababushka"); + } +} diff --git a/java/client/src/main/java/javababushka/Client.java b/java/client/src/main/java/javababushka/Client.java new file mode 100644 index 0000000000..d6cb16e591 --- /dev/null +++ b/java/client/src/main/java/javababushka/Client.java @@ -0,0 +1,405 @@ +package javababushka; + +import static connection_request.ConnectionRequestOuterClass.AddressInfo; +import static connection_request.ConnectionRequestOuterClass.AuthenticationInfo; +import static connection_request.ConnectionRequestOuterClass.ConnectionRequest; +import static connection_request.ConnectionRequestOuterClass.ConnectionRetryStrategy; +import static connection_request.ConnectionRequestOuterClass.ReadFromReplicaStrategy; +import static connection_request.ConnectionRequestOuterClass.TlsMode; +import static redis_request.RedisRequestOuterClass.Command; +import static redis_request.RedisRequestOuterClass.Command.ArgsArray; +import static redis_request.RedisRequestOuterClass.RedisRequest; +import static redis_request.RedisRequestOuterClass.RequestType; +import static redis_request.RedisRequestOuterClass.Routes; +import static redis_request.RedisRequestOuterClass.SimpleRoutes; +import static response.ResponseOuterClass.Response; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.WriteBufferWaterMark; +import io.netty.channel.epoll.EpollDomainSocketChannel; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.kqueue.KQueue; +import io.netty.channel.kqueue.KQueueDomainSocketChannel; +import io.netty.channel.kqueue.KQueueEventLoopGroup; +import io.netty.channel.unix.DomainSocketAddress; +import io.netty.channel.unix.UnixChannel; +import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; +import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import io.netty.util.internal.logging.InternalLoggerFactory; +import io.netty.util.internal.logging.Slf4JLoggerFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.lang3.tuple.Pair; + +public class Client implements AutoCloseable { + + private static final int RESPONSE_TIMEOUT_MILLISECONDS = 250; + private static final int CLIENT_CREATION_TIMEOUT_MILLISECONDS = 250; + private static final int HIGH_WRITE_WATERMARK = 4096; + private static final int LOW_WRITE_WATERMARK = 1024; + private static final long DEFAULT_TIMEOUT_MILLISECONDS = 1000; + public static boolean ALWAYS_FLUSH_ON_WRITE = true; + + // https://netty.io/3.6/api/org/jboss/netty/handler/queue/BufferedWriteHandler.html + // Flush every N bytes if !ALWAYS_FLUSH_ON_WRITE + public static int AUTO_FLUSH_THRESHOLD_BYTES = 512; // 1024; + private final AtomicInteger nonFlushedBytesCounter = new AtomicInteger(0); + + // Flush every N writes if !ALWAYS_FLUSH_ON_WRITE + public static int AUTO_FLUSH_THRESHOLD_WRITES = 10; + private final AtomicInteger nonFlushedWritesCounter = new AtomicInteger(0); + + // If !ALWAYS_FLUSH_ON_WRITE and a command has no response in N millis, flush (probably it wasn't + // send) + public static int AUTO_FLUSH_RESPONSE_TIMEOUT_MILLIS = 100; + // If !ALWAYS_FLUSH_ON_WRITE flush on timer (like a cron) + public static int AUTO_FLUSH_TIMER_MILLIS = 200; + + public static int PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS = 1000; + + // Futures to handle responses. Index is callback id, starting from 1 (0 index is for connection + // request always). + // Is it not a concurrent nor sync collection, but it is synced on adding. No removes. + private final List> responses = new ArrayList<>(); + // Unique offset for every client to avoid having multiple commands with the same id at a time. + // For debugging replace with: new Random().nextInt(1000) * 1000 + private final int callbackOffset = new Random().nextInt(); + + // TODO move to a [static] constructor. + private final String unixSocket = getSocket(); + + private static String getSocket() { + try { + return BabushkaCoreNativeDefinitions.startSocketListenerExternal(); + } catch (Exception | UnsatisfiedLinkError e) { + System.err.printf("Failed to get UDS from babushka and dedushka: %s%n%n", e); + throw new RuntimeException(e); + } + } + + private Channel channel = null; + private EventLoopGroup group = null; + + // We support MacOS and Linux only, because Babushka does not support Windows, because tokio does + // not support it. + // Probably we should use NIO (NioEventLoopGroup) for Windows. + private static final boolean isMacOs = isMacOs(); + + private static boolean isMacOs() { + try { + Class.forName("io.netty.channel.kqueue.KQueue"); + return KQueue.isAvailable(); + } catch (ClassNotFoundException e) { + return false; + } + } + + static { + // TODO fix: netty still doesn't use slf4j nor log4j + InternalLoggerFactory.setDefaultFactory(Slf4JLoggerFactory.INSTANCE); + } + + private void createChannel() { + // TODO maybe move to constructor or to static? + try { + channel = + new Bootstrap() + .option( + ChannelOption.WRITE_BUFFER_WATER_MARK, + new WriteBufferWaterMark(LOW_WRITE_WATERMARK, HIGH_WRITE_WATERMARK)) + .option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT) + .group(group = isMacOs ? new KQueueEventLoopGroup() : new EpollEventLoopGroup()) + .channel(isMacOs ? KQueueDomainSocketChannel.class : EpollDomainSocketChannel.class) + .handler( + new ChannelInitializer() { + @Override + public void initChannel(UnixChannel ch) throws Exception { + ch.pipeline() + .addLast("logger", new LoggingHandler(LogLevel.DEBUG)) + // https://netty.io/4.1/api/io/netty/handler/codec/protobuf/ProtobufEncoder.html + .addLast("protobufDecoder", new ProtobufVarint32FrameDecoder()) + .addLast("protobufEncoder", new ProtobufVarint32LengthFieldPrepender()) + .addLast( + new ChannelInboundHandlerAdapter() { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) + throws Exception { + // System.out.printf("=== channelRead %s %s %n", ctx, msg); + var buf = (ByteBuf) msg; + var bytes = new byte[buf.readableBytes()]; + buf.readBytes(bytes); + // TODO surround parsing with try-catch, set error to future if + // parsing failed. + var response = Response.parseFrom(bytes); + int callbackId = response.getCallbackIdx(); + if (callbackId != 0) { + // connection request has hardcoded callback id = 0 + // https://github.com/aws/babushka/issues/600 + callbackId -= callbackOffset; + } + // System.out.printf("== Received response with callback %d%n", + // response.getCallbackIdx()); + responses.get(callbackId).complete(response); + responses.set(callbackId, null); + super.channelRead(ctx, bytes); + } + + @Override + public void exceptionCaught( + ChannelHandlerContext ctx, Throwable cause) throws Exception { + System.out.printf("=== exceptionCaught %s %s %n", ctx, cause); + cause.printStackTrace(); + super.exceptionCaught(ctx, cause); + } + }) + .addLast( + new ChannelOutboundHandlerAdapter() { + @Override + public void write( + ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + // System.out.printf("=== write %s %s %s %n", ctx, msg, promise); + var bytes = (byte[]) msg; + + boolean needFlush = false; + if (!ALWAYS_FLUSH_ON_WRITE) { + synchronized (nonFlushedBytesCounter) { + if (nonFlushedBytesCounter.addAndGet(bytes.length) + >= AUTO_FLUSH_THRESHOLD_BYTES + || nonFlushedWritesCounter.incrementAndGet() + >= AUTO_FLUSH_THRESHOLD_WRITES) { + nonFlushedBytesCounter.set(0); + nonFlushedWritesCounter.set(0); + needFlush = true; + } + } + } + super.write(ctx, Unpooled.copiedBuffer(bytes), promise); + if (needFlush) { + // flush outside the sync block + flush(ctx); + // System.out.println("-- auto flush - buffer"); + } + } + }); + } + }) + .connect(new DomainSocketAddress(unixSocket)) + .sync() + .channel(); + + } catch (Exception e) { + System.err.printf( + "Failed to create a channel %s: %s%n", e.getClass().getSimpleName(), e.getMessage()); + e.printStackTrace(System.err); + } + + if (!ALWAYS_FLUSH_ON_WRITE) { + new Timer(true) + .scheduleAtFixedRate( + new TimerTask() { + @Override + public void run() { + channel.flush(); + nonFlushedBytesCounter.set(0); + nonFlushedWritesCounter.set(0); + } + }, + 0, + AUTO_FLUSH_TIMER_MILLIS); + } + } + + public void closeConnection() { + + // flush and close the channel + channel.flush(); + channel.close(); + // TODO: check that the channel is closed + + // shutdown the event loop group gracefully by waiting for the remaining response + // and then shutting down the connection + try { + long waitStarted = System.nanoTime(); + long waitUntil = + waitStarted + PENDING_RESPONSES_ON_CLOSE_TIMEOUT_MILLIS * 100_000; // in nanos + for (var responseFuture : responses) { + if (responseFuture == null || responseFuture.isDone()) { + continue; + } + try { + responseFuture.get(waitUntil - System.nanoTime(), TimeUnit.NANOSECONDS); + } catch (InterruptedException | ExecutionException ignored) { + // TODO: print warning + } catch (TimeoutException e) { + responseFuture.cancel(true); + // TODO: cancel the rest + break; + } + } + } finally { + var shuttingDown = group.shutdownGracefully(); + try { + shuttingDown.get(); + } catch (InterruptedException | ExecutionException e) { + e.printStackTrace(); + } + assert group.isShutdown() : "Redis connection did not shutdown gracefully"; + } + } + + public void set(String key, String value) { + waitForResult(asyncSet(key, value)); + // TODO parse response and rethrow an exception if there is an error + } + + public String get(String key) { + return waitForResult(asyncGet(key)); + // TODO support non-strings + } + + private synchronized Pair> getNextCallback() { + var future = new CompletableFuture(); + responses.add(future); + return Pair.of(responses.size() - 1, future); + } + + @Override + public void close() throws Exception { + closeConnection(); + } + + public Future asyncConnectToRedis( + String host, int port, boolean useSsl, boolean clusterMode) { + createChannel(); + + var request = + ConnectionRequest.newBuilder() + .addAddresses(AddressInfo.newBuilder().setHost(host).setPort(port).build()) + .setTlsMode( + useSsl // TODO: secure or insecure TLS? + ? TlsMode.SecureTls + : TlsMode.NoTls) + .setClusterModeEnabled(clusterMode) + .setResponseTimeout(RESPONSE_TIMEOUT_MILLISECONDS) + .setClientCreationTimeout(CLIENT_CREATION_TIMEOUT_MILLISECONDS) + .setReadFromReplicaStrategy(ReadFromReplicaStrategy.AlwaysFromPrimary) + .setConnectionRetryStrategy( + ConnectionRetryStrategy.newBuilder() + .setNumberOfRetries(1) + .setFactor(1) + .setExponentBase(1) + .build()) + .setAuthenticationInfo( + AuthenticationInfo.newBuilder().setPassword("").setUsername("default").build()) + .setDatabaseId(0) + .build(); + + var future = new CompletableFuture(); + responses.add(future); + channel.writeAndFlush(request.toByteArray()); + return future; + } + + private CompletableFuture submitNewCommand(RequestType command, List args) { + var commandId = getNextCallback(); + // System.out.printf("== %s(%s), callback %d%n", command, String.join(", ", args), commandId); + + return CompletableFuture.supplyAsync( + () -> { + var commandArgs = ArgsArray.newBuilder(); + for (var arg : args) { + commandArgs.addArgs(arg); + } + + RedisRequest request = + RedisRequest.newBuilder() + .setCallbackIdx(commandId.getKey() + callbackOffset) + .setSingleCommand( + Command.newBuilder() + .setRequestType(command) + .setArgsArray(commandArgs.build()) + .build()) + .setRoute(Routes.newBuilder().setSimpleRoutes(SimpleRoutes.AllNodes).build()) + .build(); + if (ALWAYS_FLUSH_ON_WRITE) { + channel.writeAndFlush(request.toByteArray()); + return commandId.getRight(); + } + channel.write(request.toByteArray()); + return autoFlushFutureWrapper(commandId.getRight()); + }) + .thenCompose(f -> f); + } + + private CompletableFuture autoFlushFutureWrapper(Future future) { + return CompletableFuture.supplyAsync( + () -> { + try { + return future.get(AUTO_FLUSH_RESPONSE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + // System.out.println("-- auto flush - timeout"); + channel.flush(); + nonFlushedBytesCounter.set(0); + nonFlushedWritesCounter.set(0); + } + try { + return future.get(); + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + }); + } + + public Future asyncSet(String key, String value) { + // System.out.printf("== set(%s, %s), callback %d%n", key, value, callbackId); + return submitNewCommand(RequestType.SetString, List.of(key, value)); + } + + public Future asyncGet(String key) { + // System.out.printf("== get(%s), callback %d%n", key, callbackId); + return submitNewCommand(RequestType.GetString, List.of(key)) + .thenApply( + response -> + response.getRespPointer() != 0 + ? BabushkaCoreNativeDefinitions.valueFromPointer(response.getRespPointer()) + .toString() + : null); + } + + public T waitForResult(Future future) { + return waitForResult(future, DEFAULT_TIMEOUT_MILLISECONDS); + } + + public T waitForResult(Future future, long timeout) { + try { + return future.get(timeout, TimeUnit.MILLISECONDS); + } catch (Exception ignored) { + return null; + } + } +} diff --git a/java/src/lib.rs b/java/src/lib.rs index e69de29bb2..468d8797e7 100644 --- a/java/src/lib.rs +++ b/java/src/lib.rs @@ -0,0 +1,98 @@ +use babushka::start_socket_listener; + +use jni::objects::{JClass, JObject, JThrowable}; +use jni::JNIEnv; +use jni::sys::jlong; +use std::sync::mpsc; +use log::error; +use logger_core::Level; +use redis::Value; + +fn redis_value_to_java(mut env: JNIEnv, val: Value) -> JObject { + match val { + Value::Nil => JObject::null(), + Value::Status(str) => JObject::from(env.new_string(str).unwrap()), + Value::Okay => JObject::from(env.new_string("OK").unwrap()), + // TODO use primitive integer + Value::Int(num) => env.new_object("java/lang/Integer", "(I)V", &[num.into()]).unwrap(), + Value::Data(data) => match std::str::from_utf8(data.as_ref()) { + Ok(val) => JObject::from(env.new_string(val).unwrap()), + Err(_err) => { + let _ = env.throw("Error decoding Unicode data"); + JObject::null() + }, + }, + Value::Bulk(_bulk) => { + let _ = env.throw("Not implemented"); + JObject::null() + /* + let elements: &PyList = PyList::new( + py, + bulk.into_iter() + .map(|item| redis_value_to_py(py, item).unwrap()), + ); + Ok(elements.into_py(py)) + */ + } + } +} + +#[no_mangle] +pub extern "system" fn Java_javababushka_BabushkaCoreNativeDefinitions_valueFromPointer<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local>, + pointer: jlong +) -> JObject<'local> { + let value = unsafe { Box::from_raw(pointer as *mut Value) }; + redis_value_to_java(env, *value) +} + +#[no_mangle] +pub extern "system" fn Java_javababushka_BabushkaCoreNativeDefinitions_startSocketListenerExternal<'local>( + mut env: JNIEnv<'local>, + _class: JClass<'local> +) -> JObject<'local> { + let (tx, rx) = mpsc::channel::>(); + + //logger_core::init(Some(Level::Trace), None); + + start_socket_listener(move |socket_path : Result| { + // Signals that thread has started + let _ = tx.send(socket_path); + }); + + // Wait until the thread has started + let socket_path = rx.recv(); + + match socket_path { + Ok(Ok(path)) => { + env.new_string(path).unwrap().into() + }, + Ok(Err(error_message)) => { + throw_java_exception(env, error_message); + JObject::null() + }, + Err(error) => { + throw_java_exception(env, error.to_string()); + JObject::null() + } + } +} + +fn throw_java_exception(mut env: JNIEnv, message: String) { + let res = env.new_object( + "java/lang/Exception", + "(Ljava/lang/String;)V", + &[ + (&env.new_string(message.clone()).unwrap()).into(), + ]); + + match res { + Ok(res) => { + let _ = env.throw(JThrowable::from(res)); + }, + Err(err) => { + error!("Failed to create exception with string {}: {}", message, err.to_string()); + } + }; +}