diff --git a/CHANGELOG.md b/CHANGELOG.md index 51b21873..02238d5d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,11 @@ ## Changelog +### [:link: 5.0.0](https://github.com/KarelCemus/play-redis/tree/5.0.0) + +Switched underlying connector from Rediscala, which is no longer maintained, +to Lettuce. [#301](https://github.com/KarelCemus/play-redis/pull/301) + ### [:link: 4.1.0](https://github.com/KarelCemus/play-redis/tree/4.1.0) Provided support for MasterSlave configuration, which writes data to the master, diff --git a/build.sbt b/build.sbt index d18e0873..888a2800 100644 --- a/build.sbt +++ b/build.sbt @@ -11,24 +11,24 @@ description := "Redis cache plugin for the Play framework 2" organization := "com.github.karelcemus" -crossScalaVersions := Seq("2.13.12", "3.3.1") +crossScalaVersions := Seq("2.13.14", "3.3.3") scalaVersion := crossScalaVersions.value.head -playVersion := "3.0.1" +playVersion := "3.0.2" libraryDependencies ++= Seq( // play framework cache API - "org.playframework" %% "play-cache" % playVersion.value % Provided, + "org.playframework" %% "play-cache" % playVersion.value % Provided, // redis connector - "io.github.rediscala" %% "rediscala" % "1.14.0-pekko", + "io.lettuce" % "lettuce-core" % "6.3.2.RELEASE", // test framework with mockito extension - "org.scalatest" %% "scalatest" % "3.2.18" % Test, - "org.scalamock" %% "scalamock" % "6.0.0-M2" % Test, + "org.scalatest" %% "scalatest" % "3.2.18" % Test, + "org.scalamock" %% "scalamock" % "6.0.0" % Test, // test module for play framework - "org.playframework" %% "play-test" % playVersion.value % Test, + "org.playframework" %% "play-test" % playVersion.value % Test, // to run integration tests - "com.dimafeng" %% "testcontainers-scala-core" % "0.41.2" % Test, + "com.dimafeng" %% "testcontainers-scala-core" % "0.41.3" % Test, ) resolvers ++= Seq( @@ -43,16 +43,19 @@ scalacOptions ++= { if (scalaVersion.value.startsWith("2.")) Seq("-Ywarn-unused") else Seq.empty } -enablePlugins(CustomReleasePlugin) +ThisBuild / version := "4.0.2" + +//enablePlugins(CustomReleasePlugin) // exclude from tests coverage coverageExcludedFiles := ".*exceptions.*" +Test / fork := true Test / test := (Test / testOnly).toTask(" * -- -l \"org.scalatest.Ignore\"").value +Test / testOptions += Tests.Argument(TestFrameworks.ScalaTest, "-oF") -semanticdbEnabled := true -semanticdbVersion := scalafixSemanticdb.revision -ThisBuild / scalafixScalaBinaryVersion := CrossVersion.binaryScalaVersion(scalaVersion.value) +semanticdbEnabled := true +semanticdbVersion := scalafixSemanticdb.revision wartremoverWarnings ++= Warts.allBut( Wart.Any, diff --git a/project/CustomReleasePlugin.scala b/project/CustomReleasePlugin.scala index 110a3c3d..d0d3d7fb 100644 --- a/project/CustomReleasePlugin.scala +++ b/project/CustomReleasePlugin.scala @@ -60,7 +60,7 @@ object CustomReleasePlugin extends AutoPlugin { val nextVersion = st.extracted.runTask(releaseVersion, st)._2(currentV) val bump = Version.Bump.Minor - val suggestedReleaseV: String = Version(nextVersion).map(_.bump(bump).string).getOrElse(versionFormatError(currentV)) + val suggestedReleaseV: String = Version(nextVersion).map(_.bump(bump).unapply).getOrElse(versionFormatError(currentV)) st.log.info("Press enter to use the default value") diff --git a/project/plugins.sbt b/project/plugins.sbt index fbc63766..02adc71a 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,17 +4,17 @@ resolvers += Resolver.url("scoverage-bintray", url("https://dl.bintray.com/sksam addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.4") // code coverage and uploader of the coverage results into the coveralls.io -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.9") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.12") addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.11") // library release addSbtPlugin("com.github.sbt" % "sbt-git" % "2.0.1") -addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.9.21") +addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "3.10.0") addSbtPlugin("com.github.sbt" % "sbt-pgp" % "2.2.1") -addSbtPlugin("com.github.sbt" % "sbt-release" % "1.1.0") +addSbtPlugin("com.github.sbt" % "sbt-release" % "1.4.0") // linters -addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.0") +addSbtPlugin("org.typelevel" % "sbt-tpolecat" % "0.5.1") addSbtPlugin("org.wartremover" % "sbt-wartremover" % "3.1.6") -addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.6") -addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.11.1") +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.2") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.12.1") diff --git a/src/main/scala/play/api/cache/redis/connector/RedisClientFactory.scala b/src/main/scala/play/api/cache/redis/connector/RedisClientFactory.scala new file mode 100644 index 00000000..674a9c39 --- /dev/null +++ b/src/main/scala/play/api/cache/redis/connector/RedisClientFactory.scala @@ -0,0 +1,164 @@ +package play.api.cache.redis.connector + +import io.lettuce.core._ +import io.lettuce.core.api.StatefulConnection +import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection +import io.lettuce.core.resource.{ClientResources, NettyCustomizer} +import io.netty.channel.{Channel, ChannelDuplexHandler, ChannelHandlerContext} +import io.netty.handler.timeout.{IdleStateEvent, IdleStateHandler} +import play.api.cache.redis.configuration.RedisHost + +import java.time.{Duration => JavaDuration} +import scala.concurrent.duration.FiniteDuration + +private object RedisClientFactory { + + implicit class RichRedisConnection[Connection <: StatefulConnection[String, String]]( + private val thiz: Connection, + ) extends AnyVal { + + def withTimeout(maybeTimeout: Option[FiniteDuration]): Connection = { + maybeTimeout.foreach { timeout => + thiz.setTimeout(JavaDuration.ofNanos(timeout.toNanos)) + } + thiz + } + + } + + implicit class RichRedisMasterReplicaConnection[Connection <: StatefulRedisMasterReplicaConnection[String, String]]( + private val thiz: Connection, + ) extends AnyVal { + + def withReadFrom(readFrom: ReadFrom): Connection = { + thiz.setReadFrom(readFrom) + thiz + } + + } + + implicit class RichRedisURIBuilder[Builder <: RedisURI.Builder]( + private val thiz: Builder, + ) extends AnyVal { + + def withDatabase(database: Option[Int]): Builder = { + thiz.withDatabase(database.getOrElse(0)) // mutable + thiz + } + + def withCredentials( + username: Option[String], + password: Option[String], + ): Builder = + (username, password) match { + case (None, None) => + thiz + case (Some(username), Some(password)) => + thiz.withAuthentication(username, password) // mutable + thiz + case (None, Some(password)) => + thiz.withPassword(password.toCharArray) // mutable + thiz + case (Some(username), None) => + throw new IllegalArgumentException(s"Username is set to $username but password is missing") + } + + def withSentinels(sentinels: Seq[RedisHost]): Builder = { + sentinels.foreach { + case RedisHost(host, port, _, _, None) => + thiz.withSentinel(host, port) // mutable + case RedisHost(host, port, _, _, Some(password)) => + thiz.withSentinel(host, port, password) // mutable + } + thiz + } + + } + + implicit class RichClientOptionsBuilder[T <: ClientOptions.Builder]( + private val thiz: T, + ) extends AnyVal { + + def withDefaults(): T = { + // mutable calls + thiz.autoReconnect(true) // Auto-Reconnect + thiz.pingBeforeActivateConnection(true) // PING before activating connection + thiz + } + + def withTimeout(maybeTimeout: Option[FiniteDuration]): T = { + val options = maybeTimeout match { + case Some(timeout) => + TimeoutOptions.builder() + .timeoutCommands(true) + .fixedTimeout(JavaDuration.ofNanos(timeout.toNanos)) + .build() + case None => + TimeoutOptions.builder().build() + } + + thiz.timeoutOptions(options) // mutable call + thiz + } + + } + + implicit class RichRedisClient[Client <: AbstractRedisClient]( + private val thiz: Client, + ) extends AnyVal { + + def withOptions[Options <: ClientOptions]( + f: Client => Options => Unit, + )( + options: Options, + ): Client = { + f(thiz)(options) + thiz + } + + } + + def newClientResources( + ioThreadPoolSize: Int = 8, + computationThreadPoolSize: Int = 8, + afterChannelTime: Int = 60 * 4, + ): ClientResources = + ClientResources.builder + // The number of threads in the I/O thread pools. + // The number defaults to the number of available processors that + // the runtime returns (which, as a well-known fact, sometimes does + // not represent the actual number of processors). Every thread + // represents an internal event loop where all I/O tasks are run. + // The number does not reflect the actual number of I/O threads because + // the client requires different thread pools for Network (NIO) and + // Unix Domain Socket (EPoll) connections. The minimum I/O threads are 3. + // A pool with fewer threads can cause undefined behavior. + .ioThreadPoolSize(ioThreadPoolSize) + // The number of threads in the computation thread pool. The number + // defaults to the number of available processors that the runtime returns + // (which, as a well-known fact, sometimes does not represent the actual + // number of processors). Every thread represents an internal event loop + // where all computation tasks are run. The minimum computation threads + // are 3. A pool with fewer threads can cause undefined behavior. + .computationThreadPoolSize(computationThreadPoolSize) + // Maintain connection to Redis every four minutes + .nettyCustomizer( + new NettyCustomizer() { + + @SuppressWarnings(Array("org.wartremover.warts.IsInstanceOf")) + override def afterChannelInitialized(channel: Channel): Unit = { + val _ = channel.pipeline.addLast(new IdleStateHandler(afterChannelTime, 0, 0)) + val _ = channel.pipeline.addLast(new ChannelDuplexHandler() { + @throws[Exception] + override def userEventTriggered(ctx: ChannelHandlerContext, evt: Object): Unit = + if (evt.isInstanceOf[IdleStateEvent]) { + val _ = ctx.disconnect().sync() + } + }) + } + + }, + ) + .build + +} diff --git a/src/main/scala/play/api/cache/redis/connector/RedisCommands.scala b/src/main/scala/play/api/cache/redis/connector/RedisCommands.scala index cff0033c..f615dcf1 100644 --- a/src/main/scala/play/api/cache/redis/connector/RedisCommands.scala +++ b/src/main/scala/play/api/cache/redis/connector/RedisCommands.scala @@ -1,22 +1,35 @@ package play.api.cache.redis.connector -import org.apache.pekko.actor.{ActorSystem, Scheduler} +import io.lettuce.core.cluster.ClusterTopologyRefreshOptions.RefreshTrigger +import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands +import io.lettuce.core.cluster.{ClusterClientOptions, ClusterTopologyRefreshOptions, RedisClusterClient} +import io.lettuce.core.codec.StringCodec +import io.lettuce.core.masterreplica.MasterReplica +import io.lettuce.core.resource.ClientResources +import io.lettuce.core.{AbstractRedisClient, ClientOptions, ReadFrom, RedisClient, RedisURI} import play.api.Logger import play.api.cache.redis.configuration._ import play.api.inject.ApplicationLifecycle -import redis.{RedisClient => RedisStandaloneClient, RedisCluster => RedisClusterClient, _} +import java.time.Duration +import java.util.concurrent.TimeUnit import javax.inject._ -import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters.SeqHasAsJava +import scala.jdk.FutureConverters.CompletionStageOps /** * Dispatches a provider of the redis commands implementation. Use with Guice * or some other DI container. */ -private[connector] class RedisCommandsProvider(instance: RedisInstance)(implicit system: ActorSystem, lifecycle: ApplicationLifecycle) extends Provider[RedisCommands] { - - lazy val get: RedisCommands = instance match { +private[connector] class RedisCommandsProvider( + instance: RedisInstance, +)(implicit + lifecycle: ApplicationLifecycle, + executionContext: ExecutionContext, +) extends Provider[RedisClusterAsyncCommands[String, String]] { + + lazy val get: RedisClusterAsyncCommands[String, String] = instance match { case cluster: RedisCluster => new RedisCommandsCluster(cluster).get case standalone: RedisStandalone => new RedisCommandsStandalone(standalone).get case sentinel: RedisSentinel => new RedisCommandsSentinel(sentinel).get @@ -25,28 +38,59 @@ private[connector] class RedisCommandsProvider(instance: RedisInstance)(implicit } -private[connector] trait AbstractRedisCommands { +abstract private[connector] class AbstractRedisCommands( + protected val name: String, +)(implicit + executionContext: ExecutionContext, + lifecycle: ApplicationLifecycle, +) { /** logger instance */ protected def log: Logger = Logger("play.api.cache.redis") - def lifecycle: ApplicationLifecycle + protected def connectionString: String + + protected lazy val resources: ClientResources = RedisClientFactory.newClientResources() + + protected def client: AbstractRedisClient /** an implementation of the redis commands */ - def client: RedisCommands + protected def newConnection: RedisConnection - lazy val get: RedisCommands = client + private lazy val connection = newConnection - /** action invoked on the start of the actor */ - def start(): Unit + lazy val get: RedisClusterAsyncCommands[String, String] = { + // start the connector + start() + // listen on system stop + lifecycle.addStopHook(() => stop()) + // make the client + connection.api + } - /** stops the actor */ - def stop(): Future[Unit] + // $COVERAGE-OFF$ + /** action invoked on the start of the redis client */ + def start(): Unit = + log.info(s"Starting $name. It will connect to $connectionString") + + /** stops the client */ + final def stop(): Future[Unit] = + for { + _ <- Future.unit + _ = log.info(s"Stopping $name ...") + _ <- connection.close().recover { case ex => + log.warn("Error while closing the redis connection", ex) + } + _ <- client.shutdownAsync().asScala.map(_ => ()).recover { case ex => + log.warn("Error while shutting down the redis client", ex) + } + _ <- Future.apply(resources.shutdown().get()).map(_ => ()).recover { case ex => + log.warn("Error while shutting down client resources", ex) + } + _ = log.info(s"Stopped $name.") + } yield () + // $COVERAGE-ON$ - // start the connector - start() - // listen on system stop - lifecycle.addStopHook(() => stop()) } /** @@ -56,44 +100,41 @@ private[connector] trait AbstractRedisCommands { * application lifecycle to trigger on stop hook * @param configuration * configures clusters - * @param system - * actor system */ -private[connector] class RedisCommandsStandalone(configuration: RedisStandalone)(implicit system: ActorSystem, val lifecycle: ApplicationLifecycle) extends Provider[RedisCommands] with AbstractRedisCommands { - import configuration._ - - val client: RedisStandaloneClient = new RedisStandaloneClient( - host = host, - port = port, - db = database, - username = username, - password = password, - ) with FailEagerly with RedisRequestTimeout { - - protected val connectionTimeout: Option[FiniteDuration] = configuration.timeout.connection - - protected val timeout: Option[FiniteDuration] = configuration.timeout.redis - - implicit protected val scheduler: Scheduler = system.scheduler - - override def send[T](redisCommand: RedisCommand[? <: protocol.RedisReply, T]): Future[T] = super.send(redisCommand) - - override def onConnectStatus: Boolean => Unit = (status: Boolean) => connected = status - } - - // $COVERAGE-OFF$ - override def start(): Unit = database.fold { - log.info(s"Redis cache actor started. It is connected to $host:$port") - } { database => - log.info(s"Redis cache actor started. It is connected to $host:$port?database=$database") - } +private[connector] class RedisCommandsStandalone( + configuration: RedisStandalone, +)(implicit + executionContext: ExecutionContext, + lifecycle: ApplicationLifecycle, +) extends AbstractRedisCommands("standalone redis") + with Provider[RedisClusterAsyncCommands[String, String]] { + + import RedisClientFactory._ + + private val redisUri: RedisURI = + RedisURI.Builder + .redis(configuration.host) + .withPort(configuration.port) + .withDatabase(configuration.database) + .withCredentials(configuration.username, configuration.password) + .build() + + override protected def connectionString: String = + redisUri.toString + + override protected lazy val client: RedisClient = + RedisClient.create(resources, redisUri) + .withOptions(_.setOptions)( + ClientOptions.builder() + .withDefaults() + .withTimeout(configuration.timeout.redis) + .build(), + ) - override def stop(): Future[Unit] = Future successful { - log.info("Stopping the redis cache actor ...") - client.stop() - log.info("Redis cache stopped.") - } - // $COVERAGE-ON$ + override protected def newConnection: RedisConnection = + RedisConnection.fromStandalone( + client.connect().withTimeout(configuration.timeout.connection), + ) } @@ -104,42 +145,48 @@ private[connector] class RedisCommandsStandalone(configuration: RedisStandalone) * application lifecycle to trigger on stop hook * @param configuration * configures clusters - * @param system - * actor system */ -private[connector] class RedisCommandsCluster(configuration: RedisCluster)(implicit system: ActorSystem, val lifecycle: ApplicationLifecycle) extends Provider[RedisCommands] with AbstractRedisCommands { - - import HostnameResolver._ - - import configuration._ - - val client: RedisClusterClient = new RedisClusterClient( - nodes.map { case RedisHost(host, port, database, username, password) => - RedisServer(host.resolvedIpAddress, port, username, password, database) - }, - ) with RedisRequestTimeout { - - protected val timeout: Option[FiniteDuration] = configuration.timeout.redis - - implicit protected val scheduler: Scheduler = system.scheduler - } - - // $COVERAGE-OFF$ - override def start(): Unit = { - def servers: Seq[String] = nodes.map { - case RedisHost(host, port, Some(database), _, _) => s" $host:$port?database=$database" - case RedisHost(host, port, None, _, _) => s" $host:$port" +private[connector] class RedisCommandsCluster( + configuration: RedisCluster, +)(implicit + lifecycle: ApplicationLifecycle, + executionContext: ExecutionContext, +) extends AbstractRedisCommands("redis cluster") + with Provider[RedisClusterAsyncCommands[String, String]] { + + import RedisClientFactory._ + + private val redisUris: Seq[RedisURI] = + configuration.nodes.map { case RedisHost(host, port, database, password, username) => + RedisURI.Builder + .redis(host) + .withPort(port) + .withDatabase(database) + .withCredentials(username, password) + .build() } - log.info(s"Redis cluster cache actor started. It is connected to ${servers mkString ", "}") - } + override protected def connectionString: String = redisUris.map(_.toString).mkString(", ") + + override protected val client: RedisClusterClient = + RedisClusterClient.create(resources, redisUris.asJava) + .withOptions(_.setOptions)( + ClusterClientOptions.builder() + .withDefaults() + .withTimeout(configuration.timeout.redis) + .topologyRefreshOptions( + ClusterTopologyRefreshOptions.builder + .enableAdaptiveRefreshTrigger(RefreshTrigger.MOVED_REDIRECT, RefreshTrigger.PERSISTENT_RECONNECTS) + .adaptiveRefreshTriggersTimeout(Duration.ofNanos(TimeUnit.SECONDS.toNanos(30))) + .build, + ) + .build(), + ) - override def stop(): Future[Unit] = Future successful { - log.info("Stopping the redis cluster cache actor ...") - Option(client).foreach(_.stop()) - log.info("Redis cluster cache stopped.") - } - // $COVERAGE-ON$ + val newConnection: RedisConnection = + RedisConnection.fromCluster( + client.connect().withTimeout(configuration.timeout.connection), + ) } @@ -150,39 +197,43 @@ private[connector] class RedisCommandsCluster(configuration: RedisCluster)(impli * application lifecycle to trigger on stop hook * @param configuration * configures sentinels - * @param system - * actor system */ -private[connector] class RedisCommandsSentinel(configuration: RedisSentinel)(implicit system: ActorSystem, val lifecycle: ApplicationLifecycle) extends Provider[RedisCommands] with AbstractRedisCommands { - import HostnameResolver._ - - val client: SentinelMonitoredRedisClient with RedisRequestTimeout = new SentinelMonitoredRedisClient( - configuration.sentinels.map { case RedisHost(host, port, _, _, _) => - (host.resolvedIpAddress, port) - }, - master = configuration.masterGroup, - username = configuration.username, - password = configuration.password, - db = configuration.database, - ) with RedisRequestTimeout { - - protected val timeout: Option[FiniteDuration] = configuration.timeout.redis - - implicit protected val scheduler: Scheduler = system.scheduler - - override def send[T](redisCommand: RedisCommand[? <: protocol.RedisReply, T]): Future[T] = super.send(redisCommand) - } - - // $COVERAGE-OFF$ - override def start(): Unit = - log.info(s"Redis sentinel cache actor started. It is connected to ${configuration.toString}") +private[connector] class RedisCommandsSentinel( + configuration: RedisSentinel, +)(implicit + lifecycle: ApplicationLifecycle, + executionContext: ExecutionContext, +) extends AbstractRedisCommands("redis sentinel") + with Provider[RedisClusterAsyncCommands[String, String]] { + + import RedisClientFactory._ + + private val sentinel: RedisHost = + configuration.sentinels.head + + private val redisUri: RedisURI = + RedisURI.Builder + .sentinel(sentinel.host, sentinel.port) + .withDatabase(configuration.database) + .withCredentials(configuration.username, configuration.password) + .withSentinels(configuration.sentinels) + .build() + + override protected def connectionString: String = redisUri.toString + + override protected val client: RedisClient = + RedisClient.create(resources, redisUri) + .withOptions(_.setOptions)( + ClientOptions.builder() + .withDefaults() + .withTimeout(configuration.timeout.redis) + .build(), + ) - override def stop(): Future[Unit] = Future successful { - log.info("Stopping the redis sentinel cache actor ...") - client.stop() - log.info("Redis sentinel cache stopped.") - } - // $COVERAGE-ON$ + val newConnection: RedisConnection = + RedisConnection.fromStandalone( + client.connect().withTimeout(configuration.timeout.connection), + ) } @@ -193,48 +244,44 @@ private[connector] class RedisCommandsSentinel(configuration: RedisSentinel)(imp * application lifecycle to trigger on stop hook * @param configuration * configures master-slaves - * @param system - * actor system */ -private[connector] class RedisCommandsMasterSlaves(configuration: RedisMasterSlaves)(implicit system: ActorSystem, val lifecycle: ApplicationLifecycle) extends Provider[RedisCommands] with AbstractRedisCommands { - import HostnameResolver._ - - val client: RedisClientMasterSlaves with RedisRequestTimeout = new RedisClientMasterSlaves( - master = RedisServer( - host = configuration.master.host.resolvedIpAddress, - port = configuration.master.port, - username = if (configuration.master.username.isEmpty) configuration.username else configuration.master.username, - password = if (configuration.master.password.isEmpty) configuration.password else configuration.master.password, - db = if (configuration.master.database.isEmpty) configuration.database else configuration.master.database, - ), - slaves = configuration.slaves.map { case RedisHost(host, port, db, username, password) => - RedisServer( - host.resolvedIpAddress, - port, - if (username.isEmpty) configuration.username else username, - if (password.isEmpty) configuration.password else password, - if (db.isEmpty) configuration.database else db, +//noinspection DuplicatedCode +private[connector] class RedisCommandsMasterSlaves( + configuration: RedisMasterSlaves, +)(implicit + lifecycle: ApplicationLifecycle, + executionContext: ExecutionContext, +) extends AbstractRedisCommands("redis master-slaves") + with Provider[RedisClusterAsyncCommands[String, String]] { + + import RedisClientFactory._ + + private val redisUri: RedisURI = + RedisURI.Builder + .redis(configuration.master.host) + .withPort(configuration.master.port) + .withDatabase(configuration.master.database.orElse(configuration.database)) + .withCredentials( + configuration.master.username.orElse(configuration.username), + configuration.master.password.orElse(configuration.password), ) - }, - ) with RedisRequestTimeout { - - protected val timeout: Option[FiniteDuration] = configuration.timeout.redis - - implicit protected val scheduler: Scheduler = system.scheduler + .build() - override def send[T](redisCommand: RedisCommand[? <: protocol.RedisReply, T]): Future[T] = super.send(redisCommand) - } + override protected def connectionString: String = redisUri.toString - // $COVERAGE-OFF$ - def start(): Unit = - log.info(s"Redis master-slaves cache actor started. It is connected to ${configuration.toString}") + override protected val client: RedisClient = + RedisClient.create(resources) + .withOptions(_.setOptions)( + ClientOptions.builder() + .withDefaults() + .withTimeout(configuration.timeout.redis) + .build(), + ) - def stop(): Future[Unit] = Future successful { - log.info("Stopping the redis master-slaves cache actor ...") - client.masterClient.stop() - client.slavesClients.stop() - log.info("Redis master-slaves cache stopped.") - } - // $COVERAGE-ON$ + val newConnection: RedisConnection = + RedisConnection.fromStandalone( + MasterReplica.connect(client, StringCodec.UTF8, redisUri) + .withReadFrom(ReadFrom.MASTER_PREFERRED), + ) } diff --git a/src/main/scala/play/api/cache/redis/connector/RedisConnection.scala b/src/main/scala/play/api/cache/redis/connector/RedisConnection.scala new file mode 100644 index 00000000..5a23101a --- /dev/null +++ b/src/main/scala/play/api/cache/redis/connector/RedisConnection.scala @@ -0,0 +1,55 @@ +package play.api.cache.redis.connector + +import io.lettuce.core.api.StatefulRedisConnection +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection +import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands + +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.FutureConverters.CompletionStageOps + +sealed private trait RedisConnection extends Any { + + def close()(implicit ec: ExecutionContext): Future[Unit] + + def api: RedisClusterAsyncCommands[String, String] +} + +private object RedisConnection { + + final private class StandaloneConnection( + private val connection: StatefulRedisConnection[String, String], + ) extends AnyVal + with RedisConnection { + + override def close()(implicit ec: ExecutionContext): Future[Unit] = + connection.closeAsync().asScala.map(_ => ()) + + override def api: RedisClusterAsyncCommands[String, String] = + connection.async() + + } + + final private class ClusterConnection( + private val connection: StatefulRedisClusterConnection[String, String], + ) extends AnyVal + with RedisConnection { + + override def close()(implicit ec: ExecutionContext): Future[Unit] = + connection.closeAsync().asScala.map(_ => ()) + + override def api: RedisClusterAsyncCommands[String, String] = + connection.async() + + } + + def fromStandalone( + connection: StatefulRedisConnection[String, String], + ): RedisConnection = + new StandaloneConnection(connection) + + def fromCluster( + connection: StatefulRedisClusterConnection[String, String], + ): RedisConnection = + new ClusterConnection(connection) + +} diff --git a/src/main/scala/play/api/cache/redis/connector/RedisConnectorImpl.scala b/src/main/scala/play/api/cache/redis/connector/RedisConnectorImpl.scala index 0444d8de..dc039410 100644 --- a/src/main/scala/play/api/cache/redis/connector/RedisConnectorImpl.scala +++ b/src/main/scala/play/api/cache/redis/connector/RedisConnectorImpl.scala @@ -1,12 +1,16 @@ package play.api.cache.redis.connector +import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands +import io.lettuce.core.{KeyValue, RedisFuture, ScoredValue, SetArgs} import play.api.Logger import play.api.cache.redis._ -import redis._ import java.util.concurrent.TimeUnit -import scala.concurrent.Future import scala.concurrent.duration.Duration +import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters.{ListHasAsScala, MapHasAsJava, MapHasAsScala, SetHasAsScala} +import scala.jdk.FutureConverters.CompletionStageOps +import scala.jdk.OptionConverters._ import scala.reflect.ClassTag /** @@ -20,17 +24,22 @@ import scala.reflect.ClassTag * @param redis * implementation of the commands */ -private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: RedisCommands)(implicit runtime: RedisRuntime) extends RedisConnector { +private[connector] class RedisConnectorImpl( + serializer: PekkoSerializer, + redis: RedisClusterAsyncCommands[String, String], +)(implicit + runtime: RedisRuntime, +) extends RedisConnector { import ExpectedFuture._ - + import RedisConnectorImpl._ import runtime._ /** logger instance */ protected val log: Logger = Logger("play.api.cache.redis") override def get[T: ClassTag](key: String): Future[Option[T]] = - redis.get[String](key) executing "GET" withKey key expects { + redis.get(key).toScala[Option[String]] executing "GET" withKey key expects { case Some(response: String) => log.trace(s"Hit on key '$key'.") Some(decode[T](key, response)) @@ -40,17 +49,15 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: } override def mGet[T: ClassTag](keys: String*): Future[Seq[Option[T]]] = - redis.mget[String](keys: _*) executing "MGET" withKeys keys expects { - // list is always returned - case list => - keys.zip(list).map { - case (key, Some(response: String)) => - log.trace(s"Hit on key '$key'.") - Some(decode[T](key, response)) - case (key, None) => - log.debug(s"Miss on key '$key'.") - None - } + redis.mget(keys: _*).toScala[Seq[(String, Option[String])]] executing "MGET" withKeys keys expects { + _.map { + case (key, Some(response: String)) => + log.trace(s"Hit on key '$key'.") + Some(decode[T](key, response)) + case (key, None) => + log.debug(s"Miss on key '$key'.") + None + } } /** decodes the object, reports an exception if fails */ @@ -80,16 +87,23 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: * the storage */ private def doSet(key: String, value: String, expiration: Duration, ifNotExists: Boolean): Future[Boolean] = - redis.set[String]( + redis.set( key, value, - pxMilliseconds = if (expiration.isFinite) Some(expiration.toMillis) else None, - NX = ifNotExists, - ) executing "SET" withKey key andParameters s"$value${s" PX $expiration" when expiration.isFinite}${" NX" when ifNotExists}" logging { - case true if expiration.isFinite => log.debug(s"Set on key '$key' for ${expiration.toMillis} milliseconds.") - case true => log.debug(s"Set on key '$key' for infinite seconds.") - case false => log.debug(s"Set on key '$key' ignored. Condition was not met.") - } + new SetArgs() + .mapWhen(expiration.isFinite, _.px(expiration.toMillis)) + .mapWhen(ifNotExists, _.nx()), + ) + .toScala[Option[String]] + .map(_ contains "OK") + .executing("SET") + .withKey(key) + .andParameters(s"$value${s" PX $expiration" when expiration.isFinite}${" NX" when ifNotExists}") + .logging { + case true if expiration.isFinite => log.debug(s"Set on key '$key' for ${expiration.toMillis} milliseconds.") + case true => log.debug(s"Set on key '$key' for infinite seconds.") + case false => log.debug(s"Set on key '$key' ignored. Condition was not met.") + } override def mSet(keyValues: (String, Any)*): Future[Unit] = mSetUsing(mSetEternally, (), keyValues: _*) @@ -111,25 +125,25 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: /** eternally stores already encoded values into the storage */ private def mSetEternally(keyValues: (String, String)*): Future[Unit] = - redis.mset(keyValues.toMap) executing "MSET" withKeys keyValues.map(_._1) asCommand keyValues.map(_.asString).mkString(" ") logging { case _ => + redis.mset(keyValues.toMap.asJava).toScala[String] executing "MSET" withKeys keyValues.map(_._1) asCommand keyValues.map(_.asString).mkString(" ") logging { case _ => log.debug(s"Set on keys ${keyValues.map(_.key)} for infinite seconds.") } /** eternally stores already encoded values into the storage */ private def mSetEternallyIfNotExist(keyValues: (String, String)*): Future[Boolean] = - redis.msetnx(keyValues.toMap) executing "MSETNX" withKeys keyValues.map(_._1) asCommand keyValues.map(_.asString).mkString(" ") logging { + redis.msetnx(keyValues.toMap.asJava).toScala[Boolean] executing "MSETNX" withKeys keyValues.map(_._1) asCommand keyValues.map(_.asString).mkString(" ") logging { case true => log.debug(s"Set if not exists on keys ${keyValues.map(_.key) mkString " "} succeeded.") case false => log.debug(s"Set if not exists on keys ${keyValues.map(_.key) mkString " "} ignored. Some value already exists.") } override def expire(key: String, expiration: Duration): Future[Unit] = - redis.expire(key, expiration.toSeconds) executing "EXPIRE" withKey key andParameter s"$expiration" logging { + redis.expire(key, expiration.toSeconds).toScala[Boolean] executing "EXPIRE" withKey key andParameter s"$expiration" logging { case true => log.debug(s"Expiration set on key '$key'.") // expiration was set case false => log.debug(s"Expiration set on key '$key' failed. Key does not exist.") // Nothing was removed } override def expiresIn(key: String): Future[Option[Duration]] = - redis.pttl(key) executing "PTTL" withKey key expects { + redis.pttl(key).toScala[Long] executing "PTTL" withKey key expects { case -2 => log.debug(s"PTTL on key '$key' returns -2, it does not exist.") None @@ -142,7 +156,7 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: } override def matching(pattern: String): Future[Seq[String]] = - redis.keys(pattern) executing "KEYS" withKey pattern logging { case keys => + redis.keys(pattern).toScala[Seq[String]] executing "KEYS" withKey pattern logging { case keys => log.debug(s"KEYS on '$pattern' responded '${keys.mkString(", ")}'.") } @@ -151,20 +165,21 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: // the tests are in progress // $COVERAGE-OFF$ override def invalidate(): Future[Unit] = - redis.flushdb() executing "FLUSHDB" logging { case _ => + redis.flushdb().toScala[String] executing "FLUSHDB" logging { case _ => log.info("Invalidated.") // cache was invalidated } // $COVERAGE-ON$ override def exists(key: String): Future[Boolean] = - redis.exists(key) executing "EXISTS" withKey key logging { - case true => log.debug(s"Key '$key' exists.") - case false => log.debug(s"Key '$key' doesn't exist.") - } + redis.exists(key).toScala[Long] executing "EXISTS" withKey key logging { + case 0 => log.debug(s"Key '$key' doesn't exist.") + case 1 => log.debug(s"Key '$key' exists.") + case other => throw new IllegalStateException(s"Unexpected response from EXISTS $key got $other, expected is at most one occurrence.") + } map (_ > 0) override def remove(keys: String*): Future[Unit] = if (keys.nonEmpty) { // if any key to remove do it - redis.del(keys: _*) executing "DEL" withKeys keys logging { + redis.del(keys: _*).toScala[Long] executing "DEL" withKeys keys logging { // Nothing was removed case 0L => log.debug(s"Remove on keys ${keys.mkString("'", ",", "'")} succeeded but nothing was removed.") // Some entries were removed @@ -175,22 +190,22 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: } override def ping(): Future[Unit] = - redis.ping() executing "PING" logging { case "PONG" => + redis.ping().toScala[String] executing "PING" logging { case "PONG" => () } override def increment(key: String, by: Long): Future[Long] = - redis.incrby(key, by) executing "INCRBY" withKey key andParameter s"$by" logging { case value => + redis.incrby(key, by).toScala[Long] executing "INCRBY" withKey key andParameter s"$by" logging { case value => log.debug(s"The value at key '$key' was incremented by $by to $value.") } override def append(key: String, value: String): Future[Long] = - redis.append(key, value) executing "APPEND" withKey key andParameter value logging { case _ => + redis.append(key, value).toScala[Long] executing "APPEND" withKey key andParameter value logging { case _ => log.debug(s"The value was appended to key '$key'.") } override def listPrepend(key: String, values: Any*): Future[Long] = - Future.sequence(values.map(encode(key, _))).flatMap(redis.lpush(key, _: _*)) executing "LPUSH" withKey key andParameters values logging { case length => + Future.sequence(values.map(encode(key, _))).flatMap(redis.lpush(key, _: _*).toScala[Long]) executing "LPUSH" withKey key andParameters values logging { case length => log.debug(s"The $length values was prepended to key '$key'.") } recover { case ExecutionFailedException(_, _, _, ex) if ex.getMessage startsWith "WRONGTYPE" => @@ -199,7 +214,7 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: } override def listAppend(key: String, values: Any*): Future[Long] = - Future.sequence(values.map(encode(key, _))).flatMap(redis.rpush(key, _: _*)) executing "RPUSH" withKey key andParameters values logging { case length => + Future.sequence(values.map(encode(key, _))).flatMap(redis.rpush(key, _: _*).toScala[Long]) executing "RPUSH" withKey key andParameters values logging { case length => log.debug(s"The $length values was appended to key '$key'.") } recover { case ExecutionFailedException(_, _, _, ex) if ex.getMessage startsWith "WRONGTYPE" => @@ -208,20 +223,21 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: } override def listSize(key: String): Future[Long] = - redis.llen(key) executing "LLEN" withKey key logging { case length => + redis.llen(key).toScala[Long] executing "LLEN" withKey key logging { case length => log.debug(s"The collection at '$key' has $length items.") } override def listSetAt(key: String, position: Long, value: Any): Future[Unit] = - encode(key, value).flatMap(redis.lset(key, position, _)) executing "LSET" withKey key andParameter value logging { case _ => + encode(key, value).flatMap(redis.lset(key, position, _).toScala[String]) executing "LSET" withKey key andParameter value logging { case _ => log.debug(s"Updated value at $position in '$key' to $value.") - } map (_ => ()) recover { case ExecutionFailedException(_, _, _, actors.ReplyErrorException("ERR index out of range")) => + } map (_ => ()) recover { case ExecutionFailedException(_, _, _, _) => + // todo fix exception detection here log.debug(s"Update of the value at $position in '$key' failed due to index out of range.") throw new IndexOutOfBoundsException("Index out of range") } override def listHeadPop[T: ClassTag](key: String): Future[Option[T]] = - redis.lpop[String](key) executing "LPOP" withKey key expects { + redis.lpop(key).toScala[Option[String]] executing "LPOP" withKey key expects { case Some(encoded) => log.trace(s"Hit on head in key '$key'.") Some(decode[T](key, encoded)) @@ -231,25 +247,25 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: } override def listSlice[T: ClassTag](key: String, start: Long, end: Long): Future[Seq[T]] = - redis.lrange[String](key, start, end) executing "LRANGE" withKey key andParameters s"$start $end" expects { case values => + redis.lrange(key, start, end).toScala[Seq[String]] executing "LRANGE" withKey key andParameters s"$start $end" expects { case values => log.debug(s"The range on '$key' from $start to $end included returned ${values.size} values.") values.map(decode[T](key, _)) } override def listRemove(key: String, value: Any, count: Long): Future[Long] = - encode(key, value).flatMap(redis.lrem(key, count, _)) executing "LREM" withKey key andParameters s"$value $count" logging { case removed => + encode(key, value).flatMap(redis.lrem(key, count, _).toScala[Long]) executing "LREM" withKey key andParameters s"$value $count" logging { case removed => log.debug(s"Removed $removed occurrences of $value in '$key'.") } override def listTrim(key: String, start: Long, end: Long): Future[Unit] = - redis.ltrim(key, start, end) executing "LTRIM" withKey key andParameter s"$start $end" logging { case _ => + redis.ltrim(key, start, end).toScala[String] executing "LTRIM" withKey key andParameter s"$start $end" logging { case _ => log.debug(s"Trimmed collection at '$key' to $start:$end ") } override def listInsert(key: String, pivot: Any, value: Any): Future[Option[Long]] = for { pivot <- encode(key, pivot) value <- encode(key, value) - result <- redis.linsert(key, api.BEFORE, pivot, value) executing "LINSERT" withKey key andParameter s"$pivot $value" expects { + result <- redis.linsert(key, true, pivot, value).toScala[Long] executing "LINSERT" withKey key andParameter s"$pivot $value" expects { case -1L | 0L => log.debug(s"Insert into the list at '$key' failed. Pivot not found.") None @@ -266,7 +282,7 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: override def setAdd(key: String, values: Any*): Future[Long] = { // encodes the value def toEncoded(value: Any) = encode(key, value) - Future.sequence(values map toEncoded).flatMap(redis.sadd(key, _: _*)) executing "SADD" withKey key andParameters values expects { case inserted => + Future.sequence(values map toEncoded).flatMap(redis.sadd(key, _: _*).toScala[Long]) executing "SADD" withKey key andParameters values expects { case inserted => log.debug(s"Inserted $inserted elements into the set at '$key'.") inserted } recover { @@ -277,18 +293,18 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: } override def setSize(key: String): Future[Long] = - redis.scard(key) executing "SCARD" withKey key logging { case length => + redis.scard(key).toScala[Long] executing "SCARD" withKey key logging { case length => log.debug(s"The collection at '$key' has $length items.") } override def setMembers[T: ClassTag](key: String): Future[Set[T]] = - redis.smembers[String](key) executing "SMEMBERS" withKey key expects { case items => + redis.smembers(key).toScala[Set[String]] executing "SMEMBERS" withKey key expects { case items => log.debug(s"Returned ${items.size} items from the collection at '$key'.") - items.map(decode[T](key, _)).toSet + items.map(decode[T](key, _)) } override def setIsMember(key: String, value: Any): Future[Boolean] = - encode(key, value).flatMap(redis.sismember(key, _)) executing "SISMEMBER" withKey key andParameter value logging { + encode(key, value).flatMap(redis.sismember(key, _).toScala[Boolean]) executing "SISMEMBER" withKey key andParameter value logging { case true => log.debug(s"Item $value exists in the collection at '$key'.") case false => log.debug(s"Item $value does not exist in the collection at '$key'") } @@ -297,16 +313,17 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: // encodes the value def toEncoded(value: Any): Future[String] = encode(key, value) - Future.sequence(values map toEncoded).flatMap(redis.srem(key, _: _*)) executing "SREM" withKey key andParameters values logging { case removed => + Future.sequence(values map toEncoded).flatMap(redis.srem(key, _: _*).toScala[Long]) executing "SREM" withKey key andParameters values logging { case removed => log.debug(s"Removed $removed elements from the collection at '$key'.") } } override def sortedSetAdd(key: String, scoreValues: (Double, Any)*): Future[Long] = { // encodes the value - def toEncoded(scoreValue: (Double, Any)) = encode(key, scoreValue._2).map((scoreValue._1, _)) + def toEncoded(scoreValue: (Double, Any)): Future[ScoredValue[String]] = + encode(key, scoreValue._2).map(encoded => ScoredValue.just(scoreValue._1, encoded)) - Future.sequence(scoreValues.map(toEncoded)).flatMap(redis.zadd(key, _: _*)) executing "ZADD" withKey key andParameters scoreValues expects { case inserted => + Future.sequence(scoreValues.map(toEncoded)).flatMap(redis.zadd(key, _: _*).toScala[Long]) executing "ZADD" withKey key andParameters scoreValues expects { case inserted => log.debug(s"Inserted $inserted elements into the zset at '$key'.") inserted } recover { @@ -317,12 +334,12 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: } override def sortedSetSize(key: String): Future[Long] = - redis.zcard(key) executing "ZCARD" withKey key logging { case length => + redis.zcard(key).toScala[Long] executing "ZCARD" withKey key logging { case length => log.debug(s"The zset at '$key' has $length items.") } override def sortedSetScore(key: String, value: Any): Future[Option[Double]] = - encode(key, value) flatMap (redis.zscore(key, _)) executing "ZSCORE" withKey key andParameter value logging { + encode(key, value) flatMap (redis.zscore(key, _).toScala[Option[Double]]) executing "ZSCORE" withKey key andParameter value logging { case Some(score) => log.debug(s"The score of item: $value is $score in the collection at '$key'.") case None => log.debug(s"Item $value does not exist in the collection at '$key'") } @@ -331,41 +348,41 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: // encodes the value def toEncoded(value: Any) = encode(key, value) - Future.sequence(values map toEncoded).flatMap(redis.zrem(key, _: _*)) executing "ZREM" withKey key andParameters values logging { case removed => + Future.sequence(values map toEncoded).flatMap(redis.zrem(key, _: _*).toScala[Long]) executing "ZREM" withKey key andParameters values logging { case removed => log.debug(s"Removed $removed elements from the zset at '$key'.") } } override def sortedSetRange[T: ClassTag](key: String, start: Long, stop: Long): Future[Seq[T]] = - redis.zrange[String](key, start, stop) executing "ZRANGE" withKey key andParameter s"$start $stop" expects { case encodedSeq => + redis.zrange(key, start, stop).toScala[Seq[String]] executing "ZRANGE" withKey key andParameter s"$start $stop" expects { case encodedSeq => log.debug(s"Got range from $start to $stop in the zset at '$key'.") encodedSeq.map(encoded => decode[T](key, encoded)) } override def sortedSetReverseRange[T: ClassTag](key: String, start: Long, stop: Long): Future[Seq[T]] = - redis.zrevrange[String](key, start, stop) executing "ZREVRANGE" withKey key andParameter s"$start $stop" expects { case encodedSeq => + redis.zrevrange(key, start, stop).toScala[Seq[String]] executing "ZREVRANGE" withKey key andParameter s"$start $stop" expects { case encodedSeq => log.debug(s"Got reverse range from $start to $stop in the zset at '$key'.") encodedSeq.map(encoded => decode[T](key, encoded)) } override def hashRemove(key: String, fields: String*): Future[Long] = - redis.hdel(key, fields: _*) executing "HDEL" withKey key andParameters fields logging { case removed => + redis.hdel(key, fields: _*).toScala[Long] executing "HDEL" withKey key andParameters fields logging { case removed => log.debug(s"Removed $removed elements from the collection at '$key'.") } override def hashIncrement(key: String, field: String, incrementBy: Long): Future[Long] = - redis.hincrby(key, field, incrementBy) executing "HINCRBY" withKey key andParameters s"$field $incrementBy" logging { case value => + redis.hincrby(key, field, incrementBy).toScala[Long] executing "HINCRBY" withKey key andParameters s"$field $incrementBy" logging { case value => log.debug(s"Field '$field' in '$key' was incremented to $value.") } override def hashExists(key: String, field: String): Future[Boolean] = - redis.hexists(key, field) executing "HEXISTS" withKey key andParameter field logging { + redis.hexists(key, field).toScala[Boolean] executing "HEXISTS" withKey key andParameter field logging { case true => log.debug(s"Item $field exists in the collection at '$key'.") case false => log.debug(s"Item $field does not exist in the collection at '$key'") } override def hashGet[T: ClassTag](key: String, field: String): Future[Option[T]] = - redis.hget[String](key, field) executing "HGET" withKey key andParameter field expects { + redis.hget(key, field).toScala[Option[String]] executing "HGET" withKey key andParameter field expects { case Some(encoded) => log.debug(s"Item $field exists in the collection at '$key'.") Some(decode[T](key, encoded)) @@ -375,13 +392,13 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: } override def hashGet[T: ClassTag](key: String, fields: Seq[String]): Future[Seq[Option[T]]] = - redis.hmget[String](key, fields: _*) executing "HMGET" withKey key andParameters fields expects { case encoded => + redis.hmget(key, fields: _*).toScala[Seq[(String, Option[String])]] executing "HMGET" withKey key andParameters fields expects { case encoded => log.debug(s"Collection at '$key' with fields '$fields' has returned ${encoded.size} items.") - encoded.map(_.map(decode[T](key, _))) + encoded.map(_._2.map(decode[T](key, _))) } override def hashGetAll[T: ClassTag](key: String): Future[Map[String, T]] = - redis.hgetall[String](key) executing "HGETALL" withKey key expects { + redis.hgetall(key).toScala[Map[String, String]] executing "HGETALL" withKey key expects { case empty if empty.isEmpty => log.debug(s"Collection at '$key' is empty.") Map.empty[String, T] @@ -391,18 +408,18 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: } override def hashSize(key: String): Future[Long] = - redis.hlen(key) executing "HLEN" withKey key logging { case length => + redis.hlen(key).toScala[Long] executing "HLEN" withKey key logging { case length => log.debug(s"The collection at '$key' has $length items.") } override def hashKeys(key: String): Future[Set[String]] = - redis.hkeys(key) executing "HKEYS" withKey key expects { case keys => + redis.hkeys(key).toScala[Seq[String]] executing "HKEYS" withKey key expects { case keys => log.debug(s"The collection at '$key' defines: ${keys mkString " "}.") keys.toSet } override def hashSet(key: String, field: String, value: Any): Future[Boolean] = - encode(key, value).flatMap(redis.hset(key, field, _)) executing "HSET" withKey key andParameters s"$field $value" logging { + encode(key, value).flatMap(redis.hset(key, field, _).toScala[Boolean]) executing "HSET" withKey key andParameters s"$field $value" logging { case true => log.debug(s"Item $field in the collection at '$key' was inserted.") case false => log.debug(s"Item $field in the collection at '$key' was updated.") } recover { @@ -412,7 +429,7 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: } override def hashValues[T: ClassTag](key: String): Future[Set[T]] = - redis.hvals[String](key) executing "HVALS" withKey key expects { case values => + redis.hvals(key).toScala[Seq[String]] executing "HVALS" withKey key expects { case values => log.debug(s"The collection at '$key' contains ${values.size} values.") values.map(decode[T](key, _)).toSet } @@ -421,3 +438,57 @@ private[connector] class RedisConnectorImpl(serializer: PekkoSerializer, redis: override def toString: String = s"RedisConnector(name=$name)" // $COVERAGE-ON$ } + +private[connector] object RedisConnectorImpl { + + private trait JavaConvertible[From, To] { + def convert(from: From): To + } + + private object JavaConvertible { + + private def instance[From, To](f: From => To): JavaConvertible[From, To] = + (from: From) => f(from) + + implicit def identity[T]: JavaConvertible[T, T] = + instance(x => x) + + implicit val boolean: JavaConvertible[java.lang.Boolean, Boolean] = + instance(_.booleanValue()) + + implicit val long: JavaConvertible[java.lang.Long, Long] = + instance(_.longValue()) + + implicit val double: JavaConvertible[java.lang.Double, Double] = + instance(_.doubleValue()) + + implicit def option[T, U](implicit ev: JavaConvertible[T, U]): JavaConvertible[T, Option[U]] = + instance(Option(_).map(ev.convert)) + + implicit def seq[T, U](implicit ev: JavaConvertible[T, U]): JavaConvertible[java.util.List[T], Seq[U]] = + instance(_.asScala.map(ev.convert).toSeq) + + implicit def set[T, U](implicit ev: JavaConvertible[T, U]): JavaConvertible[java.util.Set[T], Set[U]] = + instance(_.asScala.map(ev.convert).toSet) + + implicit def map[K, V, KK, VV](implicit evK: JavaConvertible[K, KK], evV: JavaConvertible[V, VV]): JavaConvertible[java.util.Map[K, V], Map[KK, VV]] = + instance(_.asScala.map { case (k, v) => evK.convert(k) -> evV.convert(v) }.toMap) + + implicit def pair[K, V]: JavaConvertible[KeyValue[K, V], (K, Option[V])] = + instance(pair => pair.getKey -> pair.optional().toScala) + + } + + implicit private class RichRedisFuture[T](private val thiz: RedisFuture[T]) extends AnyVal { + def toScala[U](implicit ev: JavaConvertible[T, U], ec: ExecutionContext): Future[U] = thiz.asScala.map(ev.convert) + } + + implicit private class ConditionalCall[T](private val thiz: T) extends AnyVal { + + def mapWhen(condition: Boolean, f: T => T): T = + if (condition) f(thiz) // mutable side effect + else thiz + + } + +} diff --git a/src/main/scala/play/api/cache/redis/connector/RedisConnectorProvider.scala b/src/main/scala/play/api/cache/redis/connector/RedisConnectorProvider.scala index c548544a..a8ee8242 100644 --- a/src/main/scala/play/api/cache/redis/connector/RedisConnectorProvider.scala +++ b/src/main/scala/play/api/cache/redis/connector/RedisConnectorProvider.scala @@ -1,15 +1,20 @@ package play.api.cache.redis.connector -import org.apache.pekko.actor.ActorSystem import play.api.cache.redis._ import play.api.inject.ApplicationLifecycle import javax.inject.Provider /** Provides an instance of named redis connector */ -private[redis] class RedisConnectorProvider(instance: RedisInstance, serializer: PekkoSerializer)(implicit system: ActorSystem, lifecycle: ApplicationLifecycle, runtime: RedisRuntime) extends Provider[RedisConnector] { +private[redis] class RedisConnectorProvider( + instance: RedisInstance, + serializer: PekkoSerializer, +)(implicit + lifecycle: ApplicationLifecycle, + runtime: RedisRuntime, +) extends Provider[RedisConnector] { - private[connector] lazy val commands = new RedisCommandsProvider(instance).get + private[connector] lazy val commands = new RedisCommandsProvider(instance)(lifecycle, runtime.context).get lazy val get = new RedisConnectorImpl(serializer, commands) } diff --git a/src/main/scala/play/api/cache/redis/connector/RequestTimeout.scala b/src/main/scala/play/api/cache/redis/connector/RequestTimeout.scala deleted file mode 100644 index 85aaf2c1..00000000 --- a/src/main/scala/play/api/cache/redis/connector/RequestTimeout.scala +++ /dev/null @@ -1,90 +0,0 @@ -package play.api.cache.redis.connector - -import org.apache.pekko.actor.Scheduler -import org.apache.pekko.pattern.after -import redis._ - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} - -/** - * Helper for manipulation with the request to the redis. It defines the common - * variables and methods to avoid code duplication - */ -trait RequestTimeout extends Request { - - implicit protected val scheduler: Scheduler -} - -object RequestTimeout { - - // fails - @inline - def fail(failAfter: FiniteDuration)(implicit scheduler: Scheduler, context: ExecutionContext): Future[Nothing] = - after(failAfter, scheduler)(Future.failed(redis.actors.NoConnectionException)) - - // first completed - @inline - def invokeOrFail[T](continue: => Future[T], failAfter: FiniteDuration)(implicit scheduler: Scheduler, context: ExecutionContext): Future[T] = - Future.firstCompletedOf(Seq(continue, fail(failAfter))) - -} - -/** - * Actor extension maintaining current connected status. The operations are not - * invoked when the connection is not established, the failed future is - * returned instead. - */ -trait FailEagerly extends RequestTimeout { - import RequestTimeout._ - - protected var connected = false - - /** max timeout of the future when the redis is disconnected */ - @inline - protected def connectionTimeout: Option[FiniteDuration] - - abstract override def send[T](redisCommand: RedisCommand[? <: protocol.RedisReply, T]): Future[T] = { - // proceed with the command - @inline def continue: Future[T] = super.send(redisCommand) - // based on connection status - if (connected) continue else connectionTimeout.fold(continue)(invokeOrFail(continue, _)) - } - -} - -/** - * Actor extension implementing a request timeout, if enabled. This is due to - * no internal timeout provided by the redis-scala to avoid never-completed - * futures. - */ -trait RedisRequestTimeout extends RequestTimeout { - import RequestTimeout._ - - private var initialized = false - - /** indicates the timeout on the redis request */ - protected def timeout: Option[FiniteDuration] - - abstract override def send[T](redisCommand: RedisCommand[? <: protocol.RedisReply, T]): Future[T] = { - // proceed with the command - @inline def continue: Future[T] = super.send(redisCommand) - // based on connection status - if (initialized) timeout.fold(continue)(invokeOrFail(continue, _)) else continue - } - - // Note: Cannot RedisCluster invokes the `send` method during - // the class initialization. This call uses both timeout and scheduler - // properties although they are not initialized yet. Unfortunately, - // it seems there is no - // way to provide a `val timeout = configuration.timeout.redis`, - // which would be resolved before the use of the timeout property. - // - // As a workaround, the introduced boolean property initialized to false - // by JVM to efficintly disable the timeout mechanism while the trait - // initialization is not completed. Then the flag is set to true. - // - // This avoids the issue with the order of traits initialization. - // - initialized = true -} diff --git a/src/test/java/play/api/cache/redis/connector/AbstractRedisCommandsMock.java b/src/test/java/play/api/cache/redis/connector/AbstractRedisCommandsMock.java new file mode 100644 index 00000000..4f2a7134 --- /dev/null +++ b/src/test/java/play/api/cache/redis/connector/AbstractRedisCommandsMock.java @@ -0,0 +1,115 @@ +package play.api.cache.redis.connector; + +import io.lettuce.core.*; +import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands; +import io.lettuce.core.protocol.CommandType; + +import java.util.List; + +public abstract class AbstractRedisCommandsMock implements RedisClusterAsyncCommands { + + @Override + final public RedisFuture set(String key, String value, SetArgs setArgs) { + return setWithArgs(key, value, setArgs); + } + + public abstract RedisFuture setWithArgs(String key, String value, SetArgs setArgs); + + @Override + final public RedisFuture expire(String key, long seconds) { + return expireSeconds(key, seconds); + } + + public abstract RedisFuture expireSeconds(String key, long seconds); + + @Override + final public RedisFuture> lrange(String key, long start, long stop) { + return lrangeLong(key, start, stop); + } + + public abstract RedisFuture> lrangeLong(String key, long start, long stop); + + @Override + final public RedisFuture hset(String key, String field, String value) { + return hsetSimple(key, field, value); + } + + public abstract RedisFuture hsetSimple(String key, String field, String value); + + @Override + final public RedisFuture geoadd(String key, Object... lngLatMember) { + return null; + } + + @SafeVarargs + @Override + final public RedisFuture geoadd(String key, GeoValue... values) { + return null; + } + + @Override + final public RedisFuture geoadd(String key, GeoAddArgs args, Object... lngLatMember) { + return null; + } + + @SafeVarargs + @Override + final public RedisFuture geoadd(String key, GeoAddArgs args, GeoValue... values) { + return null; + } + + @Override + final public RedisFuture> commandInfo(String... commands) { + return null; + } + + @Override + final public RedisFuture> commandInfo(CommandType... commands) { + return null; + } + + @Override + final public RedisFuture zadd(String key, Object... scoresAndValues) { + return null; + } + + @SafeVarargs + @Override + final public RedisFuture zadd(String key, ScoredValue... scoredValues) { + return zaddMock(key, scoredValues); + } + + public abstract RedisFuture zaddMock(String key, ScoredValue[] scoredValues); + + @Override + final public RedisFuture zadd(String key, ZAddArgs zAddArgs, Object... scoresAndValues) { + return null; + } + + @SafeVarargs + @Override + final public RedisFuture zadd(String key, ZAddArgs zAddArgs, ScoredValue... scoredValues) { + return null; + } + + @Override + final public RedisFuture zrem(String key, String... members) { + return zremMock(key, members); + } + + public abstract RedisFuture zremMock(String key, String[] members); + + @Override + final public RedisFuture> zrange(String key, long start, long stop) { + return zrangeMock(key, start, stop); + } + + public abstract RedisFuture> zrangeMock(String key, long start, long stop); + + @Override + final public RedisFuture> zrevrange(String key, long start, long stop) { + return zrevrangeMock(key, start, stop); + } + + public abstract RedisFuture> zrevrangeMock(String key, long start, long stop); +} diff --git a/src/test/scala/play/api/cache/redis/RecoveryPolicySpec.scala b/src/test/scala/play/api/cache/redis/RecoveryPolicySpec.scala index 0ae7c979..0b3ffdcf 100644 --- a/src/test/scala/play/api/cache/redis/RecoveryPolicySpec.scala +++ b/src/test/scala/play/api/cache/redis/RecoveryPolicySpec.scala @@ -4,6 +4,7 @@ import play.api.Logger import play.api.cache.redis.test._ import scala.concurrent.Future +import scala.util.control.NoStackTrace class RecoveryPolicySpec extends AsyncUnitSpec { @@ -11,7 +12,7 @@ class RecoveryPolicySpec extends AsyncUnitSpec { private val default = Future.successful(0) private object ex { - private val internal = new IllegalArgumentException("Simulated Internal cause") + private val internal = new IllegalArgumentException("Simulated Internal cause") with NoStackTrace val unexpectedAny: UnexpectedResponseException = UnexpectedResponseException(None, "TEST-CMD") val unexpectedKey: UnexpectedResponseException = UnexpectedResponseException(Some("some key"), "TEST-CMD") val failedAny: ExecutionFailedException = ExecutionFailedException(None, "TEST-CMD", "TEST-CMD", internal) diff --git a/src/test/scala/play/api/cache/redis/RedisCacheComponentsSpec.scala b/src/test/scala/play/api/cache/redis/RedisCacheComponentsSpec.scala index 96b4b373..bcdae475 100644 --- a/src/test/scala/play/api/cache/redis/RedisCacheComponentsSpec.scala +++ b/src/test/scala/play/api/cache/redis/RedisCacheComponentsSpec.scala @@ -40,9 +40,10 @@ class RedisCacheComponentsSpec extends IntegrationSpec with RedisStandaloneConta override lazy val applicationLifecycle: ApplicationLifecycle = injector.instanceOf[ApplicationLifecycle] override lazy val environment: Environment = injector.instanceOf[Environment] override lazy val syncRedis: CacheApi = cacheApi("play").sync + } - components.runInApplication { + TestApplication.run(components.injector) { cache(components.syncRedis) } } diff --git a/src/test/scala/play/api/cache/redis/RedisCacheModuleSpec.scala b/src/test/scala/play/api/cache/redis/RedisCacheModuleSpec.scala index dc4fbd1a..f600d241 100644 --- a/src/test/scala/play/api/cache/redis/RedisCacheModuleSpec.scala +++ b/src/test/scala/play/api/cache/redis/RedisCacheModuleSpec.scala @@ -1,6 +1,5 @@ package play.api.cache.redis -import org.apache.pekko.actor.ActorSystem import play.api.cache.redis.configuration.{RedisHost, RedisSettings, RedisStandalone, RedisTimeouts} import play.api.cache.redis.test._ import play.api.inject._ @@ -47,7 +46,7 @@ class RedisCacheModuleSpec extends IntegrationSpec with RedisStandaloneContainer } test("bind named cache in simple mode") { - _.bindings(new RedisCacheModule) + _.bindings(new RedisCacheModule).configure("play.cache.redis.port" -> container.mappedPort(defaultPort)) } { injector => injector.checkNamedBinding[RedisConnector] injector.checkNamedBinding[CacheApi] @@ -165,11 +164,9 @@ class RedisCacheModuleSpec extends IntegrationSpec with RedisStandaloneContainer private def test(name: String)(createBuilder: GuiceApplicationBuilder => GuiceApplicationBuilder)(f: Injector => Assertion): Unit = s"should $name" in { - val builder = createBuilder(new GuiceApplicationBuilder) + val builder = createBuilder(new GuiceApplicationBuilder()) val injector = builder.injector() - val application = StoppableApplication(injector.instanceOf[ActorSystem]) - - application.runInApplication { + TestApplication.run(injector) { f(injector) } } diff --git a/src/test/scala/play/api/cache/redis/configuration/HostnameResolverSpec.scala b/src/test/scala/play/api/cache/redis/configuration/HostnameResolverSpec.scala index 1a1b764e..c70ddc9e 100644 --- a/src/test/scala/play/api/cache/redis/configuration/HostnameResolverSpec.scala +++ b/src/test/scala/play/api/cache/redis/configuration/HostnameResolverSpec.scala @@ -1,6 +1,6 @@ package play.api.cache.redis.configuration -import play.api.cache.redis.test.UnitSpec +import play.api.cache.redis.test._ class HostnameResolverSpec extends UnitSpec { import HostnameResolver._ diff --git a/src/test/scala/play/api/cache/redis/configuration/RedisInstanceProviderSpec.scala b/src/test/scala/play/api/cache/redis/configuration/RedisInstanceProviderSpec.scala index f02dafb4..d41a129c 100644 --- a/src/test/scala/play/api/cache/redis/configuration/RedisInstanceProviderSpec.scala +++ b/src/test/scala/play/api/cache/redis/configuration/RedisInstanceProviderSpec.scala @@ -1,6 +1,6 @@ package play.api.cache.redis.configuration -import play.api.cache.redis.test.UnitSpec +import play.api.cache.redis.test._ class RedisInstanceProviderSpec extends UnitSpec { diff --git a/src/test/scala/play/api/cache/redis/configuration/RedisTimeoutsSpec.scala b/src/test/scala/play/api/cache/redis/configuration/RedisTimeoutsSpec.scala index 8c3ced8d..1e329a82 100644 --- a/src/test/scala/play/api/cache/redis/configuration/RedisTimeoutsSpec.scala +++ b/src/test/scala/play/api/cache/redis/configuration/RedisTimeoutsSpec.scala @@ -1,6 +1,6 @@ package play.api.cache.redis.configuration -import play.api.cache.redis.test.{Helpers, ImplicitOptionMaterialization, UnitSpec} +import play.api.cache.redis.test._ import scala.concurrent.duration._ diff --git a/src/test/scala/play/api/cache/redis/connector/ExpectedFutureSpec.scala b/src/test/scala/play/api/cache/redis/connector/ExpectedFutureSpec.scala index e3869e6f..7b83e955 100644 --- a/src/test/scala/play/api/cache/redis/connector/ExpectedFutureSpec.scala +++ b/src/test/scala/play/api/cache/redis/connector/ExpectedFutureSpec.scala @@ -1,7 +1,7 @@ package play.api.cache.redis.connector import play.api.cache.redis._ -import play.api.cache.redis.test.{AsyncUnitSpec, SimulatedException} +import play.api.cache.redis.test._ import scala.concurrent.{ExecutionContext, Future} diff --git a/src/test/scala/play/api/cache/redis/connector/FailEagerlySpec.scala b/src/test/scala/play/api/cache/redis/connector/FailEagerlySpec.scala index fe68c2c5..d2f762aa 100644 --- a/src/test/scala/play/api/cache/redis/connector/FailEagerlySpec.scala +++ b/src/test/scala/play/api/cache/redis/connector/FailEagerlySpec.scala @@ -1,78 +1,78 @@ -package play.api.cache.redis.connector - -import org.apache.pekko.actor.{ActorSystem, Scheduler} -import play.api.cache.redis.test._ - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future, Promise} - -class FailEagerlySpec extends AsyncUnitSpec with ImplicitFutureMaterialization { - import FailEagerlySpec._ - - test("not fail regular requests when disconnected") { failEagerly => - val cmd = mock[RedisCommandTest[String]] - (() => cmd.returning).expects().returns("response") - // run the test - failEagerly.isConnected mustEqual false - failEagerly.send(cmd).assertingEqual("response") - } - - test("do fail long running requests when disconnected") { failEagerly => - val cmd = mock[RedisCommandTest[String]] - (() => cmd.returning).expects().returns(Promise[String]().future) - // run the test - failEagerly.isConnected mustEqual false - failEagerly.send(cmd).assertingFailure[redis.actors.NoConnectionException.type] - } - - test("not fail long running requests when connected ") { failEagerly => - val cmd = mock[RedisCommandTest[String]] - (() => cmd.returning).expects().returns(Promise[String]().future) - failEagerly.markConnected() - // run the test - failEagerly.isConnected mustEqual true - failEagerly.send(cmd).assertTimeout(200.millis) - } - - def test(name: String)(f: FailEagerlyImpl => Future[Assertion]): Unit = - name in { - val system = ActorSystem("test", classLoader = Some(getClass.getClassLoader)) - val application = StoppableApplication(system) - application.runAsyncInApplication { - val impl = new FailEagerlyImpl()(system) - f(impl) - } - } - -} - -object FailEagerlySpec { - - import redis.RedisCommand - import redis.protocol.RedisReply - - trait RedisCommandTest[T] extends RedisCommand[RedisReply, T] { - def returning: Future[T] - } - - class FailEagerlyBase(implicit system: ActorSystem) extends RequestTimeout { - implicit protected val scheduler: Scheduler = system.scheduler - implicit val executionContext: ExecutionContext = system.dispatcher - - def send[T](redisCommand: RedisCommand[? <: RedisReply, T]): Future[T] = - redisCommand.asInstanceOf[RedisCommandTest[T]].returning - - } - - final class FailEagerlyImpl(implicit system: ActorSystem) extends FailEagerlyBase with FailEagerly { - - def connectionTimeout: Option[FiniteDuration] = Some(100.millis) - - def isConnected: Boolean = connected - - def markConnected(): Unit = connected = true - - def markDisconnected(): Unit = connected = false - } - -} +//package play.api.cache.redis.connector +// +//import org.apache.pekko.actor.{ActorSystem, Scheduler} +//import play.api.cache.redis.test._ +// +//import scala.concurrent.duration._ +//import scala.concurrent.{ExecutionContext, Future, Promise} +// +//class FailEagerlySpec extends AsyncUnitSpec with ImplicitFutureMaterialization { +// import FailEagerlySpec._ +// +// test("not fail regular requests when disconnected") { failEagerly => +// val cmd = mock[RedisCommandTest[String]] +// (() => cmd.returning).expects().returns("response") +// // run the test +// failEagerly.isConnected mustEqual false +// failEagerly.send(cmd).assertingEqual("response") +// } +// +// test("do fail long running requests when disconnected") { failEagerly => +// val cmd = mock[RedisCommandTest[String]] +// (() => cmd.returning).expects().returns(Promise[String]().future) +// // run the test +// failEagerly.isConnected mustEqual false +// failEagerly.send(cmd).assertingFailure[redis.actors.NoConnectionException.type] +// } +// +// test("not fail long running requests when connected ") { failEagerly => +// val cmd = mock[RedisCommandTest[String]] +// (() => cmd.returning).expects().returns(Promise[String]().future) +// failEagerly.markConnected() +// // run the test +// failEagerly.isConnected mustEqual true +// failEagerly.send(cmd).assertTimeout(200.millis) +// } +// +// def test(name: String)(f: FailEagerlyImpl => Future[Assertion]): Unit = +// name in { +// val system = ActorSystem("test", classLoader = Some(getClass.getClassLoader)) +// val application = StoppableApplication(system) +// application.runAsyncInApplication { +// val impl = new FailEagerlyImpl()(system) +// f(impl) +// } +// } +// +//} +// +//object FailEagerlySpec { +// +// import redis.RedisCommand +// import redis.protocol.RedisReply +// +// trait RedisCommandTest[T] extends RedisCommand[RedisReply, T] { +// def returning: Future[T] +// } +// +// class FailEagerlyBase(implicit system: ActorSystem) extends RequestTimeout { +// implicit protected val scheduler: Scheduler = system.scheduler +// implicit val executionContext: ExecutionContext = system.dispatcher +// +// def send[T](redisCommand: RedisCommand[? <: RedisReply, T]): Future[T] = +// redisCommand.asInstanceOf[RedisCommandTest[T]].returning +// +// } +// +// final class FailEagerlyImpl(implicit system: ActorSystem) extends FailEagerlyBase with FailEagerly { +// +// def connectionTimeout: Option[FiniteDuration] = Some(100.millis) +// +// def isConnected: Boolean = connected +// +// def markConnected(): Unit = connected = true +// +// def markDisconnected(): Unit = connected = false +// } +// +//} diff --git a/src/test/scala/play/api/cache/redis/connector/RedisClusterSpec.scala b/src/test/scala/play/api/cache/redis/connector/RedisClusterSpec.scala index 0dcf1212..81ec27f2 100644 --- a/src/test/scala/play/api/cache/redis/connector/RedisClusterSpec.scala +++ b/src/test/scala/play/api/cache/redis/connector/RedisClusterSpec.scala @@ -5,11 +5,12 @@ import play.api.cache.redis._ import play.api.cache.redis.configuration._ import play.api.cache.redis.impl._ import play.api.cache.redis.test._ +import play.api.inject.ApplicationLifecycle import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} -class RedisClusterSpec extends IntegrationSpec with RedisClusterContainer { +class RedisClusterSpec extends IntegrationSpec with RedisClusterContainer with DefaultInjector { override protected def testTimeout: FiniteDuration = 60.seconds @@ -75,12 +76,13 @@ class RedisClusterSpec extends IntegrationSpec with RedisClusterContainer { ) def runTest: Future[Assertion] = { - implicit val system: ActorSystem = ActorSystem("test", classLoader = Some(getClass.getClassLoader)) + val injector = newInjector.build() + implicit val system: ActorSystem = injector.instanceOf[ActorSystem] + implicit val lifecycle: ApplicationLifecycle = injector.instanceOf[ApplicationLifecycle] implicit val runtime: RedisRuntime = RedisRuntime("cluster", syncTimeout = 5.seconds, ExecutionContext.global, new LogAndFailPolicy, LazyInvocation) - implicit val application: StoppableApplication = StoppableApplication(system) val serializer = new PekkoSerializerImpl(system) - application.runAsyncInApplication { + TestApplication.runAsync(injector) { for { connector <- Future(new RedisConnectorProvider(clusterInstance, serializer).get) // initialize the connector by flushing the database diff --git a/src/test/scala/play/api/cache/redis/connector/RedisCommandsMock.scala b/src/test/scala/play/api/cache/redis/connector/RedisCommandsMock.scala index d83e2011..3990eb4c 100644 --- a/src/test/scala/play/api/cache/redis/connector/RedisCommandsMock.scala +++ b/src/test/scala/play/api/cache/redis/connector/RedisCommandsMock.scala @@ -1,28 +1,3 @@ package play.api.cache.redis.connector -import org.scalamock.scalatest.AsyncMockFactory -import redis.{ByteStringSerializer, RedisCommands} - -import scala.concurrent.Future - -private trait RedisCommandsMock extends RedisCommands { - - final override def zadd[V: ByteStringSerializer](key: String, scoreMembers: (Double, V)*): Future[Long] = - zaddMock(key, scoreMembers) - - def zaddMock[V: ByteStringSerializer](key: String, scoreMembers: Seq[(Double, V)]): Future[Long] - - final override def zrem[V: ByteStringSerializer](key: String, members: V*): Future[Long] = - zremMock(key, members) - - def zremMock[V: ByteStringSerializer](key: String, members: Seq[V]): Future[Long] -} - -private object RedisCommandsMock { - - def mock(factory: AsyncMockFactory): (RedisCommands, RedisCommandsMock) = { - val mock = factory.mock[RedisCommandsMock](factory) - (mock, mock) - } - -} +abstract class RedisCommandsMock extends AbstractRedisCommandsMock diff --git a/src/test/scala/play/api/cache/redis/connector/RedisConnectorFailureSpec.scala b/src/test/scala/play/api/cache/redis/connector/RedisConnectorFailureSpec.scala index d752db92..7840599b 100644 --- a/src/test/scala/play/api/cache/redis/connector/RedisConnectorFailureSpec.scala +++ b/src/test/scala/play/api/cache/redis/connector/RedisConnectorFailureSpec.scala @@ -1,12 +1,17 @@ package play.api.cache.redis.connector +import io.lettuce.core.codec.StringCodec +import io.lettuce.core.protocol.CommandArgs +import io.lettuce.core.{RedisFuture, ScoredValue, SetArgs} +import org.scalamock.handlers.CallHandler +import org.scalamock.matchers.MatcherBase import play.api.cache.redis._ import play.api.cache.redis.test._ -import redis._ -import redis.api.{BEFORE, ListPivot} +import java.util.concurrent.CompletableFuture import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} +import scala.jdk.CollectionConverters.MapHasAsJava import scala.reflect.ClassTag import scala.util.{Failure, Success} @@ -14,7 +19,7 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria private val score = 1d private val encodedValue = "encoded" - private val disconnected = Future.failed(SimulatedException) + private val disconnected = SimulatedException "Serializer fail" when { @@ -28,7 +33,7 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria test("decoder fails") { (serializer, commands, connector) => for { _ <- serializer.failOnDecode(cacheValue) - _ = (commands.get[String](_: String)(_: ByteStringDeserializer[String])).expects(cacheKey, *).returns(Some(cacheValue)) + _ = (commands.get(_: String)).expects(cacheKey).returns(RedisFutureInTest(cacheValue)) _ <- connector.get[String](cacheKey).assertingFailure[SerializationException] } yield Passed } @@ -39,17 +44,18 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria test("SET returning false") { (serializer, commands, connector) => for { _ <- serializer.encode(cacheValue, encodedValue) - _ = (commands - .set[String](_: String, _: String, _: Option[Long], _: Option[Long], _: Boolean, _: Boolean)(_: ByteStringSerializer[String])) - .expects(cacheKey, encodedValue, None, None, false, false, *) - .returns(false) + _ = (commands.setWithArgs(_: String, _: String, _: SetArgs)) + .expects(cacheKey, encodedValue, *) + .returnsFuture("Failure") _ <- connector.set(cacheKey, cacheValue).assertingEqual(false) } yield Passed } test("EXPIRE returning false") { (_, commands, connector) => for { - _ <- (commands.expire _).expects(cacheKey, 1.minute.toSeconds).returns(false) + _ <- (commands.expireSeconds(_: String, _: Long)) + .expects(cacheKey, 1.minute.toSeconds) + .returnsFuture(false) _ <- connector.expire(cacheKey, 1.minute).assertingSuccess } yield Passed } @@ -61,22 +67,19 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria test("failed SET") { (serializer, commands, connector) => for { _ <- serializer.encode(cacheValue, encodedValue) - _ = (commands - .set[String](_: String, _: String, _: Option[Long], _: Option[Long], _: Boolean, _: Boolean)(_: ByteStringSerializer[String])) - .expects(cacheKey, encodedValue, None, None, false, false, *) - .returns(disconnected) + _ = (commands.setWithArgs(_: String, _: String, _: SetArgs)) + .expects(cacheKey, encodedValue, Matcher.setArgs(new SetArgs())) + .fails(disconnected) _ <- serializer.encode(cacheValue, encodedValue) - _ = (commands - .set[String](_: String, _: String, _: Option[Long], _: Option[Long], _: Boolean, _: Boolean)(_: ByteStringSerializer[String])) - .expects(cacheKey, encodedValue, None, Some(1.minute.toMillis), false, false, *) - .returns(disconnected) + _ = (commands.setWithArgs(_: String, _: String, _: SetArgs)) + .expects(cacheKey, encodedValue, Matcher.setArgs(new SetArgs().px(1.minute.toMillis))) + .fails(disconnected) _ <- serializer.encode(cacheValue, encodedValue) - _ = (commands - .set[String](_: String, _: String, _: Option[Long], _: Option[Long], _: Boolean, _: Boolean)(_: ByteStringSerializer[String])) - .expects(cacheKey, encodedValue, None, None, true, false, *) - .returns(disconnected) + _ = (commands.setWithArgs(_: String, _: String, _: SetArgs)) + .expects(cacheKey, encodedValue, Matcher.setArgs(new SetArgs().nx())) + .fails(disconnected) _ <- connector.set(cacheKey, cacheValue).assertingFailure[ExecutionFailedException, SimulatedException] _ <- connector.set(cacheKey, cacheValue, 1.minute).assertingFailure[ExecutionFailedException, SimulatedException] @@ -87,10 +90,7 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria test("failed MSET") { (serializer, commands, connector) => for { _ <- serializer.encode(cacheValue, encodedValue) - _ = (commands - .mset[String](_: Map[String, String])(_: ByteStringSerializer[String])) - .expects(Map(cacheKey -> encodedValue), *) - .returns(disconnected) + _ = (commands.mset(_: java.util.Map[String, String])).expects(Map(cacheKey -> encodedValue).asJava).fails(disconnected) _ <- connector.mSet(cacheKey -> cacheValue).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } @@ -98,34 +98,28 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria test("failed MSETNX") { (serializer, commands, connector) => for { _ <- serializer.encode(cacheValue, encodedValue) - _ = (commands - .msetnx[String](_: Map[String, String])(_: ByteStringSerializer[String])) - .expects(Map(cacheKey -> encodedValue), *) - .returns(disconnected) + _ = (commands.msetnx(_: java.util.Map[String, String])).expects(Map(cacheKey -> encodedValue).asJava).fails(disconnected) _ <- connector.mSetIfNotExist(cacheKey -> cacheValue).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } test("failed EXPIRE") { (_, commands, connector) => for { - _ <- (commands.expire(_: String, _: Long)).expects(cacheKey, 1.minute.toSeconds).returns(disconnected) + _ <- (commands.expireSeconds(_: String, _: Long)).expects(cacheKey, 1.minute.toSeconds).fails(disconnected) _ <- connector.expire(cacheKey, 1.minute).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } test("failed INCRBY") { (_, commands, connector) => for { - _ <- (commands.incrby(_: String, _: Long)).expects(cacheKey, 1L).returns(disconnected) + _ <- (commands.incrby(_: String, _: Long)).expects(cacheKey, 1L).fails(disconnected) _ <- connector.increment(cacheKey, 1L).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } test("failed LRANGE") { (_, commands, connector) => for { - _ <- (commands - .lrange[String](_: String, _: Long, _: Long)(_: ByteStringDeserializer[String])) - .expects(cacheKey, 0, -1, *) - .returns(disconnected) + _ <- (commands.lrangeLong(_: String, _: Long, _: Long)).expects(cacheKey, 0, -1).fails(disconnected) _ <- connector.listSlice[String](cacheKey, 0, -1).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } @@ -133,17 +127,14 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria test("failed LREM") { (serializer, commands, connector) => for { _ <- serializer.encode(cacheValue, encodedValue) - _ = (commands - .lrem(_: String, _: Long, _: String)(_: ByteStringSerializer[String])) - .expects(cacheKey, 2L, encodedValue, *) - .returns(disconnected) + _ = (commands.lrem(_: String, _: Long, _: String)).expects(cacheKey, 2L, encodedValue).fails(disconnected) _ <- connector.listRemove(cacheKey, cacheValue, 2).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } test("failed LTRIM") { (_, commands, connector) => for { - _ <- (commands.ltrim(_: String, _: Long, _: Long)).expects(cacheKey, 1L, 5L).returns(disconnected) + _ <- (commands.ltrim(_: String, _: Long, _: Long)).expects(cacheKey, 1L, 5L).fails(disconnected) _ <- connector.listTrim(cacheKey, 1, 5).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } @@ -152,10 +143,9 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria for { _ <- serializer.encode("pivot", "encodedPivot") _ <- serializer.encode(cacheValue, encodedValue) - _ = (commands - .linsert[String](_: String, _: ListPivot, _: String, _: String)(_: ByteStringSerializer[String])) - .expects(cacheKey, BEFORE, "encodedPivot", encodedValue, *) - .returns(disconnected) + _ = (commands.linsert(_: String, _: Boolean, _: String, _: String)) + .expects(cacheKey, true, "encodedPivot", encodedValue) + .fails(disconnected) // run the test _ <- connector.listInsert(cacheKey, "pivot", cacheValue).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed @@ -163,7 +153,7 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria test("failed HINCRBY") { (_, commands, connector) => for { - _ <- (commands.hincrby(_: String, _: String, _: Long)).expects(cacheKey, "field", 1L).returns(disconnected) + _ <- (commands.hincrby(_: String, _: String, _: Long)).expects(cacheKey, "field", 1L).fails(disconnected) // run the test _ <- connector.hashIncrement(cacheKey, "field", 1).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed @@ -173,9 +163,9 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria for { _ <- serializer.encode(cacheValue, encodedValue) _ = (commands - .hset[String](_: String, _: String, _: String)(_: ByteStringSerializer[String])) - .expects(cacheKey, "field", encodedValue, *) - .returns(disconnected) + .hsetSimple(_: String, _: String, _: String)) + .expects(cacheKey, "field", encodedValue) + .fails(disconnected) _ <- connector.hashSet(cacheKey, "field", cacheValue).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } @@ -183,17 +173,16 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria test("failed ZADD") { (serializer, commands, connector) => for { _ <- serializer.encode(cacheValue, encodedValue) - _ = (commands - .zaddMock[String](_: String, _: Seq[(Double, String)])(_: ByteStringSerializer[String])) - .expects(cacheKey, Seq((score, encodedValue)), *) - .returns(disconnected) + _ = (commands.zaddMock(_: String, _: Array[ScoredValue[String]])) + .expects(cacheKey, *) + .fails(disconnected) _ <- connector.sortedSetAdd(cacheKey, (score, cacheValue)).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } test("failed ZCARD") { (_, commands, connector) => for { - _ <- (commands.zcard(_: String)).expects(cacheKey).returns(disconnected) + _ <- (commands.zcard(_: String)).expects(cacheKey).fails(disconnected) _ <- connector.sortedSetSize(cacheKey).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } @@ -202,9 +191,9 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria for { _ <- serializer.encode(cacheValue, encodedValue) _ = (commands - .zscore[String](_: String, _: String)(_: ByteStringSerializer[String])) - .expects(cacheKey, encodedValue, *) - .returns(disconnected) + .zscore(_: String, _: String)) + .expects(cacheKey, encodedValue) + .fails(disconnected) _ <- connector.sortedSetScore(cacheKey, cacheValue).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } @@ -212,30 +201,27 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria test("failed ZREM") { (serializer, commands, connector) => for { _ <- serializer.encode(cacheValue, encodedValue) - _ = (commands - .zremMock(_: String, _: Seq[String])(_: ByteStringSerializer[String])) - .expects(cacheKey, Seq(encodedValue), *) - .returns(disconnected) + _ = (commands.zremMock(_: String, _: Array[String])) + .expects(cacheKey, Matcher.array(encodedValue)) + .fails(disconnected) _ <- connector.sortedSetRemove(cacheKey, cacheValue).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } test("failed ZRANGE") { (_, commands, connector) => for { - _ <- (commands - .zrange[String](_: String, _: Long, _: Long)(_: ByteStringDeserializer[String])) - .expects(cacheKey, 1, 5, *) - .returns(disconnected) + _ <- (commands.zrangeMock(_: String, _: Long, _: Long)) + .expects(cacheKey, 1, 5) + .fails(disconnected) _ <- connector.sortedSetRange[String](cacheKey, 1, 5).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } test("failed ZREVRANGE") { (_, commands, connector) => for { - _ <- (commands - .zrevrange[String](_: String, _: Long, _: Long)(_: ByteStringDeserializer[String])) - .expects(cacheKey, 1, 5, *) - .returns(disconnected) + _ <- (commands.zrevrangeMock(_: String, _: Long, _: Long)) + .expects(cacheKey, 1, 5) + .fails(disconnected) _ <- connector.sortedSetReverseRange[String](cacheKey, 1, 5).assertingFailure[ExecutionFailedException, SimulatedException] } yield Passed } @@ -245,8 +231,8 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria name in { implicit val runtime: RedisRuntime = mock[RedisRuntime] val serializer: PekkoSerializer = mock[PekkoSerializer] - val (commands: RedisCommands, mockedCommands: RedisCommandsMock) = RedisCommandsMock.mock(this) - val connector: RedisConnector = new RedisConnectorImpl(serializer, commands) + val mockedCommands: RedisCommandsMock = mock[RedisCommandsMock] + val connector: RedisConnector = new RedisConnectorImpl(serializer, mockedCommands) (() => runtime.context).expects().returns(ExecutionContext.global).anyNumberOfTimes() @@ -272,4 +258,68 @@ class RedisConnectorFailureSpec extends AsyncUnitSpec with ImplicitFutureMateria } + private class RedisFutureInTest[T] extends CompletableFuture[T] with RedisFuture[T] { + + override def getError: String = null + + override def await(timeout: Long, unit: TimeUnit): Boolean = true + } + + private object RedisFutureInTest { + + def apply[T](value: T): RedisFuture[T] = { + val future = new RedisFutureInTest[T] + future.complete(value) + future + } + + def failed[T](failure: Throwable): RedisFuture[T] = { + val future = new RedisFutureInTest[T] + future.completeExceptionally(failure) + future + } + + } + + implicit private class RichCallHandler[R](private val thiz: CallHandler[RedisFuture[R]]) { + def returnsFuture(value: R): CallHandler[RedisFuture[R]] = thiz.returns(RedisFutureInTest(value)) + def fails(failure: Throwable): CallHandler[RedisFuture[R]] = thiz.returns(RedisFutureInTest.failed(failure)) + } + + override def convertToEqualizer[T](left: T): Equalizer[T] = super.convertToEqualizer(left) + + private object Matcher { + + def setArgs(expected: SetArgs): MatcherBase = + new MatcherBase { + override def canEqual(that: Any): Boolean = true + + override def equals(obj: Any): Boolean = + obj match { + case that: SetArgs => comparable(expected) === comparable(that) + case _ => false + } + + private def comparable(setArgs: SetArgs): String = { + val args = new CommandArgs[String, String](new StringCodec()) + setArgs.build(args) + args.toCommandString + } + + } + + def array[T](expected: T*): MatcherBase = + new MatcherBase { + override def canEqual(that: Any): Boolean = true + + @SuppressWarnings(Array("org.wartremover.warts.Equals")) + override def equals(obj: Any): Boolean = obj match { + case that: Array[?] => that.length == expected.length && expected.zip(that).forall { case (a, b) => a == b } + case _ => false + } + + } + + } + } diff --git a/src/test/scala/play/api/cache/redis/connector/RedisMasterSlavesSpec.scala b/src/test/scala/play/api/cache/redis/connector/RedisMasterSlavesSpec.scala index ba920781..76b2a29f 100644 --- a/src/test/scala/play/api/cache/redis/connector/RedisMasterSlavesSpec.scala +++ b/src/test/scala/play/api/cache/redis/connector/RedisMasterSlavesSpec.scala @@ -3,13 +3,14 @@ package play.api.cache.redis.connector import org.apache.pekko.actor.ActorSystem import play.api.cache.redis.configuration.{RedisHost, RedisMasterSlaves, RedisSettings} import play.api.cache.redis.impl.{LazyInvocation, RedisRuntime} -import play.api.cache.redis.test.{Helpers, IntegrationSpec, RedisMasterSlaveContainer, StoppableApplication} +import play.api.cache.redis.test._ import play.api.cache.redis.{LogAndFailPolicy, RedisConnector} +import play.api.inject.{ApplicationLifecycle, Injector} import scala.concurrent.duration.{DurationInt, FiniteDuration} import scala.concurrent.{ExecutionContext, Future} -class RedisMasterSlavesSpec extends IntegrationSpec with RedisMasterSlaveContainer { +class RedisMasterSlavesSpec extends IntegrationSpec with RedisMasterSlaveContainer with DefaultInjector { override protected def testTimeout: FiniteDuration = 60.seconds @@ -63,9 +64,10 @@ class RedisMasterSlavesSpec extends IntegrationSpec with RedisMasterSlaveContain def test(name: String)(f: RedisConnector => Future[Assertion]): Unit = name in { - implicit val system: ActorSystem = ActorSystem("test", classLoader = Some(getClass.getClassLoader)) + val injector: Injector = newInjector.build() + implicit val system: ActorSystem = injector.instanceOf[ActorSystem] + implicit val lifecycle: ApplicationLifecycle = injector.instanceOf[ApplicationLifecycle] implicit val runtime: RedisRuntime = RedisRuntime("master-slaves", syncTimeout = 5.seconds, ExecutionContext.global, new LogAndFailPolicy, LazyInvocation) - implicit val application: StoppableApplication = StoppableApplication(system) val serializer = new PekkoSerializerImpl(system) lazy val masterSlavesInstance = RedisMasterSlaves( @@ -78,7 +80,7 @@ class RedisMasterSlavesSpec extends IntegrationSpec with RedisMasterSlaveContain ), ) - application.runAsyncInApplication { + TestApplication.runAsync(injector) { val connector: RedisConnector = new RedisConnectorProvider(masterSlavesInstance, serializer).get for { // initialize the connector by flushing the database diff --git a/src/test/scala/play/api/cache/redis/connector/RedisRequestTimeoutSpec.scala b/src/test/scala/play/api/cache/redis/connector/RedisRequestTimeoutSpec.scala index 3e94fac7..83fa301c 100644 --- a/src/test/scala/play/api/cache/redis/connector/RedisRequestTimeoutSpec.scala +++ b/src/test/scala/play/api/cache/redis/connector/RedisRequestTimeoutSpec.scala @@ -1,48 +1,48 @@ -package play.api.cache.redis.connector - -import org.apache.pekko.actor.{ActorSystem, Scheduler} -import play.api.cache.redis.test.{AsyncUnitSpec, StoppableApplication} -import redis.RedisCommand -import redis.protocol.RedisReply - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContextExecutor, Future, Promise} - -class RedisRequestTimeoutSpec extends AsyncUnitSpec { - - override protected def testTimeout: FiniteDuration = 3.seconds - - "fail long running requests when connected but timeout defined" in { - implicit val system: ActorSystem = ActorSystem("test", classLoader = Some(getClass.getClassLoader)) - val application = StoppableApplication(system) - - application.runAsyncInApplication { - val redisCommandMock = mock[RedisCommandTest[String]] - (() => redisCommandMock.returning).expects().returns(Promise[String]().future) - val redisRequest = new RedisRequestTimeoutImpl(timeout = Some(1.second)) - // run the test - redisRequest.send[String](redisCommandMock).assertingFailure[redis.actors.NoConnectionException.type] - } - } - - private trait RedisCommandTest[T] extends RedisCommand[RedisReply, T] { - def returning: Future[T] - } - - private class RequestTimeoutBase(implicit system: ActorSystem) extends RequestTimeout { - implicit protected val scheduler: Scheduler = system.scheduler - implicit val executionContext: ExecutionContextExecutor = system.dispatcher - - def send[T](redisCommand: RedisCommand[? <: RedisReply, T]): Future[T] = - redisCommand.asInstanceOf[RedisCommandTest[T]].returning - - } - - private class RedisRequestTimeoutImpl( - override val timeout: Option[FiniteDuration], - )(implicit - system: ActorSystem, - ) extends RequestTimeoutBase - with RedisRequestTimeout - -} +//package play.api.cache.redis.connector +// +//import org.apache.pekko.actor.{ActorSystem, Scheduler} +//import play.api.cache.redis.test._ +//import redis.RedisCommand +//import redis.protocol.RedisReply +// +//import scala.concurrent.duration._ +//import scala.concurrent.{ExecutionContextExecutor, Future, Promise} +// +//class RedisRequestTimeoutSpec extends AsyncUnitSpec { +// +// override protected def testTimeout: FiniteDuration = 3.seconds +// +// "fail long running requests when connected but timeout defined" in { +// implicit val system: ActorSystem = ActorSystem("test", classLoader = Some(getClass.getClassLoader)) +// val application = StoppableApplication(system) +// +// application.runAsyncInApplication { +// val redisCommandMock = mock[RedisCommandTest[String]] +// (() => redisCommandMock.returning).expects().returns(Promise[String]().future) +// val redisRequest = new RedisRequestTimeoutImpl(timeout = Some(1.second)) +// // run the test +// redisRequest.send[String](redisCommandMock).assertingFailure[redis.actors.NoConnectionException.type] +// } +// } +// +// private trait RedisCommandTest[T] extends RedisCommand[RedisReply, T] { +// def returning: Future[T] +// } +// +// private class RequestTimeoutBase(implicit system: ActorSystem) extends RequestTimeout { +// implicit protected val scheduler: Scheduler = system.scheduler +// implicit val executionContext: ExecutionContextExecutor = system.dispatcher +// +// def send[T](redisCommand: RedisCommand[? <: RedisReply, T]): Future[T] = +// redisCommand.asInstanceOf[RedisCommandTest[T]].returning +// +// } +// +// private class RedisRequestTimeoutImpl( +// override val timeout: Option[FiniteDuration], +// )(implicit +// system: ActorSystem, +// ) extends RequestTimeoutBase +// with RedisRequestTimeout +// +//} diff --git a/src/test/scala/play/api/cache/redis/connector/RedisSentinelSpec.scala b/src/test/scala/play/api/cache/redis/connector/RedisSentinelSpec.scala index de89feea..6f221986 100644 --- a/src/test/scala/play/api/cache/redis/connector/RedisSentinelSpec.scala +++ b/src/test/scala/play/api/cache/redis/connector/RedisSentinelSpec.scala @@ -6,12 +6,13 @@ import play.api.cache.redis._ import play.api.cache.redis.configuration._ import play.api.cache.redis.impl._ import play.api.cache.redis.test._ +import play.api.inject.{ApplicationLifecycle, Injector} import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} @Ignore -class RedisSentinelSpec extends IntegrationSpec with RedisSentinelContainer { +class RedisSentinelSpec extends IntegrationSpec with RedisSentinelContainer with DefaultInjector { test("pong on ping") { connector => for { @@ -63,9 +64,10 @@ class RedisSentinelSpec extends IntegrationSpec with RedisSentinelContainer { def test(name: String)(f: RedisConnector => Future[Assertion]): Unit = name in { + val injector: Injector = newInjector.build() implicit val system: ActorSystem = ActorSystem("test", classLoader = Some(getClass.getClassLoader)) + implicit val lifecycle: ApplicationLifecycle = injector.instanceOf[ApplicationLifecycle] implicit val runtime: RedisRuntime = RedisRuntime("sentinel", syncTimeout = 5.seconds, ExecutionContext.global, new LogAndFailPolicy, LazyInvocation) - implicit val application: StoppableApplication = StoppableApplication(system) val serializer = new PekkoSerializerImpl(system) lazy val sentinelInstance = RedisSentinel( @@ -83,7 +85,7 @@ class RedisSentinelSpec extends IntegrationSpec with RedisSentinelContainer { ), ) - application.runAsyncInApplication { + TestApplication.runAsync(injector) { val connector: RedisConnector = new RedisConnectorProvider(sentinelInstance, serializer).get for { // initialize the connector by flushing the database diff --git a/src/test/scala/play/api/cache/redis/connector/RedisStandaloneSpec.scala b/src/test/scala/play/api/cache/redis/connector/RedisStandaloneSpec.scala index 690fb0cd..94dd4455 100644 --- a/src/test/scala/play/api/cache/redis/connector/RedisStandaloneSpec.scala +++ b/src/test/scala/play/api/cache/redis/connector/RedisStandaloneSpec.scala @@ -5,11 +5,12 @@ import play.api.cache.redis._ import play.api.cache.redis.configuration._ import play.api.cache.redis.impl._ import play.api.cache.redis.test._ +import play.api.inject.{ApplicationLifecycle, Injector} import scala.concurrent.duration._ import scala.concurrent.{ExecutionContext, Future} -class RedisStandaloneSpec extends IntegrationSpec with RedisStandaloneContainer { +class RedisStandaloneSpec extends IntegrationSpec with RedisStandaloneContainer with DefaultInjector { test("pong on ping") { (_, connector) => for { @@ -584,9 +585,10 @@ class RedisStandaloneSpec extends IntegrationSpec with RedisStandaloneContainer def test(name: String)(f: (String, RedisConnector) => Future[Assertion]): Unit = name in { - implicit val system: ActorSystem = ActorSystem("test", classLoader = Some(getClass.getClassLoader)) + val injector: Injector = newInjector.build() + implicit val system: ActorSystem = injector.instanceOf[ActorSystem] + implicit val lifecycle: ApplicationLifecycle = injector.instanceOf[ApplicationLifecycle] implicit val runtime: RedisRuntime = RedisRuntime("standalone", syncTimeout = 5.seconds, ExecutionContext.global, new LogAndFailPolicy, LazyInvocation) - implicit val application: StoppableApplication = StoppableApplication(system) val serializer = new PekkoSerializerImpl(system) lazy val instance = RedisStandalone( @@ -600,7 +602,7 @@ class RedisStandaloneSpec extends IntegrationSpec with RedisStandaloneContainer val cacheKey = name.toLowerCase().replace(" ", "-") - application.runAsyncInApplication { + TestApplication.runAsync(injector) { for { connector <- Future(new RedisConnectorProvider(instance, serializer).get) // initialize the connector by flushing the database diff --git a/src/test/scala/play/api/cache/redis/impl/InvocationPolicySpec.scala b/src/test/scala/play/api/cache/redis/impl/InvocationPolicySpec.scala index aecf706f..2916c619 100644 --- a/src/test/scala/play/api/cache/redis/impl/InvocationPolicySpec.scala +++ b/src/test/scala/play/api/cache/redis/impl/InvocationPolicySpec.scala @@ -1,7 +1,7 @@ package play.api.cache.redis.impl import play.api.cache.redis._ -import play.api.cache.redis.test.UnitSpec +import play.api.cache.redis.test._ import scala.concurrent._ import scala.util.Success diff --git a/src/test/scala/play/api/cache/redis/impl/MockedAsyncRedis.scala b/src/test/scala/play/api/cache/redis/impl/MockedAsyncRedis.scala index e09ccba9..026f43f4 100644 --- a/src/test/scala/play/api/cache/redis/impl/MockedAsyncRedis.scala +++ b/src/test/scala/play/api/cache/redis/impl/MockedAsyncRedis.scala @@ -2,7 +2,7 @@ package play.api.cache.redis.impl import org.scalamock.scalatest.AsyncMockFactoryBase import play.api.cache.redis._ -import play.api.cache.redis.test.ImplicitOptionMaterialization +import play.api.cache.redis.test._ import scala.concurrent.Future import scala.concurrent.duration.Duration diff --git a/src/test/scala/play/api/cache/redis/impl/RedisConnectorMock.scala b/src/test/scala/play/api/cache/redis/impl/RedisConnectorMock.scala index c0dd4858..a52cad44 100644 --- a/src/test/scala/play/api/cache/redis/impl/RedisConnectorMock.scala +++ b/src/test/scala/play/api/cache/redis/impl/RedisConnectorMock.scala @@ -2,7 +2,7 @@ package play.api.cache.redis.impl import org.scalamock.scalatest.AsyncMockFactoryBase import play.api.cache.redis._ -import play.api.cache.redis.test.ImplicitOptionMaterialization +import play.api.cache.redis.test._ import scala.concurrent.Future import scala.concurrent.duration.Duration diff --git a/src/test/scala/play/api/cache/redis/impl/RedisRuntimeSpec.scala b/src/test/scala/play/api/cache/redis/impl/RedisRuntimeSpec.scala index abcfc9ff..f93a806a 100644 --- a/src/test/scala/play/api/cache/redis/impl/RedisRuntimeSpec.scala +++ b/src/test/scala/play/api/cache/redis/impl/RedisRuntimeSpec.scala @@ -4,7 +4,7 @@ import org.apache.pekko.actor.ActorSystem import org.apache.pekko.util.Timeout import play.api.cache.redis._ import play.api.cache.redis.configuration.{RedisHost, RedisStandalone} -import play.api.cache.redis.test.UnitSpec +import play.api.cache.redis.test._ class RedisRuntimeSpec extends UnitSpec { import RedisRuntime._ diff --git a/src/test/scala/play/api/cache/redis/test/DefaultInjector.scala b/src/test/scala/play/api/cache/redis/test/DefaultInjector.scala new file mode 100644 index 00000000..2db15e62 --- /dev/null +++ b/src/test/scala/play/api/cache/redis/test/DefaultInjector.scala @@ -0,0 +1,29 @@ +package play.api.cache.redis.test + +import com.typesafe.config.Config +import org.apache.pekko.actor.{ActorSystem, CoordinatedShutdown} +import play.api.inject._ +import play.api.inject.guice.GuiceInjectorBuilder +import play.api.libs.concurrent.{ActorSystemProvider, CoordinatedShutdownProvider} +import play.api.{Configuration, Environment} + +import javax.inject.Singleton + +trait DefaultInjector { + + def env: Environment = Environment.simple() + + def newInjector: GuiceInjectorBuilder = + new GuiceInjectorBuilder() + .bindings( + bind[Environment] to env, + bind[play.Environment].toProvider[EnvironmentProvider].in(classOf[Singleton]), + bind[ConfigurationProvider].to(new ConfigurationProvider(Configuration.load(env))), + bind[Configuration].toProvider[ConfigurationProvider], + bind[Config].toProvider[ConfigProvider], + bind[ApplicationLifecycle].to(bind[DefaultApplicationLifecycle]), + bind[ActorSystem].toProvider[ActorSystemProvider], + bind[CoordinatedShutdown].toProvider[CoordinatedShutdownProvider], + ) + +} diff --git a/src/test/scala/play/api/cache/redis/test/FakeApplication.scala b/src/test/scala/play/api/cache/redis/test/FakeApplication.scala index d00c6be9..d1bfd367 100644 --- a/src/test/scala/play/api/cache/redis/test/FakeApplication.scala +++ b/src/test/scala/play/api/cache/redis/test/FakeApplication.scala @@ -3,14 +3,14 @@ package play.api.cache.redis.test import org.apache.pekko.actor.ActorSystem import play.api.inject.Injector -trait FakeApplication extends StoppableApplication { +trait FakeApplication { import play.api.Application import play.api.inject.guice.GuiceApplicationBuilder private lazy val theBuilder: GuiceApplicationBuilder = builder - protected lazy val injector: Injector = theBuilder.injector() + lazy val injector: Injector = theBuilder.injector() protected lazy val application: Application = injector.instanceOf[Application] diff --git a/src/test/scala/play/api/cache/redis/test/RedisClusterContainer.scala b/src/test/scala/play/api/cache/redis/test/RedisClusterContainer.scala index 6b6276ff..bd988711 100644 --- a/src/test/scala/play/api/cache/redis/test/RedisClusterContainer.scala +++ b/src/test/scala/play/api/cache/redis/test/RedisClusterContainer.scala @@ -15,7 +15,7 @@ trait RedisClusterContainer extends RedisContainer { this: Suite => final protected def initialPort = 7000 - private val waitForStart = 6.seconds + private val waitForStart = 10.seconds override protected lazy val redisConfig: RedisContainerConfig = RedisContainerConfig( diff --git a/src/test/scala/play/api/cache/redis/test/StoppableApplication.scala b/src/test/scala/play/api/cache/redis/test/StoppableApplication.scala deleted file mode 100644 index 827ad810..00000000 --- a/src/test/scala/play/api/cache/redis/test/StoppableApplication.scala +++ /dev/null @@ -1,46 +0,0 @@ -package play.api.cache.redis.test - -import org.apache.pekko.Done -import org.apache.pekko.actor.{ActorSystem, CoordinatedShutdown} -import org.scalatest.Assertion -import play.api.inject.ApplicationLifecycle - -import scala.concurrent.{ExecutionContext, Future} -import scala.util.{Failure, Success} - -trait StoppableApplication extends ApplicationLifecycle { - - private var hooks: List[() => Future[?]] = Nil - - protected def system: ActorSystem - - def shutdownAsync(): Future[Done] = - CoordinatedShutdown(system).run(CoordinatedShutdown.UnknownReason) - - def runAsyncInApplication(block: => Future[Assertion])(implicit ec: ExecutionContext): Future[Assertion] = - block - .map(Success(_)) - .recover(Failure(_)) - .flatMap(result => Future.sequence(hooks.map(_.apply())).map(_ => result)) - .flatMap(result => shutdownAsync().map(_ => result)) - .flatMap(result => system.terminate().map(_ => result)) - .flatMap(Future.fromTry) - - final def runInApplication(block: => Assertion)(implicit ec: ExecutionContext): Future[Assertion] = - runAsyncInApplication(Future(block)) - - final override def addStopHook(hook: () => Future[?]): Unit = - hooks = hook :: hooks - - final override def stop(): Future[?] = Future.unit - -} - -object StoppableApplication { - - def apply(actorSystem: ActorSystem): StoppableApplication = - new StoppableApplication { - override protected def system: ActorSystem = actorSystem - } - -} diff --git a/src/test/scala/play/api/cache/redis/test/TestApplication.scala b/src/test/scala/play/api/cache/redis/test/TestApplication.scala new file mode 100644 index 00000000..db304cea --- /dev/null +++ b/src/test/scala/play/api/cache/redis/test/TestApplication.scala @@ -0,0 +1,23 @@ +package play.api.cache.redis.test + +import org.apache.pekko.actor.CoordinatedShutdown +import org.scalatest.Assertion +import play.api.inject.Injector + +import scala.concurrent.{ExecutionContext, Future} +import scala.util.{Failure, Success} + +object TestApplication { + + def run(injector: Injector)(block: => Assertion)(implicit ec: ExecutionContext): Future[Assertion] = + runAsync(injector)(Future(block)) + + def runAsync(injector: Injector)(block: => Future[Assertion])(implicit ec: ExecutionContext): Future[Assertion] = + block + .map(Success(_)) + .recover(Failure(_)) + .flatMap(result => injector.instanceOf[CoordinatedShutdown].run(ApplicationStop).map(_ => result)) + .flatMap(Future.fromTry) + + case object ApplicationStop extends CoordinatedShutdown.Reason +}