Skip to content

Commit

Permalink
Removes logical deletion (#569)
Browse files Browse the repository at this point in the history
* Disables logical deletion by default
* Removes logical delete feature alltogether
* Adds mima excludes
  • Loading branch information
nvollmar authored Sep 5, 2022
1 parent 1b23058 commit 933ece9
Show file tree
Hide file tree
Showing 22 changed files with 237 additions and 580 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,10 @@ import akka.persistence.jdbc.query.{
SqlServerCleaner
}

class PostgresHardDeleteQueryTest
extends HardDeleteQueryTest("postgres-application-with-hard-delete.conf")
with PostgresCleaner
class PostgresHardDeleteQueryTest extends HardDeleteQueryTest("postgres-application.conf") with PostgresCleaner

class MySQLHardDeleteQueryTest extends HardDeleteQueryTest("mysql-application-with-hard-delete.conf") with MysqlCleaner
class MySQLHardDeleteQueryTest extends HardDeleteQueryTest("mysql-application.conf") with MysqlCleaner

class OracleHardDeleteQueryTest
extends HardDeleteQueryTest("oracle-application-with-hard-delete.conf")
with OracleCleaner
class OracleHardDeleteQueryTest extends HardDeleteQueryTest("oracle-application.conf") with OracleCleaner

class SqlServerHardDeleteQueryTest
extends HardDeleteQueryTest("sqlserver-application-with-hard-delete.conf")
with SqlServerCleaner
class SqlServerHardDeleteQueryTest extends HardDeleteQueryTest("sqlserver-application.conf") with SqlServerCleaner
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ class PostgresJournalPerfSpecSharedDb
override def eventsCount: Int = 100
}

class PostgresJournalPerfSpecPhysicalDelete extends PostgresJournalPerfSpec {
this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false))
}

class MySQLJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("mysql-application.conf"), MySQL) {
override def eventsCount: Int = 100
}
Expand All @@ -30,10 +26,6 @@ class MySQLJournalPerfSpecSharedDb
override def eventsCount: Int = 100
}

class MySQLJournalPerfSpecPhysicalDelete extends MySQLJournalPerfSpec {
this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false))
}

class OracleJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("oracle-application.conf"), Oracle) {
override def eventsCount: Int = 100
}
Expand All @@ -43,10 +35,6 @@ class OracleJournalPerfSpecSharedDb
override def eventsCount: Int = 100
}

class OracleJournalPerfSpecPhysicalDelete extends OracleJournalPerfSpec {
this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false))
}

class SqlServerJournalPerfSpec
extends JdbcJournalPerfSpec(ConfigFactory.load("sqlserver-application.conf"), SqlServer) {
override def eventsCount: Int = 100
Expand All @@ -56,7 +44,3 @@ class SqlServerJournalPerfSpecSharedDb
extends JdbcJournalPerfSpec(ConfigFactory.load("sqlserver-shared-db-application.conf"), SqlServer) {
override def eventsCount: Int = 100
}

class SqlServerJournalPerfSpecPhysicalDelete extends SqlServerJournalPerfSpec {
this.cfg.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false))
}
Original file line number Diff line number Diff line change
@@ -1,46 +1,19 @@
package akka.persistence.jdbc.integration

import akka.persistence.jdbc.journal.JdbcJournalSpec
import akka.persistence.jdbc.testkit.internal.MySQL
import akka.persistence.jdbc.testkit.internal.Oracle
import akka.persistence.jdbc.testkit.internal.Postgres
import akka.persistence.jdbc.testkit.internal.SqlServer
import com.typesafe.config.{ ConfigFactory, ConfigValueFactory }
import akka.persistence.jdbc.testkit.internal.{ MySQL, Oracle, Postgres, SqlServer }
import com.typesafe.config.ConfigFactory

