diff --git a/core/src/it/scala/akka/persistence/jdbc/integration/EventsByTagMigrationTest.scala b/core/src/it/scala/akka/persistence/jdbc/integration/EventsByTagMigrationTest.scala new file mode 100644 index 000000000..3764e9681 --- /dev/null +++ b/core/src/it/scala/akka/persistence/jdbc/integration/EventsByTagMigrationTest.scala @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2023 Lightbend Inc. + */ + +package akka.persistence.jdbc.integration + +import akka.persistence.jdbc.query.{ + EventsByTagMigrationTest, + MysqlCleaner, + OracleCleaner, + PostgresCleaner, + SqlServerCleaner +} + +class PostgresScalaEventsByTagMigrationTest + extends EventsByTagMigrationTest("postgres-application.conf") + with PostgresCleaner {} + +class MySQLScalaEventByTagMigrationTest extends EventsByTagMigrationTest("mysql-application.conf") with MysqlCleaner { + + override def dropLegacyFKConstraint(): Unit = + dropConstraint(constraintType = "FOREIGN KEY", constraintDialect = "FOREIGN KEY") + + override def dropLegacyPKConstraint(): Unit = + dropConstraint(constraintType = "PRIMARY KEY", constraintDialect = "", constraintNameDialect = "KEY") + + override def addNewPKConstraint(): Unit = + addPKConstraint(constraintNameDialect = "") + + override def addNewFKConstraint(): Unit = + addFKConstraint() + + override def migrateLegacyRows(): Unit = + fillNewColumn( + joinDialect = joinSQL, + pidSetDialect = + s"${tagTableCfg.tableName}.${tagTableCfg.columnNames.persistenceId} = ${journalTableName}.${journalTableCfg.columnNames.persistenceId}", + seqNrSetDialect = + s"${tagTableCfg.tableName}.${tagTableCfg.columnNames.sequenceNumber} = ${journalTableName}.${journalTableCfg.columnNames.sequenceNumber}") +} + +class OracleScalaEventByTagMigrationTest + extends EventsByTagMigrationTest("oracle-application.conf") + with OracleCleaner { + + override def addNewColumn(): Unit = { + // mock event_id not null, in order to change it to null later + alterColumn(alterDialect = "MODIFY", changeToDialect = "NOT NULL") + } + + override def dropLegacyFKConstraint(): Unit = + dropConstraint(constraintTableName = "USER_CONSTRAINTS", constraintType = "R") + + override def dropLegacyPKConstraint(): Unit = + dropConstraint(constraintTableName = "USER_CONSTRAINTS", constraintType = "P") + + override def migrateLegacyRows(): Unit = + withStatement { stmt => + stmt.execute(s"""UPDATE ${tagTableCfg.tableName} + |SET (${tagTableCfg.columnNames.persistenceId}, ${tagTableCfg.columnNames.sequenceNumber}) = ( + | SELECT ${journalTableCfg.columnNames.persistenceId}, ${journalTableCfg.columnNames.sequenceNumber} + | ${fromSQL} + |) + |WHERE EXISTS ( + | SELECT 1 + | ${fromSQL} + |)""".stripMargin) + } +} + +class SqlServerScalaEventByTagMigrationTest + extends EventsByTagMigrationTest("sqlserver-application.conf") + with SqlServerCleaner { + + override def addNewPKConstraint(): Unit = { + // Change new column not null + alterColumn(columnName = tagTableCfg.columnNames.persistenceId, changeToDialect = "NVARCHAR(255) NOT NULL") + alterColumn(columnName = tagTableCfg.columnNames.sequenceNumber, changeToDialect = "NUMERIC(10,0) NOT NULL") + super.addNewPKConstraint() + } +} diff --git a/core/src/it/scala/akka/persistence/jdbc/integration/JdbcJournalPerfSpec.scala b/core/src/it/scala/akka/persistence/jdbc/integration/JdbcJournalPerfSpec.scala index e1e9d5faf..497694cc6 100644 --- a/core/src/it/scala/akka/persistence/jdbc/integration/JdbcJournalPerfSpec.scala +++ b/core/src/it/scala/akka/persistence/jdbc/integration/JdbcJournalPerfSpec.scala @@ -6,7 +6,6 @@ import akka.persistence.jdbc.testkit.internal.Oracle import akka.persistence.jdbc.testkit.internal.Postgres import akka.persistence.jdbc.testkit.internal.SqlServer import com.typesafe.config.ConfigFactory -import com.typesafe.config.ConfigValueFactory class PostgresJournalPerfSpec extends JdbcJournalPerfSpec(ConfigFactory.load("postgres-application.conf"), Postgres) { override def eventsCount: Int = 100 diff --git a/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-710-tag-fk.excludes b/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-710-tag-fk.excludes new file mode 100644 index 000000000..03556d3e7 --- /dev/null +++ b/core/src/main/mima-filters/5.4.0.backwards.excludes/issue-710-tag-fk.excludes @@ -0,0 +1,9 @@ +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.journal.dao.JournalTables#EventTags.eventId") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.eventId") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.copy$default$1") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.copy$default$2") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.this") +ProblemFilters.exclude[MissingTypesProblem]("akka.persistence.jdbc.journal.dao.JournalTables$TagRow$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.persistence.jdbc.journal.dao.JournalTables#TagRow.unapply") \ No newline at end of file diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index c9365b4bd..965312058 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -59,7 +59,7 @@ akka-persistence-jdbc { # Slick will use an async executor with a fixed size queue of 10.000 objects # The async executor is a connection pool for asynchronous execution of blocking I/O actions. # This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. - queueSize = 10000 // number of objects that can be queued by the async exector + queueSize = 10000 // number of objects that can be queued by the async executor # This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection # from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. @@ -149,9 +149,16 @@ jdbc-journal { schemaName = "" columnNames { + # use for older foreign key. eventId = "event_id" + persistenceId = "persistence_id" + sequenceNumber = "sequence_number" tag = "tag" } + + # For rolling updates the event_tag table migration. + # switch those to enable new region key write and read. + legacy-tag-key = true } # Otherwise it would be a pinned dispatcher, see https://github.com/akka/akka/issues/31058 @@ -170,7 +177,7 @@ jdbc-journal { # The size of the buffer used when queueing up events for batch writing. This number must be bigger then the number # of events that may be written concurrently. In other words this number must be bigger than the number of persistent - # actors that are actively peristing at the same time. + # actors that are actively persisting at the same time. bufferSize = 1000 # The maximum size of the batches in which journal rows will be inserted batchSize = 400 @@ -224,7 +231,7 @@ jdbc-journal { # Slick will use an async executor with a fixed size queue of 10.000 objects # The async executor is a connection pool for asynchronous execution of blocking I/O actions. # This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. - queueSize = 10000 // number of objects that can be queued by the async exector + queueSize = 10000 // number of objects that can be queued by the async executor # This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection # from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. @@ -350,7 +357,7 @@ jdbc-snapshot-store { # Slick will use an async executor with a fixed size queue of 10.000 objects # The async executor is a connection pool for asynchronous execution of blocking I/O actions. # This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. - queueSize = 10000 // number of objects that can be queued by the async exector + queueSize = 10000 // number of objects that can be queued by the async executor # This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection # from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. @@ -425,7 +432,7 @@ jdbc-read-journal { journal-sequence-retrieval { # The maximum number of ids that will be retrieved in each batch batch-size = 10000 - # In case a number in the sequence is missing, this is the ammount of retries that will be done to see + # In case a number in the sequence is missing, this is the amount of retries that will be done to see # if the number is still found. Note that the time after which a number in the sequence is assumed missing is # equal to maxTries * queryDelay # (maxTries may not be zero) @@ -485,7 +492,7 @@ jdbc-read-journal { # Slick will use an async executor with a fixed size queue of 10.000 objects # The async executor is a connection pool for asynchronous execution of blocking I/O actions. # This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. - queueSize = 10000 // number of objects that can be queued by the async exector + queueSize = 10000 // number of objects that can be queued by the async executor # This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection # from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. @@ -555,11 +562,11 @@ jdbc-durable-state-store { } } - # Settings for determining if gloabal_offset column in the durable-state are out of sequence. + # Settings for determining if global_offset column in the durable-state are out of sequence. durable-state-sequence-retrieval { # The maximum number of ids that will be retrieved in each batch batch-size = 10000 - # In case a number in the sequence is missing, this is the ammount of retries that will be done to see + # In case a number in the sequence is missing, this is the amount of retries that will be done to see # if the number is still found. Note that the time after which a number in the sequence is assumed missing is # equal to maxTries * queryDelay # (maxTries may not be zero) diff --git a/core/src/main/resources/schema/h2/h2-create-schema.sql b/core/src/main/resources/schema/h2/h2-create-schema.sql index ca44e8762..53af94e9e 100644 --- a/core/src/main/resources/schema/h2/h2-create-schema.sql +++ b/core/src/main/resources/schema/h2/h2-create-schema.sql @@ -18,12 +18,14 @@ CREATE TABLE IF NOT EXISTS "event_journal" ( CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" ("ordering"); CREATE TABLE IF NOT EXISTS "event_tag" ( - "event_id" BIGINT NOT NULL, + "event_id" BIGINT, + "persistence_id" VARCHAR(255), + "sequence_number" BIGINT, "tag" VARCHAR NOT NULL, - PRIMARY KEY("event_id", "tag"), + PRIMARY KEY("persistence_id", "sequence_number", "tag"), CONSTRAINT fk_event_journal - FOREIGN KEY("event_id") - REFERENCES "event_journal"("ordering") + FOREIGN KEY("persistence_id", "sequence_number") + REFERENCES "event_journal"("persistence_id", "sequence_number") ON DELETE CASCADE ); diff --git a/core/src/main/resources/schema/mysql/mysql-create-schema.sql b/core/src/main/resources/schema/mysql/mysql-create-schema.sql index 5c57be277..b8d07fe75 100644 --- a/core/src/main/resources/schema/mysql/mysql-create-schema.sql +++ b/core/src/main/resources/schema/mysql/mysql-create-schema.sql @@ -1,4 +1,4 @@ -CREATE TABLE IF NOT EXISTS event_journal( +CREATE TABLE IF NOT EXISTS event_journal ( ordering SERIAL, deleted BOOLEAN DEFAULT false NOT NULL, persistence_id VARCHAR(255) NOT NULL, @@ -17,11 +17,13 @@ CREATE TABLE IF NOT EXISTS event_journal( CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering); CREATE TABLE IF NOT EXISTS event_tag ( - event_id BIGINT UNSIGNED NOT NULL, + event_id BIGINT UNSIGNED, + persistence_id VARCHAR(255), + sequence_number BIGINT, tag VARCHAR(255) NOT NULL, - PRIMARY KEY(event_id, tag), - FOREIGN KEY (event_id) - REFERENCES event_journal(ordering) + PRIMARY KEY(persistence_id, sequence_number, tag), + FOREIGN KEY (persistence_id, sequence_number) + REFERENCES event_journal(persistence_id, sequence_number) ON DELETE CASCADE ); diff --git a/core/src/main/resources/schema/mysql/mysql-event-tag-migration.sql b/core/src/main/resources/schema/mysql/mysql-event-tag-migration.sql new file mode 100644 index 000000000..82323846a --- /dev/null +++ b/core/src/main/resources/schema/mysql/mysql-event-tag-migration.sql @@ -0,0 +1,36 @@ +-- **************** first step **************** +-- add new column +ALTER TABLE event_tag + ADD persistence_id VARCHAR(255), + ADD sequence_number BIGINT; +-- **************** second step **************** +-- migrate rows +UPDATE event_tag +INNER JOIN event_journal ON event_tag.event_id = event_journal.ordering +SET event_tag.persistence_id = event_journal.persistence_id, + event_tag.sequence_number = event_journal.sequence_number; +-- drop old FK constraint +SELECT CONSTRAINT_NAME +INTO @fk_constraint_name +FROM INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS +WHERE TABLE_NAME = 'event_tag'; +SET @alter_query = CONCAT('ALTER TABLE event_tag DROP FOREIGN KEY ', @fk_constraint_name); +PREPARE stmt FROM @alter_query; +EXECUTE stmt; +DEALLOCATE PREPARE stmt; +-- drop old PK constraint +ALTER TABLE event_tag +DROP PRIMARY KEY; +-- create new PK constraint for PK column. +ALTER TABLE event_tag + ADD CONSTRAINT + PRIMARY KEY (persistence_id, sequence_number, tag); +-- create new FK constraint for PK column. +ALTER TABLE event_tag + ADD CONSTRAINT fk_event_journal_on_pk + FOREIGN KEY (persistence_id, sequence_number) + REFERENCES event_journal (persistence_id, sequence_number) + ON DELETE CASCADE; +-- alter the event_id to nullable, so we can skip the InsertAndReturn. +ALTER TABLE event_tag + MODIFY COLUMN event_id BIGINT UNSIGNED NULL; \ No newline at end of file diff --git a/core/src/main/resources/schema/oracle/oracle-create-schema.sql b/core/src/main/resources/schema/oracle/oracle-create-schema.sql index dde92755f..4fde76632 100644 --- a/core/src/main/resources/schema/oracle/oracle-create-schema.sql +++ b/core/src/main/resources/schema/oracle/oracle-create-schema.sql @@ -23,10 +23,12 @@ CREATE OR REPLACE TRIGGER EVENT_JOURNAL__ORDERING_TRG before insert on EVENT_JOU / CREATE TABLE EVENT_TAG ( - EVENT_ID NUMERIC NOT NULL, + EVENT_ID NUMERIC, + PERSISTENCE_ID VARCHAR(255), + SEQUENCE_NUMBER NUMERIC, TAG VARCHAR(255) NOT NULL, - PRIMARY KEY(EVENT_ID, TAG), - FOREIGN KEY(EVENT_ID) REFERENCES EVENT_JOURNAL(ORDERING) + PRIMARY KEY(PERSISTENCE_ID, SEQUENCE_NUMBER, TAG), + FOREIGN KEY(PERSISTENCE_ID, SEQUENCE_NUMBER) REFERENCES EVENT_JOURNAL(PERSISTENCE_ID, SEQUENCE_NUMBER) ON DELETE CASCADE ) / diff --git a/core/src/main/resources/schema/oracle/oracle-event-tag-migration.sql b/core/src/main/resources/schema/oracle/oracle-event-tag-migration.sql new file mode 100644 index 000000000..1976dc532 --- /dev/null +++ b/core/src/main/resources/schema/oracle/oracle-event-tag-migration.sql @@ -0,0 +1,52 @@ +-- **************** first step **************** +-- add new column +ALTER TABLE EVENT_TAG + ADD (PERSISTENCE_ID VARCHAR2(255), + SEQUENCE_NUMBER NUMERIC); +-- **************** second step **************** +-- migrate rows +UPDATE EVENT_TAG +SET PERSISTENCE_ID = (SELECT PERSISTENCE_ID + FROM EVENT_JOURNAL + WHERE EVENT_TAG.EVENT_ID = EVENT_JOURNAL.ORDERING), + SEQUENCE_NUMBER = (SELECT SEQUENCE_NUMBER + FROM EVENT_JOURNAL + WHERE EVENT_TAG.EVENT_ID = EVENT_JOURNAL.ORDERING) +-- drop old FK constraint +DECLARE +v_constraint_name VARCHAR2(255); +BEGIN +SELECT CONSTRAINT_NAME +INTO v_constraint_name +FROM USER_CONSTRAINTS +WHERE TABLE_NAME = 'EVENT_TAG' + AND CONSTRAINT_TYPE = 'R'; + +IF v_constraint_name IS NOT NULL THEN + EXECUTE IMMEDIATE 'ALTER TABLE EVENT_TAG DROP CONSTRAINT ' || v_constraint_name; +END IF; + +COMMIT; +EXCEPTION + WHEN OTHERS THEN + ROLLBACK; + RAISE; +END; +/ + +-- drop old PK constraint +ALTER TABLE EVENT_TAG +DROP PRIMARY KEY; +-- create new PK constraint for PK column. +ALTER TABLE EVENT_TAG + ADD CONSTRAINT "pk_event_tag" + PRIMARY KEY (PERSISTENCE_ID, SEQUENCE_NUMBER, TAG); +-- create new FK constraint for PK column. +ALTER TABLE EVENT_TAG + ADD CONSTRAINT fk_EVENT_JOURNAL_on_pk + FOREIGN KEY (PERSISTENCE_ID, SEQUENCE_NUMBER) + REFERENCES EVENT_JOURNAL (PERSISTENCE_ID, SEQUENCE_NUMBER) + ON DELETE CASCADE; +-- alter the EVENT_ID to nullable, so we can skip the InsertAndReturn. +ALTER TABLE EVENT_TAG + MODIFY EVENT_ID NULL; \ No newline at end of file diff --git a/core/src/main/resources/schema/postgres/postgres-create-schema.sql b/core/src/main/resources/schema/postgres/postgres-create-schema.sql index 7ae7e0999..8129b7d7d 100644 --- a/core/src/main/resources/schema/postgres/postgres-create-schema.sql +++ b/core/src/main/resources/schema/postgres/postgres-create-schema.sql @@ -1,4 +1,4 @@ -CREATE TABLE IF NOT EXISTS public.event_journal( +CREATE TABLE IF NOT EXISTS public.event_journal ( ordering BIGSERIAL, persistence_id VARCHAR(255) NOT NULL, sequence_number BIGINT NOT NULL, @@ -23,11 +23,13 @@ CREATE UNIQUE INDEX event_journal_ordering_idx ON public.event_journal(ordering) CREATE TABLE IF NOT EXISTS public.event_tag( event_id BIGINT, + persistence_id VARCHAR(255), + sequence_number BIGINT, tag VARCHAR(256), - PRIMARY KEY(event_id, tag), + PRIMARY KEY(persistence_id, sequence_number, tag), CONSTRAINT fk_event_journal - FOREIGN KEY(event_id) - REFERENCES event_journal(ordering) + FOREIGN KEY(persistence_id, sequence_number) + REFERENCES event_journal(persistence_id, sequence_number) ON DELETE CASCADE ); diff --git a/core/src/main/resources/schema/postgres/postgres-event-tag-migration.sql b/core/src/main/resources/schema/postgres/postgres-event-tag-migration.sql new file mode 100644 index 000000000..5538cf2cf --- /dev/null +++ b/core/src/main/resources/schema/postgres/postgres-event-tag-migration.sql @@ -0,0 +1,31 @@ +-- **************** first step **************** +-- add new column +ALTER TABLE public.event_tag + ADD persistence_id VARCHAR(255), + ADD sequence_number BIGINT; +-- **************** second step **************** +-- migrate rows +UPDATE public.event_tag +SET persistence_id = public.event_journal.persistence_id, + sequence_number = public.event_journal.sequence_number +FROM event_journal +WHERE public.event_tag.event_id = public.event_journal.ordering; +-- drop old FK constraint +ALTER TABLE public.event_tag +DROP CONSTRAINT "fk_event_journal"; +-- drop old PK constraint +ALTER TABLE public.event_tag +DROP CONSTRAINT "event_tag_pkey"; +-- create new PK constraint for PK column. +ALTER TABLE public.event_tag + ADD CONSTRAINT "pk_event_tag" + PRIMARY KEY (persistence_id, sequence_number, tag); +-- create new FK constraint for PK column. +ALTER TABLE public.event_tag + ADD CONSTRAINT "fk_event_journal_on_pk" + FOREIGN KEY (persistence_id, sequence_number) + REFERENCES public.event_journal (persistence_id, sequence_number) + ON DELETE CASCADE; +-- alter the event_id to nullable, so we can skip the InsertAndReturn. +ALTER TABLE public.event_tag + ALTER COLUMN event_id DROP NOT NULL; \ No newline at end of file diff --git a/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql b/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql index f4cf59f18..6c3f41fc0 100644 --- a/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql +++ b/core/src/main/resources/schema/sqlserver/sqlserver-create-schema.sql @@ -1,41 +1,43 @@ -CREATE TABLE event_journal( +CREATE TABLE event_journal ( "ordering" BIGINT IDENTITY(1,1) NOT NULL, "deleted" BIT DEFAULT 0 NOT NULL, - "persistence_id" VARCHAR(255) NOT NULL, + "persistence_id" NVARCHAR(255) NOT NULL, "sequence_number" NUMERIC(10,0) NOT NULL, - "writer" VARCHAR(255) NOT NULL, + "writer" NVARCHAR(255) NOT NULL, "write_timestamp" BIGINT NOT NULL, - "adapter_manifest" VARCHAR(MAX) NOT NULL, + "adapter_manifest" NVARCHAR(MAX) NOT NULL, "event_payload" VARBINARY(MAX) NOT NULL, "event_ser_id" INTEGER NOT NULL, - "event_ser_manifest" VARCHAR(MAX) NOT NULL, + "event_ser_manifest" NVARCHAR(MAX) NOT NULL, "meta_payload" VARBINARY(MAX), "meta_ser_id" INTEGER, - "meta_ser_manifest" VARCHAR(MAX) + "meta_ser_manifest" NVARCHAR(MAX) PRIMARY KEY ("persistence_id", "sequence_number") ); CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering); CREATE TABLE event_tag ( - "event_id" BIGINT NOT NULL, - "tag" VARCHAR(255) NOT NULL - PRIMARY KEY ("event_id","tag") + "event_id" BIGINT, + "persistence_id" NVARCHAR(255), + "sequence_number" NUMERIC(10,0), + "tag" NVARCHAR(255) NOT NULL + PRIMARY KEY ("persistence_id", "sequence_number","tag"), constraint "fk_event_journal" - foreign key("event_id") - references "dbo"."event_journal"("ordering") + foreign key("persistence_id", "sequence_number") + references "dbo"."event_journal"("persistence_id", "sequence_number") on delete CASCADE ); CREATE TABLE "snapshot" ( - "persistence_id" VARCHAR(255) NOT NULL, + "persistence_id" NVARCHAR(255) NOT NULL, "sequence_number" NUMERIC(10,0) NOT NULL, "created" BIGINT NOT NULL, "snapshot_ser_id" INTEGER NOT NULL, - "snapshot_ser_manifest" VARCHAR(255) NOT NULL, + "snapshot_ser_manifest" NVARCHAR(255) NOT NULL, "snapshot_payload" VARBINARY(MAX) NOT NULL, "meta_ser_id" INTEGER, - "meta_ser_manifest" VARCHAR(255), + "meta_ser_manifest" NVARCHAR(255), "meta_payload" VARBINARY(MAX), PRIMARY KEY ("persistence_id", "sequence_number") ) diff --git a/core/src/main/resources/schema/sqlserver/sqlserver-event-tag-migration.sql b/core/src/main/resources/schema/sqlserver/sqlserver-event-tag-migration.sql new file mode 100644 index 000000000..4c5a337d6 --- /dev/null +++ b/core/src/main/resources/schema/sqlserver/sqlserver-event-tag-migration.sql @@ -0,0 +1,57 @@ +-- **************** first step **************** +-- add new column +ALTER TABLE event_tag + ADD persistence_id VARCHAR(255), + ADD sequence_number BIGINT; +-- **************** second step **************** +-- migrate rows +UPDATE event_tag +SET persistence_id = event_journal.persistence_id, + sequence_number = event_journal.sequence_number +FROM event_journal +WHERE event_tag.event_id = event_journal.ordering; +-- drop old FK constraint +DECLARE @fkConstraintName NVARCHAR(MAX); +DECLARE @dropFKConstraintQuery NVARCHAR(MAX); + +SELECT @fkConstraintName = CONSTRAINT_NAME +FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS +WHERE TABLE_NAME = 'event_tag' + AND CONSTRAINT_TYPE = 'FOREIGN KEY'; + +IF @fkConstraintName IS NOT NULL +BEGIN + SET @dropFKConstraintQuery = 'ALTER TABLE event_tag DROP CONSTRAINT ' + QUOTENAME(@fkConstraintName); +EXEC sp_executesql @dropFKConstraintQuery; +END +-- drop old PK constraint +DECLARE @constraintName NVARCHAR(MAX); +DECLARE @dropConstraintQuery NVARCHAR(MAX); + +SELECT @constraintName = CONSTRAINT_NAME +FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS +WHERE TABLE_NAME = 'event_tag' + AND CONSTRAINT_TYPE = 'PRIMARY KEY'; + +IF @constraintName IS NOT NULL +BEGIN + SET @dropConstraintQuery = 'ALTER TABLE event_tag DROP CONSTRAINT ' + QUOTENAME(@constraintName); +EXEC sp_executesql @dropConstraintQuery; +END +-- create new PK constraint for PK column. +ALTER TABLE event_tag +ALTER COLUMN persistence_id NVARCHAR(255) NOT NULL +ALTER TABLE event_tag +ALTER COLUMN sequence_number NUMERIC(10, 0) NOT NULL +ALTER TABLE event_tag + ADD CONSTRAINT "pk_event_tag" + PRIMARY KEY (persistence_id, sequence_number, TAG) +-- create new FK constraint for PK column. +ALTER TABLE event_tag + ADD CONSTRAINT "fk_event_journal_on_pk" + FOREIGN KEY (persistence_id, sequence_number) + REFERENCES event_journal (persistence_id, sequence_number) + ON DELETE CASCADE +-- alter the event_id to nullable, so we can skip the InsertAndReturn. +ALTER TABLE event_tag +ALTER COLUMN event_id BIGINT NULL \ No newline at end of file diff --git a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala index 4613ae33c..b246b5a67 100644 --- a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala +++ b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala @@ -53,7 +53,9 @@ class EventJournalTableColumnNames(config: Config) { class EventTagTableColumnNames(config: Config) { private val cfg = config.getConfig("tables.event_tag.columnNames") - val eventId: String = cfg.getString("eventId") + val eventId: String = cfg.getString("eventId") // for compatibility + val persistenceId: String = cfg.getString("persistenceId") + val sequenceNumber: String = cfg.getString("sequenceNumber") val tag: String = cfg.getString("tag") } @@ -74,6 +76,7 @@ class EventJournalTableConfiguration(config: Config) { } class EventTagTableConfiguration(config: Config) { private val cfg = config.getConfig("tables.event_tag") + val legacyTagKey: Boolean = cfg.getBoolean("legacy-tag-key") val tableName: String = cfg.getString("tableName") val schemaName: Option[String] = cfg.asStringOption("schemaName") val columnNames: EventTagTableColumnNames = new EventTagTableColumnNames(config) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala index 892fbd872..d0b5ffbad 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala @@ -23,20 +23,39 @@ class JournalQueries( private val insertAndReturn = JournalTable.returning(JournalTable.map(_.ordering)) private val TagTableC = Compiled(TagTable) - def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])(implicit ec: ExecutionContext) = { - val sorted = xs.sortBy((event => event._1.sequenceNumber)) + def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])( + implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = { + val sorted = xs.sortBy(event => event._1.sequenceNumber) if (sorted.exists(_._2.nonEmpty)) { // only if there are any tags - val (events, tags) = sorted.unzip + writeEventsAndTags(sorted) + } else { + // optimization avoid some work when not using tags + val events = sorted.map(_._1) + JournalTableC ++= events + } + } + + private def writeEventsAndTags(sorted: Seq[(JournalAkkaSerializationRow, Set[String])])( + implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = { + val (events, _) = sorted.unzip + if (tagTableCfg.legacyTagKey) { for { ids <- insertAndReturn ++= events - tagInserts = ids.zip(tags).flatMap { case (id, tags) => tags.map(tag => TagRow(id, tag)) } + tagInserts = ids.zip(sorted).flatMap { case (id, (e, tags)) => + tags.map(tag => TagRow(Some(id), Some(e.persistenceId), Some(e.sequenceNumber), tag)) + } _ <- TagTableC ++= tagInserts } yield () } else { - // optimization avoid some work when not using tags - val events = sorted.map(_._1) - JournalTableC ++= events + val tagInserts = sorted.map { case (e, tags) => + tags.map(t => TagRow(None, Some(e.persistenceId), Some(e.sequenceNumber), t)) + } + // optimization using batch insert + for { + _ <- JournalTableC ++= events + _ <- TagTableC ++= tagInserts.flatten + } yield () } } diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala index 5cf1a6ad0..9e9e998e2 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalTables.scala @@ -29,7 +29,7 @@ object JournalTables { metaSerId: Option[Int], metaSerManifest: Option[String]) - case class TagRow(eventId: Long, tag: String) + case class TagRow(eventId: Option[Long], persistenceId: Option[String], sequenceNumber: Option[Long], tag: String) } /** @@ -91,13 +91,17 @@ trait JournalTables { lazy val JournalTable = new TableQuery(tag => new JournalEvents(tag)) class EventTags(_tableTag: Tag) extends Table[TagRow](_tableTag, tagTableCfg.schemaName, tagTableCfg.tableName) { - override def * = (eventId, tag) <> (TagRow.tupled, TagRow.unapply) - - val eventId: Rep[Long] = column[Long](tagTableCfg.columnNames.eventId) + override def * = (eventId, persistenceId, sequenceNumber, tag) <> (TagRow.tupled, TagRow.unapply) + // allow null value insert. + val eventId: Rep[Option[Long]] = column[Long](tagTableCfg.columnNames.eventId) + val persistenceId: Rep[Option[String]] = column[String](tagTableCfg.columnNames.persistenceId) + val sequenceNumber: Rep[Option[Long]] = column[Long](tagTableCfg.columnNames.sequenceNumber) val tag: Rep[String] = column[String](tagTableCfg.columnNames.tag) - val pk = primaryKey(s"${tagTableCfg.tableName}_pk", (eventId, tag)) - val journalEvent = foreignKey(s"fk_${journalTableCfg.tableName}", eventId, JournalTable)(_.ordering) + val pk = primaryKey(s"${tagTableCfg.tableName}_pk", (persistenceId, sequenceNumber, tag)) + val journalEvent = + foreignKey(s"fk_${journalTableCfg.tableName}", (persistenceId, sequenceNumber), JournalTable)(e => + (e.persistenceId, e.sequenceNumber)) } lazy val TagTable = new TableQuery(tag => new EventTags(tag)) diff --git a/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala index 3684eb9af..1b1abee13 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala @@ -28,7 +28,13 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo JournalTable.filter(_.deleted === false) private def baseTableWithTagsQuery() = { - baseTableQuery().join(TagTable).on(_.ordering === _.eventId) + if (tagTableCfg.legacyTagKey) { + baseTableQuery().join(TagTable).on(_.ordering === _.eventId) + } else { + baseTableQuery() + .join(TagTable) + .on((e, t) => e.persistenceId === t.persistenceId && e.sequenceNumber === t.sequenceNumber) + } } val allPersistenceIdsDistinct = Compiled(_allPersistenceIdsDistinct _) diff --git a/core/src/test/resources/mysql-application.conf b/core/src/test/resources/mysql-application.conf index 43643ab49..80f3ffffc 100644 --- a/core/src/test/resources/mysql-application.conf +++ b/core/src/test/resources/mysql-application.conf @@ -49,7 +49,7 @@ slick { db { host = ${docker.host} host = ${?DB_HOST} - url = "jdbc:mysql://"${slick.db.host}":3306/mysql?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true" + url = "jdbc:mysql://"${slick.db.host}":3306/docker?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true" user = "root" password = "root" driver = "com.mysql.cj.jdbc.Driver" diff --git a/core/src/test/resources/mysql-shared-db-application.conf b/core/src/test/resources/mysql-shared-db-application.conf index b8016ae86..4318f95e1 100644 --- a/core/src/test/resources/mysql-shared-db-application.conf +++ b/core/src/test/resources/mysql-shared-db-application.conf @@ -36,7 +36,7 @@ akka-persistence-jdbc { db { host = ${docker.host} host = ${?DB_HOST} - url = "jdbc:mysql://"${akka-persistence-jdbc.shared-databases.slick.db.host}":3306/mysql?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true" + url = "jdbc:mysql://"${akka-persistence-jdbc.shared-databases.slick.db.host}":3306/docker?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true" user = "root" password = "root" driver = "com.mysql.cj.jdbc.Driver" diff --git a/core/src/test/resources/oracle-schema-overrides.conf b/core/src/test/resources/oracle-schema-overrides.conf index 44702b0dd..44d313e62 100644 --- a/core/src/test/resources/oracle-schema-overrides.conf +++ b/core/src/test/resources/oracle-schema-overrides.conf @@ -63,6 +63,8 @@ jdbc-journal { columnNames { eventId = "EVENT_ID" + persistenceId = "PERSISTENCE_ID" + sequenceNumber = "SEQUENCE_NUMBER" tag = "TAG" } } diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index 2591349e9..2314293ac 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -185,9 +185,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf // send a batch of 3 * 200 val batch1 = sendMessagesWithTag(tag, 200) // Try to persist a large batch of events per actor. Some of these may be returned, but not all! - // Reduced for 5.0.0 as we can no longer do a batch insert due to the insert returning the ordering - // so trying to persist 1000s in a batch is slower - val batch2 = sendMessagesWithTag(tag, 2000) + val batch2 = sendMessagesWithTag(tag, 5000) // wait for acknowledgement of the first batch only batch1.futureValue diff --git a/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala new file mode 100644 index 000000000..7447ed7f6 --- /dev/null +++ b/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala @@ -0,0 +1,266 @@ +/* + * Copyright (C) 2014 - 2019 Dennis Vriend + * Copyright (C) 2019 - 2023 Lightbend Inc. + */ + +package akka.persistence.jdbc.query + +import akka.actor.ActorSystem +import akka.pattern.ask +import akka.persistence.jdbc.query.EventsByTagMigrationTest.{ legacyTagKeyConfigOverride, migrationConfigOverride } +import akka.persistence.query.{ EventEnvelope, Sequence } +import com.typesafe.config.{ ConfigFactory, ConfigValue, ConfigValueFactory } + +import scala.concurrent.duration._ + +object EventsByTagMigrationTest { + val maxBufferSize = 20 + val refreshInterval = 500.milliseconds + val legacyTagKey = true + + val legacyTagKeyConfigOverride: Map[String, ConfigValue] = Map( + "jdbc-read-journal.max-buffer-size" -> ConfigValueFactory.fromAnyRef(maxBufferSize.toString), + "jdbc-read-journal.refresh-interval" -> ConfigValueFactory.fromAnyRef(refreshInterval.toString), + "jdbc-journal.tables.event_tag.legacy-tag-key" -> ConfigValueFactory.fromAnyRef(legacyTagKey)) + + val migrationConfigOverride: Map[String, ConfigValue] = Map( + "jdbc-read-journal.max-buffer-size" -> ConfigValueFactory.fromAnyRef(maxBufferSize.toString), + "jdbc-read-journal.refresh-interval" -> ConfigValueFactory.fromAnyRef(refreshInterval.toString)) +} + +abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(config, migrationConfigOverride) { + final val NoMsgTime: FiniteDuration = 100.millis + + val tagTableCfg = journalConfig.eventTagTableConfiguration + val journalTableCfg = journalConfig.eventJournalTableConfiguration + val joinSQL: String = + s"JOIN ${journalTableName} ON ${tagTableCfg.tableName}.${tagTableCfg.columnNames.eventId} = ${journalTableName}.${journalTableCfg.columnNames.ordering}" + val fromSQL: String = + s"FROM ${journalTableName} WHERE ${tagTableCfg.tableName}.${tagTableCfg.columnNames.eventId} = ${journalTableName}.${journalTableCfg.columnNames.ordering}" + + def dropConstraint( + tableName: String = tagTableCfg.tableName, + constraintTableName: String = "INFORMATION_SCHEMA.TABLE_CONSTRAINTS", + constraintType: String, + constraintDialect: String = "CONSTRAINT", + constraintNameDialect: String = ""): Unit = { + withStatement { stmt => + // SELECT AND DROP old CONSTRAINT + val constraintNameQuery = + s""" + |SELECT CONSTRAINT_NAME + |FROM $constraintTableName + |WHERE TABLE_NAME = '$tableName' AND CONSTRAINT_TYPE = '$constraintType' + """.stripMargin + val resultSet = stmt.executeQuery(constraintNameQuery) + if (resultSet.next()) { + val constraintName = resultSet.getString("CONSTRAINT_NAME") + stmt.execute(s"ALTER TABLE $tableName DROP $constraintDialect $constraintName $constraintNameDialect") + } + } + } + + def addPKConstraint( + tableName: String = tagTableCfg.tableName, + pidColumnName: String = tagTableCfg.columnNames.persistenceId, + seqNrColumnName: String = tagTableCfg.columnNames.sequenceNumber, + tagColumnName: String = tagTableCfg.columnNames.tag, + constraintNameDialect: String = "pk_event_tag"): Unit = { + withStatement { stmt => + stmt.execute(s""" + |ALTER TABLE $tableName + |ADD CONSTRAINT $constraintNameDialect + |PRIMARY KEY ($pidColumnName, $seqNrColumnName, $tagColumnName) + """.stripMargin) + } + } + + def addFKConstraint( + tableName: String = tagTableCfg.tableName, + pidColumnName: String = tagTableCfg.columnNames.persistenceId, + seqNrColumnName: String = tagTableCfg.columnNames.sequenceNumber, + journalTableName: String = journalTableCfg.tableName, + journalPidColumnName: String = tagTableCfg.columnNames.persistenceId, + journalSeqNrColumnName: String = tagTableCfg.columnNames.sequenceNumber, + constraintNameDialect: String = "fk_event_journal_on_pk"): Unit = { + withStatement { stmt => + stmt.execute(s""" + |ALTER TABLE $tableName + |ADD CONSTRAINT $constraintNameDialect + |FOREIGN KEY ($pidColumnName, $seqNrColumnName) + |REFERENCES $journalTableName ($journalPidColumnName, $journalSeqNrColumnName) + |ON DELETE CASCADE + """.stripMargin) + } + } + + def alterColumn( + tableName: String = tagTableCfg.tableName, + alterDialect: String = "ALTER COLUMN", + columnName: String = tagTableCfg.columnNames.eventId, + changeToDialect: String = "BIGINT NULL"): Unit = { + withStatement { stmt => + stmt.execute(s"ALTER TABLE $tableName $alterDialect $columnName $changeToDialect") + } + } + + def fillNewColumn( + joinDialect: String = "", + pidSetDialect: String = + s"${tagTableCfg.columnNames.persistenceId} = ${journalTableName}.${journalTableCfg.columnNames.persistenceId}", + seqNrSetDialect: String = + s"${tagTableCfg.columnNames.sequenceNumber} = ${journalTableName}.${journalTableCfg.columnNames.sequenceNumber}", + fromDialect: String = ""): Unit = { + withStatement { stmt => + stmt.execute(s""" + |UPDATE ${tagTableCfg.tableName} ${joinDialect} + |SET ${pidSetDialect}, + |${seqNrSetDialect} + |${fromDialect}""".stripMargin) + } + } + + /** + * add new column to event_tag table. + */ + def addNewColumn(): Unit = {} + + /** + * fill new column for exists rows. + */ + def migrateLegacyRows(): Unit = { + fillNewColumn(fromDialect = fromSQL); + } + + /** + * drop old FK constraint + */ + def dropLegacyFKConstraint(): Unit = + dropConstraint(constraintType = "FOREIGN KEY") + + /** + * drop old PK constraint + */ + def dropLegacyPKConstraint(): Unit = + dropConstraint(constraintType = "PRIMARY KEY") + + /** + * create new PK constraint for PK column. + */ + def addNewPKConstraint(): Unit = + addPKConstraint() + + /** + * create new FK constraint for PK column. + */ + def addNewFKConstraint(): Unit = + addFKConstraint() + + // override this, so we can reset the value. + def withRollingUpdateActorSystem(f: ActorSystem => Unit): Unit = { + val legacyTagKeyConfig = legacyTagKeyConfigOverride.foldLeft(ConfigFactory.load(config)) { + case (conf, (path, configValue)) => + conf.withValue(path, configValue) + } + + implicit val system: ActorSystem = ActorSystem("migrator-test", legacyTagKeyConfig) + f(system) + system.terminate().futureValue + } + + it should "migrate event tag to new way" in { + // 1. Mock legacy tag column on here, but actually using new tag write. + withRollingUpdateActorSystem { implicit system => + + val journalOps = new ScalaJdbcReadJournalOperations(system) + withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => + (actor1 ? withTags(1, "number")).futureValue + (actor2 ? withTags(2, "number")).futureValue + (actor3 ? withTags(3, "number")).futureValue + + journalOps.withEventsByTag()("number", Sequence(Long.MinValue)) { tp => + tp.request(Int.MaxValue) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp = 0L)) + tp.cancel() + } + }(system) + } + + // Assume that the user could alter table for the addition of the new column manually, then we don't need to maintain + // the legacy table schema creation. + if (newDao) { + addNewColumn(); + migrateLegacyRows(); + } + + // 2. write and read redundancy + withRollingUpdateActorSystem { implicit system => + val journalOps = new ScalaJdbcReadJournalOperations(system) + withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => + (actor1 ? withTags(4, "number")).futureValue + (actor2 ? withTags(5, "number")).futureValue + (actor3 ? withTags(6, "number")).futureValue + // Delay events that have not yet been projected can still be read. + journalOps.withEventsByTag()("number", Sequence(Long.MinValue)) { tp => + tp.request(Int.MaxValue) + tp.expectNext(EventEnvelope(Sequence(1), "my-1", 1, 1, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(2), "my-2", 1, 2, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(3), "my-3", 1, 3, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, 4, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(5), "my-2", 2, 5, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(6), "my-3", 2, 6, timestamp = 0L)) + tp.cancel() + } + }(system) + } + + // 3. Migrate the old constraints so that we can change read and write from the new PK. + if (newDao) { + dropLegacyFKConstraint(); + dropLegacyPKConstraint() + addNewPKConstraint() + addNewFKConstraint() + } + + // 4. check the migration completed. + withActorSystem { implicit system => + + val journalOps = new ScalaJdbcReadJournalOperations(system) + withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => + + (actor1 ? withTags(7, "number")).futureValue + (actor2 ? withTags(8, "number")).futureValue + (actor3 ? withTags(9, "number")).futureValue + + journalOps.withEventsByTag()("number", Sequence(3)) { tp => + tp.request(Int.MaxValue) + tp.expectNext(EventEnvelope(Sequence(4), "my-1", 2, 4, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(5), "my-2", 2, 5, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(6), "my-3", 2, 6, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(7), "my-1", 3, 7, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(8), "my-2", 3, 8, timestamp = 0L)) + tp.expectNext(EventEnvelope(Sequence(9), "my-3", 3, 9, timestamp = 0L)) + tp.cancel() + } + + }(system) + } + } +} + +class H2ScalaEventsByTagMigrationTest extends EventsByTagMigrationTest("h2-application.conf") with H2Cleaner { + + override def migrateLegacyRows(): Unit = { + fillNewColumn( + pidSetDialect = s"""${tagTableCfg.columnNames.persistenceId} = ( + | SELECT ${journalTableCfg.columnNames.persistenceId} + | ${fromSQL} + |)""".stripMargin, + seqNrSetDialect = s"""${tagTableCfg.columnNames.sequenceNumber} = ( + | SELECT ${journalTableCfg.columnNames.sequenceNumber} + | ${fromSQL} + |)""".stripMargin) + } +} diff --git a/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala index ed4fa8b9e..d238975f8 100644 --- a/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala +++ b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala @@ -139,7 +139,14 @@ final case class JournalMigrator(profile: JdbcProfile)(implicit system: ActorSys .forceInsert(journalSerializedRow) val tagInserts = - newJournalQueries.TagTable ++= tags.map(tag => TagRow(journalSerializedRow.ordering, tag)).toSeq + newJournalQueries.TagTable ++= tags + .map(tag => + TagRow( + Some(journalSerializedRow.ordering), // legacy tag key enabled by default. + Some(journalSerializedRow.persistenceId), + Some(journalSerializedRow.sequenceNumber), + tag)) + .toSeq journalInsert.flatMap(_ => tagInserts.asInstanceOf[DBIO[Unit]]) } diff --git a/migrator/src/test/resources/mysql-application.conf b/migrator/src/test/resources/mysql-application.conf index 6b9cf8b46..ce594eba5 100644 --- a/migrator/src/test/resources/mysql-application.conf +++ b/migrator/src/test/resources/mysql-application.conf @@ -37,7 +37,7 @@ slick { db { host = ${docker.host} host = ${?DB_HOST} - url = "jdbc:mysql://"${slick.db.host}":3306/mysql?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true" + url = "jdbc:mysql://"${slick.db.host}":3306/docker?cachePrepStmts=true&cacheCallableStmts=true&cacheServerConfiguration=true&useLocalSessionState=true&elideSetAutoCommits=true&alwaysSendSetIsolation=false&enableQueryTimeouts=false&connectionAttributes=none&verifyServerCertificate=false&useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&useLegacyDatetimeCode=false&serverTimezone=UTC&rewriteBatchedStatements=true" user = "root" password = "root" driver = "com.mysql.cj.jdbc.Driver" diff --git a/migrator/src/test/resources/schema/h2/h2-create-schema.sql b/migrator/src/test/resources/schema/h2/h2-create-schema.sql index ca44e8762..53af94e9e 100644 --- a/migrator/src/test/resources/schema/h2/h2-create-schema.sql +++ b/migrator/src/test/resources/schema/h2/h2-create-schema.sql @@ -18,12 +18,14 @@ CREATE TABLE IF NOT EXISTS "event_journal" ( CREATE UNIQUE INDEX "event_journal_ordering_idx" on "event_journal" ("ordering"); CREATE TABLE IF NOT EXISTS "event_tag" ( - "event_id" BIGINT NOT NULL, + "event_id" BIGINT, + "persistence_id" VARCHAR(255), + "sequence_number" BIGINT, "tag" VARCHAR NOT NULL, - PRIMARY KEY("event_id", "tag"), + PRIMARY KEY("persistence_id", "sequence_number", "tag"), CONSTRAINT fk_event_journal - FOREIGN KEY("event_id") - REFERENCES "event_journal"("ordering") + FOREIGN KEY("persistence_id", "sequence_number") + REFERENCES "event_journal"("persistence_id", "sequence_number") ON DELETE CASCADE ); diff --git a/migrator/src/test/resources/schema/mysql/mysql-create-schema.sql b/migrator/src/test/resources/schema/mysql/mysql-create-schema.sql index 5c57be277..b8d07fe75 100644 --- a/migrator/src/test/resources/schema/mysql/mysql-create-schema.sql +++ b/migrator/src/test/resources/schema/mysql/mysql-create-schema.sql @@ -1,4 +1,4 @@ -CREATE TABLE IF NOT EXISTS event_journal( +CREATE TABLE IF NOT EXISTS event_journal ( ordering SERIAL, deleted BOOLEAN DEFAULT false NOT NULL, persistence_id VARCHAR(255) NOT NULL, @@ -17,11 +17,13 @@ CREATE TABLE IF NOT EXISTS event_journal( CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering); CREATE TABLE IF NOT EXISTS event_tag ( - event_id BIGINT UNSIGNED NOT NULL, + event_id BIGINT UNSIGNED, + persistence_id VARCHAR(255), + sequence_number BIGINT, tag VARCHAR(255) NOT NULL, - PRIMARY KEY(event_id, tag), - FOREIGN KEY (event_id) - REFERENCES event_journal(ordering) + PRIMARY KEY(persistence_id, sequence_number, tag), + FOREIGN KEY (persistence_id, sequence_number) + REFERENCES event_journal(persistence_id, sequence_number) ON DELETE CASCADE ); diff --git a/migrator/src/test/resources/schema/oracle/oracle-create-schema.sql b/migrator/src/test/resources/schema/oracle/oracle-create-schema.sql index dde92755f..4fde76632 100644 --- a/migrator/src/test/resources/schema/oracle/oracle-create-schema.sql +++ b/migrator/src/test/resources/schema/oracle/oracle-create-schema.sql @@ -23,10 +23,12 @@ CREATE OR REPLACE TRIGGER EVENT_JOURNAL__ORDERING_TRG before insert on EVENT_JOU / CREATE TABLE EVENT_TAG ( - EVENT_ID NUMERIC NOT NULL, + EVENT_ID NUMERIC, + PERSISTENCE_ID VARCHAR(255), + SEQUENCE_NUMBER NUMERIC, TAG VARCHAR(255) NOT NULL, - PRIMARY KEY(EVENT_ID, TAG), - FOREIGN KEY(EVENT_ID) REFERENCES EVENT_JOURNAL(ORDERING) + PRIMARY KEY(PERSISTENCE_ID, SEQUENCE_NUMBER, TAG), + FOREIGN KEY(PERSISTENCE_ID, SEQUENCE_NUMBER) REFERENCES EVENT_JOURNAL(PERSISTENCE_ID, SEQUENCE_NUMBER) ON DELETE CASCADE ) / diff --git a/migrator/src/test/resources/schema/postgres/postgres-create-schema.sql b/migrator/src/test/resources/schema/postgres/postgres-create-schema.sql index 7ae7e0999..190dc9668 100644 --- a/migrator/src/test/resources/schema/postgres/postgres-create-schema.sql +++ b/migrator/src/test/resources/schema/postgres/postgres-create-schema.sql @@ -23,11 +23,13 @@ CREATE UNIQUE INDEX event_journal_ordering_idx ON public.event_journal(ordering) CREATE TABLE IF NOT EXISTS public.event_tag( event_id BIGINT, + persistence_id VARCHAR(255), + sequence_number BIGINT, tag VARCHAR(256), - PRIMARY KEY(event_id, tag), + PRIMARY KEY(persistence_id, sequence_number, tag), CONSTRAINT fk_event_journal - FOREIGN KEY(event_id) - REFERENCES event_journal(ordering) + FOREIGN KEY(persistence_id, sequence_number) + REFERENCES event_journal(persistence_id, sequence_number) ON DELETE CASCADE ); diff --git a/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema.sql b/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema.sql index f4cf59f18..65f9f883c 100644 --- a/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema.sql +++ b/migrator/src/test/resources/schema/sqlserver/sqlserver-create-schema.sql @@ -18,9 +18,11 @@ CREATE TABLE event_journal( CREATE UNIQUE INDEX event_journal_ordering_idx ON event_journal(ordering); CREATE TABLE event_tag ( - "event_id" BIGINT NOT NULL, - "tag" VARCHAR(255) NOT NULL - PRIMARY KEY ("event_id","tag") + "event_id" BIGINT, + "persistence_id" VARCHAR(255), + "sequence_number" NUMERIC(10,0), + "tag" VARCHAR(255) NOT NULL, + PRIMARY KEY ("event_id", "tag"), constraint "fk_event_journal" foreign key("event_id") references "dbo"."event_journal"("ordering") diff --git a/scripts/docker-compose.yml b/scripts/docker-compose.yml index 5f5983f66..ec3f26eb2 100644 --- a/scripts/docker-compose.yml +++ b/scripts/docker-compose.yml @@ -14,6 +14,7 @@ mysql: environment: - "TZ=Europe/Amsterdam" - "MYSQL_ROOT_PASSWORD=root" + - "MYSQL_DATABASE=docker" ports: - "3306:3306" # credentials (root:root)