Skip to content

Commit

Permalink
forward fit Akka compat changes (#1211)
Browse files Browse the repository at this point in the history
* Handle mixed akka/pekko protocol names

* add extra changes needed to get akka cluster support

* add default for pekko.remote.akka.version (#1112)

* add default for pekko.cluster.akka.version

* refactor configs

* Update reference.conf

* add validations for config settings

* Update RemoteSettings.scala

* Update RemoteSettingsSpec.scala

* scalafmt

---------

Co-authored-by: Matthew de Detrich <[email protected]>
  • Loading branch information
pjfanning and mdedetrich authored Mar 22, 2024
1 parent 4bb851d commit 0646754
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 26 deletions.
54 changes: 54 additions & 0 deletions cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.cluster

import com.typesafe.config.{ Config, ConfigValue, ConfigValueFactory, ConfigValueType }

import scala.annotation.nowarn

private[cluster] object ConfigUtil {

@nowarn("msg=deprecated")
def addAkkaConfig(cfg: Config, akkaVersion: String): Config = {
import scala.collection.JavaConverters._
val innerSet = cfg.entrySet().asScala
.filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() != ConfigValueType.OBJECT)
.map { entry =>
entry.getKey.replace("pekko", "akka") -> adjustPackageNameIfNecessary(entry.getValue)
}
var newConfig = cfg
innerSet.foreach { case (key, value) =>
newConfig = newConfig.withValue(key, value)
}
newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion))
}

private def adjustPackageNameIfNecessary(cv: ConfigValue): ConfigValue = {
if (cv.valueType() == ConfigValueType.STRING) {
val str = cv.unwrapped().toString
if (str.startsWith("org.apache.pekko")) {
ConfigValueFactory.fromAnyRef(str.replace("org.apache.pekko", "akka"))
} else {
cv
}
} else {
cv
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,21 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon
"Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " +
"This node will be shutdown!"

private lazy val needsAkkaConfig: Boolean = {
context.system.settings.config
.getStringList("pekko.remote.accept-protocol-names")
.contains("akka")
}

private lazy val akkaVersion: String = {
val cfg = context.system.settings.config
if (cfg.hasPath("akka.version")) {
cfg.getString("akka.version")
} else {
cfg.getString("pekko.remote.akka.version")
}
}

private def stopOrBecome(behavior: Option[Actor.Receive]): Unit =
behavior match {
case Some(done) => context.become(done) // JoinSeedNodeProcess
Expand All @@ -65,8 +80,12 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon
val configToValidate =
JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, context.system.settings.config)

val adjustedConfig = if (needsAkkaConfig)
ConfigUtil.addAkkaConfig(configToValidate, akkaVersion)
else configToValidate

seedNodes.foreach { a =>
context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate)
context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(adjustedConfig)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ abstract class RemotingFeaturesSpec(val multiNodeConfig: RemotingFeaturesConfig)
remotePath,
Nobody,
None,
None)
None,
Set("pekko", "akka"))

