From fc9474f1a8dc88725306c70ae6796720f6a7da77 Mon Sep 17 00:00:00 2001 From: Alex Mihailov Date: Wed, 3 Apr 2024 11:00:30 +0300 Subject: [PATCH] Support RedisClientMasterSlaves --- doc/20-configuration.md | 70 +++++++++-- src/main/resources/reference.conf | 46 ++++++- .../redis/configuration/RedisInstance.scala | 40 ++++++ .../configuration/RedisInstanceProvider.scala | 22 ++++ .../cache/redis/connector/RedisCommands.scala | 60 ++++++++- src/test/resources/docker-compose.yml | 14 +++ .../RedisInstanceManagerSpec.scala | 35 ++++++ .../connector/RedisMasterSlavesSpec.scala | 118 ++++++++++++++++++ 8 files changed, 388 insertions(+), 17 deletions(-) create mode 100644 src/test/resources/docker-compose.yml create mode 100644 src/test/scala/play/api/cache/redis/connector/RedisMasterSlavesSpec.scala diff --git a/doc/20-configuration.md b/doc/20-configuration.md index 9b4311d..b3ebfeb 100644 --- a/doc/20-configuration.md +++ b/doc/20-configuration.md @@ -10,7 +10,7 @@ There are several features supported in the configuration, they are discussed be This implementation supports both standalone and cluster instances. By default, the standalone mode is enabled. It is configured like this: -``` +```hocon play.cache.redis { host: localhost # redis server: port @@ -26,7 +26,7 @@ This implementation supports both standalone and cluster instances. By default, To enable cluster mode instead, use `source` property. Valid values are `standalone` (default), `cluster`, `connection-string`, and `custom`. For more details, see below. Example of cluster settings: -``` +```hocon play.cache.redis { # enable cluster mode source: cluster @@ -52,7 +52,7 @@ play.cache.redis { Some platforms such as Amazon AWS use a single DNS record to define a whole cluster. Such a domain name resolves to multiple IP addresses, which are nodes of a cluster. -``` +```hocon play.cache.redis { instances { play { @@ -68,7 +68,7 @@ play.cache.redis { Use `source: sentinel` to enable sentinel mode. Required parameters are `master_group_name: ...` and `sentinels: []`. An example of sentinel settings: -``` +```hocon play.cache.redis { source: sentinel @@ -101,6 +101,50 @@ play.cache.redis { } ``` +## Master-Slaves + +Use `source: master-slaves` to enable master-slaves mode. +In this mode write only to the master node and read from one of slaves node. +Required parameters are `master: {...}` and `slaves: []`. An example of master-slaves settings: + +```hocon +play.cache.redis { + source: master-slaves + + # username to your redis hosts (optional) + username: some-username + # password to your redis hosts, use if not specified for a specific node (optional) + password: "my-password" + # number of your redis database, use if not specified for a specific node (optional) + database: 1 + + # master node + master: { + host: "localhost" + port: 6380 + # number of your redis database on master (optional) + database: 1 + # username on master host (optional) + username: some-username + # password on master host (optional) + password: something + } + # slave nodes + slaves: [ + { + host: "localhost" + port: 6381 + # number of your redis database on slave (optional) + database: 1 + # username on slave host (optional) + username: some-username + # password on slave host (optional) + password: something + } + ] +} +``` + ## Named caches Play framework supports [named caches](https://www.playframework.com/documentation/2.6.x/ScalaCache#Accessing-different-caches) through a qualifier. For a simplicity, the default cache is also exposed without a qualifier to ease the access. This feature can be disabled by `bind-default` property, which defaults to true. The name of the default cache is defined in `default-cache` property, which defaults to `play` to keep consistency with Play framework. @@ -307,7 +351,7 @@ each instance uses `lazy` policy. ## Running in different environments -This module can run in various environments, from the localhost through the Heroku to your own premise. Each of these has a possibly different configuration. For this purpose, there is a `source` property accepting 4 values: `standalone` (default), `cluster`, `connection-string`, and `custom`. +This module can run in various environments, from the localhost through the Heroku to your own premise. Each of these has a possibly different configuration. For this purpose, there is a `source` property accepting 4 values: `standalone` (default), `cluster`, `connection-string`, `master-slaves` and `custom`. The `standalone` and `cluster` options are already explained. The latter two simplify the use in environments, where the connection cannot be written into the configuration file up front. @@ -366,11 +410,11 @@ configuration, see the [official Pekko documentation](https://pekko.apache.org/d ### Instance-specific (can be locally overridden) -| Key | Type | Default | Description | -|----------------------------------------------------------|---------:|-------------------------------------:|-------------------------------------------------------------------------------------------------------------------------| -| [play.cache.redis.source](#standalone-vs-cluster) | String | `standalone` | Defines the source of the configuration. Accepted values are `standalone`, `cluster`, `connection-string`, and `custom` | -| [play.cache.redis.sync-timeout](#timeout) | Duration | `1s` | conversion timeout applied by `SyncAPI` to convert `Future[T]` to `T` | -| [play.cache.redis.redis-timeout](#timeout) | Duration | `null` | waiting for the response from redis server | -| [play.cache.redis.prefix](#namespace-prefix) | String | `null` | optional namespace, i.e., key prefix | -| play.cache.redis.dispatcher | String | `pekko.actor.default-dispatcher` | Pekko actor | -| [play.cache.redis.recovery](#recovery-policy) | String | `log-and-default` | Defines behavior when command execution fails. For accepted values and more see | +| Key | Type | Default | Description | +|----------------------------------------------------------|---------:|-------------------------------------:|-----------------------------------------------------------------------------------------------------------------------------------------| +| [play.cache.redis.source](#standalone-vs-cluster) | String | `standalone` | Defines the source of the configuration. Accepted values are `standalone`, `cluster`, `connection-string`, `master-slaves` and `custom` | +| [play.cache.redis.sync-timeout](#timeout) | Duration | `1s` | conversion timeout applied by `SyncAPI` to convert `Future[T]` to `T` | +| [play.cache.redis.redis-timeout](#timeout) | Duration | `null` | waiting for the response from redis server | +| [play.cache.redis.prefix](#namespace-prefix) | String | `null` | optional namespace, i.e., key prefix | +| play.cache.redis.dispatcher | String | `pekko.actor.default-dispatcher` | Pekko actor | +| [play.cache.redis.recovery](#recovery-policy) | String | `log-and-default` | Defines behavior when command execution fails. For accepted values and more see | diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 00c526f..2e06c82 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -147,7 +147,49 @@ play.cache.redis { # # note: To consider a 'connection-string' variable, set 'source' # # to 'connection-string'. # source: connection-string - # } + # + # ########################## + # # Master-Slaves mode + # ########################## + # + # # master node, required config + # master: { + # # required string, defining a host the master is running on + # host: localhost + # # required integer, defining a port the master is running on + # port: 6379 + # # number of your redis database on master (optional) + # database: 1 + # # optional string, defines a username to use with "redis" as a fallback + # username: null + # # password on master host (optional) + # password: something + # } + # + # # list of slaves nodes is either []. + # slaves: [ + # { + # # required string, defining a host the slave is running on + # host: localhost + # # required integer, defining a port the slave is running on + # port: 6379 + # # number of your redis database on slave (optional) + # database: 1 + # # optional string, defines a username to use with "redis" as a fallback + # username: null + # # password on slave host (optional) + # password: something + # } + # ] + # + # # number of your redis database, use if not specified for a node (optional) + # database: 1 + # # optional string, defines a username to use with "redis" as a fallback + # username: null + # # password to your redis hosts, use if not specified for a node (optional) + # password: something + # # to enable the master-slaves, set 'source' variable to 'master-slaves' + # source: master-slaves # } # configuration source. This library supports multiple types of @@ -163,6 +205,8 @@ play.cache.redis { # - 'sentinel' mode indicates use of 'sentinels' variable defining nodes # - 'connection-string' mode is usually used with PaaS as setup by the # environment. It consideres 'connection-string' property. + # - 'master-slaves' master-slave mode is used to write only to the master node + # and read from one of slaves node. It consideres 'master-slaves' property. # - 'custom' indicates that the user supplies his own RedisInstance configuration # # Default value is 'standalone' diff --git a/src/main/scala/play/api/cache/redis/configuration/RedisInstance.scala b/src/main/scala/play/api/cache/redis/configuration/RedisInstance.scala index 41cb917..dc35b90 100644 --- a/src/main/scala/play/api/cache/redis/configuration/RedisInstance.scala +++ b/src/main/scala/play/api/cache/redis/configuration/RedisInstance.scala @@ -143,3 +143,43 @@ object RedisSentinel { } } + +/** + * Type of Redis Instance - a master-slaves. It encapsulates common settings of + * the master and slaves nodes. + */ +sealed trait RedisMasterSlaves extends RedisInstance { + + def master: RedisHost + def slaves: List[RedisHost] + def username: Option[String] + def password: Option[String] + def database: Option[Int] + + override def equals(obj: scala.Any): Boolean = obj match { + case that: RedisMasterSlaves => equalsAsInstance(that) && this.master === master && this.slaves === that.slaves + case _ => false + } + + /** to string */ + override def toString: String = s"MasterSlaves[master=$master, slaves=${slaves mkString ","}]" +} + +object RedisMasterSlaves { + + def apply(name: String, master: RedisHost, slaves: List[RedisHost], settings: RedisSettings, username: Option[String] = None, password: Option[String] = None, database: Option[Int] = None): RedisMasterSlaves with RedisDelegatingSettings = + create(name, master, slaves, username, password, database, settings) + + @inline + private def create(_name: String, _master: RedisHost, _slaves: List[RedisHost], _username: Option[String], _password: Option[String], _database: Option[Int], _settings: RedisSettings) = + new RedisMasterSlaves with RedisDelegatingSettings { + override val name: String = _name + override val master: RedisHost = _master + override val slaves: List[RedisHost] = _slaves + override val username: Option[String] = _username + override val password: Option[String] = _password + override val database: Option[Int] = _database + override val settings: RedisSettings = _settings + } + +} diff --git a/src/main/scala/play/api/cache/redis/configuration/RedisInstanceProvider.scala b/src/main/scala/play/api/cache/redis/configuration/RedisInstanceProvider.scala index 7d30a77..7a65d88 100644 --- a/src/main/scala/play/api/cache/redis/configuration/RedisInstanceProvider.scala +++ b/src/main/scala/play/api/cache/redis/configuration/RedisInstanceProvider.scala @@ -58,6 +58,8 @@ private[configuration] object RedisInstanceProvider extends RedisConfigInstanceL case "aws-cluster" => RedisInstanceAwsCluster // required static configuration of the sentinel using application.conf case "sentinel" => RedisInstanceSentinel + // required static configuration of the master-slaves using application.conf + case "master-slaves" => RedisInstanceMasterSlaves // required possibly environmental configuration of the standalone instance case "connection-string" => RedisInstanceEnvironmental // supplied custom configuration @@ -161,6 +163,26 @@ private[configuration] object RedisInstanceSentinel extends RedisConfigInstanceL } +/** Statically configures redis master-slaves. */ +private[configuration] object RedisInstanceMasterSlaves extends RedisConfigInstanceLoader[RedisInstanceProvider] { + + import JavaCompatibilityBase._ + import RedisConfigLoader._ + + def load(config: Config, path: String, instanceName: String)(implicit defaults: RedisSettings) = new ResolvedRedisInstance( + RedisMasterSlaves.apply( + name = instanceName, + master = RedisHost.load(config.getConfig(path / "master")), + slaves = config.getConfigList(path / "slaves").asScala.map(config => RedisHost.load(config)).toList, + username = config.getOption(path / "username", _.getString), + password = config.getOption(path / "password", _.getString), + database = config.getOption(path / "database", _.getInt), + settings = RedisSettings.withFallback(defaults).load(config, path), + ), + ) + +} + /** * This binder indicates that the user provides his own configuration of this * named cache. diff --git a/src/main/scala/play/api/cache/redis/connector/RedisCommands.scala b/src/main/scala/play/api/cache/redis/connector/RedisCommands.scala index c90b33b..cff0033 100644 --- a/src/main/scala/play/api/cache/redis/connector/RedisCommands.scala +++ b/src/main/scala/play/api/cache/redis/connector/RedisCommands.scala @@ -17,9 +17,10 @@ import scala.concurrent.duration.FiniteDuration private[connector] class RedisCommandsProvider(instance: RedisInstance)(implicit system: ActorSystem, lifecycle: ApplicationLifecycle) extends Provider[RedisCommands] { lazy val get: RedisCommands = instance match { - case cluster: RedisCluster => new RedisCommandsCluster(cluster).get - case standalone: RedisStandalone => new RedisCommandsStandalone(standalone).get - case sentinel: RedisSentinel => new RedisCommandsSentinel(sentinel).get + case cluster: RedisCluster => new RedisCommandsCluster(cluster).get + case standalone: RedisStandalone => new RedisCommandsStandalone(standalone).get + case sentinel: RedisSentinel => new RedisCommandsSentinel(sentinel).get + case masterSlaves: RedisMasterSlaves => new RedisCommandsMasterSlaves(masterSlaves).get } } @@ -184,3 +185,56 @@ private[connector] class RedisCommandsSentinel(configuration: RedisSentinel)(imp // $COVERAGE-ON$ } + +/** + * Creates a connection to master and slaves nodes. + * + * @param lifecycle + * application lifecycle to trigger on stop hook + * @param configuration + * configures master-slaves + * @param system + * actor system + */ +private[connector] class RedisCommandsMasterSlaves(configuration: RedisMasterSlaves)(implicit system: ActorSystem, val lifecycle: ApplicationLifecycle) extends Provider[RedisCommands] with AbstractRedisCommands { + import HostnameResolver._ + + val client: RedisClientMasterSlaves with RedisRequestTimeout = new RedisClientMasterSlaves( + master = RedisServer( + host = configuration.master.host.resolvedIpAddress, + port = configuration.master.port, + username = if (configuration.master.username.isEmpty) configuration.username else configuration.master.username, + password = if (configuration.master.password.isEmpty) configuration.password else configuration.master.password, + db = if (configuration.master.database.isEmpty) configuration.database else configuration.master.database, + ), + slaves = configuration.slaves.map { case RedisHost(host, port, db, username, password) => + RedisServer( + host.resolvedIpAddress, + port, + if (username.isEmpty) configuration.username else username, + if (password.isEmpty) configuration.password else password, + if (db.isEmpty) configuration.database else db, + ) + }, + ) with RedisRequestTimeout { + + protected val timeout: Option[FiniteDuration] = configuration.timeout.redis + + implicit protected val scheduler: Scheduler = system.scheduler + + override def send[T](redisCommand: RedisCommand[? <: protocol.RedisReply, T]): Future[T] = super.send(redisCommand) + } + + // $COVERAGE-OFF$ + def start(): Unit = + log.info(s"Redis master-slaves cache actor started. It is connected to ${configuration.toString}") + + def stop(): Future[Unit] = Future successful { + log.info("Stopping the redis master-slaves cache actor ...") + client.masterClient.stop() + client.slavesClients.stop() + log.info("Redis master-slaves cache stopped.") + } + // $COVERAGE-ON$ + +} diff --git a/src/test/resources/docker-compose.yml b/src/test/resources/docker-compose.yml new file mode 100644 index 0000000..8840e8a --- /dev/null +++ b/src/test/resources/docker-compose.yml @@ -0,0 +1,14 @@ +version: '3.7' +services: + redis-master: + image: redis:latest + hostname: redis-master + ports: + - '6379:6379' + + redis-slave: + image: redis:latest + hostname: redis-slave + ports: + - '6479:6379' + command: redis-server --slaveof redis-master 6379 diff --git a/src/test/scala/play/api/cache/redis/configuration/RedisInstanceManagerSpec.scala b/src/test/scala/play/api/cache/redis/configuration/RedisInstanceManagerSpec.scala index 4d23dca..ad27122 100644 --- a/src/test/scala/play/api/cache/redis/configuration/RedisInstanceManagerSpec.scala +++ b/src/test/scala/play/api/cache/redis/configuration/RedisInstanceManagerSpec.scala @@ -216,6 +216,41 @@ class RedisInstanceManagerSpec extends UnitSpec with ImplicitOptionMaterializati } + "master-slaves mode" in new TestCase { + + override protected def hocon: String = + """ + |play.cache.redis { + | instances { + | play { + | master: { host: "localhost", port: 6380 } + | slaves: [ + | { host: "localhost", port: 6381 } + | { host: "localhost", port: 6382 } + | ] + | password: "my-password" + | database: 1 + | source: master-slaves + | } + | } + |} + """ + + private def node(port: Int) = RedisHost(localhost, port) + + manager mustEqual RedisInstanceManagerTest(defaultCacheName)( + RedisMasterSlaves( + name = defaultCacheName, + master = node(6380), + slaves = node(6381) :: node(6382) :: Nil, + settings = defaultsSettings.copy(source = "master-slaves"), + password = "my-password", + database = 1, + ), + ) + + } + "custom mode" in new TestCase { override protected def hocon: String = diff --git a/src/test/scala/play/api/cache/redis/connector/RedisMasterSlavesSpec.scala b/src/test/scala/play/api/cache/redis/connector/RedisMasterSlavesSpec.scala new file mode 100644 index 0000000..a518331 --- /dev/null +++ b/src/test/scala/play/api/cache/redis/connector/RedisMasterSlavesSpec.scala @@ -0,0 +1,118 @@ +package play.api.cache.redis.connector + +import com.dimafeng.testcontainers.{Container, DockerComposeContainer, ExposedService} +import org.apache.pekko.actor.ActorSystem +import org.scalatest.BeforeAndAfterAll +import org.testcontainers.containers.wait.strategy.Wait +import play.api.cache.redis.configuration.{RedisHost, RedisMasterSlaves, RedisSettings} +import play.api.cache.redis.impl.{LazyInvocation, RedisRuntime} +import play.api.cache.redis.test.{Helpers, IntegrationSpec, StoppableApplication} +import play.api.cache.redis.{LogAndFailPolicy, RedisConnector} + +import java.io.File +import scala.concurrent.duration.{DurationInt, FiniteDuration} +import scala.concurrent.{ExecutionContext, Future} + +class RedisMasterSlavesSpec extends IntegrationSpec with BeforeAndAfterAll { + + private val host = "localhost" + private val masterPort = 6379 + private val slavePort = 6479 + + private val containerDef = DockerComposeContainer.Def( + new File("src/test/resources/docker-compose.yml"), + tailChildContainers = true, + exposedServices = Seq( + ExposedService("redis-master", masterPort, Wait.forLogMessage(".*Ready to accept connections tcp.*\\n", 1)), + ExposedService("redis-slave", slavePort, Wait.forLogMessage(".*MASTER <-> REPLICA sync: Finished with success.*\\n", 1)), + ), + ) + + private def startContainer = containerDef.start() + + private lazy val container: Container = startContainer + + override protected def beforeAll(): Unit = container + + override protected def afterAll(): Unit = container.stop() + + override protected def testTimeout: FiniteDuration = 60.seconds + + test("pong on ping") { connector => + for { + _ <- connector.ping().assertingSuccess + } yield Passed + } + + test("miss on get") { connector => + for { + _ <- connector.get[String]("miss-on-get").assertingEqual(None) + } yield Passed + } + + test("hit after set") { connector => + for { + _ <- connector.set("hit-after-set", "value").assertingEqual(true) + _ <- connector.get[String]("hit-after-set").assertingEqual(Some("value")) + } yield Passed + } + + test("ignore set if not exists when already defined") { connector => + for { + _ <- connector.set("if-not-exists-when-exists", "previous").assertingEqual(true) + _ <- connector.set("if-not-exists-when-exists", "value", ifNotExists = true).assertingEqual(false) + _ <- connector.get[String]("if-not-exists-when-exists").assertingEqual(Some("previous")) + } yield Passed + } + + test("perform set if not exists when undefined") { connector => + for { + _ <- connector.get[String]("if-not-exists").assertingEqual(None) + _ <- connector.set("if-not-exists", "value", ifNotExists = true).assertingEqual(true) + _ <- connector.get[String]("if-not-exists").assertingEqual(Some("value")) + _ <- connector.set("if-not-exists", "other", ifNotExists = true).assertingEqual(false) + _ <- connector.get[String]("if-not-exists").assertingEqual(Some("value")) + } yield Passed + } + + test("perform set if not exists with expiration") { connector => + for { + _ <- connector.get[String]("if-not-exists-with-expiration").assertingEqual(None) + _ <- connector.set("if-not-exists-with-expiration", "value", 300.millis, ifNotExists = true).assertingEqual(true) + _ <- connector.get[String]("if-not-exists-with-expiration").assertingEqual(Some("value")) + // wait until the first duration expires + _ <- Future.after(700.millis, ()) + _ <- connector.get[String]("if-not-exists-with-expiration").assertingEqual(None) + } yield Passed + } + + def test(name: String)(f: RedisConnector => Future[Assertion]): Unit = + name in { + implicit val system: ActorSystem = ActorSystem("test", classLoader = Some(getClass.getClassLoader)) + implicit val runtime: RedisRuntime = RedisRuntime("master-slaves", syncTimeout = 5.seconds, ExecutionContext.global, new LogAndFailPolicy, LazyInvocation) + implicit val application: StoppableApplication = StoppableApplication(system) + val serializer = new PekkoSerializerImpl(system) + + lazy val masterSlavesInstance = RedisMasterSlaves( + name = "master-slaves", + master = RedisHost(host, masterPort), + slaves = List(RedisHost(host, slavePort)), + settings = RedisSettings.load( + config = Helpers.configuration.default.underlying, + path = "play.cache.redis", + ), + ) + + application.runAsyncInApplication { + val connector: RedisConnector = new RedisConnectorProvider(masterSlavesInstance, serializer).get + for { + // initialize the connector by flushing the database + keys <- connector.matching("*") + _ <- Future.sequence(keys.map(connector.remove(_))) + // run the test + _ <- f(connector) + } yield Passed + } + } + +}