Skip to content

Commit

Permalink
feat: Scala 3 support (#775)
Browse files Browse the repository at this point in the history
Includes 
* bump to Slick 3.5.0
* mark EagerSlickDatabase and LazySlickDatabase as internal
---------

Co-authored-by: Johan Andrén <[email protected]>
  • Loading branch information
jtjeferreira and johanandren authored Mar 8, 2024
1 parent 20d20a6 commit df901fa
Show file tree
Hide file tree
Showing 37 changed files with 141 additions and 68 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ jobs:
with:
jvm: temurin:1.11.0

- name: Compile all code with fatal warnings for Java 11, Scala 2.13
- name: Compile all code with fatal warnings for Java 11 and Scala 2.13
# Run locally with: sbt 'clean ; +Test/compile ; +It/compile'
run: sbt "; Test/compile"

- name: Compile all code with fatal warnings for Java 11 and Scala 3.3
run: sbt "++3.3; Test/compile"

check-docs:
name: Check Docs
runs-on: ubuntu-22.04
Expand Down
12 changes: 9 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ lazy val core = project
name := "akka-persistence-jdbc",
libraryDependencies ++= Dependencies.Libraries,
mimaReportSignatureProblems := true,
mimaPreviousArtifacts := Set(
organization.value %% name.value % previousStableVersion.value.getOrElse(
throw new Error("Unable to determine previous version for MiMa"))))
mimaPreviousArtifacts := {
if (scalaVersion.value.startsWith("3")) {
Set.empty
} else {
Set(
organization.value %% name.value % previousStableVersion.value.getOrElse(
throw new Error("Unable to determine previous version for MiMa")))
}
})

lazy val integration = project
.in(file("integration"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.unapply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.tupled")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.curried")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.database")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.copy$default$1")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.this")

ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.LazySlickDatabase.database")

ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.SlickDatabase.forConfig")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.SlickDatabase.database")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.db.SlickDatabase.database")

ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.JdbcAsyncWriteJournal.db")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.DefaultJournalDao.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.journal.dao.DefaultJournalDao.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.BaseByteArrayJournalDao.db")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.journal.dao.legacy.BaseByteArrayJournalDao.db")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.DefaultReadJournalDao.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.query.dao.DefaultReadJournalDao.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.BaseByteArrayReadJournalDao.db")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.query.dao.legacy.BaseByteArrayReadJournalDao.db")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.OracleReadJournalDao.db")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.query.dao.legacy.OracleReadJournalDao.db")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.snapshot.JdbcSnapshotStore.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.snapshot.dao.DefaultSnapshotDao.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.snapshot.dao.legacy.ByteArraySnapshotDao.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.state.JdbcDurableStateStoreProvider.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore.this")

ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.query.JournalSequenceActor.receive")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.query.JournalSequenceActor.receive$default$4")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.query.JournalSequenceActor.findGaps")




Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package akka.persistence.jdbc.db

import akka.actor.ActorSystem
import akka.annotation.InternalApi

import javax.naming.InitialContext
import akka.persistence.jdbc.config.SlickConfiguration
import com.typesafe.config.Config
Expand Down Expand Up @@ -82,6 +84,7 @@ trait SlickDatabase {
def allowShutdown: Boolean
}

@InternalApi
case class EagerSlickDatabase(database: Database, profile: JdbcProfile) extends SlickDatabase {
override def allowShutdown: Boolean = true
}
Expand All @@ -90,6 +93,7 @@ case class EagerSlickDatabase(database: Database, profile: JdbcProfile) extends
* A LazySlickDatabase lazily initializes a database, it also manages the shutdown of the database
* @param config The configuration used to create the database
*/
@InternalApi
class LazySlickDatabase(config: Config, system: ActorSystem) extends SlickDatabase {
val profile: JdbcProfile = SlickDatabase.profile(config, path = "")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ trait JournalTables {
eventSerManifest,
metaPayload,
metaSerId,
metaSerManifest) <> (JournalAkkaSerializationRow.tupled, JournalAkkaSerializationRow.unapply)
metaSerManifest) <> ((JournalAkkaSerializationRow.apply _).tupled, JournalAkkaSerializationRow.unapply)