class PostgresJournalSpec extends JdbcJournalSpec(ConfigFactory.load("postgres-application.conf"), Postgres)
class PostgresJournalSpecSharedDb
extends JdbcJournalSpec(ConfigFactory.load("postgres-shared-db-application.conf"), Postgres)
class PostgresJournalSpecPhysicalDelete
extends JdbcJournalSpec(
ConfigFactory
.load("postgres-application.conf")
.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)),
Postgres)

class MySQLJournalSpec extends JdbcJournalSpec(ConfigFactory.load("mysql-application.conf"), MySQL)
class MySQLJournalSpecSharedDb extends JdbcJournalSpec(ConfigFactory.load("mysql-shared-db-application.conf"), MySQL)
class MySQLJournalSpecPhysicalDelete
extends JdbcJournalSpec(
ConfigFactory
.load("mysql-application.conf")
.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)),
MySQL)

class OracleJournalSpec extends JdbcJournalSpec(ConfigFactory.load("oracle-application.conf"), Oracle)
class OracleJournalSpecSharedDb extends JdbcJournalSpec(ConfigFactory.load("oracle-shared-db-application.conf"), Oracle)
class OracleJournalSpecPhysicalDelete
extends JdbcJournalSpec(
ConfigFactory
.load("oracle-application.conf")
.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)),
Oracle)

class SqlServerJournalSpec extends JdbcJournalSpec(ConfigFactory.load("sqlserver-application.conf"), SqlServer)
class SqlServerJournalSpecSharedDb
extends JdbcJournalSpec(ConfigFactory.load("sqlserver-shared-db-application.conf"), SqlServer)
class SqlServerJournalSpecPhysicalDelete
extends JdbcJournalSpec(
ConfigFactory
.load("sqlserver-application.conf")
.withValue("jdbc-journal.logicalDelete", ConfigValueFactory.fromAnyRef(false)),
SqlServer)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.config.BaseDaoConfig.logicalDelete")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.config.ReadJournalConfig.includeDeleted")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.journal.dao.legacy.BaseByteArrayJournalDao.logWarnAboutLogicalDeletionDeprecation")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.journal.dao.legacy.ByteArrayJournalDao.logWarnAboutLogicalDeletionDeprecation")
22 changes: 0 additions & 22 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,6 @@

