Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: bump to akka 2.10.0-M1, align with changes from upstream #859

Merged
merged 3 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.persistence.jdbc.config.{ ConfigKeys, SlickConfiguration }
import akka.persistence.jdbc.util.ConfigOps._
import com.typesafe.config.{ Config, ConfigObject }

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{ Failure, Success }

object SlickExtension extends ExtensionId[SlickExtensionImpl] with ExtensionIdProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import scala.concurrent.Future
import scala.util.Try

import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.persistence.jdbc.AkkaSerialization
import akka.persistence.jdbc.config.BaseDaoConfig
import akka.persistence.jdbc.config.JournalConfig
Expand Down Expand Up @@ -46,7 +45,7 @@ class DefaultJournalDao(
override def baseDaoConfig: BaseDaoConfig = journalConfig.daoConfig

override def writeJournalRows(xs: immutable.Seq[(JournalAkkaSerializationRow, Set[String])]): Future[Unit] = {
db.run(queries.writeJournalRows(xs).transactionally).map(_ => ())(ExecutionContexts.parasitic)
db.run(queries.writeJournalRows(xs).transactionally).map(_ => ())(ExecutionContext.parasitic)
}

val queries =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import akka.persistence.jdbc.config.SnapshotConfig
import akka.serialization.Serialization
import akka.stream.Materializer
import SnapshotTables._
import akka.dispatch.ExecutionContexts
import akka.persistence.jdbc.AkkaSerialization

import scala.concurrent.{ ExecutionContext, Future }
Expand Down Expand Up @@ -97,23 +96,23 @@ class DefaultSnapshotDao(

override def save(snapshotMetadata: SnapshotMetadata, snapshot: Any): Future[Unit] = {
val eventualSnapshotRow = Future.fromTry(serializeSnapshot(snapshotMetadata, snapshot))
eventualSnapshotRow.map(queries.insertOrUpdate).flatMap(db.run).map(_ => ())(ExecutionContexts.parasitic)
eventualSnapshotRow.map(queries.insertOrUpdate).flatMap(db.run).map(_ => ())(ExecutionContext.parasitic)
}

override def delete(persistenceId: String, sequenceNr: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdAndSequenceNr((persistenceId, sequenceNr)).delete)
.map(_ => ())(ExecutionContexts.parasitic)
.map(_ => ())(ExecutionContext.parasitic)

override def deleteAllSnapshots(persistenceId: String): Future[Unit] =
db.run(queries.selectAll(persistenceId).delete).map(_ => ())((ExecutionContexts.parasitic))
db.run(queries.selectAll(persistenceId).delete).map(_ => ())((ExecutionContext.parasitic))

override def deleteUpToMaxSequenceNr(persistenceId: String, maxSequenceNr: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdUpToMaxSequenceNr((persistenceId, maxSequenceNr)).delete)
.map(_ => ())((ExecutionContexts.parasitic))
.map(_ => ())((ExecutionContext.parasitic))

override def deleteUpToMaxTimestamp(persistenceId: String, maxTimestamp: Long): Future[Unit] =
db.run(queries.selectByPersistenceIdUpToMaxTimestamp((persistenceId, maxTimestamp)).delete)
.map(_ => ())((ExecutionContexts.parasitic))
.map(_ => ())((ExecutionContext.parasitic))

override def deleteUpToMaxSequenceNrAndMaxTimestamp(
persistenceId: String,
Expand All @@ -123,5 +122,5 @@ class DefaultSnapshotDao(
queries
.selectByPersistenceIdUpToMaxSequenceNrAndMaxTimestamp((persistenceId, maxSequenceNr, maxTimestamp))
.delete)
.map(_ => ())((ExecutionContexts.parasitic))
.map(_ => ())((ExecutionContext.parasitic))
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.persistence.jdbc.state.javadsl

import java.util.Optional
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._
import scala.concurrent.ExecutionContext
import akka.annotation.ApiMayChange
import slick.jdbc.JdbcProfile
Expand Down Expand Up @@ -40,21 +40,21 @@ class JdbcDurableStateStore[A](
val queries = new DurableStateQueries(profile, durableStateConfig)

def getObject(persistenceId: String): CompletionStage[GetObjectResult[A]] =
toJava(
scalaStore
.getObject(persistenceId)
.map(x => GetObjectResult(Optional.ofNullable(x.value.getOrElse(null.asInstanceOf[A])), x.revision)))
scalaStore
.getObject(persistenceId)
.map(x => GetObjectResult(Optional.ofNullable(x.value.getOrElse(null.asInstanceOf[A])), x.revision))
.asJava

def upsertObject(persistenceId: String, revision: Long, value: A, tag: String): CompletionStage[Done] =
toJava(scalaStore.upsertObject(persistenceId, revision, value, tag))
scalaStore.upsertObject(persistenceId, revision, value, tag).asJava

@deprecated(message = "Use the deleteObject overload with revision instead.", since = "1.0.0")
override def deleteObject(persistenceId: String): CompletionStage[Done] =
deleteObject(persistenceId, revision = 0)

@nowarn("msg=deprecated")
override def deleteObject(persistenceId: String, revision: Long): CompletionStage[Done] =
toJava(scalaStore.deleteObject(persistenceId))
scalaStore.deleteObject(persistenceId).asJava

def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], NotUsed] =
scalaStore.currentChanges(tag, offset).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.persistence.jdbc.testkit.javadsl

import java.util.concurrent.CompletionStage

import scala.compat.java8.FutureConverters._
import scala.jdk.FutureConverters._

import akka.Done
import akka.actor.ClassicActorSystemProvider
Expand Down Expand Up @@ -53,7 +53,7 @@ object SchemaUtils {
*/
@ApiMayChange
def dropIfExists(configKey: String, actorSystem: ClassicActorSystemProvider): CompletionStage[Done] =
SchemaUtilsImpl.dropIfExists(configKey, logger)(actorSystem).toJava
SchemaUtilsImpl.dropIfExists(configKey, logger)(actorSystem).asJava

/**
* Creates the schema for both the journal and the snapshot table using the default schema definition.
Expand Down Expand Up @@ -89,7 +89,7 @@ object SchemaUtils {
*/
@ApiMayChange
def createIfNotExists(configKey: String, actorSystem: ClassicActorSystemProvider): CompletionStage[Done] =
SchemaUtilsImpl.createIfNotExists(configKey, logger)(actorSystem).toJava
SchemaUtilsImpl.createIfNotExists(configKey, logger)(actorSystem).asJava

/**
* This method can be used to load alternative DDL scripts.
Expand Down Expand Up @@ -125,5 +125,5 @@ object SchemaUtils {
separator: String,
configKey: String,
actorSystem: ClassicActorSystemProvider): CompletionStage[Done] =
SchemaUtilsImpl.applyScript(script, separator, configKey, logger)(actorSystem).toJava
SchemaUtilsImpl.applyScript(script, separator, configKey, logger)(actorSystem).asJava
}
11 changes: 5 additions & 6 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ object Dependencies {

val ScalaVersions = Seq(Scala213, Scala3)

val AkkaVersion = "2.9.3"
val AkkaVersion = "2.10.0-M1"
val AkkaBinaryVersion = AkkaVersion.take(3)

val SlickVersion = "3.5.1"
Expand All @@ -21,11 +21,10 @@ object Dependencies {

val Libraries: Seq[ModuleID] = Seq(
"com.typesafe.akka" %% "akka-persistence-query" % AkkaVersion,
// Slick 3.5 pulls in slf4j-api 2.2 which doesn't work with Akka
("com.typesafe.slick" %% "slick" % SlickVersion).exclude("org.slf4j", "slf4j-api"),
"org.slf4j" % "slf4j-api" % "1.7.36",
"com.typesafe.slick" %% "slick" % SlickVersion,
"org.slf4j" % "slf4j-api" % "2.0.16",
"com.typesafe.slick" %% "slick-hikaricp" % SlickVersion,
"ch.qos.logback" % "logback-classic" % "1.2.13" % Test,
"ch.qos.logback" % "logback-classic" % "1.5.7" % Test,
"com.typesafe.akka" %% "akka-slf4j" % AkkaVersion % Test,
"com.typesafe.akka" %% "akka-persistence-tck" % AkkaVersion % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % AkkaVersion % Test,
Expand All @@ -34,7 +33,7 @@ object Dependencies {

val Migration: Seq[ModuleID] = Seq(
"com.typesafe" % "config" % "1.4.3",
"ch.qos.logback" % "logback-classic" % "1.2.13",
"ch.qos.logback" % "logback-classic" % "1.5.7",
"org.testcontainers" % "postgresql" % "1.20.1" % Test,
"org.scalatest" %% "scalatest" % ScalaTestVersion % Test) ++ JdbcDrivers.map(_ % Provided)
}
Loading