Skip to content

Commit

Permalink
Implement optional physical deletion of marked journal messages (#160)
Browse files Browse the repository at this point in the history
Code improvements
  • Loading branch information
dmi3zkm authored and WellingR committed Jan 12, 2018
1 parent fa4f84d commit 6af37ab
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 9 deletions.
2 changes: 2 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ jdbc-journal {
batchSize = 400
# The maximum number of batch-inserts that may be running concurrently
parallelism = 8
# Only mark as deleted. If false, delete physically
logicalDelete = true

slick {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ class BaseByteArrayJournalDaoConfig(config: Config) {
val bufferSize: Int = config.asInt("bufferSize", 1000)
val batchSize: Int = config.asInt("batchSize", 400)
val parallelism: Int = config.asInt("parallelism", 8)
val logicalDelete: Boolean = config.asBoolean("logicalDelete", default = true)
override def toString: String = s"BaseByteArrayJournalDaoConfig($bufferSize,$batchSize,$parallelism)"
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import akka.persistence.jdbc.config.JournalConfig
import akka.persistence.jdbc.serialization.FlowPersistentReprSerializer
import akka.persistence.{AtomicWrite, PersistentRepr}
import akka.serialization.Serialization
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import akka.stream.scaladsl.{Keep, Sink, Source}
import slick.jdbc.JdbcProfile
import akka.stream.{Materializer, OverflowStrategy, QueueOfferResult}
import slick.jdbc.JdbcBackend._
import slick.jdbc.JdbcProfile

import scala.collection.immutable._
import scala.concurrent.{ExecutionContext, Future, Promise}
Expand All @@ -44,8 +44,8 @@ trait BaseByteArrayJournalDao extends JournalDao {
implicit val ec: ExecutionContext
implicit val mat: Materializer

import journalConfig.daoConfig.{batchSize, bufferSize, logicalDelete, parallelism}
import profile.api._
import journalConfig.daoConfig.{batchSize, bufferSize, parallelism}

private val writeQueue = Source.queue[(Promise[Unit], Seq[JournalRow])](bufferSize, OverflowStrategy.dropNew)
.batchWeighted[(Seq[Promise[Unit]], Seq[JournalRow])](batchSize, _._2.size, tup => Vector(tup._1) -> tup._2) {
Expand Down Expand Up @@ -92,9 +92,23 @@ trait BaseByteArrayJournalDao extends JournalDao {
queueWriteJournalRows(rowsToWrite).map(_ => resultWhenWriteComplete)
}

override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = for {
_ <- db.run(queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr))
} yield ()
override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] =
if (logicalDelete) {
db.run(queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr)).map(_ => ())
} else {
// We should keep journal record with highest sequence number in order to be compliant
// with @see [[akka.persistence.journal.JournalSpec]]
val actions = for {
_ <- queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr)
highestMarkedSequenceNr <- highestMarkedSequenceNr(persistenceId)
_ <- queries.delete(persistenceId, highestMarkedSequenceNr.getOrElse(0L) - 1)
} yield ()

db.run(actions.transactionally)
}

private def highestMarkedSequenceNr(persistenceId: String) =
queries.highestMarkedSequenceNrForPersistenceId(persistenceId).result.headOption

override def highestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = for {
maybeHighestSeqNo <- db.run(queries.highestSequenceNrForPersistenceId(persistenceId).result.headOption)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Jou
private def selectAllJournalForPersistenceId(persistenceId: Rep[String]) =
JournalTable.filter(_.persistenceId === persistenceId).sortBy(_.sequenceNumber.desc)

def delete(persistenceId: String, toSequenceNr: Long) = {
JournalTable
.filter(_.persistenceId === persistenceId)
.filter(_.sequenceNumber <= toSequenceNr)
.delete
}

def markJournalMessagesAsDeleted(persistenceId: String, maxSequenceNr: Long) =
JournalTable
.filter(_.persistenceId === persistenceId)
Expand All @@ -42,8 +49,13 @@ class JournalQueries(val profile: JdbcProfile, override val journalTableCfg: Jou
private def _highestSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] =
selectAllJournalForPersistenceId(persistenceId).map(_.sequenceNumber).take(1)

private def _highestMarkedSequenceNrForPersistenceId(persistenceId: Rep[String]): Query[Rep[Long], Long, Seq] =
selectAllJournalForPersistenceId(persistenceId).filter(_.deleted === true).map(_.sequenceNumber)

val highestSequenceNrForPersistenceId = Compiled(_highestSequenceNrForPersistenceId _)

val highestMarkedSequenceNrForPersistenceId = Compiled(_highestMarkedSequenceNrForPersistenceId _)

private def _selectByPersistenceIdAndMaxSequenceNumber(persistenceId: Rep[String], maxSequenceNr: Rep[Long]) =
selectAllJournalForPersistenceId(persistenceId).filter(_.sequenceNumber <= maxSequenceNr)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class AkkaPersistenceConfigTest extends FlatSpec with Matchers {
|
| dao = "akka.persistence.jdbc.dao.bytea.journal.ByteArrayJournalDao"
|
| logicalDelete = true
|
| slick {
| profile = "slick.jdbc.PostgresProfile$"
| db {
Expand Down Expand Up @@ -232,6 +234,8 @@ class AkkaPersistenceConfigTest extends FlatSpec with Matchers {
cfg.journalTableConfiguration.columnNames.persistenceId shouldBe "persistence_id"
cfg.journalTableConfiguration.columnNames.sequenceNumber shouldBe "sequence_number"
cfg.journalTableConfiguration.columnNames.tags shouldBe "tags"

cfg.daoConfig.logicalDelete shouldBe true
}

it should "parse SnapshotConfig" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import akka.persistence.jdbc.util.{ClasspathResources, DropCreate, SlickDatabase
import akka.persistence.journal.JournalPerfSpec
import akka.persistence.journal.JournalPerfSpec.{BenchActor, Cmd, ResetCounter}
import akka.testkit.TestProbe
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

Expand Down Expand Up @@ -123,6 +123,10 @@ class PostgresJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("po
override def measurementIterations: Int = 1
}

class PostgresJournalPerfSpecPhysicalDelete extends PostgresJournalPerfSpec {
this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false))
}

class MySQLJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("mysql-application.conf"), MySQL()) {
override implicit def pc: PatienceConfig = PatienceConfig(timeout = 10.minutes)

Expand All @@ -133,6 +137,10 @@ class MySQLJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("mysql
override def measurementIterations: Int = 1
}

class MySQLJournalPerfSpecPhysicalDelete extends MySQLJournalPerfSpec {
this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false))
}

class OracleJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("oracle-application.conf"), Oracle()) {
override implicit def pc: PatienceConfig = PatienceConfig(timeout = 10.minutes)

Expand All @@ -143,4 +151,12 @@ class OracleJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("orac
override def measurementIterations: Int = 1
}

class H2JournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("h2-application.conf"), H2())
class OracleJournalPerfSpecPhysicalDelete extends OracleJournalPerfSpec {
this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false))
}