akka-persistence-jdbc {

# If set to true event deletion is performed as a soft, logical delete. Events are kept in the journal and are sill
# delivered in queries. Otherwise, a hard delete will be performed.
#
# Note that even when configured for hard deletes, the last event is kept as a logical delete.
# This is necessary because we must keep track of the highest sequence number that was ever used on a
# given persistent actor. However, this last event will be 'invisible' it won't be ever replay nor delivered on queries.
#
# This property affects jdbc-journal.logicalDelete and jdbc-read-journal.includeLogicallyDeleted.
#
logicalDeletion.enable = true


# The tag separator to use when tagging events with more than one tag.
# This property affects jdbc-journal.tagSeparator and jdbc-read-journal.tagSeparator.
tagSeparator = ","
Expand Down Expand Up @@ -191,11 +179,6 @@ jdbc-journal {
# The maximum number of batch-inserts that may be running concurrently
parallelism = 8

# Only mark as deleted. If false, delete physically
# should not be configured directly, but through property akka-persistence-jdbc.logicalDelete.enable
# in order to keep consistent behavior over write/read sides
logicalDelete = ${akka-persistence-jdbc.logicalDeletion.enable}

# This setting can be used to configure usage of a shared database.
# To disable usage of a shared database, set to null or an empty string.
# When set to a non empty string, this setting does two things:
Expand Down Expand Up @@ -438,11 +421,6 @@ jdbc-read-journal {

dao = "akka.persistence.jdbc.query.dao.DefaultReadJournalDao"

# if true, queries will include logically deleted events
# should not be configured directly, but through property akka-persistence-jdbc.logicalDelete.enable
# in order to keep consistent behavior over write/read sides
includeLogicallyDeleted = ${akka-persistence-jdbc.logicalDeletion.enable}

# Settings for determining if ids (ordering column) in the journal are out of sequence.
journal-sequence-retrieval {
# The maximum number of ids that will be retrieved in each batch
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,7 @@ class BaseDaoConfig(config: Config) {
val batchSize: Int = config.getInt("batchSize")
val replayBatchSize: Int = config.getInt("replayBatchSize")
val parallelism: Int = config.getInt("parallelism")
val logicalDelete: Boolean = config.getBoolean("logicalDelete")
override def toString: String = s"BaseDaoConfig($bufferSize,$batchSize,$parallelism,$logicalDelete)"
override def toString: String = s"BaseDaoConfig($bufferSize,$batchSize,$parallelism)"
}

class ReadJournalPluginConfig(config: Config) {
Expand Down Expand Up @@ -189,10 +188,9 @@ class ReadJournalConfig(config: Config) {
val refreshInterval: FiniteDuration = config.asFiniteDuration("refresh-interval")
val maxBufferSize: Int = config.getInt("max-buffer-size")
val addShutdownHook: Boolean = config.getBoolean("add-shutdown-hook")
val includeDeleted: Boolean = config.getBoolean("includeLogicallyDeleted")

override def toString: String =
s"ReadJournalConfig($journalTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook,$includeDeleted)"
s"ReadJournalConfig($journalTableConfiguration,$pluginConfig,$refreshInterval,$maxBufferSize,$addShutdownHook)"
}

class DurableStateTableColumnNames(config: Config) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

package akka.persistence.jdbc.journal.dao.legacy

import akka.persistence.jdbc.journal.dao.{ BaseDao, BaseJournalDaoWithReadMessages, H2Compat, JournalDaoWithUpdates }
import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig }
import akka.persistence.jdbc.journal.dao.{ BaseDao, BaseJournalDaoWithReadMessages, H2Compat, JournalDaoWithUpdates }
import akka.persistence.jdbc.serialization.FlowPersistentReprSerializer
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.serialization.Serialization
Expand Down Expand Up @@ -48,21 +48,10 @@ trait BaseByteArrayJournalDao
implicit val ec: ExecutionContext
implicit val mat: Materializer

import journalConfig.daoConfig.logicalDelete
import profile.api._

val logger = LoggerFactory.getLogger(this.getClass)

// This logging may block since we don't control how the user will configure logback
// We can't use a Akka logging neither because we don't have an ActorSystem in scope and
// we should not introduce another dependency here.
// Therefore, we make sure we only log a warning for logical deletes once
lazy val logWarnAboutLogicalDeletionDeprecation = {
logger.warn(
"Logical deletion of events is deprecated and will be removed in akka-persistence-jdbc in a later version " +
"To disable it in this current version you must set the property 'akka-persistence-jdbc.logicalDeletion.enable' to false.")
}

def writeJournalRows(xs: Seq[JournalRow]): Future[Unit] = { // Write atomically without auto-commit
db.run(queries.writeJournalRows(xs).transactionally).map(_ => ())
}
Expand All @@ -85,26 +74,17 @@ trait BaseByteArrayJournalDao
queueWriteJournalRows(rowsToWrite).map(_ => resultWhenWriteComplete)
}

override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] =
if (logicalDelete) {
// We only log a warning when user effectively deletes an event.
// The rationale here is that this feature is not so broadly used and the default
// is to have logical delete enabled.
// We don't want to log warnings for users that are not using this,
// so we make it happen only when effectively used.
logWarnAboutLogicalDeletionDeprecation
db.run(queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr)).map(_ => ())
} else {
// We should keep journal record with highest sequence number in order to be compliant
// with @see [[akka.persistence.journal.JournalSpec]]
val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] = for {
_ <- queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr)
highestMarkedSequenceNr <- highestMarkedSequenceNr(persistenceId)
_ <- queries.delete(persistenceId, highestMarkedSequenceNr.getOrElse(0L) - 1)
} yield ()

