diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala index fcc1904ba..2c6027fd4 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala @@ -16,6 +16,9 @@ import scala.concurrent.ExecutionContext class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) { + def log(msg: String) = + if false then println(s"[$uid] $msg") + val executionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor) @@ -43,11 +46,11 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) { def publish(delta: ClusterState): ClusterState = currentStateLock.synchronized { if !(delta <= currentState) then { - println(s"[$uid] publishing") + log(s"publishing") currentState = currentState.merge(delta) executionContext.execute(() => clusterDataManager.applyDelta(delta)) } else - println(s"[$uid] skip") + log(s"skip") currentState } @@ -66,34 +69,36 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) { ) def handleIncoming(change: ClusterState): Unit = currentStateLock.synchronized { - println(s"[$uid] handling incoming") + log(s"handling incoming") val (old, changed) = currentStateLock.synchronized { val old = currentState currentState = currentState `merge` change (old, currentState) } - val upkept = changed.upkeep() - if upkept <= currentState - then println(s"[$uid] no changes") - else println(s"[$uid] upkeep") - assert(changed == currentState) - // else println(s"[$uid] upkept: ${pprint(upkept)}") - val newState = publish(upkept) - maybeAnswerClient(old, newState) + if old != changed then { + val upkept = changed.upkeep() + if upkept <= currentState + then log(s"no changes") + else log(s"upkeep") + assert(changed == currentState) + // else log(s"upkept: ${pprint(upkept)}") + val newState = publish(upkept) + maybeAnswerClient(old, newState) + } } private val kvCache = mutable.HashMap[String, String]() private def onClientStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = { newState.firstUnansweredRequest.foreach { req => - println(s"[$uid] applying client request $req") + log(s"applying client request $req") currentStateLock.synchronized { transform(_.write(ClusterData(req.value, req.dot))) } } } private def maybeAnswerClient(oldState: ClusterState, newState: ClusterState): Unit = { - println(s"[$uid] ${newState.log}") + log(s"${newState.log}") // println(s"${pprint.tokenize(newState).mkString("")}") for decidedRequest <- newState.readDecisionsSince(oldState.counter) do { diff --git a/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala b/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala index c6b18b827..af166338b 100644 --- a/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala +++ b/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala @@ -3,7 +3,7 @@ package probench import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec import com.github.plokhotnyuk.jsoniter_scala.macros.{CodecMakerConfig, JsonCodecMaker} import de.rmgk.options.* -import probench.clients.{ClientCLI, ProBenchClient} +import probench.clients.ProBenchClient import probench.data.{ClientNodeState, ClusterData, KVOperation} import rdts.base.{LocalUid, Uid} import rdts.datatypes.experiments.protocols.Membership @@ -33,7 +33,7 @@ class ClusterConsensus extends munit.FunSuite { primary.addClientConnection(clientConnection.server) val clientUid = Uid.gen() - val client = ProBenchClient(clientUid, blocking = false) + val client = ProBenchClient(clientUid, blocking = true) client.addLatentConnection(clientConnection.client(clientUid.toString)) client.read("test") @@ -78,5 +78,12 @@ class ClusterConsensus extends munit.FunSuite { nodes.foreach(noUpkeep) + assertEquals(nodes(0).currentState, nodes(1).currentState) + assertEquals(nodes(1).currentState, nodes(2).currentState) + assertEquals(nodes(2).currentState, nodes(0).currentState) + + + println(s"================ at the end of the tests") + } }