From b5e766ef90ddf0e4218740c128461a15244c8b4e Mon Sep 17 00:00:00 2001 From: Steven Wheeler Date: Thu, 7 Mar 2024 13:12:15 -0600 Subject: [PATCH] Add MySQL implementation --- .../jdbc/state/DurableStateQueries.scala | 1 + .../jdbc/state/SequenceNextValUpdater.scala | 16 ++++++++++++++++ .../MySQLDurableStateStorePluginSpec.scala | 8 ++++++++ ...QLScalaJdbcDurableStateChangesByTagTest.scala | 12 ++++++++++++ 4 files changed, 37 insertions(+) create mode 100644 integration/src/test/scala/akka/persistence/jdbc/integration/MySQLDurableStateStorePluginSpec.scala create mode 100644 integration/src/test/scala/akka/persistence/jdbc/integration/MySQLScalaJdbcDurableStateChangesByTagTest.scala diff --git a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala index c4bb6a034..7bb7e2a3a 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/DurableStateQueries.scala @@ -39,6 +39,7 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration lazy val sequenceNextValUpdater = slickProfileToSchemaType(profile) match { case "H2" => new H2SequenceNextValUpdater(profile, durableStateTableCfg) case "Postgres" => new PostgresSequenceNextValUpdater(profile, durableStateTableCfg) + case "MySQL" => new MySQLSequenceNextValUpdater(profile, durableStateTableCfg) case _ => ??? } diff --git a/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala index 65cbb1bed..62aa3cd94 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/SequenceNextValUpdater.scala @@ -51,3 +51,19 @@ import slick.sql.SqlStreamingAction def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String] } + +/** + * INTERNAL API + */ +@InternalApi private[jdbc] class MySQLSequenceNextValUpdater( + profile: JdbcProfile, + val durableStateTableCfg: DurableStateTableConfiguration) + extends SequenceNextValUpdater { + import profile.api._ + private val schema = durableStateTableCfg.schemaName.map(n => s"'$n'").getOrElse("DATABASE()") + // Note: for actual MySQL servers (i.e. not MariaDB) the variable information_schema_stats_expiry should be set to zero. + final val nextValFetcher = + s"""(SELECT AUTO_INCREMENT FROM information_schema.tables WHERE table_name = '${durableStateTableCfg.tableName}' AND table_schema = ${schema})""" + + def getSequenceNextValueExpr() = sql"""#$nextValFetcher""".as[String] +} diff --git a/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLDurableStateStorePluginSpec.scala b/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLDurableStateStorePluginSpec.scala new file mode 100644 index 000000000..c3ef795f4 --- /dev/null +++ b/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLDurableStateStorePluginSpec.scala @@ -0,0 +1,8 @@ +package akka.persistence.jdbc.integration + +import com.typesafe.config.ConfigFactory +import slick.jdbc.MySQLProfile +import akka.persistence.jdbc.state.scaladsl.DurableStateStorePluginSpec + +class MySQLDurableStateStorePluginSpec + extends DurableStateStorePluginSpec(ConfigFactory.load("mysql-shared-db-application.conf"), MySQLProfile) {} diff --git a/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLScalaJdbcDurableStateChangesByTagTest.scala b/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLScalaJdbcDurableStateChangesByTagTest.scala new file mode 100644 index 000000000..2d43d7413 --- /dev/null +++ b/integration/src/test/scala/akka/persistence/jdbc/integration/MySQLScalaJdbcDurableStateChangesByTagTest.scala @@ -0,0 +1,12 @@ +package akka.persistence.jdbc.integration + +import com.typesafe.config.ConfigFactory +import akka.actor.ActorSystem +import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateSpec +import akka.persistence.jdbc.testkit.internal.Mysql + +class MySQLScalaJdbcDurableStateStoreQueryTest + extends JdbcDurableStateSpec(ConfigFactory.load("mysql-shared-db-application.conf"), MySQL) { + implicit lazy val system: ActorSystem = + ActorSystem("JdbcDurableStateSpec", config.withFallback(customSerializers)) +}