Skip to content

Commit

Permalink
disable logging
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Dec 3, 2024
1 parent 1356580 commit 9befad2
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import scala.concurrent.ExecutionContext

class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) {

def log(msg: String) =
if false then println(s"[$uid] $msg")

val executionContext =
ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor)

Expand Down Expand Up @@ -43,11 +46,11 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) {

def publish(delta: ClusterState): ClusterState = currentStateLock.synchronized {
if !(delta <= currentState) then {
println(s"[$uid] publishing")
log(s"publishing")
currentState = currentState.merge(delta)
executionContext.execute(() => clusterDataManager.applyDelta(delta))
} else
println(s"[$uid] skip")
log(s"skip")
currentState
}

Expand All @@ -66,34 +69,36 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) {
)

def handleIncoming(change: ClusterState): Unit = currentStateLock.synchronized {
println(s"[$uid] handling incoming")
log(s"handling incoming")
val (old, changed) = currentStateLock.synchronized {
val old = currentState
currentState = currentState `merge` change
(old, currentState)
}
val upkept = changed.upkeep()
if upkept <= currentState
then println(s"[$uid] no changes")
else println(s"[$uid] upkeep")
assert(changed == currentState)
// else println(s"[$uid] upkept: ${pprint(upkept)}")
val newState = publish(upkept)
maybeAnswerClient(old, newState)
if old != changed then {
val upkept = changed.upkeep()
if upkept <= currentState
then log(s"no changes")
else log(s"upkeep")
assert(changed == currentState)
// else log(s"upkept: ${pprint(upkept)}")
val newState = publish(upkept)
maybeAnswerClient(old, newState)
}
}

private val kvCache = mutable.HashMap[String, String]()

private def onClientStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = {
newState.firstUnansweredRequest.foreach { req =>
println(s"[$uid] applying client request $req")
log(s"applying client request $req")
currentStateLock.synchronized { transform(_.write(ClusterData(req.value, req.dot))) }
}
}

private def maybeAnswerClient(oldState: ClusterState, newState: ClusterState): Unit = {

println(s"[$uid] ${newState.log}")
log(s"${newState.log}")
// println(s"${pprint.tokenize(newState).mkString("")}")

for decidedRequest <- newState.readDecisionsSince(oldState.counter) do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package probench
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.{CodecMakerConfig, JsonCodecMaker}
import de.rmgk.options.*
import probench.clients.{ClientCLI, ProBenchClient}
import probench.clients.ProBenchClient
import probench.data.{ClientNodeState, ClusterData, KVOperation}
import rdts.base.{LocalUid, Uid}
import rdts.datatypes.experiments.protocols.Membership
Expand Down Expand Up @@ -33,7 +33,7 @@ class ClusterConsensus extends munit.FunSuite {
primary.addClientConnection(clientConnection.server)

val clientUid = Uid.gen()
val client = ProBenchClient(clientUid, blocking = false)
val client = ProBenchClient(clientUid, blocking = true)
client.addLatentConnection(clientConnection.client(clientUid.toString))

client.read("test")
Expand Down Expand Up @@ -78,5 +78,12 @@ class ClusterConsensus extends munit.FunSuite {

nodes.foreach(noUpkeep)

assertEquals(nodes(0).currentState, nodes(1).currentState)
assertEquals(nodes(1).currentState, nodes(2).currentState)
assertEquals(nodes(2).currentState, nodes(0).currentState)


println(s"================ at the end of the tests")

}
}

0 comments on commit 9befad2

Please sign in to comment.