From ed541ff6b8f9e2dbef8f2b72748a5f14688e5d79 Mon Sep 17 00:00:00 2001 From: AndyChen Date: Thu, 14 Sep 2023 22:47:44 +0800 Subject: [PATCH] perf: Reducing memory cost while replaying (#765) --- .../persistence/jdbc/journal/JdbcAsyncWriteJournal.scala | 7 ++++--- .../jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala | 3 +-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala b/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala index 1fe61dcd3..08a6886a7 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/JdbcAsyncWriteJournal.scala @@ -118,9 +118,10 @@ class JdbcAsyncWriteJournal(config: Config) extends AsyncWriteJournal { journalDao .messagesWithBatch(persistenceId, fromSequenceNr, toSequenceNr, journalConfig.daoConfig.replayBatchSize, None) .take(max) - .mapAsync(1)(reprAndOrdNr => Future.fromTry(reprAndOrdNr)) - .runForeach { case (repr, _) => - recoveryCallback(repr) + .runForeach { + case Success((repr, _)) => + recoveryCallback(repr) + case Failure(ex) => throw ex } .map(_ => ()) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala index f1174d64b..65627a6ed 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala @@ -12,7 +12,6 @@ import akka.persistence.jdbc.journal.dao.FlowControl.{ Continue, ContinueDelayed import akka.stream.Materializer import akka.stream.scaladsl.{ Sink, Source } -import scala.collection.immutable.Seq import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.duration.FiniteDuration import scala.util.{ Failure, Success, Try } @@ -67,7 +66,7 @@ trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages { akka.pattern.after(delay, scheduler)(retrieveNextBatch()) } } - .mapConcat(identity(_)) + .mapConcat(identity) } }