Skip to content

Commit

Permalink
more debug output for kvstore
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Dec 3, 2024
1 parent d80c002 commit 8ca0f9b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,6 @@ import java.util.concurrent.Executors
import scala.collection.mutable
import scala.concurrent.ExecutionContext

object Time {

var current: Long = System.nanoTime()

def report(name: => String = ""): Unit = if true then
println {
synchronized {
val last = current
current = System.nanoTime()
s"$name took ${(current - last).doubleValue / 1000_000}ms"
}
}
}

class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) {

val executionContext =
Expand Down Expand Up @@ -57,54 +43,48 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) {

def publish(delta: ClusterState): ClusterState = currentStateLock.synchronized {
if !(delta <= currentState) then {
println(s"[$uid] publishing")
currentState = currentState.merge(delta)
executionContext.execute(() => clusterDataManager.applyDelta(delta))
}
} else
println(s"[$uid] skip")
currentState
}

def transform(f: ClusterState => ClusterState) = publish(
f(currentStateLock.synchronized(currentState))
)

def handleIncoming(change: ClusterState): Unit = {
def handleIncoming(change: ClusterState): Unit = currentStateLock.synchronized {
println(s"[$uid] handling incoming")
val (old, changed) = currentStateLock.synchronized {
val old = currentState
currentState = currentState `merge` change
(old, currentState)
}
val upkept = changed.upkeep()
maybeAnswerClient(old, publish(upkept))
if upkept <= currentState
then println(s"[$uid] no changes")
else println(s"[$uid] upkeep")
assert(changed == currentState)
// else println(s"[$uid] upkept: ${pprint(upkept)}")
val newState = publish(upkept)
maybeAnswerClient(old, newState)
}

private val kvCache = mutable.HashMap[String, String]()

private def onClientStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = {
newState.firstUnansweredRequest.foreach { req =>
println(s"applying client request $req on $uid")
transform(_.write(ClusterData(req.value, req.dot)))
println(s"[$uid] applying client request $req")
currentStateLock.synchronized { transform(_.write(ClusterData(req.value, req.dot))) }
}
}

var counter = 0

private def maybeAnswerClient(oldState: ClusterState, newState: ClusterState): Unit = {

val start = System.nanoTime()
var last = start
val tid = synchronized {
counter += +1
counter
}

Time.report(s"[$tid] cluster changed")

def timeStep(msg: => String): Unit = if false then
println {
val current = last
last = System.nanoTime()
s"[$tid] $msg after ${(last - current).doubleValue / 1000_000}ms"
}
println(s"[$uid] ${newState.log}")
// println(s"${pprint.tokenize(newState).mkString("")}")

for decidedRequest <- newState.readDecisionsSince(oldState.counter) do {
val decision: String = decidedRequest match {
Expand Down Expand Up @@ -140,8 +120,6 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) {
*/
}

timeStep("done")
if false then println(s"[$tid] total ${(System.nanoTime() - start).doubleValue / 1000_000}ms")
}

export clientDataManager.addLatentConnection as addClientConnection
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ lazy val proBench = project.in(file("Modules/Examples/Protocol Benchmarks"))
Dependencies.munit,
Dependencies.slips.options,
Dependencies.jetcd,
Dependencies.pprint,
)

lazy val rdts = crossProject(JVMPlatform, JSPlatform, NativePlatform).crossType(CrossType.Pure)
Expand Down

0 comments on commit 8ca0f9b

Please sign in to comment.