Skip to content

Commit

Permalink
Migrate from Akka to Pekko (#2678)
Browse files Browse the repository at this point in the history
  • Loading branch information
StepanBrychta committed Jul 16, 2024
1 parent b14a046 commit 83e5918
Show file tree
Hide file tree
Showing 103 changed files with 397 additions and 397 deletions.
2 changes: 1 addition & 1 deletion bag_register/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ aws.metrics.namespace=${?metrics_namespace}
bags.tracker.host=${?bags_tracker_host}
operation.name=${?operation_name}

akka {
pekko {
http.server.request-timeout=60s
# The maximum serialised size of a bag is ~200000000
# At time of writing this digitised/b19974760 (Chemist & Druggist)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package weco.storage_service.bag_register

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import com.typesafe.config.Config
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import weco.json.JsonUtil._
import weco.messaging.typesafe.{AlpakkaSqsWorkerConfigBuilder, SNSBuilder}
import weco.messaging.typesafe.{PekkoSQSWorkerConfigBuilder, SNSBuilder}
import weco.monitoring.cloudwatch.CloudWatchMetrics
import weco.monitoring.typesafe.CloudWatchBuilder
import weco.storage_service.bag_register.services.{
BagRegisterWorker,
Register,
S3StorageManifestService
}
import weco.storage_service.bag_tracker.client.AkkaBagTrackerClient
import weco.storage_service.bag_tracker.client.PekkoBagTrackerClient
import weco.storage_service.bagit.services.s3.S3BagReader
import weco.storage_service.config.builders.{
IngestUpdaterBuilder,
Expand Down Expand Up @@ -48,7 +48,7 @@ object Main extends WellcomeTypesafeApp {

val register = new Register(
bagReader = new S3BagReader(),
bagTrackerClient = new AkkaBagTrackerClient(
bagTrackerClient = new PekkoBagTrackerClient(
trackerHost = config.requireString("bags.tracker.host")
),
storageManifestService = storageManifestService
Expand All @@ -61,7 +61,7 @@ object Main extends WellcomeTypesafeApp {
)

new BagRegisterWorker(
config = AlpakkaSqsWorkerConfigBuilder.build(config),
config = PekkoSQSWorkerConfigBuilder.build(config),
ingestUpdater = ingestUpdater,
registrationNotifications = registrationNotifications,
register = register
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package weco.storage_service.bag_register.services

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import io.circe.Decoder
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import weco.json.JsonUtil._
import weco.messaging.MessageSender
import weco.messaging.sqsworker.alpakka.AlpakkaSQSWorkerConfig
import weco.messaging.sqsworker.pekko.PekkoSQSWorkerConfig
import weco.messaging.worker.models.Result
import weco.monitoring.Metrics
import weco.storage_service.bag_register.models.RegistrationSummary
Expand All @@ -22,7 +22,7 @@ import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

class BagRegisterWorker[IngestDestination, NotificationDestination](
val config: AlpakkaSQSWorkerConfig,
val config: PekkoSQSWorkerConfig,
ingestUpdater: IngestUpdater[IngestDestination],
registrationNotifications: MessageSender[NotificationDestination],
register: Register
Expand Down Expand Up @@ -93,7 +93,7 @@ class BagRegisterWorker[IngestDestination, NotificationDestination](

// The IngestStepWorker trait expects a processMessage() method, which returns
// a Try[…]. That method then gets called to provide the process() method,
// which is in turn used by the AlpakkaSQSWorker.
// which is in turn used by the PekkoSQSWorker.
//
// Because the bag tracker client returns a Future[…] rather than a Try[…],
// we bypass this method and define our own process(). We still have to define
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package weco.storage_service.bag_register.fixtures

import org.scalatest.Assertion
import weco.akka.fixtures.Akka
import weco.pekko.fixtures.Pekko
import weco.fixtures.TestWith
import weco.json.JsonUtil._
import weco.messaging.fixtures.SQS.{Queue, QueuePair}
import weco.messaging.fixtures.worker.AlpakkaSQSWorkerFixtures
import weco.messaging.fixtures.worker.PekkoSQSWorkerFixtures
import weco.messaging.memory.MemoryMessageSender
import weco.monitoring.memory.MemoryMetrics
import weco.storage_service.bag_register.services.{
Expand All @@ -30,8 +30,8 @@ import scala.concurrent.ExecutionContext.Implicits.global

trait BagRegisterFixtures
extends StorageRandomGenerators
with Akka
with AlpakkaSQSWorkerFixtures
with Pekko
with PekkoSQSWorkerFixtures
with OperationFixtures
with StorageManifestDaoFixture
with IngestUpdateAssertions
Expand Down Expand Up @@ -70,7 +70,7 @@ trait BagRegisterFixtures
)

val service = new BagRegisterWorker(
config = createAlpakkaSQSWorkerConfig(queue),
config = createPekkoSQSWorkerConfig(queue),
ingestUpdater =
createIngestUpdaterWith(ingests, stepName = "register"),
registrationNotifications = registrationNotifications,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package weco.storage_service.bag_register.services

import akka.http.scaladsl.model.Uri
import org.apache.pekko.http.scaladsl.model.Uri
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
import org.scalatest.funspec.AnyFunSpec
import org.scalatest.matchers.should.Matchers
import weco.storage_service.bag_register.fixtures.BagRegisterFixtures
import weco.storage_service.bag_register.models.RegistrationSummary
import weco.storage_service.bag_tracker.client.AkkaBagTrackerClient
import weco.storage_service.bag_tracker.client.PekkoBagTrackerClient
import weco.storage_service.bag_tracker.fixtures.BagTrackerFixtures
import weco.storage_service.bagit.models.BagId
import weco.storage_service.bagit.services.s3.S3BagReader
Expand Down Expand Up @@ -111,7 +111,7 @@ class RegisterTest
val ingestId = createIngestID

withActorSystem { implicit actorSystem =>
val bagTrackerClient = new AkkaBagTrackerClient(
val bagTrackerClient = new PekkoBagTrackerClient(
trackerHost = Uri("http://localhost:9000/doesnotexist")
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package weco.storage_service.bag_replicator

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import com.azure.storage.blob.{BlobServiceClient, BlobServiceClientBuilder}
import com.typesafe.config.Config
import software.amazon.awssdk.services.s3.{S3AsyncClient, S3Client}
Expand All @@ -9,7 +9,7 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.transfer.s3.S3TransferManager
import weco.json.JsonUtil._
import weco.messaging.sns.SNSConfig
import weco.messaging.typesafe.AlpakkaSqsWorkerConfigBuilder
import weco.messaging.typesafe.PekkoSQSWorkerConfigBuilder
import weco.monitoring.cloudwatch.CloudWatchMetrics
import weco.monitoring.typesafe.CloudWatchBuilder
import weco.storage.providers.azure.AzureBlobLocationPrefix
Expand Down Expand Up @@ -102,7 +102,7 @@ object Main extends WellcomeTypesafeApp {
DstPrefix
] =
new BagReplicatorWorker(
config = AlpakkaSqsWorkerConfigBuilder.build(config),
config = PekkoSQSWorkerConfigBuilder.build(config),
ingestUpdater = IngestUpdaterBuilder.build(config, operationName),
outgoingPublisher =
OutgoingPublisherBuilder.build(config, operationName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package weco.storage_service.bag_replicator.services

import java.time.Instant
import java.util.UUID
import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import cats.instances.try_._
import io.circe.Decoder
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import weco.messaging.sqsworker.alpakka.AlpakkaSQSWorkerConfig
import weco.messaging.sqsworker.pekko.PekkoSQSWorkerConfig
import weco.monitoring.Metrics
import weco.storage_service.bag_replicator.config.ReplicatorDestinationConfig
import weco.storage_service.bag_replicator.replicator.Replicator
Expand All @@ -30,7 +30,7 @@ class BagReplicatorWorker[
DstLocation <: Location,
DstPrefix <: Prefix[DstLocation]
](
val config: AlpakkaSQSWorkerConfig,
val config: PekkoSQSWorkerConfig,
ingestUpdater: IngestUpdater[IngestDestination],
outgoingPublisher: OutgoingPublisher[OutgoingDestination],
lockingService: LockingService[IngestStepResult[
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package weco.storage_service.bag_replicator.fixtures

import java.util.UUID
import org.scalatest.Assertion
import weco.akka.fixtures.Akka
import weco.pekko.fixtures.Pekko
import weco.fixtures.TestWith
import weco.json.JsonUtil._
import weco.messaging.fixtures.SQS.Queue
import weco.messaging.fixtures.worker.AlpakkaSQSWorkerFixtures
import weco.messaging.fixtures.worker.PekkoSQSWorkerFixtures
import weco.messaging.memory.MemoryMessageSender
import weco.monitoring.memory.MemoryMetrics
import weco.storage_service.bag_replicator.config.ReplicatorDestinationConfig
Expand All @@ -31,9 +31,9 @@ import scala.concurrent.duration._
import scala.util.Try

trait BagReplicatorFixtures
extends Akka
extends Pekko
with OperationFixtures
with AlpakkaSQSWorkerFixtures
with PekkoSQSWorkerFixtures
with MemoryLockDaoFixtures
with S3TransferManagerFixtures {

Expand Down Expand Up @@ -99,7 +99,7 @@ trait BagReplicatorFixtures
val replicator = new S3Replicator()

val service = new BagReplicatorWorker(
config = createAlpakkaSQSWorkerConfig(queue),
config = createPekkoSQSWorkerConfig(queue),
ingestUpdater = ingestUpdater,
outgoingPublisher = outgoingPublisher,
lockingService = lockingService,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package weco.storage_service.bag_root_finder

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import com.typesafe.config.Config
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import weco.json.JsonUtil._
import weco.messaging.typesafe.AlpakkaSqsWorkerConfigBuilder
import weco.messaging.typesafe.PekkoSQSWorkerConfigBuilder
import weco.monitoring.cloudwatch.CloudWatchMetrics
import weco.monitoring.typesafe.CloudWatchBuilder
import weco.storage_service.bag_root_finder.services.{
Expand Down Expand Up @@ -39,7 +39,7 @@ object Main extends WellcomeTypesafeApp {
val operationName = OperationNameBuilder.getName(config)

new BagRootFinderWorker(
config = AlpakkaSqsWorkerConfigBuilder.build(config),
config = PekkoSQSWorkerConfigBuilder.build(config),
bagRootFinder = new BagRootFinder(),
ingestUpdater = IngestUpdaterBuilder.build(config, operationName),
outgoingPublisher = OutgoingPublisherBuilder.build(config, operationName)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package weco.storage_service.bag_root_finder.services

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import io.circe.Decoder
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import weco.messaging.sqsworker.alpakka.AlpakkaSQSWorkerConfig
import weco.messaging.sqsworker.pekko.PekkoSQSWorkerConfig
import weco.monitoring.Metrics
import weco.storage_service._
import weco.storage_service.ingests.services.IngestUpdater
Expand All @@ -22,7 +22,7 @@ import scala.concurrent.Future
import scala.util.{Success, Try}

class BagRootFinderWorker[IngestDestination, OutgoingDestination](
val config: AlpakkaSQSWorkerConfig,
val config: PekkoSQSWorkerConfig,
bagRootFinder: BagRootFinder,
ingestUpdater: IngestUpdater[IngestDestination],
outgoingPublisher: OutgoingPublisher[OutgoingDestination]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package weco.storage_service.bag_root_finder.fixtures

import weco.akka.fixtures.Akka
import weco.pekko.fixtures.Pekko
import weco.fixtures.TestWith
import weco.json.JsonUtil._
import weco.messaging.fixtures.SQS.Queue
import weco.messaging.fixtures.worker.AlpakkaSQSWorkerFixtures
import weco.messaging.fixtures.worker.PekkoSQSWorkerFixtures
import weco.messaging.memory.MemoryMessageSender
import weco.monitoring.memory.MemoryMetrics
import weco.storage_service.fixtures.OperationFixtures
Expand All @@ -16,8 +16,8 @@ import weco.storage.fixtures.S3Fixtures

trait BagRootFinderFixtures
extends OperationFixtures
with Akka
with AlpakkaSQSWorkerFixtures
with Pekko
with PekkoSQSWorkerFixtures
with S3Fixtures {

def withWorkerService[R](
Expand All @@ -33,7 +33,7 @@ trait BagRootFinderFixtures
implicit val metrics: MemoryMetrics = new MemoryMetrics()

val worker = new BagRootFinderWorker(
config = createAlpakkaSQSWorkerConfig(queue),
config = createPekkoSQSWorkerConfig(queue),
bagRootFinder = new BagRootFinder(),
ingestUpdater = ingestUpdater,
outgoingPublisher = outgoingPublisher
Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package weco.storage_service.bag_tagger

import akka.actor.ActorSystem
import org.apache.pekko.actor.ActorSystem
import com.typesafe.config.Config
import software.amazon.awssdk.services.s3.S3Client
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import weco.json.JsonUtil._
import weco.messaging.typesafe.AlpakkaSqsWorkerConfigBuilder
import weco.messaging.typesafe.PekkoSQSWorkerConfigBuilder
import weco.monitoring.cloudwatch.CloudWatchMetrics
import weco.monitoring.typesafe.CloudWatchBuilder
import weco.storage_service.bag_tagger.services.{
ApplyTags,
BagTaggerWorker,
TagRules
}
import weco.storage_service.bag_tracker.client.AkkaBagTrackerClient
import weco.storage_service.bag_tracker.client.PekkoBagTrackerClient
import weco.typesafe.WellcomeTypesafeApp
import weco.typesafe.config.builders.EnrichConfig._

Expand All @@ -32,15 +32,15 @@ object Main extends WellcomeTypesafeApp {
implicit val sqsClient: SqsAsyncClient =
SqsAsyncClient.builder().build()

val bagTrackerClient = new AkkaBagTrackerClient(
val bagTrackerClient = new PekkoBagTrackerClient(
trackerHost = config.requireString("bags.tracker.host")
)

implicit val s3Client: S3Client =
S3Client.builder().build()

new BagTaggerWorker(
config = AlpakkaSqsWorkerConfigBuilder.build(config),
config = PekkoSQSWorkerConfigBuilder.build(config),
bagTrackerClient = bagTrackerClient,
applyTags = ApplyTags(),
tagRules = TagRules.chooseTags
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package weco.storage_service.bag_tagger.services

import akka.actor.ActorSystem
import akka.stream.alpakka.sqs
import akka.stream.alpakka.sqs.MessageAction
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.stream.connectors.sqs
import org.apache.pekko.stream.connectors.sqs.MessageAction
import grizzled.slf4j.Logging
import io.circe.Decoder
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.model.Message
import weco.messaging.sqsworker.alpakka.{
AlpakkaSQSWorker,
AlpakkaSQSWorkerConfig
import weco.messaging.sqsworker.pekko.{
PekkoSQSWorker,
PekkoSQSWorkerConfig
}
import weco.messaging.worker.models.{Result, RetryableFailure, Successful}
import weco.monitoring.Metrics
Expand All @@ -25,7 +25,7 @@ import weco.typesafe.Runnable
import scala.concurrent.Future

class BagTaggerWorker(
config: AlpakkaSQSWorkerConfig,
config: PekkoSQSWorkerConfig,
bagTrackerClient: BagTrackerClient,
applyTags: ApplyTags,
tagRules: StorageManifest => Map[StorageManifestFile, Map[String, String]]
Expand Down Expand Up @@ -84,8 +84,8 @@ class BagTaggerWorker(
}
}

val worker: AlpakkaSQSWorker[BagRegistrationNotification, Unit] =
new AlpakkaSQSWorker[BagRegistrationNotification, Unit](config)(process) {
val worker: PekkoSQSWorker[BagRegistrationNotification, Unit] =
new PekkoSQSWorker[BagRegistrationNotification, Unit](config)(process) {
override val retryAction: Message => sqs.MessageAction =
(message: Message) =>
MessageAction.changeMessageVisibility(message, visibilityTimeout = 0)
Expand Down
Loading

0 comments on commit 83e5918

Please sign in to comment.