Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for redis clustered mode #735

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion src/main/groovy/io/seqera/wave/redis/RedisFactory.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,16 @@ import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import io.micronaut.core.annotation.Nullable
import jakarta.inject.Singleton
import redis.clients.jedis.DefaultJedisClientConfig
import redis.clients.jedis.HostAndPort
import redis.clients.jedis.JedisCluster
import redis.clients.jedis.JedisPool
import redis.clients.jedis.JedisPoolConfig
import redis.clients.jedis.exceptions.InvalidURIException
import redis.clients.jedis.util.JedisURIHelper

/**
* Redis connection pool factory
*
Expand All @@ -38,18 +45,59 @@ import redis.clients.jedis.JedisPoolConfig
class RedisFactory {

@Singleton
@Requires(property = "redis.mode", value = "standalone")
JedisPool createRedisPool(
@Value('${redis.uri}') String uri,
@Value('${redis.pool.minIdle:0}') int minIdle,
@Value('${redis.pool.maxIdle:10}') int maxIdle,
@Value('${redis.pool.maxTotal:50}') int maxTotal
) {
log.info "Using redis $uri as storage for rate limit - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}"
log.info "Using redis $uri as storage and cache - pool minIdle: ${minIdle}; maxIdle: ${maxIdle}; maxTotal: ${maxTotal}"
final config = new JedisPoolConfig()
config.setMinIdle(minIdle)
config.setMaxIdle(maxIdle)
config.setMaxTotal(maxTotal)
return new JedisPool(config, URI.create(uri))
}

@Singleton
@Requires(property = "redis.mode", value = "cluster")
JedisCluster createRedisCluster(
@Value('${redis.uris}') List<String> uris,
@Value('${redis.client.timeout:5000}') int timeout,
@Nullable @Value('${redis.user}') String user,
@Nullable @Value('${redis.password}') String password,
@Nullable @Value('${redis.db}') String db,
@Nullable @Value('${redis.ssl}') boolean ssl
) {
log.info "Using redis cluster $uris as storage and cache - timeout: ${timeout}ms"

final jedisClusterNodes = new HashSet<HostAndPort>()
for (def uri : uris){
if (!JedisURIHelper.isValid(URI.create(uri))) {
throw new InvalidURIException(String.format(
"Cannot open Redis connection due invalid URI. %s", uri.toString()))
}
jedisClusterNodes.add(HostAndPort.from(uri))
}

def clientConfig = DefaultJedisClientConfig.builder().connectionTimeoutMillis(timeout)
.socketTimeoutMillis(timeout)
.blockingSocketTimeoutMillis(timeout)
if ( user ){
clientConfig.user(user)
}
if ( password ){
clientConfig.password(password)
}
if ( db ){
clientConfig.database(db as int)
}
if ( ssl ){
clientConfig.ssl(true)
}

return new JedisCluster(jedisClusterNodes, clientConfig.build())
}

}
76 changes: 76 additions & 0 deletions src/main/groovy/io/seqera/wave/redis/RedisService.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Wave, containers provisioning service
* Copyright (c) 2023-2024, Seqera Labs
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package io.seqera.wave.redis

import java.util.concurrent.TimeoutException

import redis.clients.jedis.Connection
import redis.clients.jedis.StreamEntryID
import redis.clients.jedis.Transaction
import redis.clients.jedis.params.ScanParams
import redis.clients.jedis.params.SetParams
import redis.clients.jedis.params.XAutoClaimParams
import redis.clients.jedis.params.XReadGroupParams
import redis.clients.jedis.resps.ScanResult
import redis.clients.jedis.resps.StreamEntry
import redis.clients.jedis.resps.Tuple
/**
* Implements RedisService
*
* @author Munish Chouhan <[email protected]>
*/
interface RedisService {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure to understand the need for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JedisCluster and JedisPool doesnot have common Interface, so not possible to inject
Thats why I have created this service which have separate implementation for cluster and standalone and based on redis.mode it will be initialized


String get(final String key)

long hincrBy(final String key, final String field, final long value)

Long hget(final String key, final String field)

ScanResult<Map.Entry<String, String>> hscan(String key, String cursor, ScanParams params)

String set(final String key, final String value, final SetParams params)

Transaction multi() throws TimeoutException

long lpush(final String target, final String message)

String rpop(final String target)

String brpop( final double timeout, final String target)

String xgroupCreate(final String key, final String groupName, final StreamEntryID id, final boolean makeStream)

StreamEntryID xadd(final String key, final StreamEntryID id, final Map<String, String> hash)

Map.Entry<StreamEntryID, List<StreamEntry>> xautoclaim(String key, String group, String consumerName, long minIdleTime, StreamEntryID start, XAutoClaimParams params)

List<Map.Entry<String, List<StreamEntry>>> xreadGroup(final String groupName, final String consumer, final XReadGroupParams xReadGroupParams, final Map<String, StreamEntryID> streams)

long zadd(final String key, final double score, final String member)

Object eval(final String script, final int keyCount, final String... params)

List<Tuple> zrangeByScoreWithScores(final String key, final double min, final double max, final int offset, final int count)

long del(final String key)

String flushAll()

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package io.seqera.wave.redis.impl

import java.util.concurrent.TimeoutException

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import io.micronaut.context.annotation.Requires
import io.seqera.wave.redis.RedisService
import jakarta.inject.Inject
import jakarta.inject.Singleton
import redis.clients.jedis.JedisCluster
import redis.clients.jedis.StreamEntryID
import redis.clients.jedis.Transaction
import redis.clients.jedis.params.ScanParams
import redis.clients.jedis.params.SetParams
import redis.clients.jedis.params.XAutoClaimParams
import redis.clients.jedis.params.XReadGroupParams
import redis.clients.jedis.resps.ScanResult
import redis.clients.jedis.resps.StreamEntry
import redis.clients.jedis.resps.Tuple
/**
* Implements RedisService for redis cluster
*
* @author Munish Chouhan <[email protected]>
*/
@Slf4j
@Singleton
@Requires(property = 'redis.uri')
@Requires(property = "redis.mode", value = "cluster")
@CompileStatic
class RedisServiceClusterImpl implements RedisService {

@Inject
private JedisCluster cluster

@Override
String get(String key) {
return cluster.get(key)
}

@Override
long hincrBy(String key, String field, long value) {
return cluster.hincrBy(key, field, value)
}

@Override
Long hget(String key, String field) {
return cluster.hget(key, field) ? cluster.hget(key, field).toLong() : null
}

@Override
ScanResult<Map.Entry<String, String>> hscan(String key, String cursor, ScanParams params) {
return cluster.hscan(key, cursor, params)
}

@Override
String set(String key, String value, SetParams params) {
return cluster.set(key, value, params)
}

@Override
Transaction multi() throws TimeoutException {
return cluster.multi()
}

@Override
long lpush(String target, String message) {
return cluster.lpush(target, message)
}

@Override
String rpop(String target) {
return cluster.rpop(target)
}

@Override
String brpop(double timeout, String target) {
return cluster.brpop(timeout, target)
}

@Override
String xgroupCreate(String key, String groupName, StreamEntryID id, boolean makeStream) {
return cluster.xgroupCreate(key, groupName, id, makeStream)
}

@Override
StreamEntryID xadd(String key, StreamEntryID id, Map<String, String> hash) {
return cluster.xadd(key, id, hash)
}

@Override
Map.Entry<StreamEntryID, List<StreamEntry>> xautoclaim(String key, String group, String consumerName, long minIdleTime, StreamEntryID start, XAutoClaimParams params) {
return cluster.xautoclaim(key, group, consumerName, minIdleTime, start, params)
}

@Override
List<Map.Entry<String, List<StreamEntry>>> xreadGroup(String groupName, String consumer, XReadGroupParams xReadGroupParams, Map<String, StreamEntryID> streams) {
return cluster.xreadGroup(groupName, consumer, xReadGroupParams, streams)
}

@Override
long zadd(String key, double score, String member) {
return cluster.zadd(key, score, member)
}

@Override
Object eval(String script, int keyCount, String... params) {
return cluster.eval(script, keyCount, params)
}

@Override
List<Tuple> zrangeByScoreWithScores(String key, double min, double max, int offset, int count) {
return cluster.zrangeByScoreWithScores(key, min, max, offset, count)
}

@Override
long del(String key) {
return cluster.del(key)
}

@Override
String flushAll() {
return cluster.flushAll()
}
}
Loading
Loading