From 7bf9a32279e269d8e9430aa7c6ea2f41166981c9 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Wed, 1 Nov 2023 23:51:11 +0800 Subject: [PATCH 01/11] fix: reducing the batch of persistent events --- .../akka/persistence/jdbc/query/CurrentEventsByTagTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index c8337c3a..828a10a8 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -182,7 +182,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf // send a batch of 3 * 200 val batch1 = sendMessagesWithTag(tag, 200) // Try to persist a large batch of events per actor. Some of these may be returned, but not all! - val batch2 = sendMessagesWithTag(tag, 5000) + val batch2 = sendMessagesWithTag(tag, 3000) // wait for acknowledgement of the first batch only batch1.futureValue From 056614fddd5be6acf820e5d9975bd37d805ba207 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Thu, 2 Nov 2023 00:15:14 +0800 Subject: [PATCH 02/11] fix: use barrier to block the persisting --- .../jdbc/query/CurrentEventsByTagTest.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index 828a10a8..898625a9 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -16,6 +16,8 @@ import akka.persistence.jdbc.query.EventAdapterTest.{ Event, TaggedAsyncEvent } import scala.concurrent.Future import CurrentEventsByTagTest._ +import java.util.concurrent.CountDownLatch + object CurrentEventsByTagTest { val maxBufferSize = 20 val refreshInterval = 500.milliseconds @@ -168,11 +170,16 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf it should "complete without any gaps in case events are being persisted when the query is executed" in withActorSystem { implicit system => + val latch = new CountDownLatch(1) val journalOps = new JavaDslJdbcReadJournalOperations(system) import system.dispatcher withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => def sendMessagesWithTag(tag: String, numberOfMessagesPerActor: Int): Future[Done] = { val futures = for (actor <- Seq(actor1, actor2, actor3); i <- 1 to numberOfMessagesPerActor) yield { + // block the execution + if (i == numberOfMessagesPerActor / 2) { + latch.await() + } actor ? TaggedAsyncEvent(Event(i.toString), tag) } Future.sequence(futures).map(_ => Done) @@ -182,7 +189,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf // send a batch of 3 * 200 val batch1 = sendMessagesWithTag(tag, 200) // Try to persist a large batch of events per actor. Some of these may be returned, but not all! - val batch2 = sendMessagesWithTag(tag, 3000) + val batch2 = sendMessagesWithTag(tag, 2000) // wait for acknowledgement of the first batch only batch1.futureValue @@ -191,6 +198,8 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf // start the query before the last batch completes journalOps.withCurrentEventsByTag()(tag, NoOffset) { tp => + // when query begin running, unlock the barrier. + latch.countDown() // The stream must complete within the given amount of time // This make take a while in case the journal sequence actor detects gaps val allEvents = tp.toStrict(atMost = 40.seconds) From 5ca94175dd1303d40ce449480d5bcf6c3211eca6 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Thu, 2 Nov 2023 00:39:22 +0800 Subject: [PATCH 03/11] fix: use barrier to block the persisting --- .../persistence/jdbc/query/CurrentEventsByTagTest.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index 898625a9..60f80e09 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -178,9 +178,13 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf val futures = for (actor <- Seq(actor1, actor2, actor3); i <- 1 to numberOfMessagesPerActor) yield { // block the execution if (i == numberOfMessagesPerActor / 2) { - latch.await() + Future { + latch.await() + actor ? TaggedAsyncEvent(Event(i.toString), tag) + } + } else { + actor ? TaggedAsyncEvent(Event(i.toString), tag) } - actor ? TaggedAsyncEvent(Event(i.toString), tag) } Future.sequence(futures).map(_ => Done) } From 3d6b8abb9870f86d0e3102bd3291496861058feb Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Thu, 2 Nov 2023 00:51:20 +0800 Subject: [PATCH 04/11] fix: only block large batch case --- .../akka/persistence/jdbc/query/CurrentEventsByTagTest.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index 60f80e09..492dac52 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -171,13 +171,14 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf implicit system => val latch = new CountDownLatch(1) + val largeNumberOfMessage = 2000 val journalOps = new JavaDslJdbcReadJournalOperations(system) import system.dispatcher withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => def sendMessagesWithTag(tag: String, numberOfMessagesPerActor: Int): Future[Done] = { val futures = for (actor <- Seq(actor1, actor2, actor3); i <- 1 to numberOfMessagesPerActor) yield { // block the execution - if (i == numberOfMessagesPerActor / 2) { + if (i == largeNumberOfMessage) { Future { latch.await() actor ? TaggedAsyncEvent(Event(i.toString), tag) @@ -193,7 +194,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf // send a batch of 3 * 200 val batch1 = sendMessagesWithTag(tag, 200) // Try to persist a large batch of events per actor. Some of these may be returned, but not all! - val batch2 = sendMessagesWithTag(tag, 2000) + val batch2 = sendMessagesWithTag(tag, largeNumberOfMessage) // wait for acknowledgement of the first batch only batch1.futureValue From ff812be9e27a77659f333975c271fdc86568ae23 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Thu, 2 Nov 2023 09:16:25 +0800 Subject: [PATCH 05/11] fix: avoid persistence run fast --- .../jdbc/query/CurrentEventsByTagTest.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index 492dac52..0f63c158 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -172,18 +172,16 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf val latch = new CountDownLatch(1) val largeNumberOfMessage = 2000 + val smallNumberOfMessage = 200 + val diff = largeNumberOfMessage - smallNumberOfMessage val journalOps = new JavaDslJdbcReadJournalOperations(system) import system.dispatcher withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => def sendMessagesWithTag(tag: String, numberOfMessagesPerActor: Int): Future[Done] = { val futures = for (actor <- Seq(actor1, actor2, actor3); i <- 1 to numberOfMessagesPerActor) yield { - // block the execution - if (i == largeNumberOfMessage) { - Future { - latch.await() - actor ? TaggedAsyncEvent(Event(i.toString), tag) - } - } else { + Future { + // block the remaining small batch events from being fired + if (i == diff) latch.await() actor ? TaggedAsyncEvent(Event(i.toString), tag) } } @@ -192,7 +190,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf val tag = "someTag" // send a batch of 3 * 200 - val batch1 = sendMessagesWithTag(tag, 200) + val batch1 = sendMessagesWithTag(tag, smallNumberOfMessage) // Try to persist a large batch of events per actor. Some of these may be returned, but not all! val batch2 = sendMessagesWithTag(tag, largeNumberOfMessage) From 75368d5d2569dc855eb69318fa1597792354e65a Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Thu, 2 Nov 2023 09:21:55 +0800 Subject: [PATCH 06/11] fix: replace future --- .../jdbc/query/CurrentEventsByTagTest.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index 0f63c158..f5d03ded 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -179,9 +179,13 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => def sendMessagesWithTag(tag: String, numberOfMessagesPerActor: Int): Future[Done] = { val futures = for (actor <- Seq(actor1, actor2, actor3); i <- 1 to numberOfMessagesPerActor) yield { - Future { - // block the remaining small batch events from being fired - if (i == diff) latch.await() + // block the remaining small batch events from being fired + if (i == diff) { + Future { + latch.await() + actor ? TaggedAsyncEvent(Event(i.toString), tag) + } + } else { actor ? TaggedAsyncEvent(Event(i.toString), tag) } } From 258582e93b3d01eb8fbae30b8a31e3c034f65348 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Thu, 2 Nov 2023 10:39:36 +0800 Subject: [PATCH 07/11] fix: giving more gap of event --- .../akka/persistence/jdbc/query/CurrentEventsByTagTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index f5d03ded..671a59c9 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -173,7 +173,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf val latch = new CountDownLatch(1) val largeNumberOfMessage = 2000 val smallNumberOfMessage = 200 - val diff = largeNumberOfMessage - smallNumberOfMessage + val diff = largeNumberOfMessage - (smallNumberOfMessage * 3) val journalOps = new JavaDslJdbcReadJournalOperations(system) import system.dispatcher withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => From c541a7942277a93154d96d1a6e5015b7ed063215 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 10 Nov 2023 09:27:17 +0800 Subject: [PATCH 08/11] fix: delay persist --- .../akka/persistence/jdbc/query/CurrentEventsByTagTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index 671a59c9..ef10dab1 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -173,14 +173,13 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf val latch = new CountDownLatch(1) val largeNumberOfMessage = 2000 val smallNumberOfMessage = 200 - val diff = largeNumberOfMessage - (smallNumberOfMessage * 3) val journalOps = new JavaDslJdbcReadJournalOperations(system) import system.dispatcher withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => def sendMessagesWithTag(tag: String, numberOfMessagesPerActor: Int): Future[Done] = { val futures = for (actor <- Seq(actor1, actor2, actor3); i <- 1 to numberOfMessagesPerActor) yield { // block the remaining small batch events from being fired - if (i == diff) { + if (i > largeNumberOfMessage / 2) { Future { latch.await() actor ? TaggedAsyncEvent(Event(i.toString), tag) From 033f81dd58ef114f3afc19afb7ac7feedc92f17d Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 10 Nov 2023 09:38:27 +0800 Subject: [PATCH 09/11] fix: avoid ask timeout --- .../akka/persistence/jdbc/query/CurrentEventsByTagTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index ef10dab1..c911f6e6 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -182,7 +182,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf if (i > largeNumberOfMessage / 2) { Future { latch.await() - actor ? TaggedAsyncEvent(Event(i.toString), tag) + (actor ? TaggedAsyncEvent(Event(i.toString), tag)) (20.seconds) } } else { actor ? TaggedAsyncEvent(Event(i.toString), tag) From 14e255845132d469c49f61ad362152ffca767b1b Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 10 Nov 2023 10:28:58 +0800 Subject: [PATCH 10/11] fix: just reorder --- .../jdbc/query/CurrentEventsByTagTest.scala | 21 ++++--------------- 1 file changed, 4 insertions(+), 17 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index c911f6e6..81c86b61 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -170,42 +170,29 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf it should "complete without any gaps in case events are being persisted when the query is executed" in withActorSystem { implicit system => - val latch = new CountDownLatch(1) - val largeNumberOfMessage = 2000 - val smallNumberOfMessage = 200 val journalOps = new JavaDslJdbcReadJournalOperations(system) import system.dispatcher withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => def sendMessagesWithTag(tag: String, numberOfMessagesPerActor: Int): Future[Done] = { val futures = for (actor <- Seq(actor1, actor2, actor3); i <- 1 to numberOfMessagesPerActor) yield { - // block the remaining small batch events from being fired - if (i > largeNumberOfMessage / 2) { - Future { - latch.await() - (actor ? TaggedAsyncEvent(Event(i.toString), tag)) (20.seconds) - } - } else { - actor ? TaggedAsyncEvent(Event(i.toString), tag) - } + actor ? TaggedAsyncEvent(Event(i.toString), tag) } Future.sequence(futures).map(_ => Done) } val tag = "someTag" // send a batch of 3 * 200 - val batch1 = sendMessagesWithTag(tag, smallNumberOfMessage) - // Try to persist a large batch of events per actor. Some of these may be returned, but not all! - val batch2 = sendMessagesWithTag(tag, largeNumberOfMessage) + val batch1 = sendMessagesWithTag(tag, 200) // wait for acknowledgement of the first batch only batch1.futureValue // Sanity check, all events in the first batch must be in the journal journalOps.countJournal.futureValue should be >= 600L + // Try to persist a large batch of events per actor. Some of these may be returned, but not all! + val batch2 = sendMessagesWithTag(tag, 200) // start the query before the last batch completes journalOps.withCurrentEventsByTag()(tag, NoOffset) { tp => - // when query begin running, unlock the barrier. - latch.countDown() // The stream must complete within the given amount of time // This make take a while in case the journal sequence actor detects gaps val allEvents = tp.toStrict(atMost = 40.seconds) From 9c8b5a55a2580bb0acbfccc510b4f581ffb2de96 Mon Sep 17 00:00:00 2001 From: JingZhang Chen Date: Fri, 10 Nov 2023 11:05:53 +0800 Subject: [PATCH 11/11] fix: Rollback the total number of persistent events. --- .../akka/persistence/jdbc/query/CurrentEventsByTagTest.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index 81c86b61..95382867 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -16,8 +16,6 @@ import akka.persistence.jdbc.query.EventAdapterTest.{ Event, TaggedAsyncEvent } import scala.concurrent.Future import CurrentEventsByTagTest._ -import java.util.concurrent.CountDownLatch - object CurrentEventsByTagTest { val maxBufferSize = 20 val refreshInterval = 500.milliseconds @@ -190,7 +188,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf journalOps.countJournal.futureValue should be >= 600L // Try to persist a large batch of events per actor. Some of these may be returned, but not all! - val batch2 = sendMessagesWithTag(tag, 200) + val batch2 = sendMessagesWithTag(tag, 5000) // start the query before the last batch completes journalOps.withCurrentEventsByTag()(tag, NoOffset) { tp => // The stream must complete within the given amount of time