rar.start()
rar
Expand Down
27 changes: 27 additions & 0 deletions remote/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,33 @@ pekko {
# is 'off'. Set this to 'off' to suppress these.
warn-unsafe-watch-outside-cluster = on

# When receiving requests from other remote actors, what are the valid
# prefixes to check against. Useful for when dealing with rolling cluster
# migrations with compatible systems such as Lightbend's Akka.
# By default, we only support "pekko" protocol.
# If you want to also support Akka, change this config to:
# pekko.remote.accept-protocol-names = ["pekko", "akka"]
# A ConfigurationException will be thrown at runtime if the array is empty
# or contains values other than "pekko" and/or "akka".
accept-protocol-names = ["pekko"]

# The protocol name to use when sending requests to other remote actors.
# Useful when dealing with rolling migration, i.e. temporarily change
# the protocol name to match another compatible actor implementation
# such as Lightbend's "akka" (whilst making sure accept-protocol-names
# contains "akka") so that you can gracefully migrate all nodes to Apache
# Pekko and then change the protocol-name back to "pekko" once all
# nodes have been are running on Apache Pekko.
# A ConfigurationException will be thrown at runtime if the value is not
# set to "pekko" or "akka".
protocol-name = "pekko"

# When pekko.remote.accept-protocol-names contains "akka", then we
# need to know the Akka version. If you include the Akka jars on the classpath,
# we can use the akka.version from their configuration. This configuration
# setting is only used if we can't find an akka.version setting.
akka.version = "2.6.21"

# Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf
# [Hayashibara et al]) used for remote death watch.
# The default PhiAccrualFailureDetector will trigger if there are no heartbeats within
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ object BoundAddressesExtension extends ExtensionId[BoundAddressesExtension] with

class BoundAddressesExtension(val system: ExtendedActorSystem) extends Extension {

private val remoteSettings: RemoteSettings = new RemoteSettings(system.settings.config)

/**
* Returns a mapping from a protocol to a set of bound addresses.
*/
def boundAddresses: Map[String, Set[Address]] = system.provider.asInstanceOf[RemoteActorRefProvider].transport match {
case artery: ArteryTransport => Map(ArteryTransport.ProtocolName -> Set(artery.bindAddress.address))
case artery: ArteryTransport => Map(remoteSettings.ProtocolName -> Set(artery.bindAddress.address))
case remoting: Remoting => remoting.boundAddresses
case other => throw new IllegalStateException(s"Unexpected transport type: ${other.getClass}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ private[pekko] class RemoteActorRefProvider(
val rpath =
(RootActorPath(address) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements)
.withUid(path.uid)
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d),
remoteSettings.AcceptProtocolNames)
} else {
warnIfNotRemoteActorRef(path)
local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
Expand All @@ -488,7 +489,8 @@ private[pekko] class RemoteActorRefProvider(
RootActorPath(address),
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.error(e, "No root guardian at [{}]", address)
Expand All @@ -513,7 +515,8 @@ private[pekko] class RemoteActorRefProvider(
RootActorPath(address) / elems,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
Expand Down Expand Up @@ -555,7 +558,8 @@ private[pekko] class RemoteActorRefProvider(
rootPath,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
Expand All @@ -578,7 +582,8 @@ private[pekko] class RemoteActorRefProvider(
path,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
Expand Down Expand Up @@ -672,18 +677,26 @@ private[pekko] class RemoteActorRef private[pekko] (
val path: ActorPath,
val getParent: InternalActorRef,
props: Option[Props],
deploy: Option[Deploy])
deploy: Option[Deploy],
val acceptProtocolNames: Set[String])
extends InternalActorRef
with RemoteRef {

if (path.address.hasLocalScope)
throw new IllegalArgumentException(s"Unexpected local address in RemoteActorRef [$this]")

remote match {
case t: ArteryTransport =>
// detect mistakes such as using "pekko.tcp" with Artery
if (path.address.protocol != t.localAddress.address.protocol)
throw new IllegalArgumentException(s"Wrong protocol of [$path], expected [${t.localAddress.address.protocol}]")
case _: ArteryTransport =>
// detect mistakes such as using "pekko.tcp" with Artery, also handles pekko.remote.accept-protocol-names
if (!acceptProtocolNames.contains(path.address.protocol)) {
val expectedString = if (acceptProtocolNames.size == 1)
"expected"
else
"expected one of"

throw new IllegalArgumentException(
s"Wrong protocol of [$path], $expectedString [${acceptProtocolNames.mkString}]")
}
case _ =>
}
@volatile private[remote] var cachedAssociation: artery.Association = null
Expand All @@ -697,7 +710,8 @@ private[pekko] class RemoteActorRef private[pekko] (
s.headOption match {
case None => this
case Some("..") => getParent.getChild(name)
case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None)
case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None,
acceptProtocolNames = acceptProtocolNames)
}
}

Expand Down
25 changes: 25 additions & 0 deletions remote/src/main/scala/org/apache/pekko/remote/RemoteSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,31 @@ final class RemoteSettings(val config: Config) {
@deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0")
val Adapters: Map[String, String] = configToMap(getConfig("pekko.remote.classic.adapters"))

private val AllowableProtocolNames = Set("pekko", "akka")

val ProtocolName: String = {
val setting = getString("pekko.remote.protocol-name")
if (!AllowableProtocolNames.contains(setting)) {
throw new ConfigurationException("The only allowed values for pekko.remote.protocol-name " +
"are \"pekko\" and \"akka\".")
}
setting
}

val AcceptProtocolNames: Set[String] = {
val set = immutableSeq(getStringList("pekko.remote.accept-protocol-names")).toSet
if (set.isEmpty) {
throw new ConfigurationException("pekko.remote.accept-protocol-names setting must not be empty. " +
"The setting is an array and the only acceptable values are \"pekko\" and \"akka\".")
}
val filteredSet = set.filterNot(AllowableProtocolNames.contains)
if (filteredSet.nonEmpty) {
throw new ConfigurationException("pekko.remote.accept-protocol-names is an array setting " +
"that only accepts the values \"pekko\" and \"akka\".")
}
set
}

private def transportNames: immutable.Seq[String] =
immutableSeq(getStringList("pekko.remote.classic.enabled-transports"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
val (port, boundPort) = bindInboundStreams()

_localAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port),
Address(provider.remoteSettings.ProtocolName, system.name, settings.Canonical.Hostname, port),
AddressUidExtension(system).longAddressUid)
_addresses = Set(_localAddress.address)

_bindAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort),
Address(provider.remoteSettings.ProtocolName, system.name, settings.Bind.Hostname, boundPort),
AddressUidExtension(system).longAddressUid)

flightRecorder.transportUniqueAddressSet(_localAddress)
Expand Down Expand Up @@ -954,8 +954,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
*/
private[remote] object ArteryTransport {

val ProtocolName = "pekko"

// Note that the used version of the header format for outbound messages is defined in
// `ArterySettings.Version` because that may depend on configuration settings.
// This is the highest supported version on receiving (decoding) side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,11 @@ private[remote] class PekkoProtocolSettings(config: Config) {
}

val ManagerNamePrefix: String = config.getString("pekko.remote.classic.manager-name-prefix")
val PekkoScheme: String = new RemoteSettings(config).ProtocolName
}

@nowarn("msg=deprecated")
private[remote] object PekkoProtocolTransport { // Couldn't these go into the Remoting Extension/ RemoteSettings instead?
val PekkoScheme: String = "pekko"
val PekkoOverhead: Int = 0 // Don't know yet
val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0)

Expand Down Expand Up @@ -124,7 +124,7 @@ private[remote] class PekkoProtocolTransport(
private val codec: PekkoPduCodec)
extends ActorTransportAdapter(wrappedTransport, system) {

override val addedSchemeIdentifier: String = PekkoScheme
override val addedSchemeIdentifier: String = new RemoteSettings(system.settings.config).ProtocolName

override def managementCommand(cmd: Any): Future[Boolean] = wrappedTransport.managementCommand(cmd)

Expand Down Expand Up @@ -232,8 +232,9 @@ private[remote] class PekkoProtocolHandle(
_wrappedHandle: AssociationHandle,
val handshakeInfo: HandshakeInfo,
private val stateActor: ActorRef,
private val codec: PekkoPduCodec)
extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, PekkoScheme) {
private val codec: PekkoPduCodec,
override val addedSchemeIdentifier: String)
extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, addedSchemeIdentifier) {

override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload))

Expand Down Expand Up @@ -716,7 +717,8 @@ private[remote] class ProtocolStateActor(
wrappedHandle,
handshakeInfo,
self,
codec))
codec,
settings.PekkoScheme))
readHandlerPromise.future
}

Expand All @@ -736,7 +738,8 @@ private[remote] class ProtocolStateActor(
wrappedHandle,
handshakeInfo,
self,
codec)))
codec,
settings.PekkoScheme)))
readHandlerPromise.future
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ pekko.actor.warn-about-java-serializer-usage = off
extinctPath,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = Set("pekko", "akka"))

val probe = TestProbe()
probe.watch(extinctRef)
Expand Down
Loading

0 comments on commit 0646754

Please sign in to comment.