diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala b/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala index 1fe61dcd3..08a6886a7 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala @@ -118,9 +118,10 @@ class JdbcAsyncWriteJournal(config: Config) extends AsyncWriteJournal { journalDao .messagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr, journalConfig.daoConfig.replayBatchSize, None) .take(max) - .mapAsync(1)(reprAndOrdNr => Future.fromTry(reprAndOrdNr)) - .runForeach { case (repr, _) => - recoveryCallback(repr) + .runForeach { + case Success((repr, _)) => + recoveryCallback(repr) + case Failure(ex) => throw ex } .map(_ => ()) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index f1174d64b..65627a6ed 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -12,7 +12,6 @@ import akka.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelayed import akka.stream.Materializer import akka.stream.scaladsl.{ Sink, Source } -import scala.collection.immutable.Seq import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration.FiniteDuration import scala.util.{ Failure, Success, Try } @@ -67,7 +66,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { akka.pattern.after(delay, scheduler)(retrieveNextBatch()) } } - .mapConcat(identity(_)) + .mapConcat(identity) } }