Skip to content

Commit

Permalink
deps: Upgraded to Spring Batch Redis 3.7.4
Browse files Browse the repository at this point in the history
  • Loading branch information
jruaux committed Oct 15, 2023
1 parent a9bc1e9 commit 1574838
Show file tree
Hide file tree
Showing 15 changed files with 564 additions and 526 deletions.
2 changes: 1 addition & 1 deletion core/redis-kafka-connect/redis-kafka-connect.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ dependencies {
implementation group: 'com.redis', name: 'spring-batch-redis', version: springBatchRedisVersion
implementation 'org.apache.commons:commons-pool2'
implementation 'org.slf4j:slf4j-api'
runtimeOnly 'org.slf4j:slf4j-log4j12'
compileOnly 'org.apache.kafka:connect-api'
compileOnly 'org.apache.kafka:connect-json'
testImplementation 'org.apache.kafka:connect-api'
Expand All @@ -32,6 +31,7 @@ dependencies {
testImplementation 'org.junit.jupiter:junit-jupiter-engine'
testImplementation 'org.junit.platform:junit-platform-launcher'
testImplementation 'org.slf4j:slf4j-simple'
testImplementation group: 'com.redis', name: 'spring-batch-redis', version: springBatchRedisVersion, classifier: 'tests'
testImplementation group: 'org.testcontainers', name: 'junit-jupiter', version: testcontainersVersion
testImplementation(group: 'com.redis.testcontainers', name: 'testcontainers-redis', version: testcontainersRedisVersion) {
exclude group: 'com.redis', module: 'lettucemod'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,75 +21,104 @@

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.types.Password;
import org.springframework.util.StringUtils;

import com.redis.lettucemod.util.ClientBuilder;
import com.redis.lettucemod.util.RedisURIBuilder;
import com.redis.spring.batch.common.PoolOptions;
import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.RedisURI;
import io.lettuce.core.SslVerifyMode;
import io.netty.util.internal.StringUtil;
import io.lettuce.core.SslOptions;
import io.lettuce.core.SslOptions.Builder;
import io.lettuce.core.cluster.ClusterClientOptions;

public abstract class RedisConfig extends AbstractConfig {

protected RedisConfig(RedisConfigDef config, Map<?, ?> originals) {
super(config, originals);
}
private static final char[] EMPTY_PASSWORD = new char[0];

public RedisURI uri() {
RedisURIBuilder builder = RedisURIBuilder.create();
String uri = getString(RedisConfigDef.URI_CONFIG);
if (StringUtil.isNullOrEmpty(uri)) {
builder.host(getString(RedisConfigDef.HOST_CONFIG));
builder.port(getInt(RedisConfigDef.PORT_CONFIG));
} else {
builder.uri(uri);
}
if (Boolean.TRUE.equals(getBoolean(RedisConfigDef.INSECURE_CONFIG))) {
builder.sslVerifyMode(SslVerifyMode.NONE);
}
builder.ssl(getBoolean(RedisConfigDef.TLS_CONFIG));
String username = getString(RedisConfigDef.USERNAME_CONFIG);
if (!StringUtil.isNullOrEmpty(username)) {
builder.username(username);
}
Password password = getPassword(RedisConfigDef.PASSWORD_CONFIG);
if (password != null && !StringUtil.isNullOrEmpty(password.value())) {
builder.password(password.value().toCharArray());
}
Long timeout = getLong(RedisConfigDef.TIMEOUT_CONFIG);
if (timeout != null) {
builder.timeout(Duration.ofSeconds(timeout));
}
return builder.build();
}
protected RedisConfig(RedisConfigDef config, Map<?, ?> originals) {
super(config, originals);
}

private AbstractRedisClient client(RedisURI uri) {
ClientBuilder builder = ClientBuilder.create(uri);
builder.cluster(getBoolean(RedisConfigDef.CLUSTER_CONFIG));
String keyFile = getString(RedisConfigDef.KEY_CONFIG);
if (!StringUtil.isNullOrEmpty(keyFile)) {
builder.key(new File(keyFile));
builder.keyCert(new File(getString(RedisConfigDef.KEY_CERT_CONFIG)));
Password password = getPassword(RedisConfigDef.KEY_PASSWORD_CONFIG);
if (password != null && !StringUtil.isNullOrEmpty(password.value())) {
builder.keyPassword(password.value().toCharArray());
}
}
String cacert = getString(RedisConfigDef.CACERT_CONFIG);
if (!StringUtil.isNullOrEmpty(cacert)) {
builder.trustManager(new File(cacert));
}
return builder.build();
}
public RedisURI uri() {
RedisURI.Builder builder = redisURIBuilder();
Boolean ssl = getBoolean(RedisConfigDef.TLS_CONFIG);
if (Boolean.TRUE.equals(ssl)) {
builder.withSsl(ssl);
Boolean insecure = getBoolean(RedisConfigDef.INSECURE_CONFIG);
if (Boolean.TRUE.equals(insecure)) {
builder.withVerifyPeer(false);
}
}
Password password = getPassword(RedisConfigDef.PASSWORD_CONFIG);
if (password != null) {
String passwordString = password.value();
if (StringUtils.hasLength(passwordString)) {
String username = getString(RedisConfigDef.USERNAME_CONFIG);
if (StringUtils.hasLength(username)) {
builder.withAuthentication(username, passwordString);
} else {
builder.withPassword((CharSequence) passwordString);
}
}
}
Long timeout = getLong(RedisConfigDef.TIMEOUT_CONFIG);
if (timeout != null) {
builder.withTimeout(Duration.ofSeconds(timeout));
}
return builder.build();
}

public AbstractRedisClient client() {
return client(uri());
}
private RedisURI.Builder redisURIBuilder() {
String uri = getString(RedisConfigDef.URI_CONFIG);
if (StringUtils.hasLength(uri)) {
return RedisURI.builder(RedisURI.create(uri));
}
String host = getString(RedisConfigDef.HOST_CONFIG);
int port = getInt(RedisConfigDef.PORT_CONFIG);
return RedisURI.Builder.redis(host, port);
}

public PoolOptions poolOptions() {
return PoolOptions.builder().maxTotal(getInt(RedisConfigDef.POOL_MAX_CONFIG)).build();
}
private AbstractRedisClient client(RedisURI uri) {
Boolean cluster = getBoolean(RedisConfigDef.CLUSTER_CONFIG);
if (Boolean.TRUE.equals(cluster)) {
RedisModulesClusterClient client = RedisModulesClusterClient.create(uri);
client.setOptions(clientOptions(ClusterClientOptions.builder()).build());
return client;
}
RedisModulesClient client = RedisModulesClient.create(uri);
client.setOptions(clientOptions(ClientOptions.builder()).build());
return client;
}

private <B extends ClientOptions.Builder> B clientOptions(B builder) {
builder.sslOptions(sslOptions());
return builder;
}

private SslOptions sslOptions() {
Builder options = SslOptions.builder();
String key = getString(RedisConfigDef.KEY_CONFIG);
if (StringUtils.hasLength(key)) {
String cert = getString(RedisConfigDef.KEY_CERT_CONFIG);
Password password = getPassword(RedisConfigDef.KEY_PASSWORD_CONFIG);
char[] passwordCharArray = password == null ? EMPTY_PASSWORD : password.value().toCharArray();
options.keyManager(new File(cert), new File(key), passwordCharArray);
}
String cacert = getString(RedisConfigDef.CACERT_CONFIG);
if (StringUtils.hasLength(cacert)) {
options.trustManager(new File(cacert));
}
return options.build();
}

public AbstractRedisClient client() {
return client(uri());
}

public int getPoolSize() {
return getInt(RedisConfigDef.POOL_MAX_CONFIG);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,92 +2,120 @@

import org.apache.kafka.common.config.ConfigDef;

import com.redis.spring.batch.common.PoolOptions;
import com.redis.spring.batch.common.AbstractOperationExecutor;

import io.lettuce.core.RedisURI;

public abstract class RedisConfigDef extends ConfigDef {

public static final String CLUSTER_CONFIG = "redis.cluster";
private static final boolean CLUSTER_DEFAULT = false;
private static final String CLUSTER_DOC = "Connect to a Redis Cluster database";

public static final String HOST_CONFIG = "redis.host";
private static final String HOST_DEFAULT = "localhost";
private static final String HOST_DOC = "The Redis host to connect to";

public static final String PORT_CONFIG = "redis.port";
private static final int PORT_DEFAULT = RedisURI.DEFAULT_REDIS_PORT;
private static final String PORT_DOC = "The Redis port to connect to";

public static final String URI_CONFIG = "redis.uri";
private static final String URI_DEFAULT = "";
private static final String URI_DOC = "URI of the Redis database to connect to, e.g. redis://redis-12000.redis.com:12000. For secure connections use rediss URI scheme, e.g. rediss://...";

public static final String USERNAME_CONFIG = "redis.username";
private static final String USERNAME_DEFAULT = "";
private static final String USERNAME_DOC = "Username to use to connect to Redis";

public static final String PASSWORD_CONFIG = "redis.password";
private static final String PASSWORD_DEFAULT = "";
private static final String PASSWORD_DOC = "Password to use to connect to Redis";

public static final String TIMEOUT_CONFIG = "redis.timeout";
private static final long TIMEOUT_DEFAULT = RedisURI.DEFAULT_TIMEOUT;
private static final String TIMEOUT_DOC = "Redis command timeout in seconds";

public static final String POOL_MAX_CONFIG = "redis.pool";
private static final int POOL_MAX_DEFAULT = PoolOptions.DEFAULT_MAX_TOTAL;
private static final String POOL_MAX_DOC = "Max pool connections";

public static final String TLS_CONFIG = "redis.tls";
private static final boolean TLS_DEFAULT = false;
private static final String TLS_DOC = "Establish a secure TLS connection";

public static final String INSECURE_CONFIG = "redis.insecure";
private static final boolean INSECURE_DEFAULT = false;
private static final String INSECURE_DOC = "Allow insecure connections (e.g. invalid certificates) to Redis when using SSL";

public static final String KEY_CONFIG = "redis.key.file";
public static final String KEY_DEFAULT = "";
private static final String KEY_DOC = "PKCS#8 private key file to authenticate with (PEM format)";

public static final String KEY_CERT_CONFIG = "redis.key.cert";
public static final String KEY_CERT_DEFAULT = "";
private static final String KEY_CERT_DOC = "X.509 certificate chain file to authenticate with (PEM format)";

public static final String KEY_PASSWORD_CONFIG = "redis.key.password";
private static final String KEY_PASSWORD_DEFAULT = "";
private static final String KEY_PASSWORD_DOC = "Password of the private key file. Leave empty if key file is not password-protected";

public static final String CACERT_CONFIG = "redis.cacert";
public static final String CACERT_DEFAULT = "";
private static final String CACERT_DOC = "X.509 CA certificate file to verify with";

protected RedisConfigDef() {
defineConfigs();
}

protected RedisConfigDef(ConfigDef base) {
super(base);
defineConfigs();
}

private void defineConfigs() {
define(CLUSTER_CONFIG, Type.BOOLEAN, CLUSTER_DEFAULT, Importance.MEDIUM, CLUSTER_DOC);
define(HOST_CONFIG, Type.STRING, HOST_DEFAULT, Importance.HIGH, HOST_DOC);
define(PORT_CONFIG, Type.INT, PORT_DEFAULT, Importance.HIGH, PORT_DOC);
define(URI_CONFIG, Type.STRING, URI_DEFAULT, Importance.MEDIUM, URI_DOC);
define(TLS_CONFIG, Type.BOOLEAN, TLS_DEFAULT, Importance.MEDIUM, TLS_DOC);
define(INSECURE_CONFIG, Type.BOOLEAN, INSECURE_DEFAULT, Importance.MEDIUM, INSECURE_DOC);
define(PASSWORD_CONFIG, Type.PASSWORD, PASSWORD_DEFAULT, Importance.MEDIUM, PASSWORD_DOC);
define(USERNAME_CONFIG, Type.STRING, USERNAME_DEFAULT, Importance.MEDIUM, USERNAME_DOC);
define(TIMEOUT_CONFIG, Type.LONG, TIMEOUT_DEFAULT, Importance.MEDIUM, TIMEOUT_DOC);
define(POOL_MAX_CONFIG, Type.INT, POOL_MAX_DEFAULT, Importance.MEDIUM, POOL_MAX_DOC);
define(KEY_CONFIG, Type.STRING, KEY_DEFAULT, Importance.MEDIUM, KEY_DOC);
define(KEY_CERT_CONFIG, Type.STRING, KEY_CERT_DEFAULT, Importance.MEDIUM, KEY_CERT_DOC);
define(KEY_PASSWORD_CONFIG, Type.PASSWORD, KEY_PASSWORD_DEFAULT, Importance.MEDIUM, KEY_PASSWORD_DOC);
define(CACERT_CONFIG, Type.STRING, CACERT_DEFAULT, Importance.MEDIUM, CACERT_DOC);
}

}
public static final String CLUSTER_CONFIG = "redis.cluster";

private static final boolean CLUSTER_DEFAULT = false;

private static final String CLUSTER_DOC = "Connect to a Redis Cluster database";

public static final String HOST_CONFIG = "redis.host";

private static final String HOST_DEFAULT = "localhost";

private static final String HOST_DOC = "The Redis host to connect to";

public static final String PORT_CONFIG = "redis.port";

private static final int PORT_DEFAULT = RedisURI.DEFAULT_REDIS_PORT;

private static final String PORT_DOC = "The Redis port to connect to";

public static final String URI_CONFIG = "redis.uri";

private static final String URI_DEFAULT = "";

private static final String URI_DOC = "URI of the Redis database to connect to, e.g. redis://redis-12000.redis.com:12000. For secure connections use rediss URI scheme, e.g. rediss://...";

public static final String USERNAME_CONFIG = "redis.username";

private static final String USERNAME_DEFAULT = "";

private static final String USERNAME_DOC = "Username to use to connect to Redis";

public static final String PASSWORD_CONFIG = "redis.password";

private static final String PASSWORD_DEFAULT = "";

private static final String PASSWORD_DOC = "Password to use to connect to Redis";

public static final String TIMEOUT_CONFIG = "redis.timeout";

private static final long TIMEOUT_DEFAULT = RedisURI.DEFAULT_TIMEOUT;

private static final String TIMEOUT_DOC = "Redis command timeout in seconds";

public static final String POOL_MAX_CONFIG = "redis.pool";

private static final int POOL_MAX_DEFAULT = AbstractOperationExecutor.DEFAULT_POOL_SIZE;

private static final String POOL_MAX_DOC = "Max pool connections";

public static final String TLS_CONFIG = "redis.tls";

private static final boolean TLS_DEFAULT = false;

private static final String TLS_DOC = "Establish a secure TLS connection";

public static final String INSECURE_CONFIG = "redis.insecure";

private static final boolean INSECURE_DEFAULT = false;

private static final String INSECURE_DOC = "Allow insecure connections (e.g. invalid certificates) to Redis when using SSL";

public static final String KEY_CONFIG = "redis.key.file";

public static final String KEY_DEFAULT = "";

private static final String KEY_DOC = "PKCS#8 private key file to authenticate with (PEM format)";

public static final String KEY_CERT_CONFIG = "redis.key.cert";

public static final String KEY_CERT_DEFAULT = "";

private static final String KEY_CERT_DOC = "X.509 certificate chain file to authenticate with (PEM format)";

public static final String KEY_PASSWORD_CONFIG = "redis.key.password";

private static final String KEY_PASSWORD_DEFAULT = "";

private static final String KEY_PASSWORD_DOC = "Password of the private key file. Leave empty if key file is not password-protected";

public static final String CACERT_CONFIG = "redis.cacert";

public static final String CACERT_DEFAULT = "";

private static final String CACERT_DOC = "X.509 CA certificate file to verify with";

protected RedisConfigDef() {
defineConfigs();
}

protected RedisConfigDef(ConfigDef base) {
super(base);
defineConfigs();
}

private void defineConfigs() {
define(CLUSTER_CONFIG, Type.BOOLEAN, CLUSTER_DEFAULT, Importance.MEDIUM, CLUSTER_DOC);
define(HOST_CONFIG, Type.STRING, HOST_DEFAULT, Importance.HIGH, HOST_DOC);
define(PORT_CONFIG, Type.INT, PORT_DEFAULT, Importance.HIGH, PORT_DOC);
define(URI_CONFIG, Type.STRING, URI_DEFAULT, Importance.MEDIUM, URI_DOC);
define(TLS_CONFIG, Type.BOOLEAN, TLS_DEFAULT, Importance.MEDIUM, TLS_DOC);
define(INSECURE_CONFIG, Type.BOOLEAN, INSECURE_DEFAULT, Importance.MEDIUM, INSECURE_DOC);
define(PASSWORD_CONFIG, Type.PASSWORD, PASSWORD_DEFAULT, Importance.MEDIUM, PASSWORD_DOC);
define(USERNAME_CONFIG, Type.STRING, USERNAME_DEFAULT, Importance.MEDIUM, USERNAME_DOC);
define(TIMEOUT_CONFIG, Type.LONG, TIMEOUT_DEFAULT, Importance.MEDIUM, TIMEOUT_DOC);
define(POOL_MAX_CONFIG, Type.INT, POOL_MAX_DEFAULT, Importance.MEDIUM, POOL_MAX_DOC);
define(KEY_CONFIG, Type.STRING, KEY_DEFAULT, Importance.MEDIUM, KEY_DOC);
define(KEY_CERT_CONFIG, Type.STRING, KEY_CERT_DEFAULT, Importance.MEDIUM, KEY_CERT_DOC);
define(KEY_PASSWORD_CONFIG, Type.PASSWORD, KEY_PASSWORD_DEFAULT, Importance.MEDIUM, KEY_PASSWORD_DOC);
define(CACERT_CONFIG, Type.STRING, CACERT_DEFAULT, Importance.MEDIUM, CACERT_DOC);
}

}
Loading

0 comments on commit 1574838

Please sign in to comment.