Skip to content

Commit

Permalink
formatting
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Mar 21, 2024
1 parent 62aeff3 commit 75212cf
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ class JdbcAsyncWriteJournal(config: Config) extends AsyncWriteJournal {
override def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = {
def fetchHighestSeqNr() = journalDao.highestSequenceNr(persistenceId, fromSequenceNr)
writeInProgress.get(persistenceId) match {
case null => fetchHighestSeqNr()
case f: Future[Any @unchecked] =>
case null => fetchHighestSeqNr()
case f: Future[Any @unchecked] =>
// we must fetch the highest sequence number after the previous write has completed
// If the previous write failed then we can ignore this
f.recover { case _ => () }.flatMap(_ => fetchHighestSeqNr())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@

package akka.persistence.jdbc.journal.dao.legacy

import akka.persistence.jdbc.config.{BaseDaoConfig, JournalConfig}
import akka.persistence.jdbc.journal.dao.{BaseDao, BaseJournalDaoWithReadMessages, H2Compat, JournalDaoWithUpdates}
import akka.persistence.jdbc.config.{ BaseDaoConfig, JournalConfig }
import akka.persistence.jdbc.journal.dao.{ BaseDao, BaseJournalDaoWithReadMessages, H2Compat, JournalDaoWithUpdates }
import akka.persistence.jdbc.serialization.FlowPersistentReprSerializer
import akka.persistence.{AtomicWrite, PersistentRepr}
import akka.persistence.{ AtomicWrite, PersistentRepr }
import akka.serialization.Serialization
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import akka.{Done, NotUsed}
import akka.{ Done, NotUsed }
import org.slf4j.LoggerFactory
import slick.jdbc.JdbcBackend.Database
import slick.jdbc.JdbcProfile

import scala.annotation.nowarn
import scala.collection.immutable.{Nil, Seq}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import scala.collection.immutable.{ Nil, Seq }
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }

class ByteArrayJournalDao(
val db: Database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,8 @@ trait JournalTables {
_tableTag,
_schemaName = journalTableCfg.schemaName,
_tableName = journalTableCfg.tableName) {
def * = (
ordering,
deleted,
persistenceId,
sequenceNumber,
message,
tags).<>((JournalRow.apply _).tupled, JournalRow.unapply)
def * = (ordering, deleted, persistenceId, sequenceNumber, message, tags)
.<>((JournalRow.apply _).tupled, JournalRow.unapply)

val ordering: Rep[Long] = column[Long](journalTableCfg.columnNames.ordering, O.AutoInc)
val persistenceId: Rep[String] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ package object legacy {

package legacy {
final case class JournalRow(
ordering: Long,
deleted: Boolean,
persistenceId: String,
sequenceNumber: Long,
message: Array[Byte],
tags: Option[String] = None)
}
ordering: Long,
deleted: Boolean,
persistenceId: String,
sequenceNumber: Long,
message: Array[Byte],
tags: Option[String] = None)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ package akka.persistence.jdbc.query.dao.legacy
import akka.NotUsed
import akka.persistence.PersistentRepr
import akka.persistence.jdbc.config.ReadJournalConfig
import akka.persistence.jdbc.journal.dao.{BaseJournalDaoWithReadMessages, H2Compat}
import akka.persistence.jdbc.journal.dao.legacy.{ByteArrayJournalSerializer, JournalRow}
import akka.persistence.jdbc.journal.dao.{ BaseJournalDaoWithReadMessages, H2Compat }
import akka.persistence.jdbc.journal.dao.legacy.{ ByteArrayJournalSerializer, JournalRow }
import akka.persistence.jdbc.query.dao.ReadJournalDao
import akka.persistence.jdbc.query.dao.legacy.TagFilterFlow.perfectlyMatchTag
import akka.persistence.jdbc.serialization.FlowPersistentReprSerializer
import akka.serialization.Serialization
import akka.stream.Materializer
import akka.stream.scaladsl.{Flow, Source}
import akka.stream.scaladsl.{ Flow, Source }
import slick.jdbc.JdbcBackend.*
import slick.jdbc.{GetResult, JdbcProfile}
import slick.jdbc.{ GetResult, JdbcProfile }

import scala.annotation.nowarn
import scala.collection.immutable.*
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }

trait BaseByteArrayReadJournalDao extends ReadJournalDao with BaseJournalDaoWithReadMessages with H2Compat {
def db: Database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import scala.compat.java8.FutureConverters.*
import scala.concurrent.ExecutionContext
import akka.annotation.ApiMayChange
import slick.jdbc.JdbcProfile
import akka.{Done, NotUsed}
import akka.persistence.state.javadsl.{DurableStateUpdateStore, GetObjectResult}
import akka.{ Done, NotUsed }
import akka.persistence.state.javadsl.{ DurableStateUpdateStore, GetObjectResult }
import akka.persistence.jdbc.state.DurableStateQueries
import akka.persistence.jdbc.config.DurableStateTableConfiguration
import akka.persistence.jdbc.state.scaladsl.JdbcDurableStateStore as ScalaJdbcDurableStateStore
import akka.persistence.query.{DurableStateChange, Offset}
import akka.persistence.jdbc.state.scaladsl.{ JdbcDurableStateStore => ScalaJdbcDurableStateStore }
import akka.persistence.query.{ DurableStateChange, Offset }
import akka.persistence.query.javadsl.DurableStateStoreQuery
import akka.stream.javadsl.Source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.persistence.jdbc.util

import scala.annotation.nowarn
import scala.collection.immutable.*
import scala.util.{Failure, Success, Try}
import scala.util.{ Failure, Success, Try }

object TrySeq {
def sequence[A](seq: Seq[Try[A]]): Try[Seq[A]] = {
Expand Down
33 changes: 18 additions & 15 deletions core/src/test/scala/akka/persistence/jdbc/query/QueryTestSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,25 @@

package akka.persistence.jdbc.query

import akka.actor.{ActorRef, ActorSystem, Props, Stash, Status}
import akka.actor.{ ActorRef, ActorSystem, Props, Stash, Status }
import akka.pattern.ask
import akka.event.LoggingReceive
import akka.persistence.{DeleteMessagesFailure, DeleteMessagesSuccess, PersistentActor}
import akka.persistence.{ DeleteMessagesFailure, DeleteMessagesSuccess, PersistentActor }
import akka.persistence.jdbc.SingleActorSystemPerTestSpec
import akka.persistence.jdbc.query.EventAdapterTest.{Event, TaggedAsyncEvent, TaggedEvent}
import akka.persistence.jdbc.query.javadsl.JdbcReadJournal as JavaJdbcReadJournal
import akka.persistence.jdbc.query.EventAdapterTest.{ Event, TaggedAsyncEvent, TaggedEvent }
import akka.persistence.jdbc.query.javadsl.{ JdbcReadJournal => JavaJdbcReadJournal }
import akka.persistence.jdbc.query.scaladsl.JdbcReadJournal
import akka.persistence.journal.Tagged
import akka.persistence.query.{EventEnvelope, Offset, PersistenceQuery}
import akka.persistence.query.{ EventEnvelope, Offset, PersistenceQuery }
import akka.stream.scaladsl.Sink
import akka.stream.testkit.TestSubscriber
import akka.stream.testkit.javadsl.TestSink as JavaSink
import akka.stream.testkit.javadsl.{ TestSink => JavaSink }
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{Materializer, SystemMaterializer}
import akka.stream.{ Materializer, SystemMaterializer }
import com.typesafe.config.ConfigValue

import scala.concurrent.Future
import scala.concurrent.duration.{FiniteDuration, *}
import scala.concurrent.duration.{ *, FiniteDuration }
import akka.persistence.jdbc.testkit.internal.H2
import akka.persistence.jdbc.testkit.internal.MySQL
import akka.persistence.jdbc.testkit.internal.Oracle
Expand Down Expand Up @@ -81,9 +81,8 @@ class ScalaJdbcReadJournalOperations(readJournal: JdbcReadJournal)(implicit syst
def withEventsByPersistenceId(
within: FiniteDuration)(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long)(
f: TestSubscriber.Probe[EventEnvelope] => Unit): Unit = {
val tp = readJournal
.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr)
.runWith(TestSink[EventEnvelope]())
val tp =
readJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).runWith(TestSink[EventEnvelope]())
tp.within(within)(f(tp))
}

Expand Down Expand Up @@ -137,29 +136,33 @@ class JavaDslJdbcReadJournalOperations(readJournal: javadsl.JdbcReadJournal)(
def withCurrentEventsByPersistenceId(
within: FiniteDuration)(persistenceId: String, fromSequenceNr: Long = 0, toSequenceNr: Long = Long.MaxValue)(
f: TestSubscriber.Probe[EventEnvelope] => Unit): Unit = {
val sink: akka.stream.javadsl.Sink[EventEnvelope, TestSubscriber.Probe[EventEnvelope]] = JavaSink.create[EventEnvelope](system)
val sink: akka.stream.javadsl.Sink[EventEnvelope, TestSubscriber.Probe[EventEnvelope]] =
JavaSink.create[EventEnvelope](system)
val tp = readJournal.currentEventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).runWith(sink, mat)
tp.within(within)(f(tp))
}

def withEventsByPersistenceId(
within: FiniteDuration)(persistenceId: String, fromSequenceNr: Long, toSequenceNr: Long)(
f: TestSubscriber.Probe[EventEnvelope] => Unit): Unit = {
val sink: akka.stream.javadsl.Sink[EventEnvelope, TestSubscriber.Probe[EventEnvelope]] = JavaSink.create[EventEnvelope](system)
val sink: akka.stream.javadsl.Sink[EventEnvelope, TestSubscriber.Probe[EventEnvelope]] =
JavaSink.create[EventEnvelope](system)
val tp = readJournal.eventsByPersistenceId(persistenceId, fromSequenceNr, toSequenceNr).runWith(sink, mat)
tp.within(within)(f(tp))
}

def withCurrentEventsByTag(within: FiniteDuration)(tag: String, offset: Offset)(
f: TestSubscriber.Probe[EventEnvelope] => Unit): Unit = {
val sink: akka.stream.javadsl.Sink[EventEnvelope, TestSubscriber.Probe[EventEnvelope]] = JavaSink.create[EventEnvelope](system)
val sink: akka.stream.javadsl.Sink[EventEnvelope, TestSubscriber.Probe[EventEnvelope]] =
JavaSink.create[EventEnvelope](system)
val tp = readJournal.currentEventsByTag(tag, offset).runWith(sink, mat)
tp.within(within)(f(tp))
}

def withEventsByTag(within: FiniteDuration)(tag: String, offset: Offset)(
f: TestSubscriber.Probe[EventEnvelope] => Unit): Unit = {
val sink: akka.stream.javadsl.Sink[EventEnvelope, TestSubscriber.Probe[EventEnvelope]] = JavaSink.create[EventEnvelope](system)
val sink: akka.stream.javadsl.Sink[EventEnvelope, TestSubscriber.Probe[EventEnvelope]] =
JavaSink.create[EventEnvelope](system)
val tp = readJournal.eventsByTag(tag, offset).runWith(sink, mat)
tp.within(within)(f(tp))
}
Expand Down

0 comments on commit 75212cf

Please sign in to comment.