Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala 3 support #775

Merged
merged 17 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ jobs:
with:
jvm: temurin:1.11.0

- name: Compile all code with fatal warnings for Java 11, Scala 2.13
- name: Compile all code with fatal warnings for Java 11 and Scala 2.13
# Run locally with: sbt 'clean ; +Test/compile ; +It/compile'
run: sbt "; Test/compile"

- name: Compile all code with fatal warnings for Java 11 and Scala 3.3
run: sbt "++3.3; Test/compile"

check-docs:
name: Check Docs
runs-on: ubuntu-22.04
Expand Down
12 changes: 9 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@ lazy val core = project
name := "akka-persistence-jdbc",
libraryDependencies ++= Dependencies.Libraries,
mimaReportSignatureProblems := true,
mimaPreviousArtifacts := Set(
organization.value %% name.value % previousStableVersion.value.getOrElse(
throw new Error("Unable to determine previous version for MiMa"))))
mimaPreviousArtifacts := {
if (scalaVersion.value.startsWith("3")) {
Set.empty
} else {
Set(
organization.value %% name.value % previousStableVersion.value.getOrElse(
throw new Error("Unable to determine previous version for MiMa")))
}
})

lazy val integration = project
.in(file("integration"))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.apply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.unapply")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.tupled")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.curried")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.database")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.copy")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.copy$default$1")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.db.EagerSlickDatabase.this")

ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.LazySlickDatabase.database")
johanandren marked this conversation as resolved.
Show resolved Hide resolved

ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.SlickDatabase.forConfig")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.db.SlickDatabase.database")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.db.SlickDatabase.database")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SlickDatabase already is internal, so this is fine


ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.JdbcAsyncWriteJournal.db")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.DefaultJournalDao.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.journal.dao.DefaultJournalDao.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.BaseByteArrayJournalDao.db")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.journal.dao.legacy.BaseByteArrayJournalDao.db")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.DefaultReadJournalDao.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.query.dao.DefaultReadJournalDao.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.BaseByteArrayReadJournalDao.db")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.query.dao.legacy.BaseByteArrayReadJournalDao.db")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.query.dao.legacy.ByteArrayReadJournalDao.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.query.dao.legacy.OracleReadJournalDao.db")
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.persistence.jdbc.query.dao.legacy.OracleReadJournalDao.db")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.snapshot.JdbcSnapshotStore.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.snapshot.dao.DefaultSnapshotDao.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.snapshot.dao.legacy.ByteArraySnapshotDao.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.state.JdbcDurableStateStoreProvider.db")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore.this")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both fine to exclude:

Constructor isn't used by user code (instantiated by akka-persistence). A bit curious that it shows up, I can't see that it wasn't touched for any of the listed types though.

db field shouldn't ever be touched by user code, effectively protected.




Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ trait JournalTables {
eventSerManifest,
metaPayload,
metaSerId,
metaSerManifest) <> (JournalAkkaSerializationRow.tupled, JournalAkkaSerializationRow.unapply)
metaSerManifest) <> ((JournalAkkaSerializationRow.apply _).tupled, JournalAkkaSerializationRow.unapply)

val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering, O.AutoInc)
val persistenceId: Rep[String] =
Expand All @@ -91,17 +91,17 @@ trait JournalTables {
lazy val JournalTable = new TableQuery(tag => new JournalEvents(tag))

class EventTags(_tableTag: Tag) extends Table[TagRow](_tableTag, tagTableCfg.schemaName, tagTableCfg.tableName) {
override def * = (eventId, persistenceId, sequenceNumber, tag) <> (TagRow.tupled, TagRow.unapply)
override def * = (eventId, persistenceId, sequenceNumber, tag) <> ((TagRow.apply _).tupled, TagRow.unapply)
// allow null value insert.
val eventId: Rep[Option[Long]] = column[Long](tagTableCfg.columnNames.eventId)
val persistenceId: Rep[Option[String]] = column[String](tagTableCfg.columnNames.persistenceId)
val sequenceNumber: Rep[Option[Long]] = column[Long](tagTableCfg.columnNames.sequenceNumber)
val eventId: Rep[Option[Long]] = column[Option[Long]](tagTableCfg.columnNames.eventId)
val persistenceId: Rep[Option[String]] = column[Option[String]](tagTableCfg.columnNames.persistenceId)
val sequenceNumber: Rep[Option[Long]] = column[Option[Long]](tagTableCfg.columnNames.sequenceNumber)
val tag: Rep[String] = column[String](tagTableCfg.columnNames.tag)

val pk = primaryKey(s"${tagTableCfg.tableName}_pk", (persistenceId, sequenceNumber, tag))
val journalEvent =
foreignKey(s"fk_${journalTableCfg.tableName}", (persistenceId, sequenceNumber), JournalTable)(e =>
(e.persistenceId, e.sequenceNumber))
(Rep.Some(e.persistenceId), Rep.Some(e.sequenceNumber)))
}

