Skip to content

Commit

Permalink
fix: IntegrationTest and clean code #710
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam committed Sep 5, 2023
1 parent 5552578 commit 6ed6098
Show file tree
Hide file tree
Showing 2 changed files with 119 additions and 138 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
* Copyright (C) 2019 - 2023 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.persistence.jdbc.integration

import akka.persistence.jdbc.query.{
Expand All @@ -12,63 +17,25 @@ class PostgresScalaEventsByTagMigrationTest
extends EventsByTagMigrationTest("postgres-application.conf")
with PostgresCleaner {

override def dropLegacyPKConstraint(): Unit = {
withStatement(stmt => stmt.execute(s"""ALTER TABLE ${tagTableCfg.tableName} DROP CONSTRAINT "event_tag_pkey""""))
}

override def alterEventIdToNullable(): Unit = {
withStatement(stmt =>
stmt.execute(
s"ALTER TABLE ${tagTableCfg.tableName} ALTER COLUMN ${tagTableCfg.columnNames.eventId} DROP NOT NULL"))
}
override def alterEventIdToNullable(): Unit =
alterColumn(changeToDialect = "DROP NOT NULL")
}

class MySQLScalaEventByTagMigrationTest extends EventsByTagMigrationTest("mysql-application.conf") with MysqlCleaner {
override def alterEventIdToNullable(): Unit = {
withStatement { stmt =>
stmt.execute(
s"ALTER TABLE ${tagTableCfg.tableName} MODIFY COLUMN ${tagTableCfg.columnNames.eventId} BIGINT UNSIGNED NULL")
}
}
override def alterEventIdToNullable(): Unit =
alterColumn(alterDialect = "MODIFY COLUMN", changeToDialect = "BIGINT UNSIGNED NULL")

override def dropLegacyFKConstraint(): Unit = {
withStatement { stmt =>
// SELECT AND DROP old FK CONSTRAINT
val constraintNameQuery =
s"""
|SELECT CONSTRAINT_NAME
|FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS
|WHERE TABLE_NAME = "${tagTableCfg.tableName}" LIMIT 1
""".stripMargin
val resultSet = stmt.executeQuery(constraintNameQuery)
if (resultSet.next()) {
val constraintName = resultSet.getString("CONSTRAINT_NAME")
stmt.execute(s"ALTER TABLE ${tagTableCfg.tableName} DROP FOREIGN KEY $constraintName")
}
}
}
override def dropLegacyFKConstraint(): Unit =
dropConstraint(constraintType = "FOREIGN KEY", constraintDialect = "FOREIGN KEY")

override def addNewPKConstraint(): Unit = {
withStatement { stmt =>
stmt.execute(s"""
|ALTER TABLE ${tagTableCfg.tableName}
|ADD CONSTRAINT
|PRIMARY KEY (${tagTableCfg.columnNames.persistenceId}, ${tagTableCfg.columnNames.sequenceNumber}, ${tagTableCfg.columnNames.tag})
""".stripMargin)
}
}
override def dropLegacyPKConstraint(): Unit =
dropConstraint(constraintType = "PRIMARY KEY", constraintDialect = "", constraintNameDialect = "KEY")

override def addNewFKConstraint(): Unit = {
withStatement { stmt =>
stmt.execute(s"""
|ALTER TABLE ${tagTableCfg.tableName}
|ADD CONSTRAINT fk_event_journal_on_pk
|FOREIGN KEY (${tagTableCfg.columnNames.persistenceId}, ${tagTableCfg.columnNames.sequenceNumber})
|REFERENCES ${journalTableCfg.tableName} (${journalTableCfg.columnNames.persistenceId}, ${journalTableCfg.columnNames.sequenceNumber})
|ON DELETE CASCADE
""".stripMargin)
}
}
override def addNewPKConstraint(): Unit =
addPKConstraint(constraintNameDialect = "")

override def addNewFKConstraint(): Unit =
addFKConstraint()
}

class OracleScalaEventByTagMigrationTest
Expand All @@ -77,64 +44,28 @@ class OracleScalaEventByTagMigrationTest

override def addNewColumn(): Unit = {
// mock event_id not null, in order to change it to null later
withStatement { stmt =>
stmt.execute(s"ALTER TABLE ${tagTableCfg.tableName} MODIFY ${tagTableCfg.columnNames.eventId} NOT NULL")
}
alterColumn(alterDialect = "MODIFY", changeToDialect = "NOT NULL")
}

override def alterEventIdToNullable(): Unit = {
withStatement { stmt =>
stmt.execute(s"ALTER TABLE ${tagTableCfg.tableName} MODIFY ${tagTableCfg.columnNames.eventId} NULL")
}
}
override def alterEventIdToNullable(): Unit =
alterColumn(alterDialect = "MODIFY", changeToDialect = "NULL")

override def dropLegacyFKConstraint(): Unit =
dropConstraint(constraintTableName = "USER_CONSTRAINTS", constraintType = "R")

override def dropLegacyPKConstraint(): Unit =
dropConstraint(constraintTableName = "USER_CONSTRAINTS", constraintType = "P")

override def dropLegacyFKConstraint(): Unit = {
withStatement { stmt =>
// SELECT AND DROP old FK CONSTRAINT
val constraintNameQuery =
s"""
|SELECT CONSTRAINT_NAME
|FROM USER_CONSTRAINTS
|WHERE CONSTRAINT_TYPE = 'R' AND TABLE_NAME = '${tagTableCfg.tableName}'
""".stripMargin
val resultSet = stmt.executeQuery(constraintNameQuery)
if (resultSet.next()) {
val constraintName = resultSet.getString("CONSTRAINT_NAME")
stmt.execute(s"ALTER TABLE ${tagTableCfg.tableName} DROP CONSTRAINT $constraintName")
}
}
}
}

class SqlServerScalaEventByTagMigrationTest
extends EventsByTagMigrationTest("sqlserver-application.conf")
with SqlServerCleaner {

override def dropLegacyPKConstraint(): Unit = {
withStatement { stmt =>
// SELECT AND DROP old PK CONSTRAINT
val constraintNameQuery =
s"""
|SELECT *
|FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS
|WHERE TABLE_NAME = '${tagTableCfg.tableName}' AND CONSTRAINT_TYPE = 'PRIMARY KEY'
""".stripMargin
val resultSet = stmt.executeQuery(constraintNameQuery)
if (resultSet.next()) {
val constraintName = resultSet.getString("CONSTRAINT_NAME")
stmt.execute(s"ALTER TABLE ${tagTableCfg.tableName} DROP CONSTRAINT $constraintName")
}
}
}

override def addNewPKConstraint(): Unit = {
// Change new column not null
withStatement(stmt => {
stmt.execute(
s"ALTER TABLE ${tagTableCfg.tableName} ALTER COLUMN ${tagTableCfg.columnNames.persistenceId} NVARCHAR(255) NOT NULL")
stmt.execute(
s"ALTER TABLE ${tagTableCfg.tableName} ALTER COLUMN ${tagTableCfg.columnNames.sequenceNumber} NUMERIC(10,0) NOT NULL")
})
alterColumn(columnName = tagTableCfg.columnNames.persistenceId, changeToDialect = "NVARCHAR(255) NOT NULL")
alterColumn(columnName = tagTableCfg.columnNames.sequenceNumber, changeToDialect = "NUMERIC(10,0) NOT NULL")
super.addNewPKConstraint()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,72 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co
val tagTableCfg = journalConfig.eventTagTableConfiguration
val journalTableCfg = journalConfig.eventJournalTableConfiguration

def dropConstraint(
tableName: String = tagTableCfg.tableName,
constraintTableName: String = "INFORMATION_SCHEMA.TABLE_CONSTRAINTS",
constraintType: String,
constraintDialect: String = "CONSTRAINT",
constraintNameDialect: String = ""): Unit = {
withStatement { stmt =>
// SELECT AND DROP old CONSTRAINT
val constraintNameQuery =
s"""
|SELECT CONSTRAINT_NAME
|FROM $constraintTableName
|WHERE TABLE_NAME = '$tableName' AND CONSTRAINT_TYPE = '$constraintType'
""".stripMargin
val resultSet = stmt.executeQuery(constraintNameQuery)
if (resultSet.next()) {
val constraintName = resultSet.getString("CONSTRAINT_NAME")
stmt.execute(s"ALTER TABLE $tableName DROP $constraintDialect $constraintName $constraintNameDialect")
}
}
}

def addPKConstraint(
tableName: String = tagTableCfg.tableName,
pidColumnName: String = tagTableCfg.columnNames.persistenceId,
seqNrColumnName: String = tagTableCfg.columnNames.sequenceNumber,
tagColumnName: String = tagTableCfg.columnNames.tag,
constraintNameDialect: String = "pk_event_tag"): Unit = {
withStatement { stmt =>
stmt.execute(s"""
|ALTER TABLE $tableName
|ADD CONSTRAINT $constraintNameDialect
|PRIMARY KEY ($pidColumnName, $seqNrColumnName, $tagColumnName)
""".stripMargin)
}
}

def addFKConstraint(
tableName: String = tagTableCfg.tableName,
pidColumnName: String = tagTableCfg.columnNames.persistenceId,
seqNrColumnName: String = tagTableCfg.columnNames.sequenceNumber,
journalTableName: String = journalTableCfg.tableName,
journalPidColumnName: String = tagTableCfg.columnNames.persistenceId,
journalSeqNrColumnName: String = tagTableCfg.columnNames.sequenceNumber,
constraintNameDialect: String = "fk_event_journal_on_pk"): Unit = {
withStatement { stmt =>
stmt.execute(s"""
|ALTER TABLE $tableName
|ADD CONSTRAINT $constraintNameDialect
|FOREIGN KEY ($pidColumnName, $seqNrColumnName)
|REFERENCES $journalTableName ($journalPidColumnName, $journalSeqNrColumnName)
|ON DELETE CASCADE
""".stripMargin)
}
}

def alterColumn(
tableName: String = tagTableCfg.tableName,
alterDialect: String = "ALTER COLUMN",
columnName: String = tagTableCfg.columnNames.eventId,
changeToDialect: String = "BIGINT NULL"): Unit = {
withStatement { stmt =>
stmt.execute(s"ALTER TABLE $tableName $alterDialect $columnName $changeToDialect")
}
}

/**
* add new column to event_tag table.
*/
Expand All @@ -55,54 +121,32 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co
/**
* drop old FK constraint
*/
def dropLegacyFKConstraint(): Unit = {
withStatement(stmt => stmt.execute(s"""ALTER TABLE ${tagTableCfg.tableName} DROP CONSTRAINT "fk_event_journal""""))

}
def dropLegacyFKConstraint(): Unit =
dropConstraint(constraintType = "FOREIGN KEY")

/**
* drop old PK constraint
*/
def dropLegacyPKConstraint(): Unit = {
withStatement(stmt => stmt.execute(s"ALTER TABLE ${tagTableCfg.tableName} DROP PRIMARY KEY"))
}
def dropLegacyPKConstraint(): Unit =
dropConstraint(constraintType = "PRIMARY KEY")

/**
* create new PK constraint for PK column.
*/
def addNewPKConstraint(): Unit = {
withStatement { stmt =>
stmt.execute(s"""
|ALTER TABLE ${tagTableCfg.tableName}
|ADD CONSTRAINT "pk_event_tag"
|PRIMARY KEY (${tagTableCfg.columnNames.persistenceId}, ${tagTableCfg.columnNames.sequenceNumber}, ${tagTableCfg.columnNames.tag})
""".stripMargin)
}
}
def addNewPKConstraint(): Unit =
addPKConstraint()

/**
* create new FK constraint for PK column.
*/
def addNewFKConstraint(): Unit = {
withStatement { stmt =>
stmt.execute(s"""
|ALTER TABLE ${tagTableCfg.tableName}
|ADD CONSTRAINT "fk_event_journal_on_pk"
|FOREIGN KEY (${tagTableCfg.columnNames.persistenceId}, ${tagTableCfg.columnNames.sequenceNumber})
|REFERENCES ${journalTableCfg.tableName} (${journalTableCfg.columnNames.persistenceId}, ${journalTableCfg.columnNames.sequenceNumber})
|ON DELETE CASCADE
""".stripMargin)
}
}
def addNewFKConstraint(): Unit =
addFKConstraint()

/**
* alter the event_id to nullable, so we can skip the InsertAndReturn.
*/
def alterEventIdToNullable(): Unit = {
withStatement { stmt =>
stmt.execute(s"ALTER TABLE ${tagTableCfg.tableName} ALTER COLUMN ${tagTableCfg.columnNames.eventId} BIGINT NULL")
}
}
def alterEventIdToNullable(): Unit =
alterColumn()

// override this, so we can reset the value.
def withRollingUpdateActorSystem(f: ActorSystem => Unit): Unit = {
Expand All @@ -119,13 +163,15 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co
it should "migrate event tag to new way" in {
// 1. Mock legacy data on here, but actually using redundant write and read.
withRollingUpdateActorSystem { implicit system =>
pendingIfOracleWithLegacy()

val journalOps = new ScalaJdbcReadJournalOperations(system)
withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
(actor1 ? withTags(1, "number")).futureValue
(actor2 ? withTags(2, "number")).futureValue
(actor3 ? withTags(3, "number")).futureValue

journalOps.withEventsByTag()("number", NoOffset) { tp =>
journalOps.withEventsByTag()("number", Sequence(Long.MinValue)) { tp =>
tp.request(Int.MaxValue)
tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L))
tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp = 0L))
Expand All @@ -137,7 +183,9 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co

// Assume that the user has completed the addition of the new column, then we don't need to maintain
// the legacy table schema creation.
addNewColumn();
if (newDao) {
addNewColumn();
}

// 2. write and read redundancy
withRollingUpdateActorSystem { implicit system =>
Expand All @@ -161,12 +209,14 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co
}

// 3. Delete the rows inserted in the old way and alter the event_id to nullable so that we can migrate to the read and write from the new PK.
deleteLegacyRows();
dropLegacyFKConstraint();
dropLegacyPKConstraint()
addNewPKConstraint()
addNewFKConstraint()
alterEventIdToNullable();
if (newDao) {
deleteLegacyRows();
dropLegacyFKConstraint();
dropLegacyPKConstraint()
addNewPKConstraint()
addNewFKConstraint()
alterEventIdToNullable();
}

// 4. check the migration completed.
withActorSystem { implicit system =>
Expand Down

0 comments on commit 6ed6098

Please sign in to comment.