val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering, O.AutoInc)
val persistenceId: Rep[String] =
Expand All @@ -91,17 +91,17 @@ trait JournalTables {
lazy val JournalTable = new TableQuery(tag => new JournalEvents(tag))

class EventTags(_tableTag: Tag) extends Table[TagRow](_tableTag, tagTableCfg.schemaName, tagTableCfg.tableName) {
override def * = (eventId, persistenceId, sequenceNumber, tag) <> (TagRow.tupled, TagRow.unapply)
override def * = (eventId, persistenceId, sequenceNumber, tag) <> ((TagRow.apply _).tupled, TagRow.unapply)
// allow null value insert.
val eventId: Rep[Option[Long]] = column[Long](tagTableCfg.columnNames.eventId)
val persistenceId: Rep[Option[String]] = column[String](tagTableCfg.columnNames.persistenceId)
val sequenceNumber: Rep[Option[Long]] = column[Long](tagTableCfg.columnNames.sequenceNumber)
val eventId: Rep[Option[Long]] = column[Option[Long]](tagTableCfg.columnNames.eventId)
val persistenceId: Rep[Option[String]] = column[Option[String]](tagTableCfg.columnNames.persistenceId)
val sequenceNumber: Rep[Option[Long]] = column[Option[Long]](tagTableCfg.columnNames.sequenceNumber)
val tag: Rep[String] = column[String](tagTableCfg.columnNames.tag)

val pk = primaryKey(s"${tagTableCfg.tableName}_pk", (persistenceId, sequenceNumber, tag))
val journalEvent =
foreignKey(s"fk_${journalTableCfg.tableName}", (persistenceId, sequenceNumber), JournalTable)(e =>
(e.persistenceId, e.sequenceNumber))
(Rep.Some(e.persistenceId), Rep.Some(e.sequenceNumber)))
}

