diff --git a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala index f84e97c2..baa92094 100644 --- a/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala +++ b/core/src/test/scala/akka/persistence/jdbc/query/CurrentEventsByTagTest.scala @@ -16,6 +16,8 @@ import akka.persistence.jdbc.query.EventAdapterTest.{ Event, TaggedAsyncEvent } import scala.concurrent.Future import CurrentEventsByTagTest._ +import java.util.concurrent.CountDownLatch + object CurrentEventsByTagTest { val maxBufferSize = 20 val refreshInterval = 500.milliseconds @@ -168,11 +170,16 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf it should "complete without any gaps in case events are being persisted when the query is executed" in withActorSystem { implicit system => + val latch = new CountDownLatch(1) val journalOps = new JavaDslJdbcReadJournalOperations(system) import system.dispatcher withTestActors(replyToMessages = true) { (actor1, actor2, actor3) => def sendMessagesWithTag(tag: String, numberOfMessagesPerActor: Int): Future[Done] = { val futures = for (actor <- Seq(actor1, actor2, actor3); i <- 1 to numberOfMessagesPerActor) yield { + // block the execution + if (i == numberOfMessagesPerActor / 2) { + latch.await() + } actor ? TaggedAsyncEvent(Event(i.toString), tag) } Future.sequence(futures).map(_ => Done) @@ -182,7 +189,7 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf // send a batch of 3 * 200 val batch1 = sendMessagesWithTag(tag, 200) // Try to persist a large batch of events per actor. Some of these may be returned, but not all! - val batch2 = sendMessagesWithTag(tag, 3000) + val batch2 = sendMessagesWithTag(tag, 2000) // wait for acknowledgement of the first batch only batch1.futureValue @@ -191,6 +198,8 @@ abstract class CurrentEventsByTagTest(config: String) extends QueryTestSpec(conf // start the query before the last batch completes journalOps.withCurrentEventsByTag()(tag, NoOffset) { tp => + // when query begin running, unlock the barrier. + latch.countDown() // The stream must complete within the given amount of time // This make take a while in case the journal sequence actor detects gaps val allEvents = tp.toStrict(atMost = 20.seconds)