diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala index 61174c569..33ddb1f4d 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala @@ -14,20 +14,6 @@ import java.util.concurrent.Executors import scala.collection.mutable import scala.concurrent.ExecutionContext -object Time { - - var current: Long = System.nanoTime() - - def report(name: => String = ""): Unit = if true then - println { - synchronized { - val last = current - current = System.nanoTime() - s"$name took ${(current - last).doubleValue / 1000_000}ms" - } - } -} - class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) { val executionContext = @@ -57,9 +43,11 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) { def publish(delta: ClusterState): ClusterState = currentStateLock.synchronized { if !(delta <= currentState) then { + println(s"[$uid] publishing") currentState = currentState.merge(delta) executionContext.execute(() => clusterDataManager.applyDelta(delta)) - } + } else + println(s"[$uid] skip") currentState } @@ -67,44 +55,36 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) { f(currentStateLock.synchronized(currentState)) ) - def handleIncoming(change: ClusterState): Unit = { + def handleIncoming(change: ClusterState): Unit = currentStateLock.synchronized { + println(s"[$uid] handling incoming") val (old, changed) = currentStateLock.synchronized { val old = currentState currentState = currentState `merge` change (old, currentState) } val upkept = changed.upkeep() - maybeAnswerClient(old, publish(upkept)) + if upkept <= currentState + then println(s"[$uid] no changes") + else println(s"[$uid] upkeep") + assert(changed == currentState) + // else println(s"[$uid] upkept: ${pprint(upkept)}") + val newState = publish(upkept) + maybeAnswerClient(old, newState) } private val kvCache = mutable.HashMap[String, String]() private def onClientStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = { newState.firstUnansweredRequest.foreach { req => - println(s"applying client request $req on $uid") - transform(_.write(ClusterData(req.value, req.dot))) + println(s"[$uid] applying client request $req") + currentStateLock.synchronized { transform(_.write(ClusterData(req.value, req.dot))) } } } - var counter = 0 - private def maybeAnswerClient(oldState: ClusterState, newState: ClusterState): Unit = { - val start = System.nanoTime() - var last = start - val tid = synchronized { - counter += +1 - counter - } - - Time.report(s"[$tid] cluster changed") - - def timeStep(msg: => String): Unit = if false then - println { - val current = last - last = System.nanoTime() - s"[$tid] $msg after ${(last - current).doubleValue / 1000_000}ms" - } + println(s"[$uid] ${newState.log}") + // println(s"${pprint.tokenize(newState).mkString("")}") for decidedRequest <- newState.readDecisionsSince(oldState.counter) do { val decision: String = decidedRequest match { @@ -140,8 +120,6 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) { */ } - timeStep("done") - if false then println(s"[$tid] total ${(System.nanoTime() - start).doubleValue / 1000_000}ms") } export clientDataManager.addLatentConnection as addClientConnection diff --git a/build.sbt b/build.sbt index 3f862ce90..d9c678e83 100644 --- a/build.sbt +++ b/build.sbt @@ -242,6 +242,7 @@ lazy val proBench = project.in(file("Modules/Examples/Protocol Benchmarks")) Dependencies.munit, Dependencies.slips.options, Dependencies.jetcd, + Dependencies.pprint, ) lazy val rdts = crossProject(JVMPlatform, JSPlatform, NativePlatform).crossType(CrossType.Pure)