Skip to content

Commit

Permalink
chore: typo fix akka#710
Browse files Browse the repository at this point in the history
  • Loading branch information
Roiocam authored and AndyChen committed Oct 8, 2023
1 parent f32e511 commit 7cc97d2
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 30 deletions.
23 changes: 11 additions & 12 deletions core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ akka-persistence-jdbc {
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector
queueSize = 10000 // number of objects that can be queued by the async executor

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
Expand Down Expand Up @@ -146,9 +146,8 @@ jdbc-journal {
}

# For rolling updates the event_tag table migration.
# switch those to enabled new foregion key wirte and read.
redundant-write = false
redundant-read = false
# switch those to enable new region key write and read.
legacy-tag-key = true
}

# Otherwise it would be a pinned dispatcher, see https://github.com/akka/akka/issues/31058
Expand All @@ -167,7 +166,7 @@ jdbc-journal {

# The size of the buffer used when queueing up events for batch writing. This number must be bigger then the number
# of events that may be written concurrently. In other words this number must be bigger than the number of persistent
# actors that are actively peristing at the same time.
# actors that are actively persisting at the same time.
bufferSize = 1000
# The maximum size of the batches in which journal rows will be inserted
batchSize = 400
Expand Down Expand Up @@ -221,7 +220,7 @@ jdbc-journal {
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector
queueSize = 10000 // number of objects that can be queued by the async executor

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
Expand Down Expand Up @@ -347,7 +346,7 @@ jdbc-snapshot-store {
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector
queueSize = 10000 // number of objects that can be queued by the async executor

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
Expand Down Expand Up @@ -441,7 +440,7 @@ jdbc-read-journal {
journal-sequence-retrieval {
# The maximum number of ids that will be retrieved in each batch
batch-size = 10000
# In case a number in the sequence is missing, this is the ammount of retries that will be done to see
# In case a number in the sequence is missing, this is the amount of retries that will be done to see
# if the number is still found. Note that the time after which a number in the sequence is assumed missing is
# equal to maxTries * queryDelay
# (maxTries may not be zero)
Expand Down Expand Up @@ -501,7 +500,7 @@ jdbc-read-journal {
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector
queueSize = 10000 // number of objects that can be queued by the async executor

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
Expand Down Expand Up @@ -571,11 +570,11 @@ jdbc-durable-state-store {
}
}

# Settings for determining if gloabal_offset column in the durable-state are out of sequence.
# Settings for determining if global_offset column in the durable-state are out of sequence.
durable-state-sequence-retrieval {
# The maximum number of ids that will be retrieved in each batch
batch-size = 10000
# In case a number in the sequence is missing, this is the ammount of retries that will be done to see
# In case a number in the sequence is missing, this is the amount of retries that will be done to see
# if the number is still found. Note that the time after which a number in the sequence is assumed missing is
# equal to maxTries * queryDelay
# (maxTries may not be zero)
Expand Down Expand Up @@ -626,7 +625,7 @@ jdbc-durable-state-store {
# Slick will use an async executor with a fixed size queue of 10.000 objects
# The async executor is a connection pool for asynchronous execution of blocking I/O actions.
# This is used for the asynchronous query execution API on top of blocking back-ends like JDBC.
queueSize = 10000 // number of objects that can be queued by the async exector
queueSize = 10000 // number of objects that can be queued by the async executor

# This property controls the maximum number of milliseconds that a client (that's you) will wait for a connection
# from the pool. If this time is exceeded without a connection becoming available, a SQLException will be thrown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,7 @@ class EventJournalTableConfiguration(config: Config) {
}
class EventTagTableConfiguration(config: Config) {
private val cfg = config.getConfig("tables.event_tag")
val writeRedundant: Boolean = cfg.getBoolean("redundant-write")
val readRedundant: Boolean = cfg.getBoolean("redundant-read")
val legacyTagKey: Boolean = cfg.getBoolean("legacy-tag-key")
val tableName: String = cfg.getString("tableName")
val schemaName: Option[String] = cfg.asStringOption("schemaName")
val columnNames: EventTagTableColumnNames = new EventTagTableColumnNames(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class JournalQueries(

def writeJournalRows(xs: Seq[(JournalAkkaSerializationRow, Set[String])])(
implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = {
val sorted = xs.sortBy((event => event._1.sequenceNumber))
val sorted = xs.sortBy(event => event._1.sequenceNumber)
if (sorted.exists(_._2.nonEmpty)) {
// only if there are any tags
writeEventsAndTags(sorted)
Expand All @@ -39,7 +39,7 @@ class JournalQueries(
private def writeEventsAndTags(sorted: Seq[(JournalAkkaSerializationRow, Set[String])])(
implicit ec: ExecutionContext): DBIOAction[Any, NoStream, Effect.Write] = {
val (events, _) = sorted.unzip
if (tagTableCfg.writeRedundant) {
if (tagTableCfg.legacyTagKey) {
for {
ids <- insertAndReturn ++= events
tagInserts = ids.zip(sorted).flatMap { case (id, (e, tags)) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class ReadJournalQueries(val profile: JdbcProfile, val readJournalConfig: ReadJo
JournalTable.filter(_.deleted === false)

private def baseTableWithTagsQuery() = {
if (tagTableCfg.readRedundant) {
if (tagTableCfg.legacyTagKey) {
baseTableQuery().join(TagTable).on(_.ordering === _.eventId)
} else {
baseTableQuery()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.persistence.jdbc.query

import akka.actor.ActorSystem
import akka.pattern.ask
import akka.persistence.jdbc.query.EventsByTagMigrationTest.{ migrationConfigOverride, redundantConfigOverride }
import akka.persistence.jdbc.query.EventsByTagMigrationTest.{ legacyTagKeyConfigOverride, migrationConfigOverride }
import akka.persistence.query.{ EventEnvelope, NoOffset, Sequence }
import com.typesafe.config.{ ConfigFactory, ConfigValue, ConfigValueFactory }

Expand All @@ -16,14 +16,12 @@ import scala.concurrent.duration._
object EventsByTagMigrationTest {
val maxBufferSize = 20
val refreshInterval = 500.milliseconds
val redundantWrite = true
val redundantRead = true
val legacyTagKey = true

val redundantConfigOverride: Map[String, ConfigValue] = Map(
val legacyTagKeyConfigOverride: Map[String, ConfigValue] = Map(
"jdbc-read-journal.max-buffer-size" -> ConfigValueFactory.fromAnyRef(maxBufferSize.toString),
"jdbc-read-journal.refresh-interval" -> ConfigValueFactory.fromAnyRef(refreshInterval.toString),
"jdbc-journal.tables.event_tag.redundant-write" -> ConfigValueFactory.fromAnyRef(redundantWrite),
"jdbc-read-journal.tables.event_tag.redundant-read" -> ConfigValueFactory.fromAnyRef(redundantRead))
"jdbc-journal.tables.event_tag.legacy-tag-key" -> ConfigValueFactory.fromAnyRef(legacyTagKey))

val migrationConfigOverride: Map[String, ConfigValue] = Map(
"jdbc-read-journal.max-buffer-size" -> ConfigValueFactory.fromAnyRef(maxBufferSize.toString),
Expand Down Expand Up @@ -108,12 +106,12 @@ abstract class EventsByTagMigrationTest(config: String) extends QueryTestSpec(co

// override this, so we can reset the value.
def withRollingUpdateActorSystem(f: ActorSystem => Unit): Unit = {
val redundantConfig = redundantConfigOverride.foldLeft(ConfigFactory.load(config)) {
val legacyTagKeyConfig = legacyTagKeyConfigOverride.foldLeft(ConfigFactory.load(config)) {
case (conf, (path, configValue)) =>
conf.withValue(path, configValue)
}

implicit val system: ActorSystem = ActorSystem("migrator-test", redundantConfig)
implicit val system: ActorSystem = ActorSystem("migrator-test", legacyTagKeyConfig)
f(system)
system.terminate().futureValue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,7 @@ final case class JournalMigrator(profile: JdbcProfile)(implicit system: ActorSys
val tagInserts =
newJournalQueries.TagTable ++= tags
.map(tag =>
TagRow(
Some(journalSerializedRow.ordering),
Some(journalSerializedRow.persistenceId),
Some(journalSerializedRow.sequenceNumber),
tag))
TagRow(None, Some(journalSerializedRow.persistenceId), Some(journalSerializedRow.sequenceNumber), tag))
.toSeq

journalInsert.flatMap(_ => tagInserts.asInstanceOf[DBIO[Unit]])
Expand Down

0 comments on commit 7cc97d2

Please sign in to comment.