diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 5a5bd1708..b52f8cc88 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -44,6 +44,8 @@ jdbc-journal { batchSize = 400 # The maximum number of batch-inserts that may be running concurrently parallelism = 8 + # Only mark as deleted. If false, delete physically + logicalDelete = true slick { diff --git a/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala b/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala index 235d3d8ff..893d23284 100644 --- a/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala +++ b/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala @@ -75,6 +75,7 @@ class BaseByteArrayJournalDaoConfig(config: Config) { val bufferSize: Int = config.asInt("bufferSize", 1000) val batchSize: Int = config.asInt("batchSize", 400) val parallelism: Int = config.asInt("parallelism", 8) + val logicalDelete: Boolean = config.asBoolean("logicalDelete", default = true) override def toString: String = s"BaseByteArrayJournalDaoConfig($bufferSize,$batchSize,$parallelism)" } diff --git a/src/main/scala/akka/persistence/jdbc/journal/dao/ByteArrayJournalDao.scala b/src/main/scala/akka/persistence/jdbc/journal/dao/ByteArrayJournalDao.scala index 66772b8cf..e78eff0a0 100644 --- a/src/main/scala/akka/persistence/jdbc/journal/dao/ByteArrayJournalDao.scala +++ b/src/main/scala/akka/persistence/jdbc/journal/dao/ByteArrayJournalDao.scala @@ -22,10 +22,10 @@ import akka.persistence.jdbc.config.JournalConfig import akka.persistence.jdbc.serialization.FlowPersistentReprSerializer import akka.persistence.{AtomicWrite, PersistentRepr} import akka.serialization.Serialization -import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult} import akka.stream.scaladsl.{Keep, Sink, Source} -import slick.jdbc.JdbcProfile +import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult} import slick.jdbc.JdbcBackend._ +import slick.jdbc.JdbcProfile import scala.collection.immutable._ import scala.concurrent.{ExecutionContext, Future, Promise} @@ -44,8 +44,8 @@ trait BaseByteArrayJournalDao extends JournalDao { implicit val ec: ExecutionContext implicit val mat: Materializer + import journalConfig.daoConfig.{batchSize, bufferSize, logicalDelete, parallelism} import profile.api._ - import journalConfig.daoConfig.{batchSize, bufferSize, parallelism} private val writeQueue = Source.queue[(Promise[Unit], Seq[JournalRow])](bufferSize, OverflowStrategy.dropNew) .batchWeighted[(Seq[Promise[Unit]], Seq[JournalRow])](batchSize, _._2.size, tup => Vector(tup._1) -> tup._2) { @@ -92,9 +92,23 @@ trait BaseByteArrayJournalDao extends JournalDao { queueWriteJournalRows(rowsToWrite).map(_ => resultWhenWriteComplete) } - override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = for { - _ <- db.run(queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr)) - } yield () + override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = + if (logicalDelete) { + db.run(queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr)).map(_ => ()) + } else { + // We should keep journal record with highest sequence number in order to be compliant + // with @see [[akka.persistence.journal.JournalSpec]] + val actions = for { + _ <- queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr) + highestMarkedSequenceNr <- highestMarkedSequenceNr(persistenceId) + _ <- queries.delete(persistenceId, highestMarkedSequenceNr.getOrElse(0L) - 1) + } yield () + + db.run(actions.transactionally) + } + + private def highestMarkedSequenceNr(persistenceId: String) = + queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result.headOption override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = for { maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result.headOption) diff --git a/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala b/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala index 9a41909b1..a65141bd2 100644 --- a/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala +++ b/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala @@ -32,6 +32,13 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Jou private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) = JournalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc) + def delete(persistenceId: String, toSequenceNr: Long) = { + JournalTable + .filter(_.persistenceId === persistenceId) + .filter(_.sequenceNumber <= toSequenceNr) + .delete + } + def markJournalMessagesAsDeleted(persistenceId: String, maxSequenceNr: Long) = JournalTable .filter(_.persistenceId === persistenceId) @@ -42,8 +49,13 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Jou private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] = selectAllJournalForPersistenceId(persistenceId).map(_.sequenceNumber).take(1) + private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] = + selectAllJournalForPersistenceId(persistenceId).filter(_.deleted === true).map(_.sequenceNumber) + val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _) + val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _) + private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) = selectAllJournalForPersistenceId(persistenceId).filter(_.sequenceNumber <= maxSequenceNr) diff --git a/src/test/scala/akka/persistence/jdbc/configuration/AkkaPersistenceConfigTest.scala b/src/test/scala/akka/persistence/jdbc/configuration/AkkaPersistenceConfigTest.scala index 41179ce35..032f7f860 100644 --- a/src/test/scala/akka/persistence/jdbc/configuration/AkkaPersistenceConfigTest.scala +++ b/src/test/scala/akka/persistence/jdbc/configuration/AkkaPersistenceConfigTest.scala @@ -47,6 +47,8 @@ class AkkaPersistenceConfigTest extends FlatSpec with Matchers { | | dao = "akka.persistence.jdbc.dao.bytea.journal.ByteArrayJournalDao" | + | logicalDelete = true + | | slick { | profile = "slick.jdbc.PostgresProfile$" | db { @@ -232,6 +234,8 @@ class AkkaPersistenceConfigTest extends FlatSpec with Matchers { cfg.journalTableConfiguration.columnNames.persistenceId shouldBe "persistence_id" cfg.journalTableConfiguration.columnNames.sequenceNumber shouldBe "sequence_number" cfg.journalTableConfiguration.columnNames.tags shouldBe "tags" + + cfg.daoConfig.logicalDelete shouldBe true } it should "parse SnapshotConfig" in { diff --git a/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala b/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala index 84e8f525b..73794faca 100644 --- a/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala +++ b/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalPerfSpec.scala @@ -24,7 +24,7 @@ import akka.persistence.jdbc.util.{ClasspathResources, DropCreate, SlickDatabase import akka.persistence.journal.JournalPerfSpec import akka.persistence.journal.JournalPerfSpec.{BenchActor, Cmd, ResetCounter} import akka.testkit.TestProbe -import com.typesafe.config.{Config, ConfigFactory} +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import org.scalatest.concurrent.ScalaFutures import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -123,6 +123,10 @@ class PostgresJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("po override def measurementIterations: Int = 1 } +class PostgresJournalPerfSpecPhysicalDelete extends PostgresJournalPerfSpec { + this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)) +} + class MySQLJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("mysql-application.conf"), MySQL()) { override implicit def pc: PatienceConfig = PatienceConfig(timeout = 10.minutes) @@ -133,6 +137,10 @@ class MySQLJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("mysql override def measurementIterations: Int = 1 } +class MySQLJournalPerfSpecPhysicalDelete extends MySQLJournalPerfSpec { + this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)) +} + class OracleJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("oracle-application.conf"), Oracle()) { override implicit def pc: PatienceConfig = PatienceConfig(timeout = 10.minutes) @@ -143,4 +151,12 @@ class OracleJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("orac override def measurementIterations: Int = 1 } -class H2JournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("h2-application.conf"), H2()) \ No newline at end of file +class OracleJournalPerfSpecPhysicalDelete extends OracleJournalPerfSpec { + this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)) +} + +class H2JournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("h2-application.conf"), H2()) + +class H2JournalPerfSpecPhysicalDelete extends H2JournalPerfSpec { + this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)) +} \ No newline at end of file diff --git a/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala b/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala index 3c0b7c684..2f0c0cf0d 100644 --- a/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala +++ b/src/test/scala/akka/persistence/jdbc/journal/JdbcJournalSpec.scala @@ -21,7 +21,7 @@ import akka.persistence.jdbc.config._ import akka.persistence.jdbc.util.Schema._ import akka.persistence.jdbc.util.{ClasspathResources, DropCreate, SlickDatabase} import akka.persistence.journal.JournalSpec -import com.typesafe.config.{Config, ConfigFactory} +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import org.scalatest.concurrent.ScalaFutures import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} @@ -62,13 +62,21 @@ abstract class JdbcJournalSpec(config: Config, schemaType: SchemaType) extends J * but the Slick Tables definition must not change, else it breaks the UPSERT feature... */ class PostgresJournalSpec extends JdbcJournalSpec(ConfigFactory.load("postgres-application.conf"), Postgres()) +class PostgresJournalSpecPhysicalDelete extends JdbcJournalSpec(ConfigFactory.load("postgres-application.conf") + .withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)), Postgres()) /** * Does not (yet) work because Slick generates double quotes to escape field names * for some reason when creating the DDL */ class MySQLJournalSpec extends JdbcJournalSpec(ConfigFactory.load("mysql-application.conf"), MySQL()) +class MySQLJournalSpecPhysicalDelete extends JdbcJournalSpec(ConfigFactory.load("mysql-application.conf") + .withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)), MySQL()) class OracleJournalSpec extends JdbcJournalSpec(ConfigFactory.load("oracle-application.conf"), Oracle()) +class OracleJournalSpecPhysicalDelete extends JdbcJournalSpec(ConfigFactory.load("oracle-application.conf") + .withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)), Oracle()) class H2JournalSpec extends JdbcJournalSpec(ConfigFactory.load("h2-application.conf"), H2()) +class H2JournalSpecPhysicalDelete extends JdbcJournalSpec(ConfigFactory.load("h2-application.conf") + .withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)), H2())