lazy val TagTable = new TableQuery(tag => new EventTags(tag))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class ByteArrayJournalDao(
serialization: Serialization)(implicit val ec: ExecutionContext, val mat: Materializer)
extends BaseByteArrayJournalDao {
val queries = new JournalQueries(profile, journalConfig.journalTableConfiguration)
val serializer = new ByteArrayJournalSerializer(serialization, journalConfig.pluginConfig.tagSeparator)
val serializer: ByteArrayJournalSerializer =
new ByteArrayJournalSerializer(serialization, journalConfig.pluginConfig.tagSeparator)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ trait JournalTables {
_tableTag,
_schemaName = journalTableCfg.schemaName,
_tableName = journalTableCfg.tableName) {
def * = (ordering, deleted, persistenceId, sequenceNumber, message, tags) <> (JournalRow.tupled, JournalRow.unapply)
def * = (
ordering,
deleted,
persistenceId,
sequenceNumber,
message,
tags) <> ((JournalRow.apply _).tupled, JournalRow.unapply)

val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering, O.AutoInc)
val persistenceId: Rep[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import com.typesafe.config.Config

class JdbcReadJournalProvider(system: ExtendedActorSystem, config: Config, configPath: String)
extends ReadJournalProvider {
override val scaladslReadJournal = new scaladsl.JdbcReadJournal(config, configPath)(system)
override def scaladslReadJournal(): scaladsl.JdbcReadJournal =
new scaladsl.JdbcReadJournal(config, configPath)(system)

override val javadslReadJournal = new javadsl.JdbcReadJournal(scaladslReadJournal)
override def javadslReadJournal(): javadsl.JdbcReadJournal = new javadsl.JdbcReadJournal(scaladslReadJournal())
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen
* @param moduloCounter A counter which is incremented every time a new query have been executed, modulo `maxTries`
* @param previousDelay The last used delay (may change in case failures occur)
*/
def receive(
private def receive(
currentMaxOrdering: OrderingId,
missingByCounter: Map[Int, MissingElements],
moduloCounter: Int,
Expand Down Expand Up @@ -129,7 +129,7 @@ class JournalSequenceActor(readJournalDao: ReadJournalDao, config: JournalSequen
/**
* This method that implements the "find gaps" algo. It's the meat and main purpose of this actor.
*/
def findGaps(
private def findGaps(
elements: Seq[OrderingId],
currentMaxOrdering: OrderingId,
missingByCounter: Map[Int, MissingElements],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ trait OracleReadJournalDao extends ReadJournalDao {
}
}

implicit val getJournalRow = GetResult(r => JournalRow(r.<<, r.<<, r.<<, r.<<, r.nextBytes(), r.<<))
implicit val getJournalRow: GetResult[JournalRow] =
GetResult(r => JournalRow(r.<<, r.<<, r.<<, r.<<, r.nextBytes(), r.<<))

abstract override def eventsByTag(
tag: String,
Expand Down Expand Up @@ -155,5 +156,6 @@ class ByteArrayReadJournalDao(
extends BaseByteArrayReadJournalDao
with OracleReadJournalDao {
val queries = new ReadJournalQueries(profile, readJournalConfig)
val serializer = new ByteArrayJournalSerializer(serialization, readJournalConfig.pluginConfig.tagSeparator)
val serializer: ByteArrayJournalSerializer =
new ByteArrayJournalSerializer(serialization, readJournalConfig.pluginConfig.tagSeparator)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ object SnapshotTables {
metaSerId: Option[Int],
metaSerManifest: Option[String],
metaPayload: Option[Array[Byte]])

}

trait SnapshotTables {
Expand All @@ -43,7 +44,7 @@ trait SnapshotTables {
snapshotPayload,
metaSerId,
metaSerManifest,
metaPayload) <> (SnapshotRow.tupled, SnapshotRow.unapply)
metaPayload) <> ((SnapshotRow.apply _).tupled, SnapshotRow.unapply)

val persistenceId: Rep[String] =
column[String](snapshotTableCfg.columnNames.persistenceId, O.Length(255, varying = true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait SnapshotTables {
_tableTag,
_schemaName = snapshotTableCfg.schemaName,
_tableName = snapshotTableCfg.tableName) {
def * = (persistenceId, sequenceNumber, created, snapshot) <> (SnapshotRow.tupled, SnapshotRow.unapply)
def * = (persistenceId, sequenceNumber, created, snapshot) <> ((SnapshotRow.apply _).tupled, SnapshotRow.unapply)

val persistenceId: Rep[String] =
column[String](snapshotTableCfg.columnNames.persistenceId, O.Length(255, varying = true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration
case _ => ???
}

implicit val uuidSetter = SetParameter[Array[Byte]] { case (bytes, params) =>
implicit val uuidSetter: SetParameter[Array[Byte]] = SetParameter[Array[Byte]] { case (bytes, params) =>
params.setBytes(bytes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration

def * =
(globalOffset, persistenceId, revision, statePayload, tag, stateSerId, stateSerManifest, stateTimestamp)
.<>(DurableStateRow.tupled, DurableStateRow.unapply)
.<>((DurableStateRow.apply _).tupled, DurableStateRow.unapply)

val globalOffset: Rep[Long] = column[Long](durableStateTableCfg.columnNames.globalOffset, O.AutoInc)
val persistenceId: Rep[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class JdbcDurableStateStoreProvider[A](system: ExtendedActorSystem) extends Dura
lazy val serialization = SerializationExtension(system)
val profile: JdbcProfile = slickDb.profile

override val scaladslDurableStateStore: DurableStateStore[Any] =
override def scaladslDurableStateStore(): DurableStateStore[Any] =
new scaladsl.JdbcDurableStateStore[Any](db, profile, durableStateConfig, serialization)(system)

override val javadslDurableStateStore: JDurableStateStore[AnyRef] =
override def javadslDurableStateStore(): JDurableStateStore[AnyRef] =
new javadsl.JdbcDurableStateStore[AnyRef](
profile,
durableStateConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import akka.annotation.InternalApi
* Efficient representation of missing elements using NumericRanges.
* It can be seen as a collection of GlobalOffset
*/
private case class MissingElements(elements: Seq[NumericRange[GlobalOffset]]) {
case class MissingElements(elements: Seq[NumericRange[GlobalOffset]]) {
def addRange(from: GlobalOffset, until: GlobalOffset): MissingElements = {
val newRange = from.until(until)
MissingElements(elements :+ newRange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import akka.serialization.Serialization
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.{ Materializer, SystemMaterializer }
import akka.util.Timeout
import DurableStateSequenceActor._
import OffsetSyntax._
import akka.annotation.ApiMayChange
import akka.persistence.query.UpdatedDurableState
Expand All @@ -45,6 +44,7 @@ class JdbcDurableStateStore[A](
serialization: Serialization)(implicit val system: ExtendedActorSystem)
extends DurableStateUpdateStore[A]
with DurableStateStoreQuery[A] {
import DurableStateSequenceActor._
import FlowControl._
import profile.api._

Expand Down Expand Up @@ -211,20 +211,16 @@ class JdbcDurableStateStore[A](
}

private def updateDurableState(row: DurableStateTables.DurableStateRow) = {
import queries._

for {
s <- getSequenceNextValueExpr()
u <- updateDbWithDurableState(row, s.head)
s <- queries.getSequenceNextValueExpr()
u <- queries.updateDbWithDurableState(row, s.head)
} yield u
}

private def insertDurableState(row: DurableStateTables.DurableStateRow) = {
import queries._

for {
s <- getSequenceNextValueExpr()
u <- insertDbWithDurableState(row, s.head)
s <- queries.getSequenceNextValueExpr()
u <- queries.insertDbWithDurableState(row, s.head)
} yield u
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ abstract class SharedActorSystemTestSpec(val config: Config) extends SimpleSpec

implicit lazy val ec: ExecutionContext = system.dispatcher
implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute)
implicit val timeout = Timeout(1.minute)
implicit val timeout: Timeout = Timeout(1.minute)

lazy val serialization = SerializationExtension(system)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ abstract class SingleActorSystemPerTestSpec(val config: Config)
conf.withValue(path, configValue)
})

implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute)
override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 1.minute)
implicit val timeout: Timeout = Timeout(1.minute)

val cfg = config.getConfig("jdbc-journal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

abstract class JdbcJournalPerfSpec(config: Config, schemaType: SchemaType)
Expand All @@ -29,7 +30,7 @@ abstract class JdbcJournalPerfSpec(config: Config, schemaType: SchemaType)
with DropCreate {
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = true

implicit lazy val ec = system.dispatcher
implicit lazy val ec: ExecutionContext = system.dispatcher

implicit def pc: PatienceConfig = PatienceConfig(timeout = 10.minutes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

abstract class JdbcJournalSpec(config: Config, schemaType: SchemaType)
Expand All @@ -28,7 +29,7 @@ abstract class JdbcJournalSpec(config: Config, schemaType: SchemaType)

implicit val pc: PatienceConfig = PatienceConfig(timeout = 10.seconds)

implicit lazy val ec = system.dispatcher
implicit lazy val ec: ExecutionContext = system.dispatcher

lazy val cfg = system.settings.config.getConfig("jdbc-journal")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf
journalOps.withCurrentEventsByTag()(tag, NoOffset) { tp =>
// The stream must complete within the given amount of time
// This make take a while in case the journal sequence actor detects gaps
val allEvents = tp.toStrict(atMost = 20.seconds)
val allEvents = tp.toStrict(atMost = 40.seconds)
allEvents.size should be >= 600
val expectedOffsets = 1L.to(allEvents.size).map(Sequence.apply)
allEvents.map(_.offset) shouldBe expectedOffsets
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object EventsByTagMigrationTest {
"jdbc-read-journal.refresh-interval" -> ConfigValueFactory.fromAnyRef(refreshInterval.toString))
}

abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(config, migrationConfigOverride) {
abstract class EventsByTagMigrationTest(configS: String) extends QueryTestSpec(configS, migrationConfigOverride) {
final val NoMsgTime: FiniteDuration = 100.millis

val tagTableCfg = journalConfig.eventTagTableConfiguration
Expand Down Expand Up @@ -158,7 +158,7 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co

// override this, so we can reset the value.
def withRollingUpdateActorSystem(f: ActorSystem => Unit): Unit = {
val legacyTagKeyConfig = legacyTagKeyConfigOverride.foldLeft(ConfigFactory.load(config)) {
val legacyTagKeyConfig = legacyTagKeyConfigOverride.foldLeft(ConfigFactory.load(configS)) {
case (conf, (path, configValue)) =>
conf.withValue(path, configValue)
}
Expand Down
Loading

0 comments on commit df901fa

Please sign in to comment.