From 8ebf989400006486e5db0af4eeafbfedcaf43cbe Mon Sep 17 00:00:00 2001 From: Sebastian Date: Thu, 19 Sep 2024 21:52:50 +0200 Subject: [PATCH 1/3] chore: bump to akka 2.10.0-M1, align with changes from upstream --- .../akka/persistence/jdbc/db/SlickExtension.scala | 2 +- .../jdbc/journal/dao/DefaultJournalDao.scala | 3 +-- .../jdbc/snapshot/dao/DefaultSnapshotDao.scala | 13 ++++++------- .../jdbc/state/javadsl/JdbcDurableStateStore.scala | 9 ++++----- .../jdbc/testkit/javadsl/SchemaUtils.scala | 8 ++++---- project/Dependencies.scala | 11 +++++------ 6 files changed, 21 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/db/SlickExtension.scala b/core/src/main/scala/akka/persistence/jdbc/db/SlickExtension.scala index b93f1482d..ccdd4a5ee 100644 --- a/core/src/main/scala/akka/persistence/jdbc/db/SlickExtension.scala +++ b/core/src/main/scala/akka/persistence/jdbc/db/SlickExtension.scala @@ -10,7 +10,7 @@ import akka.persistence.jdbc.config.{ ConfigKeys, SlickConfiguration } import akka.persistence.jdbc.util.ConfigOps._ import com.typesafe.config.{ Config, ConfigObject } -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.util.{ Failure, Success } object SlickExtension extends ExtensionId[SlickExtensionImpl] with ExtensionIdProvider { diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala index 16090901d..302e021dc 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala @@ -13,7 +13,6 @@ import scala.concurrent.Future import scala.util.Try import akka.NotUsed -import akka.dispatch.ExecutionContexts import akka.persistence.jdbc.AkkaSerialization import akka.persistence.jdbc.config.BaseDaoConfig import akka.persistence.jdbc.config.JournalConfig @@ -46,7 +45,7 @@ class DefaultJournalDao( override def baseDaoConfig: BaseDaoConfig = journalConfig.daoConfig override def writeJournalRows(xs: immutable.Seq[(JournalAkkaSerializationRow, Set[String])]): Future[Unit] = { - db.run(queries.writeJournalRows(xs).transactionally).map(_ => ())(ExecutionContexts.parasitic) + db.run(queries.writeJournalRows(xs).transactionally).map(_ => ())(ExecutionContext.parasitic) } val queries = diff --git a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala index 8231d5c4d..fe52cac73 100644 --- a/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/snapshot/dao/DefaultSnapshotDao.scala @@ -11,7 +11,6 @@ import akka.persistence.jdbc.config.SnapshotConfig import akka.serialization.Serialization import akka.stream.Materializer import SnapshotTables._ -import akka.dispatch.ExecutionContexts import akka.persistence.jdbc.AkkaSerialization import scala.concurrent.{ ExecutionContext, Future } @@ -97,23 +96,23 @@ class DefaultSnapshotDao( override def save(snapshotMetadata: SnapshotMetadata, snapshot: Any): Future[Unit] = { val eventualSnapshotRow = Future.fromTry(serializeSnapshot(snapshotMetadata, snapshot)) - eventualSnapshotRow.map(queries.insertOrUpdate).flatMap(db.run).map(_ => ())(ExecutionContexts.parasitic) + eventualSnapshotRow.map(queries.insertOrUpdate).flatMap(db.run).map(_ => ())(ExecutionContext.parasitic) } override def delete(persistenceId: String, sequenceNr: Long): Future[Unit] = db.run(queries.selectByPersistenceIdAndSequenceNr((persistenceId, sequenceNr)).delete) - .map(_ => ())(ExecutionContexts.parasitic) + .map(_ => ())(ExecutionContext.parasitic) override def deleteAllSnapshots(persistenceId: String): Future[Unit] = - db.run(queries.selectAll(persistenceId).delete).map(_ => ())((ExecutionContexts.parasitic)) + db.run(queries.selectAll(persistenceId).delete).map(_ => ())((ExecutionContext.parasitic)) override def deleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long): Future[Unit] = db.run(queries.selectByPersistenceIdUpToMaxSequenceNr((persistenceId, maxSequenceNr)).delete) - .map(_ => ())((ExecutionContexts.parasitic)) + .map(_ => ())((ExecutionContext.parasitic)) override def deleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long): Future[Unit] = db.run(queries.selectByPersistenceIdUpToMaxTimestamp((persistenceId, maxTimestamp)).delete) - .map(_ => ())((ExecutionContexts.parasitic)) + .map(_ => ())((ExecutionContext.parasitic)) override def deleteUpToMaxSequenceNrAndMaxTimestamp( persistenceId: String, @@ -123,5 +122,5 @@ class DefaultSnapshotDao( queries .selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp)) .delete) - .map(_ => ())((ExecutionContexts.parasitic)) + .map(_ => ())((ExecutionContext.parasitic)) } diff --git a/core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala index 360e99959..c2f67148b 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala @@ -7,7 +7,7 @@ package akka.persistence.jdbc.state.javadsl import java.util.Optional import java.util.concurrent.CompletionStage -import scala.compat.java8.FutureConverters._ +import scala.jdk.FutureConverters._ import scala.concurrent.ExecutionContext import akka.annotation.ApiMayChange import slick.jdbc.JdbcProfile @@ -40,13 +40,12 @@ class JdbcDurableStateStore[A]( val queries = new DurableStateQueries(profile, durableStateConfig) def getObject(persistenceId: String): CompletionStage[GetObjectResult[A]] = - toJava( scalaStore .getObject(persistenceId) - .map(x => GetObjectResult(Optional.ofNullable(x.value.getOrElse(null.asInstanceOf[A])), x.revision))) + .map(x => GetObjectResult(Optional.ofNullable(x.value.getOrElse(null.asInstanceOf[A])), x.revision)).asJava def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] = - toJava(scalaStore.upsertObject(persistenceId, revision, value, tag)) + scalaStore.upsertObject(persistenceId, revision, value, tag).asJava @deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0") override def deleteObject(persistenceId: String): CompletionStage[Done] = @@ -54,7 +53,7 @@ class JdbcDurableStateStore[A]( @nowarn("msg=deprecated") override def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] = - toJava(scalaStore.deleteObject(persistenceId)) + scalaStore.deleteObject(persistenceId).asJava def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] = scalaStore.currentChanges(tag, offset).asJava diff --git a/core/src/main/scala/akka/persistence/jdbc/testkit/javadsl/SchemaUtils.scala b/core/src/main/scala/akka/persistence/jdbc/testkit/javadsl/SchemaUtils.scala index 1cda7c878..5c594e305 100644 --- a/core/src/main/scala/akka/persistence/jdbc/testkit/javadsl/SchemaUtils.scala +++ b/core/src/main/scala/akka/persistence/jdbc/testkit/javadsl/SchemaUtils.scala @@ -7,7 +7,7 @@ package akka.persistence.jdbc.testkit.javadsl import java.util.concurrent.CompletionStage -import scala.compat.java8.FutureConverters._ +import scala.jdk.FutureConverters._ import akka.Done import akka.actor.ClassicActorSystemProvider @@ -53,7 +53,7 @@ object SchemaUtils { */ @ApiMayChange def dropIfExists(configKey: String, actorSystem: ClassicActorSystemProvider): CompletionStage[Done] = - SchemaUtilsImpl.dropIfExists(configKey, logger)(actorSystem).toJava + SchemaUtilsImpl.dropIfExists(configKey, logger)(actorSystem).asJava /** * Creates the schema for both the journal and the snapshot table using the default schema definition. @@ -89,7 +89,7 @@ object SchemaUtils { */ @ApiMayChange def createIfNotExists(configKey: String, actorSystem: ClassicActorSystemProvider): CompletionStage[Done] = - SchemaUtilsImpl.createIfNotExists(configKey, logger)(actorSystem).toJava + SchemaUtilsImpl.createIfNotExists(configKey, logger)(actorSystem).asJava /** * This method can be used to load alternative DDL scripts. @@ -125,5 +125,5 @@ object SchemaUtils { separator: String, configKey: String, actorSystem: ClassicActorSystemProvider): CompletionStage[Done] = - SchemaUtilsImpl.applyScript(script, separator, configKey, logger)(actorSystem).toJava + SchemaUtilsImpl.applyScript(script, separator, configKey, logger)(actorSystem).asJava } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index ba4ac5374..7e4792f57 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -7,7 +7,7 @@ object Dependencies { val ScalaVersions = Seq(Scala213, Scala3) - val AkkaVersion = "2.9.3" + val AkkaVersion = "2.10.0-M1" val AkkaBinaryVersion = AkkaVersion.take(3) val SlickVersion = "3.5.1" @@ -21,11 +21,10 @@ object Dependencies { val Libraries: Seq[ModuleID] = Seq( "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion, - // Slick 3.5 pulls in slf4j-api 2.2 which doesn't work with Akka - ("com.typesafe.slick" %% "slick" % SlickVersion).exclude("org.slf4j", "slf4j-api"), - "org.slf4j" % "slf4j-api" % "1.7.36", + ("com.typesafe.slick" %% "slick" % SlickVersion), + "org.slf4j" % "slf4j-api" % "2.0.16", "com.typesafe.slick" %% "slick-hikaricp" % SlickVersion, - "ch.qos.logback" % "logback-classic" % "1.2.13" % Test, + "ch.qos.logback" % "logback-classic" % "1.5.7" % Test, "com.typesafe.akka" %% "akka-slf4j" % AkkaVersion % Test, "com.typesafe.akka" %% "akka-persistence-tck" % AkkaVersion % Test, "com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test, @@ -34,7 +33,7 @@ object Dependencies { val Migration: Seq[ModuleID] = Seq( "com.typesafe" % "config" % "1.4.3", - "ch.qos.logback" % "logback-classic" % "1.2.13", + "ch.qos.logback" % "logback-classic" % "1.5.7", "org.testcontainers" % "postgresql" % "1.20.1" % Test, "org.scalatest" %% "scalatest" % ScalaTestVersion % Test) ++ JdbcDrivers.map(_ % Provided) } From 401072d85d8271b35d19ed0fe55f7f3859cbc2bb Mon Sep 17 00:00:00 2001 From: Sebastian Date: Thu, 19 Sep 2024 21:59:06 +0200 Subject: [PATCH 2/3] fmt --- .../jdbc/state/javadsl/JdbcDurableStateStore.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala b/core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala index c2f67148b..7ea3cc26e 100644 --- a/core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala +++ b/core/src/main/scala/akka/persistence/jdbc/state/javadsl/JdbcDurableStateStore.scala @@ -40,9 +40,10 @@ class JdbcDurableStateStore[A]( val queries = new DurableStateQueries(profile, durableStateConfig) def getObject(persistenceId: String): CompletionStage[GetObjectResult[A]] = - scalaStore - .getObject(persistenceId) - .map(x => GetObjectResult(Optional.ofNullable(x.value.getOrElse(null.asInstanceOf[A])), x.revision)).asJava + scalaStore + .getObject(persistenceId) + .map(x => GetObjectResult(Optional.ofNullable(x.value.getOrElse(null.asInstanceOf[A])), x.revision)) + .asJava def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] = scalaStore.upsertObject(persistenceId, revision, value, tag).asJava From 6c84bcaa84dd4b06b94ddcab5f91edfa12a718f1 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Fri, 20 Sep 2024 07:06:10 +0200 Subject: [PATCH 3/3] remove brackets from slick dependency --- project/Dependencies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7e4792f57..df8fa3ade 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -21,7 +21,7 @@ object Dependencies { val Libraries: Seq[ModuleID] = Seq( "com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion, - ("com.typesafe.slick" %% "slick" % SlickVersion), + "com.typesafe.slick" %% "slick" % SlickVersion, "org.slf4j" % "slf4j-api" % "2.0.16", "com.typesafe.slick" %% "slick-hikaricp" % SlickVersion, "ch.qos.logback" % "logback-classic" % "1.5.7" % Test,