From 87a2e5f9d82447efc46342fc2f63b24b16c30782 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Thu, 2 Nov 2023 09:16:25 +0800 Subject: [PATCH] fix: avoid persistence run fast --- .../jdbc/query/CurrentEventsByTagTest.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index f6f14b66..dc11862b 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -172,18 +172,16 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf val latch = new CountDownLatch(1) val largeNumberOfMessage = 2000 + val smallNumberOfMessage = 200 + val diff = largeNumberOfMessage - smallNumberOfMessage val journalOps = new JavaDslJdbcReadJournalOperations(system) import system.dispatcher withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => def sendMessagesWithTag(tag: String, numberOfMessagesPerActor: Int): Future[Done] = { val futures = for (actor <- Seq(actor1, actor2, actor3); i <- 1 to numberOfMessagesPerActor) yield { - // block the execution - if (i == largeNumberOfMessage) { - Future { - latch.await() - actor ? TaggedAsyncEvent(Event(i.toString), tag) - } - } else { + Future { + // block the remaining small batch events from being fired + if (i == diff) latch.await() actor ? TaggedAsyncEvent(Event(i.toString), tag) } } @@ -192,7 +190,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf val tag = "someTag" // send a batch of 3 * 200 - val batch1 = sendMessagesWithTag(tag, 200) + val batch1 = sendMessagesWithTag(tag, smallNumberOfMessage) // Try to persist a large batch of events per actor. Some of these may be returned, but not all! val batch2 = sendMessagesWithTag(tag, largeNumberOfMessage)