From 7cc97d2e40955d3095f601dc41e10aa7a1f0ad93 Mon Sep 17 00:00:00 2001 From: AndyChen Date: Mon, 4 Sep 2023 21:23:34 +0800 Subject: [PATCH] chore: typo fix #710 --- core/src/main/resources/reference.conf | 23 +++++++++---------- .../jdbc/config/AkkaPersistenceConfig.scala | 3 +-- .../jdbc/journal/dao/JournalQueries.scala | 4 ++-- .../jdbc/query/dao/ReadJournalQueries.scala | 2 +- .../jdbc/query/EventsByTagMigrationTest.scala | 14 +++++------ .../jdbc/migrator/JournalMigrator.scala | 6 +---- 6 files changed, 22 insertions(+), 30 deletions(-) diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 92013d850..5e34be031 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -48,7 +48,7 @@ akka-persistence-jdbc { # Slick will use an async executor with a fixed size queue of 10.000 objects # The async executor is a connection pool for asynchronous execution of blocking I/O actions. # This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. - queueSize = 10000 // number of objects that can be queued by the async exector + queueSize = 10000 // number of objects that can be queued by the async executor # This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection # from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. @@ -146,9 +146,8 @@ jdbc-journal { } # For rolling updates the event_tag table migration. - # switch those to enabled new foregion key wirte and read. - redundant-write = false - redundant-read = false + # switch those to enable new region key write and read. + legacy-tag-key = true } # Otherwise it would be a pinned dispatcher, see https://github.com/akka/akka/issues/31058 @@ -167,7 +166,7 @@ jdbc-journal { # The size of the buffer used when queueing up events for batch writing. This number must be bigger then the number # of events that may be written concurrently. In other words this number must be bigger than the number of persistent - # actors that are actively peristing at the same time. + # actors that are actively persisting at the same time. bufferSize = 1000 # The maximum size of the batches in which journal rows will be inserted batchSize = 400 @@ -221,7 +220,7 @@ jdbc-journal { # Slick will use an async executor with a fixed size queue of 10.000 objects # The async executor is a connection pool for asynchronous execution of blocking I/O actions. # This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. - queueSize = 10000 // number of objects that can be queued by the async exector + queueSize = 10000 // number of objects that can be queued by the async executor # This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection # from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. @@ -347,7 +346,7 @@ jdbc-snapshot-store { # Slick will use an async executor with a fixed size queue of 10.000 objects # The async executor is a connection pool for asynchronous execution of blocking I/O actions. # This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. - queueSize = 10000 // number of objects that can be queued by the async exector + queueSize = 10000 // number of objects that can be queued by the async executor # This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection # from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. @@ -441,7 +440,7 @@ jdbc-read-journal { journal-sequence-retrieval { # The maximum number of ids that will be retrieved in each batch batch-size = 10000 - # In case a number in the sequence is missing, this is the ammount of retries that will be done to see + # In case a number in the sequence is missing, this is the amount of retries that will be done to see # if the number is still found. Note that the time after which a number in the sequence is assumed missing is # equal to maxTries * queryDelay # (maxTries may not be zero) @@ -501,7 +500,7 @@ jdbc-read-journal { # Slick will use an async executor with a fixed size queue of 10.000 objects # The async executor is a connection pool for asynchronous execution of blocking I/O actions. # This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. - queueSize = 10000 // number of objects that can be queued by the async exector + queueSize = 10000 // number of objects that can be queued by the async executor # This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection # from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. @@ -571,11 +570,11 @@ jdbc-durable-state-store { } } - # Settings for determining if gloabal_offset column in the durable-state are out of sequence. + # Settings for determining if global_offset column in the durable-state are out of sequence. durable-state-sequence-retrieval { # The maximum number of ids that will be retrieved in each batch batch-size = 10000 - # In case a number in the sequence is missing, this is the ammount of retries that will be done to see + # In case a number in the sequence is missing, this is the amount of retries that will be done to see # if the number is still found. Note that the time after which a number in the sequence is assumed missing is # equal to maxTries * queryDelay # (maxTries may not be zero) @@ -626,7 +625,7 @@ jdbc-durable-state-store { # Slick will use an async executor with a fixed size queue of 10.000 objects # The async executor is a connection pool for asynchronous execution of blocking I/O actions. # This is used for the asynchronous query execution API on top of blocking back-ends like JDBC. - queueSize = 10000 // number of objects that can be queued by the async exector + queueSize = 10000 // number of objects that can be queued by the async executor # This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection # from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown. diff --git a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala index fac49ce76..546a16544 100644 --- a/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala +++ b/core/src/main/scala/akka/persistence/jdbc/config/AkkaPersistenceConfig.scala @@ -76,8 +76,7 @@ class EventJournalTableConfiguration(config: Config) { } class EventTagTableConfiguration(config: Config) { private val cfg = config.getConfig("tables.event_tag") - val writeRedundant: Boolean = cfg.getBoolean("redundant-write") - val readRedundant: Boolean = cfg.getBoolean("redundant-read") + val legacyTagKey: Boolean = cfg.getBoolean("legacy-tag-key") val tableName: String = cfg.getString("tableName") val schemaName: Option[String] = cfg.asStringOption("schemaName") val columnNames: EventTagTableColumnNames = new EventTagTableColumnNames(config) diff --git a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala index 3b6306203..5df745eeb 100644 --- a/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/journal/dao/JournalQueries.scala @@ -25,7 +25,7 @@ class JournalQueries( def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])( implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = { - val sorted = xs.sortBy((event => event._1.sequenceNumber)) + val sorted = xs.sortBy(event => event._1.sequenceNumber) if (sorted.exists(_._2.nonEmpty)) { // only if there are any tags writeEventsAndTags(sorted) @@ -39,7 +39,7 @@ class JournalQueries( private def writeEventsAndTags(sorted: Seq[(JournalAkkaSerializationRow, Set[String])])( implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = { val (events, _) = sorted.unzip - if (tagTableCfg.writeRedundant) { + if (tagTableCfg.legacyTagKey) { for { ids <- insertAndReturn ++= events tagInserts = ids.zip(sorted).flatMap { case (id, (e, tags)) => diff --git a/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala b/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala index 9b2eace53..d74bde7da 100644 --- a/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala +++ b/core/src/main/scala/akka/persistence/jdbc/query/dao/ReadJournalQueries.scala @@ -28,7 +28,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo JournalTable.filter(_.deleted === false) private def baseTableWithTagsQuery() = { - if (tagTableCfg.readRedundant) { + if (tagTableCfg.legacyTagKey) { baseTableQuery().join(TagTable).on(_.ordering === _.eventId) } else { baseTableQuery() diff --git a/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala index 727db0a50..607e967ee 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/EventsByTagMigrationTest.scala @@ -7,7 +7,7 @@ package akka.persistence.jdbc.query import akka.actor.ActorSystem import akka.pattern.ask -import akka.persistence.jdbc.query.EventsByTagMigrationTest.{ migrationConfigOverride, redundantConfigOverride } +import akka.persistence.jdbc.query.EventsByTagMigrationTest.{ legacyTagKeyConfigOverride, migrationConfigOverride } import akka.persistence.query.{ EventEnvelope, NoOffset, Sequence } import com.typesafe.config.{ ConfigFactory, ConfigValue, ConfigValueFactory } @@ -16,14 +16,12 @@ import scala.concurrent.duration._ object EventsByTagMigrationTest { val maxBufferSize = 20 val refreshInterval = 500.milliseconds - val redundantWrite = true - val redundantRead = true + val legacyTagKey = true - val redundantConfigOverride: Map[String, ConfigValue] = Map( + val legacyTagKeyConfigOverride: Map[String, ConfigValue] = Map( "jdbc-read-journal.max-buffer-size" -> ConfigValueFactory.fromAnyRef(maxBufferSize.toString), "jdbc-read-journal.refresh-interval" -> ConfigValueFactory.fromAnyRef(refreshInterval.toString), - "jdbc-journal.tables.event_tag.redundant-write" -> ConfigValueFactory.fromAnyRef(redundantWrite), - "jdbc-read-journal.tables.event_tag.redundant-read" -> ConfigValueFactory.fromAnyRef(redundantRead)) + "jdbc-journal.tables.event_tag.legacy-tag-key" -> ConfigValueFactory.fromAnyRef(legacyTagKey)) val migrationConfigOverride: Map[String, ConfigValue] = Map( "jdbc-read-journal.max-buffer-size" -> ConfigValueFactory.fromAnyRef(maxBufferSize.toString), @@ -108,12 +106,12 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co // override this, so we can reset the value. def withRollingUpdateActorSystem(f: ActorSystem => Unit): Unit = { - val redundantConfig = redundantConfigOverride.foldLeft(ConfigFactory.load(config)) { + val legacyTagKeyConfig = legacyTagKeyConfigOverride.foldLeft(ConfigFactory.load(config)) { case (conf, (path, configValue)) => conf.withValue(path, configValue) } - implicit val system: ActorSystem = ActorSystem("migrator-test", redundantConfig) + implicit val system: ActorSystem = ActorSystem("migrator-test", legacyTagKeyConfig) f(system) system.terminate().futureValue } diff --git a/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala index b5b7853ed..59243358a 100644 --- a/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala +++ b/migrator/src/main/scala/akka/persistence/jdbc/migrator/JournalMigrator.scala @@ -141,11 +141,7 @@ final case class JournalMigrator(profile: JdbcProfile)(implicit system: ActorSys val tagInserts = newJournalQueries.TagTable ++= tags .map(tag => - TagRow( - Some(journalSerializedRow.ordering), - Some(journalSerializedRow.persistenceId), - Some(journalSerializedRow.sequenceNumber), - tag)) + TagRow(None, Some(journalSerializedRow.persistenceId), Some(journalSerializedRow.sequenceNumber), tag)) .toSeq journalInsert.flatMap(_ => tagInserts.asInstanceOf[DBIO[Unit]])