db.run(actions.transactionally)
}
override def delete(persistenceId: String, maxSequenceNr: Long): Future[Unit] = {
// We should keep journal record with highest sequence number in order to be compliant
// with @see [[akka.persistence.journal.JournalSpec]]
val actions: DBIOAction[Unit, NoStream, Effect.Write with Effect.Read] = for {
_ <- queries.markJournalMessagesAsDeleted(persistenceId, maxSequenceNr)
highestMarkedSequenceNr <- highestMarkedSequenceNr(persistenceId)
_ <- queries.delete(persistenceId, highestMarkedSequenceNr.getOrElse(0L) - 1)
} yield ()

db.run(actions.transactionally)
}

def update(persistenceId: String, sequenceNr: Long, payload: AnyRef): Future[Done] = {
val write = PersistentRepr(payload, sequenceNr, persistenceId)
Expand Down Expand Up @@ -140,5 +120,4 @@ trait BaseByteArrayJournalDao
case Success((repr, _, ordering)) => Success(repr -> ordering)
case Failure(e) => Failure(e)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
baseTableQuery().map(_.persistenceId).distinct.take(max)

private def baseTableQuery() =
if (readJournalConfig.includeDeleted) JournalTable
else JournalTable.filter(_.deleted === false)
JournalTable.filter(_.deleted === false)

private def baseTableWithTagsQuery() = {
baseTableQuery().join(TagTable).on(_.ordering === _.eventId)
Expand All @@ -43,6 +42,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
.filter(_.persistenceId === persistenceId)
.filter(_.sequenceNumber >= fromSequenceNr)
.filter(_.sequenceNumber <= toSequenceNr)
.filter(!_.deleted)
.sortBy(_.sequenceNumber.asc)
.take(max)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,19 +124,7 @@ trait OracleReadJournalDao extends ReadJournalDao {
val theTag = s"%$tag%"

val selectStatement =
if (readJournalConfig.includeDeleted)
sql"""
SELECT "#$ordering", "#$deleted", "#$persistenceId", "#$sequenceNumber", "#$message", "#$tags"
FROM (
SELECT * FROM #$theTableName
WHERE "#$tags" LIKE $theTag
AND "#$ordering" > $theOffset
AND "#$ordering" <= $maxOffset
ORDER BY "#$ordering"
)
WHERE rownum <= $max""".as[JournalRow]
else
sql"""
sql"""
SELECT "#$ordering", "#$deleted", "#$persistenceId", "#$sequenceNumber", "#$message", "#$tags"
FROM (
SELECT * FROM #$theTableName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
baseTableQuery().map(_.persistenceId).distinct.take(max)

private def baseTableQuery() =
if (readJournalConfig.includeDeleted) JournalTable
else JournalTable.filter(_.deleted === false)
JournalTable.filter(_.deleted === false)

val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct _)

Expand Down
2 changes: 0 additions & 2 deletions core/src/test/resources/general.conf
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ docker {
}

jdbc-journal {
logicalDelete = ${akka-persistence-jdbc.logicalDeletion.enable}
event-adapters {
test-write-event-adapter = "akka.persistence.jdbc.query.EventAdapterTest$TestWriteEventAdapter"
test-read-event-adapter = "akka.persistence.jdbc.query.EventAdapterTest$TestReadEventAdapter"
Expand All @@ -59,7 +58,6 @@ jdbc-journal {


jdbc-read-journal {
includeLogicallyDeleted = ${akka-persistence-jdbc.logicalDeletion.enable}
refresh-interval = "10ms"
max-buffer-size = "500"
}
Expand Down
19 changes: 0 additions & 19 deletions core/src/test/resources/h2-application-with-hard-delete.conf

This file was deleted.

19 changes: 0 additions & 19 deletions core/src/test/resources/mysql-application-with-hard-delete.conf

This file was deleted.

19 changes: 0 additions & 19 deletions core/src/test/resources/oracle-application-with-hard-delete.conf

This file was deleted.

Loading

0 comments on commit 933ece9

Please sign in to comment.