Skip to content

Commit

Permalink
add benchmark data collection code
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiibou-chan committed Nov 8, 2024
1 parent 7e78049 commit 74a461c
Show file tree
Hide file tree
Showing 11 changed files with 147 additions and 53 deletions.
4 changes: 2 additions & 2 deletions Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ object TCP {
): JIOStreamConnection = {
println(s"handling new connection")
socket.setTcpNoDelay(true)
socket.setPerformancePreferences(1, 2, 0)
socket.setPerformancePreferences(1, 0, 2)
val conn = new JIOStreamConnection(socket.getInputStream, socket.getOutputStream, () => socket.close())
executionContext.execute: () =>
println(s"executing task")
Expand Down Expand Up @@ -54,7 +54,7 @@ object TCP {

try {
socket.setReuseAddress(true)
socket.setPerformancePreferences(1, 2, 0)
socket.setPerformancePreferences(1, 0, 2)
} catch {
case _: SocketException =>
// some implementations may not allow SO_REUSEADDR to be set
Expand Down
23 changes: 12 additions & 11 deletions Modules/Channels/shared/src/main/scala/channels/Channels.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,28 +54,29 @@ trait LatentConnection[T] {
def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]]
}

object ConnectionMapper {

class ConnectionMapper[A, B](f: B => A, acc: Connection[A]) extends Connection[B] {
override def send(message: B): Async[Any, Unit] =
acc.send(f(message))
class ConnectionMapper[A, B](f: B => A, acc: Connection[A]) extends Connection[B] {
override def send(message: B): Async[Any, Unit] =
acc.send(f(message))

override def close(): Unit = acc.close()
}
override def close(): Unit = acc.close()
}

def adapt[A, B](f: A => B, g: B => A)(la: LatentConnection[A]): LatentConnection[B] = {
object ConnectionMapper {

def adapt[A, B](f: A => B, g: B => A)(latentConnection: LatentConnection[A]): LatentConnection[B] = {
new LatentConnection[B] {
def prepare(incomingHandler: Handler[B]): Async[Abort, Connection[B]] =
Async[Abort] {
val conn = Async.bind:
la.prepare: conn =>
val conn = Async.bind {
latentConnection.prepare { conn =>
val mapped = ConnectionMapper(g, conn)
val cb = incomingHandler.getCallbackFor(mapped)
rs => cb.complete(rs.map(f))
}
}
ConnectionMapper(g, conn)
}
}

}

}
6 changes: 6 additions & 0 deletions Modules/Examples/Protocol Benchmarks/args/bench-1-1
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
multiput key value 100
benchmark
multiput keyX%n valueX%n 10000
# multiget keyX%n 1000
save-benchmark bench-results/
exit
4 changes: 2 additions & 2 deletions Modules/Examples/Protocol Benchmarks/args/client-1-1
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# wait-for-res false
multiput key%n valueX%n 200
multiget key%n 200
multiput key%n valueX%n 1000
multiget key%n 1000
# wait
exit
4 changes: 2 additions & 2 deletions Modules/Examples/Protocol Benchmarks/args/client-1-2
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# wait-for-res false
multiput key%n valueY%n 200
multiget key%n 200
multiput key%n valueY%n 1000
multiget key%n 1000
# wait
exit
4 changes: 2 additions & 2 deletions Modules/Examples/Protocol Benchmarks/args/client-1-3
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# wait-for-res false
multiput key%n valueZ%n 200
multiget key%n 200
multiput key%n valueZ%n 1000
multiget key%n 1000
# wait
exit
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package probench

import probench.benchmark.{BenchmarkData, CSVWriter}
import probench.data.*
import rdts.base.{Bottom, LocalUid, Uid}
import rdts.datatypes.contextual.CausalQueue
import rdts.dotted.Dotted
import rdts.syntax.DeltaBuffer

import java.nio.file.Path
import scala.collection.mutable
import scala.io.StdIn
import scala.io.StdIn.readLine
import scala.util.matching.Regex
Expand All @@ -15,24 +18,23 @@ class Client(val name: Uid) {
given localUid: LocalUid = LocalUid(name)
private val dataManager = ProDataManager[ClientNodeState](localUid, Bottom[ClientNodeState].empty, onStateChange)

private val lock = new Object()
private var currentOp: Option[Request] = None
private var waitForOp: Boolean = true

private val commented: Regex = """#.*""".r
private val waitForRes: Regex = """wait-for-res (true|false)""".r
private val get: Regex = """get ([\w%]+)""".r
private val put: Regex = """put ([\w%]+) ([\w%]+)""".r
private val multiget: Regex = """multiget ([\w%]+) (\d+)""".r
private val multiput: Regex = """multiput ([\w%]+) ([\w%]+) (\d+)""".r
private val mp: Regex = """mp (\d+)""".r
private val lock = new Object()
private var currentOp: Option[Request] = None
private var waitForOp: Boolean = true
private var doBenchmark: Boolean = false
private val benchmarkData: mutable.ListBuffer[BenchmarkData] = mutable.ListBuffer.empty

private val commented: Regex = """#.*""".r
private val waitForRes: Regex = """wait-for-res (true|false)""".r
private val get: Regex = """get ([\w%]+)""".r
private val put: Regex = """put ([\w%]+) ([\w%]+)""".r
private val multiget: Regex = """multiget ([\w%]+) ([\d_]+)""".r
private val multiput: Regex = """multiput ([\w%]+) ([\w%]+) ([\d_]+)""".r
private val mp: Regex = """mp ([\d_]+)""".r
private val benchmark: Regex = """benchmark""".r
private val saveBenchmark: Regex = """save-benchmark ([\w\\/.\-]+)""".r

private def onStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = {
/* val diff = newState.responses.data.values.size - oldState.responses.data.values.size
if diff > 0 then {
println(s"Got $diff result(s): ${newState.responses.data.values.toList.reverseIterator.take(diff).toList.reverse.map(_.value)}")
} */

for {
op <- currentOp
CausalQueue.QueueElement(res @ Response(req, _), _, _) <- newState.responses.data.values if req == op
Expand All @@ -53,18 +55,35 @@ class Client(val name: Uid) {
val req = Request(op)
currentOp = Some(req)

// println(s"Put $req")
val start = if doBenchmark then System.nanoTime() else 0

dataManager.transform { current =>
current.mod(it => it.copy(requests = it.requests.mod(_.enqueue(req))))
}

// println(s"New Requests ${dataManager.mergedState.data.requests.data.values.toList.map(_.value)}")

if waitForOp then {
lock.synchronized {
lock.wait()
}

if doBenchmark then {
val end = System.nanoTime()
val opString = op match
case KVOperation.Read(_) => "get"
case KVOperation.Write(_, _) => "put"
val args = op match
case KVOperation.Read(key) => key
case KVOperation.Write(key, value) => s"$key $value"
benchmarkData.append(BenchmarkData(
name.delegate,
opString,
args,
start / 1000,
end / 1000,
(end - start).toDouble / 1000,
"µs"
))
}
}
}

Expand Down Expand Up @@ -97,20 +116,42 @@ class Client(val name: Uid) {
case Some(commented()) => // ignore
case Some(get(key)) => read(key)
case Some(put(key, value)) => write(key, value)
case Some(multiget(key, times)) => multiget(key, times.toInt)
case Some(multiput(key, value, times)) => multiput(key, value, times.toInt)
case Some(waitForRes(flag)) => waitForOp = flag.toBoolean
case Some("wait") => lock.synchronized { lock.wait() }
case Some("ping") => dataManager.pingAll()
case Some("exit") => running = false
case Some(mp(times)) => multiput("key%n", "value%n", times.toInt)
case None => running = false
case other =>
case Some(multiget(key, times)) => multiget(key, times.replace("_", "").toInt)
case Some(multiput(key, value, times)) => multiput(key, value, times.replace("_", "").toInt)
case Some(mp(times)) => multiput("key%n", "value%n", times.replace("_", "").toInt)
case Some(waitForRes(flag)) =>
if doBenchmark then println("Can't change waiting mode while benchmarking!")
else waitForOp = flag.toBoolean
case Some(benchmark()) =>
doBenchmark = true
waitForOp = true
case Some(saveBenchmark(path)) =>
println(path)
val benchmarkPath = Path.of(path)
val runId = Uid.gen().delegate
val writer = new CSVWriter(";", benchmarkPath, s"${name.delegate}-$runId", BenchmarkData.header)
benchmarkData.foreach { row =>
writer.writeRow(
s"${row.name}-$runId",
row.op,
row.args,
row.sendTime.toString,
row.receiveTime.toString,
row.latency.toString,
row.unit
)
}
writer.close()
case Some("ping") => dataManager.pingAll()
case Some("wait") => lock.synchronized { lock.wait() }
case None | Some("exit") => running = false
case _ =>
println("assuming put")
write("key", "value")
}
}
println(s"ended")
System.exit(1)
}

export dataManager.addLatentConnection
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package probench.benchmark

case class BenchmarkData(
name: String,
op: String,
args: String,
sendTime: Long,
receiveTime: Long,
latency: Double,
unit: String
)

object BenchmarkData {
val header: Seq[String] = Seq("name", "op", "args", "send-time", "receive-time", "latency", "unit")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package probench.benchmark

import java.io.{BufferedWriter, FileOutputStream, OutputStreamWriter}
import java.nio.file.Path

class CSVWriter(
private val separator: String,
private val path: Path,
private val fileName: String,
private val header: Seq[String]
) {
path.toFile.mkdirs()
private val file = path.resolve(s"$fileName.csv").toFile
private val writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file)))

writer.write(header.mkString(separator))
writer.write("\n")

def writeRow(data: String*): Unit = {
writer.write(data.mkString(separator))
writer.write("\n")
}

def close(): Unit = {
writer.flush()
println(s"Saving to ${path.toAbsolutePath}")
writer.close()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ object cli {

val (ip, port) = clientNode.value

client.addLatentConnection(TCP.connect(socketPath(ip, port), ec))
client.addLatentConnection(NioTCP.connect(socketPath(ip, port), ec, Abort()))

client.startCLI()
}.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ class ProDataManager[State: Lattice](
dataManager.allDeltas.foldLeft(ProtocolDots(initialState, Dots.empty))(Lattice[ProtocolDots[State]].merge)

private def receivedChanges(changes: ProtocolDots[State]): Unit = {
val oldState = mergedState
dataManager.lock.synchronized {
mergedState = mergedState.merge(changes)
val (o, n) = synchronized {
val oldState = mergedState
mergedState = oldState.merge(changes)
(oldState, mergedState)
}
onChange(oldState.data, mergedState.data)
onChange(o.data, n.data)
}

def transform(fun: DeltaBuffer[State] => DeltaBuffer[State]): Unit = dataManager.lock.synchronized {
def transform(fun: DeltaBuffer[State] => DeltaBuffer[State]): Unit = {
val current: DeltaBuffer[State] = DeltaBuffer(mergedState.data)
val next: DeltaBuffer[State] = fun(current)

Expand Down

0 comments on commit 74a461c

Please sign in to comment.