Skip to content

Commit

Permalink
fixed query
Browse files Browse the repository at this point in the history
  • Loading branch information
octonato committed Aug 1, 2024
1 parent 39ef13d commit e1abbc6
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,28 @@

package akka.persistence.jdbc.journal.dao

import scala.collection.immutable
import scala.collection.immutable.Nil
import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.util.Try

import akka.NotUsed
import akka.dispatch.ExecutionContexts
import akka.persistence.jdbc.AkkaSerialization
import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig }
import akka.persistence.jdbc.config.BaseDaoConfig
import akka.persistence.jdbc.config.JournalConfig
import akka.persistence.jdbc.journal.dao.JournalTables.JournalAkkaSerializationRow
import akka.persistence.journal.Tagged
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.persistence.AtomicWrite
import akka.persistence.PersistentRepr
import akka.serialization.Serialization
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.JdbcProfile

import scala.collection.immutable
import scala.collection.immutable.{ Nil, Seq }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.Try

/**
* A [[JournalDao]] that uses Akka serialization to serialize the payload and store
* the manifest and serializer id used.
Expand All @@ -49,19 +53,22 @@ class DefaultJournalDao(
new JournalQueries(profile, journalConfig.eventJournalTableConfiguration, journalConfig.eventTagTableConfiguration)

override def delete(persistenceId: String, toSequenceNr: Long): Future[Unit] = {

// note: the passed toSequenceNr will be Long.MaxValue when doing a 'full' journal clean-up
// see JournalSpec's test: 'not reset highestSequenceNr after journal cleanup'
val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] =
highestSequenceNrAction(persistenceId)
.flatMap { highestMarkedSequenceNr =>
// are we trying to delete the highest seqNr?
if (highestMarkedSequenceNr == toSequenceNr) {
// if so, we delete up to the before last and mark the last as logically deleted
.flatMap {
// are we trying to delete the highest or even higher seqNr ?
case highestSeqNr if highestSeqNr <= toSequenceNr =>
// if so, we delete up to the before last and
// mark the last as logically deleted preserving highestSeqNr
queries
.delete(persistenceId, toSequenceNr - 1)
.flatMap(_ => queries.markAsDeleted(persistenceId, toSequenceNr))
} else {
.delete(persistenceId, highestSeqNr - 1)
.flatMap(_ => queries.markAsDeleted(persistenceId, highestSeqNr))
case _ =>
// if not, we delete up to the requested seqNr
queries.delete(persistenceId, toSequenceNr)
}
}
.map(_ => ())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ class JournalQueries(
JournalTable.filter(_.persistenceId === persistenceId).filter(_.sequenceNumber <= toSequenceNr).delete
}

private[akka] def markAsDeleted(persistenceId: String, highestMarkedSequenceNr: Long) =
private[akka] def markAsDeleted(persistenceId: String, seqNr: Long) =
JournalTable
.filter(_.persistenceId === persistenceId)
.filter(_.sequenceNumber === highestMarkedSequenceNr)
.filter(_.sequenceNumber === seqNr)
.filter(_.deleted === false)
.map(_.deleted)
.update(true)
Expand Down

0 comments on commit e1abbc6

Please sign in to comment.