From e1abbc61941eb41a995182c3372bd447bfac3665 Mon Sep 17 00:00:00 2001 From: Renato Cavalcanti Date: Thu, 1 Aug 2024 12:51:07 +0200 Subject: [PATCH] fixed query --- .../jdbc/journal/dao/DefaultJournalDao.scala | 37 +++++++++++-------- .../jdbc/journal/dao/JournalQueries.scala | 4 +- 2 files changed, 24 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala index a8163830..16090901 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/DefaultJournalDao.scala @@ -5,24 +5,28 @@ package akka.persistence.jdbc.journal.dao +import scala.collection.immutable +import scala.collection.immutable.Nil +import scala.collection.immutable.Seq +import scala.concurrent.ExecutionContext +import scala.concurrent.Future +import scala.util.Try + import akka.NotUsed import akka.dispatch.ExecutionContexts import akka.persistence.jdbc.AkkaSerialization -import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig } +import akka.persistence.jdbc.config.BaseDaoConfig +import akka.persistence.jdbc.config.JournalConfig import akka.persistence.jdbc.journal.dao.JournalTables.JournalAkkaSerializationRow import akka.persistence.journal.Tagged -import akka.persistence.{ AtomicWrite, PersistentRepr } +import akka.persistence.AtomicWrite +import akka.persistence.PersistentRepr import akka.serialization.Serialization import akka.stream.Materializer import akka.stream.scaladsl.Source import slick.jdbc.JdbcBackend.Database import slick.jdbc.JdbcProfile -import scala.collection.immutable -import scala.collection.immutable.{ Nil, Seq } -import scala.concurrent.{ ExecutionContext, Future } -import scala.util.Try - /** * A [[JournalDao]] that uses Akka serialization to serialize the payload and store * the manifest and serializer id used. @@ -49,19 +53,22 @@ class DefaultJournalDao( new JournalQueries(profile, journalConfig.eventJournalTableConfiguration, journalConfig.eventTagTableConfiguration) override def delete(persistenceId: String, toSequenceNr: Long): Future[Unit] = { + + // note: the passed toSequenceNr will be Long.MaxValue when doing a 'full' journal clean-up + // see JournalSpec's test: 'not reset highestSequenceNr after journal cleanup' val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] = highestSequenceNrAction(persistenceId) - .flatMap { highestMarkedSequenceNr => - // are we trying to delete the highest seqNr? - if (highestMarkedSequenceNr == toSequenceNr) { - // if so, we delete up to the before last and mark the last as logically deleted + .flatMap { + // are we trying to delete the highest or even higher seqNr ? + case highestSeqNr if highestSeqNr <= toSequenceNr => + // if so, we delete up to the before last and + // mark the last as logically deleted preserving highestSeqNr queries - .delete(persistenceId, toSequenceNr - 1) - .flatMap(_ => queries.markAsDeleted(persistenceId, toSequenceNr)) - } else { + .delete(persistenceId, highestSeqNr - 1) + .flatMap(_ => queries.markAsDeleted(persistenceId, highestSeqNr)) + case _ => // if not, we delete up to the requested seqNr queries.delete(persistenceId, toSequenceNr) - } } .map(_ => ()) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala index d7641af8..9ccf4914 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala @@ -69,10 +69,10 @@ class JournalQueries( JournalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete } - private[akka] def markAsDeleted(persistenceId: String, highestMarkedSequenceNr: Long) = + private[akka] def markAsDeleted(persistenceId: String, seqNr: Long) = JournalTable .filter(_.persistenceId === persistenceId) - .filter(_.sequenceNumber === highestMarkedSequenceNr) + .filter(_.sequenceNumber === seqNr) .filter(_.deleted === false) .map(_.deleted) .update(true)