lazy val TagTable = new TableQuery(tag => new EventTags(tag))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class ByteArrayJournalDao(
serialization: Serialization)(implicit val ec: ExecutionContext, val mat: Materializer)
extends BaseByteArrayJournalDao {
val queries = new JournalQueries(profile, journalConfig.journalTableConfiguration)
val serializer = new ByteArrayJournalSerializer(serialization, journalConfig.pluginConfig.tagSeparator)
val serializer: ByteArrayJournalSerializer =
new ByteArrayJournalSerializer(serialization, journalConfig.pluginConfig.tagSeparator)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,13 @@ trait JournalTables {
_tableTag,
_schemaName = journalTableCfg.schemaName,
_tableName = journalTableCfg.tableName) {
def * = (ordering, deleted, persistenceId, sequenceNumber, message, tags) <> (JournalRow.tupled, JournalRow.unapply)
def * = (
ordering,
deleted,
persistenceId,
sequenceNumber,
message,
tags) <> ((JournalRow.apply _).tupled, JournalRow.unapply)

val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering, O.AutoInc)
val persistenceId: Rep[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import com.typesafe.config.Config

class JdbcReadJournalProvider(system: ExtendedActorSystem, config: Config, configPath: String)
extends ReadJournalProvider {
override val scaladslReadJournal = new scaladsl.JdbcReadJournal(config, configPath)(system)
override def scaladslReadJournal(): scaladsl.JdbcReadJournal =
new scaladsl.JdbcReadJournal(config, configPath)(system)

override val javadslReadJournal = new javadsl.JdbcReadJournal(scaladslReadJournal)
override def javadslReadJournal(): javadsl.JdbcReadJournal = new javadsl.JdbcReadJournal(scaladslReadJournal())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned instances are cached and shared by akka-persistence internals so this change is fine. 👍

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,15 @@ object JournalSequenceActor {
* Efficient representation of missing elements using NumericRanges.
* It can be seen as a collection of OrderingIds
*/
private case class MissingElements(elements: Seq[NumericRange[OrderingId]]) {
case class MissingElements(elements: Seq[NumericRange[OrderingId]]) {
def addRange(from: OrderingId, until: OrderingId): MissingElements = {
val newRange = from.until(until)
MissingElements(elements :+ newRange)
}
def contains(id: OrderingId): Boolean = elements.exists(_.containsTyped(id))
def isEmpty: Boolean = elements.forall(_.isEmpty)
}
private object MissingElements {
object MissingElements {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the multi-param JournalSequenceActor.receive(currentMaxOrdering ...) private instead of changing these (it's an internal factory for Receive instances, not called from the outside)

def empty: MissingElements = MissingElements(Vector.empty)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ trait OracleReadJournalDao extends ReadJournalDao {
}
}

implicit val getJournalRow = GetResult(r => JournalRow(r.<<, r.<<, r.<<, r.<<, r.nextBytes(), r.<<))
implicit val getJournalRow: GetResult[JournalRow] =
GetResult(r => JournalRow(r.<<, r.<<, r.<<, r.<<, r.nextBytes(), r.<<))

abstract override def eventsByTag(
tag: String,
Expand Down Expand Up @@ -155,5 +156,6 @@ class ByteArrayReadJournalDao(
extends BaseByteArrayReadJournalDao
with OracleReadJournalDao {
val queries = new ReadJournalQueries(profile, readJournalConfig)
val serializer = new ByteArrayJournalSerializer(serialization, readJournalConfig.pluginConfig.tagSeparator)
val serializer: ByteArrayJournalSerializer =
new ByteArrayJournalSerializer(serialization, readJournalConfig.pluginConfig.tagSeparator)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ object SnapshotTables {
metaSerId: Option[Int],
metaSerManifest: Option[String],
metaPayload: Option[Array[Byte]])

}

trait SnapshotTables {
Expand All @@ -43,7 +44,7 @@ trait SnapshotTables {
snapshotPayload,
metaSerId,
metaSerManifest,
metaPayload) <> (SnapshotRow.tupled, SnapshotRow.unapply)
metaPayload) <> ((SnapshotRow.apply _).tupled, SnapshotRow.unapply)

val persistenceId: Rep[String] =
column[String](snapshotTableCfg.columnNames.persistenceId, O.Length(255, varying = true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ trait SnapshotTables {
_tableTag,
_schemaName = snapshotTableCfg.schemaName,
_tableName = snapshotTableCfg.tableName) {
def * = (persistenceId, sequenceNumber, created, snapshot) <> (SnapshotRow.tupled, SnapshotRow.unapply)
def * = (persistenceId, sequenceNumber, created, snapshot) <> ((SnapshotRow.apply _).tupled, SnapshotRow.unapply)

val persistenceId: Rep[String] =
column[String](snapshotTableCfg.columnNames.persistenceId, O.Length(255, varying = true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration
case _ => ???
}

implicit val uuidSetter = SetParameter[Array[Byte]] { case (bytes, params) =>
implicit val uuidSetter: SetParameter[Array[Byte]] = SetParameter[Array[Byte]] { case (bytes, params) =>
params.setBytes(bytes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import akka.persistence.jdbc.config.DurableStateTableConfiguration

def * =
(globalOffset, persistenceId, revision, statePayload, tag, stateSerId, stateSerManifest, stateTimestamp)
.<>(DurableStateRow.tupled, DurableStateRow.unapply)
.<>((DurableStateRow.apply _).tupled, DurableStateRow.unapply)

val globalOffset: Rep[Long] = column[Long](durableStateTableCfg.columnNames.globalOffset, O.AutoInc)
val persistenceId: Rep[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@ class JdbcDurableStateStoreProvider[A](system: ExtendedActorSystem) extends Dura
lazy val serialization = SerializationExtension(system)
val profile: JdbcProfile = slickDb.profile

override val scaladslDurableStateStore: DurableStateStore[Any] =
override def scaladslDurableStateStore(): DurableStateStore[Any] =
new scaladsl.JdbcDurableStateStore[Any](db, profile, durableStateConfig, serialization)(system)

override val javadslDurableStateStore: JDurableStateStore[AnyRef] =
override def javadslDurableStateStore(): JDurableStateStore[AnyRef] =
new javadsl.JdbcDurableStateStore[AnyRef](
profile,
durableStateConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import akka.annotation.InternalApi
* Efficient representation of missing elements using NumericRanges.
* It can be seen as a collection of GlobalOffset
*/
private case class MissingElements(elements: Seq[NumericRange[GlobalOffset]]) {
case class MissingElements(elements: Seq[NumericRange[GlobalOffset]]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, make the method private instead

def addRange(from: GlobalOffset, until: GlobalOffset): MissingElements = {
val newRange = from.until(until)
MissingElements(elements :+ newRange)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,20 +211,16 @@ class JdbcDurableStateStore[A](
}

private def updateDurableState(row: DurableStateTables.DurableStateRow) = {
import queries._

for {
s <- getSequenceNextValueExpr()
u <- updateDbWithDurableState(row, s.head)
s <- queries.getSequenceNextValueExpr()
u <- queries.updateDbWithDurableState(row, s.head)
} yield u
}

private def insertDurableState(row: DurableStateTables.DurableStateRow) = {
import queries._

for {
s <- getSequenceNextValueExpr()
u <- insertDbWithDurableState(row, s.head)
s <- queries.getSequenceNextValueExpr()
u <- queries.insertDbWithDurableState(row, s.head)
} yield u
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ abstract class SharedActorSystemTestSpec(val config: Config) extends SimpleSpec

implicit lazy val ec: ExecutionContext = system.dispatcher
implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute)
implicit val timeout = Timeout(1.minute)
implicit val timeout: Timeout = Timeout(1.minute)

lazy val serialization = SerializationExtension(system)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ abstract class SingleActorSystemPerTestSpec(val config: Config)
conf.withValue(path, configValue)
})

implicit val pc: PatienceConfig = PatienceConfig(timeout = 1.minute)
override implicit val patienceConfig: PatienceConfig = PatienceConfig(timeout = 1.minute)
implicit val timeout: Timeout = Timeout(1.minute)

val cfg = config.getConfig("jdbc-journal")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

abstract class JdbcJournalPerfSpec(config: Config, schemaType: SchemaType)
Expand All @@ -29,7 +30,7 @@ abstract class JdbcJournalPerfSpec(config: Config, schemaType: SchemaType)
with DropCreate {
override protected def supportsRejectingNonSerializableObjects: CapabilityFlag = true

implicit lazy val ec = system.dispatcher
implicit lazy val ec: ExecutionContext = system.dispatcher

implicit def pc: PatienceConfig = PatienceConfig(timeout = 10.minutes)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

abstract class JdbcJournalSpec(config: Config, schemaType: SchemaType)
Expand All @@ -28,7 +29,7 @@ abstract class JdbcJournalSpec(config: Config, schemaType: SchemaType)

implicit val pc: PatienceConfig = PatienceConfig(timeout = 10.seconds)

implicit lazy val ec = system.dispatcher
implicit lazy val ec: ExecutionContext = system.dispatcher

lazy val cfg = system.settings.config.getConfig("jdbc-journal")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import scala.concurrent.duration._
import org.scalatest.matchers.should.Matchers

abstract class HardDeleteQueryTest(config: String) extends QueryTestSpec(config) with Matchers {
implicit val askTimeout = 500.millis
implicit val askTimeout: FiniteDuration = 500.millis

it should "not return deleted events when using CurrentEventsByTag" in withActorSystem { implicit system =>
val journalOps = new ScalaJdbcReadJournalOperations(system)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ abstract class JournalDaoStreamMessagesMemoryTest(configFile: String)
val journalSequenceActorConfig = readJournalConfig.journalSequenceRetrievalConfiguration
val journalTableCfg = journalConfig.journalTableConfiguration

implicit val askTimeout = 50.millis
implicit val askTimeout: FiniteDuration = 50.millis

def generateId: Int = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean)

import profile.api._

implicit val askTimeout = 50.millis
implicit val askTimeout: FiniteDuration = 50.millis

def generateId: Int = 0

Expand Down Expand Up @@ -75,11 +75,12 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean)

val startTime = System.currentTimeMillis()
withJournalSequenceActor(db, maxTries = 100) { actor =>
val patienceConfig = PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis))
implicit val patienceConfig: PatienceConfig =
PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis))
eventually {
val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering
currentMax shouldBe elements
}(patienceConfig, implicitly, implicitly)
}
}
val timeTaken = System.currentTimeMillis() - startTime
log.info(s"Recovered all events in $timeTaken ms")
Expand Down Expand Up @@ -109,11 +110,12 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean)

withJournalSequenceActor(db, maxTries = 2) { actor =>
// Should normally recover after `maxTries` seconds
val patienceConfig = PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis))
implicit val patienceConfig: PatienceConfig =
PatienceConfig(10.seconds, Span(200, org.scalatest.time.Millis))
eventually {
val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering
currentMax shouldBe lastElement
}(patienceConfig, implicitly, implicitly)
}
}
}
}
Expand Down Expand Up @@ -145,11 +147,11 @@ abstract class JournalSequenceActorTest(configFile: String, isOracle: Boolean)

withJournalSequenceActor(db, maxTries = 2) { actor =>
// The actor should assume the max after 2 seconds
val patienceConfig = PatienceConfig(3.seconds)
implicit val patienceConfig: PatienceConfig = PatienceConfig(3.seconds)
eventually {
val currentMax = actor.ask(GetMaxOrderingId).mapTo[MaxOrderingId].futureValue.maxOrdering
currentMax shouldBe highestValue
}(patienceConfig, implicitly, implicitly)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object TestProbeReadJournalDao {
*/
class TestProbeReadJournalDao(val probe: TestProbe) extends ReadJournalDao {
// Since the testprobe is instrumented by the test, it should respond very fast
implicit val askTimeout = Timeout(100.millis)
implicit val askTimeout: Timeout = Timeout(100.millis)

/**
* Returns distinct stream of persistenceIds
Expand Down
Loading
Loading