From 7d2356de0bf79431dd2a18b04d98c12cd95188c2 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Mon, 16 Dec 2024 21:12:59 +0100 Subject: [PATCH] Revert "revert #1568 due to test failures (#1587)" This reverts commit 7af03e5215dc69df9702803dba7f22d94b49ead4. --- .../apache/pekko/cluster/ClusterDaemon.scala | 14 +-- .../cluster/MixedProtocolClusterSpec.scala | 92 +++++++++++++++++++ 2 files changed, 100 insertions(+), 6 deletions(-) create mode 100644 cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala index 34320067be1..c703bdd6b12 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ClusterDaemon.scala @@ -365,6 +365,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh val statsEnabled = PublishStatsInterval.isFinite var gossipStats = GossipStats() + val acceptedProtocols = context.system.settings.config.getStringList("pekko.remote.accept-protocol-names") + var seedNodes = SeedNodes var seedNodeProcess: Option[ActorRef] = None var seedNodeProcessCounter = 0 // for unique names @@ -701,10 +703,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh * which will reply with a `Welcome` message. */ def join(address: Address): Unit = { - if (address.protocol != selfAddress.protocol) + if (!acceptedProtocols.contains(address.protocol)) logWarning( - "Trying to join member with wrong protocol, but was ignored, expected [{}] but was [{}]", - selfAddress.protocol, + "Trying to join member with wrong protocol, but was ignored, expected any of [{}] but was [{}]", + acceptedProtocols, address.protocol) else if (address.system != selfAddress.system) logWarning( @@ -750,10 +752,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh def joining(joiningNode: UniqueAddress, roles: Set[String], appVersion: Version): Unit = { if (!preparingForShutdown) { val selfStatus = latestGossip.member(selfUniqueAddress).status - if (joiningNode.address.protocol != selfAddress.protocol) + if (!acceptedProtocols.contains(joiningNode.address.protocol)) logWarning( - "Member with wrong protocol tried to join, but was ignored, expected [{}] but was [{}]", - selfAddress.protocol, + "Member with wrong protocol tried to join, but was ignored, expected any of {} but was [{}]", + acceptedProtocols, joiningNode.address.protocol) else if (joiningNode.address.system != selfAddress.system) logWarning( diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala new file mode 100644 index 00000000000..45382f4b353 --- /dev/null +++ b/cluster/src/test/scala/org/apache/pekko/cluster/MixedProtocolClusterSpec.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.cluster + +import com.typesafe.config.{ Config, ConfigFactory } + +import org.apache.pekko.testkit.{ LongRunningTest, PekkoSpec } + +object MixedProtocolClusterSpec { + + val baseConfig: Config = + ConfigFactory.parseString(""" + pekko.actor.provider = "cluster" + pekko.coordinated-shutdown.terminate-actor-system = on + + pekko.remote.classic.netty.tcp.port = 0 + pekko.remote.artery.canonical.port = 0 + pekko.remote.artery.advanced.aeron.idle-cpu-level = 3 + pekko.remote.accept-protocol-names = ["pekko", "akka"] + + pekko.cluster.jmx.multi-mbeans-in-same-jvm = on + pekko.cluster.configuration-compatibility-check.enforce-on-join = off + """) + + val configWithPekko: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "pekko" + """).withFallback(baseConfig) + + val configWithAkka: Config = + ConfigFactory.parseString(""" + pekko.remote.protocol-name = "akka" + """).withFallback(baseConfig) +} + +class MixedProtocolClusterSpec extends PekkoSpec with ClusterTestKit { + + import MixedProtocolClusterSpec._ + + "A node using the akka protocol" must { + + "be allowed to join a cluster with a node using the pekko protocol" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + // start the first node with the "pekko" protocol + clusterTestUtil.newActorSystem(configWithPekko) + + // have a node using the "akka" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithAkka) + clusterTestUtil.formCluster() + + try { + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + + "allow a node using the pekko protocol to join the cluster" taggedAs LongRunningTest in { + + val clusterTestUtil = new ClusterTestUtil(system.name) + + // create the first node with the "akka" protocol + clusterTestUtil.newActorSystem(configWithAkka) + + // have a node using the "pekko" protocol join + val joiningNode = clusterTestUtil.newActorSystem(configWithPekko) + clusterTestUtil.formCluster() + + try { + awaitCond(clusterTestUtil.isMemberUp(joiningNode), message = "awaiting joining node to be 'Up'") + } finally { + clusterTestUtil.shutdownAll() + } + } + } +}