class H2JournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("h2-application.conf"), H2())

class H2JournalPerfSpecPhysicalDelete extends H2JournalPerfSpec {
this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false))
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import akka.persistence.jdbc.config._
import akka.persistence.jdbc.util.Schema._
import akka.persistence.jdbc.util.{ClasspathResources, DropCreate, SlickDatabase}
import akka.persistence.journal.JournalSpec
import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}

Expand Down Expand Up @@ -62,13 +62,21 @@ abstract class JdbcJournalSpec(config: Config, schemaType: SchemaType) extends J
* but the Slick Tables definition must not change, else it breaks the UPSERT feature...
*/
class PostgresJournalSpec extends JdbcJournalSpec(ConfigFactory.load("postgres-application.conf"), Postgres())
class PostgresJournalSpecPhysicalDelete extends JdbcJournalSpec(ConfigFactory.load("postgres-application.conf")
.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)), Postgres())

/**
* Does not (yet) work because Slick generates double quotes to escape field names
* for some reason when creating the DDL
*/
class MySQLJournalSpec extends JdbcJournalSpec(ConfigFactory.load("mysql-application.conf"), MySQL())
class MySQLJournalSpecPhysicalDelete extends JdbcJournalSpec(ConfigFactory.load("mysql-application.conf")
.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)), MySQL())

class OracleJournalSpec extends JdbcJournalSpec(ConfigFactory.load("oracle-application.conf"), Oracle())
class OracleJournalSpecPhysicalDelete extends JdbcJournalSpec(ConfigFactory.load("oracle-application.conf")
.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)), Oracle())

class H2JournalSpec extends JdbcJournalSpec(ConfigFactory.load("h2-application.conf"), H2())
class H2JournalSpecPhysicalDelete extends JdbcJournalSpec(ConfigFactory.load("h2-application.conf")
.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)), H2())

0 comments on commit 6af37ab

Please sign in to comment.