From 74a461c832b6d4b97ac43e0cb70331e1bfb31112 Mon Sep 17 00:00:00 2001 From: Svenja Kernig Date: Fri, 8 Nov 2024 15:33:27 +0100 Subject: [PATCH] add benchmark data collection code --- .../src/main/scala/channels/TCP.scala | 4 +- .../src/main/scala/channels/Channels.scala | 23 ++--- .../Protocol Benchmarks/args/bench-1-1 | 6 ++ .../Protocol Benchmarks/args/client-1-1 | 4 +- .../Protocol Benchmarks/args/client-1-2 | 4 +- .../Protocol Benchmarks/args/client-1-3 | 4 +- .../src/main/scala/probench/Client.scala | 97 +++++++++++++------ .../probench/benchmark/BenchmarkData.scala | 15 +++ .../scala/probench/benchmark/CSVWriter.scala | 30 ++++++ .../src/main/scala/probench/cli.scala | 2 +- .../scala/probench/data/ProDataManager.scala | 11 ++- 11 files changed, 147 insertions(+), 53 deletions(-) create mode 100644 Modules/Examples/Protocol Benchmarks/args/bench-1-1 create mode 100644 Modules/Examples/Protocol Benchmarks/src/main/scala/probench/benchmark/BenchmarkData.scala create mode 100644 Modules/Examples/Protocol Benchmarks/src/main/scala/probench/benchmark/CSVWriter.scala diff --git a/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala b/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala index 4e2dd881e..6aa8bc547 100644 --- a/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala +++ b/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala @@ -23,7 +23,7 @@ object TCP { ): JIOStreamConnection = { println(s"handling new connection") socket.setTcpNoDelay(true) - socket.setPerformancePreferences(1, 2, 0) + socket.setPerformancePreferences(1, 0, 2) val conn = new JIOStreamConnection(socket.getInputStream, socket.getOutputStream, () => socket.close()) executionContext.execute: () => println(s"executing task") @@ -54,7 +54,7 @@ object TCP { try { socket.setReuseAddress(true) - socket.setPerformancePreferences(1, 2, 0) + socket.setPerformancePreferences(1, 0, 2) } catch { case _: SocketException => // some implementations may not allow SO_REUSEADDR to be set diff --git a/Modules/Channels/shared/src/main/scala/channels/Channels.scala b/Modules/Channels/shared/src/main/scala/channels/Channels.scala index 94d890651..71f87537a 100644 --- a/Modules/Channels/shared/src/main/scala/channels/Channels.scala +++ b/Modules/Channels/shared/src/main/scala/channels/Channels.scala @@ -54,28 +54,29 @@ trait LatentConnection[T] { def prepare(incomingHandler: Handler[T]): Async[Abort, Connection[T]] } -object ConnectionMapper { - - class ConnectionMapper[A, B](f: B => A, acc: Connection[A]) extends Connection[B] { - override def send(message: B): Async[Any, Unit] = - acc.send(f(message)) +class ConnectionMapper[A, B](f: B => A, acc: Connection[A]) extends Connection[B] { + override def send(message: B): Async[Any, Unit] = + acc.send(f(message)) - override def close(): Unit = acc.close() - } + override def close(): Unit = acc.close() +} - def adapt[A, B](f: A => B, g: B => A)(la: LatentConnection[A]): LatentConnection[B] = { +object ConnectionMapper { + def adapt[A, B](f: A => B, g: B => A)(latentConnection: LatentConnection[A]): LatentConnection[B] = { new LatentConnection[B] { def prepare(incomingHandler: Handler[B]): Async[Abort, Connection[B]] = Async[Abort] { - val conn = Async.bind: - la.prepare: conn => + val conn = Async.bind { + latentConnection.prepare { conn => val mapped = ConnectionMapper(g, conn) val cb = incomingHandler.getCallbackFor(mapped) rs => cb.complete(rs.map(f)) + } + } ConnectionMapper(g, conn) } } - } + } diff --git a/Modules/Examples/Protocol Benchmarks/args/bench-1-1 b/Modules/Examples/Protocol Benchmarks/args/bench-1-1 new file mode 100644 index 000000000..f876647b0 --- /dev/null +++ b/Modules/Examples/Protocol Benchmarks/args/bench-1-1 @@ -0,0 +1,6 @@ +multiput key value 100 +benchmark +multiput keyX%n valueX%n 10000 +# multiget keyX%n 1000 +save-benchmark bench-results/ +exit diff --git a/Modules/Examples/Protocol Benchmarks/args/client-1-1 b/Modules/Examples/Protocol Benchmarks/args/client-1-1 index 159597666..17f254694 100644 --- a/Modules/Examples/Protocol Benchmarks/args/client-1-1 +++ b/Modules/Examples/Protocol Benchmarks/args/client-1-1 @@ -1,5 +1,5 @@ # wait-for-res false -multiput key%n valueX%n 200 -multiget key%n 200 +multiput key%n valueX%n 1000 +multiget key%n 1000 # wait exit diff --git a/Modules/Examples/Protocol Benchmarks/args/client-1-2 b/Modules/Examples/Protocol Benchmarks/args/client-1-2 index c8d95a1cc..96a41a417 100644 --- a/Modules/Examples/Protocol Benchmarks/args/client-1-2 +++ b/Modules/Examples/Protocol Benchmarks/args/client-1-2 @@ -1,5 +1,5 @@ # wait-for-res false -multiput key%n valueY%n 200 -multiget key%n 200 +multiput key%n valueY%n 1000 +multiget key%n 1000 # wait exit diff --git a/Modules/Examples/Protocol Benchmarks/args/client-1-3 b/Modules/Examples/Protocol Benchmarks/args/client-1-3 index 124721dcf..49d6563e6 100644 --- a/Modules/Examples/Protocol Benchmarks/args/client-1-3 +++ b/Modules/Examples/Protocol Benchmarks/args/client-1-3 @@ -1,5 +1,5 @@ # wait-for-res false -multiput key%n valueZ%n 200 -multiget key%n 200 +multiput key%n valueZ%n 1000 +multiget key%n 1000 # wait exit diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala index a2d56c54a..de2dce3a5 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala @@ -1,11 +1,14 @@ package probench +import probench.benchmark.{BenchmarkData, CSVWriter} import probench.data.* import rdts.base.{Bottom, LocalUid, Uid} import rdts.datatypes.contextual.CausalQueue import rdts.dotted.Dotted import rdts.syntax.DeltaBuffer +import java.nio.file.Path +import scala.collection.mutable import scala.io.StdIn import scala.io.StdIn.readLine import scala.util.matching.Regex @@ -15,24 +18,23 @@ class Client(val name: Uid) { given localUid: LocalUid = LocalUid(name) private val dataManager = ProDataManager[ClientNodeState](localUid, Bottom[ClientNodeState].empty, onStateChange) - private val lock = new Object() - private var currentOp: Option[Request] = None - private var waitForOp: Boolean = true - - private val commented: Regex = """#.*""".r - private val waitForRes: Regex = """wait-for-res (true|false)""".r - private val get: Regex = """get ([\w%]+)""".r - private val put: Regex = """put ([\w%]+) ([\w%]+)""".r - private val multiget: Regex = """multiget ([\w%]+) (\d+)""".r - private val multiput: Regex = """multiput ([\w%]+) ([\w%]+) (\d+)""".r - private val mp: Regex = """mp (\d+)""".r + private val lock = new Object() + private var currentOp: Option[Request] = None + private var waitForOp: Boolean = true + private var doBenchmark: Boolean = false + private val benchmarkData: mutable.ListBuffer[BenchmarkData] = mutable.ListBuffer.empty + + private val commented: Regex = """#.*""".r + private val waitForRes: Regex = """wait-for-res (true|false)""".r + private val get: Regex = """get ([\w%]+)""".r + private val put: Regex = """put ([\w%]+) ([\w%]+)""".r + private val multiget: Regex = """multiget ([\w%]+) ([\d_]+)""".r + private val multiput: Regex = """multiput ([\w%]+) ([\w%]+) ([\d_]+)""".r + private val mp: Regex = """mp ([\d_]+)""".r + private val benchmark: Regex = """benchmark""".r + private val saveBenchmark: Regex = """save-benchmark ([\w\\/.\-]+)""".r private def onStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = { - /* val diff = newState.responses.data.values.size - oldState.responses.data.values.size - if diff > 0 then { - println(s"Got $diff result(s): ${newState.responses.data.values.toList.reverseIterator.take(diff).toList.reverse.map(_.value)}") - } */ - for { op <- currentOp CausalQueue.QueueElement(res @ Response(req, _), _, _) <- newState.responses.data.values if req == op @@ -53,18 +55,35 @@ class Client(val name: Uid) { val req = Request(op) currentOp = Some(req) - // println(s"Put $req") + val start = if doBenchmark then System.nanoTime() else 0 dataManager.transform { current => current.mod(it => it.copy(requests = it.requests.mod(_.enqueue(req)))) } - // println(s"New Requests ${dataManager.mergedState.data.requests.data.values.toList.map(_.value)}") - if waitForOp then { lock.synchronized { lock.wait() } + + if doBenchmark then { + val end = System.nanoTime() + val opString = op match + case KVOperation.Read(_) => "get" + case KVOperation.Write(_, _) => "put" + val args = op match + case KVOperation.Read(key) => key + case KVOperation.Write(key, value) => s"$key $value" + benchmarkData.append(BenchmarkData( + name.delegate, + opString, + args, + start / 1000, + end / 1000, + (end - start).toDouble / 1000, + "µs" + )) + } } } @@ -97,20 +116,42 @@ class Client(val name: Uid) { case Some(commented()) => // ignore case Some(get(key)) => read(key) case Some(put(key, value)) => write(key, value) - case Some(multiget(key, times)) => multiget(key, times.toInt) - case Some(multiput(key, value, times)) => multiput(key, value, times.toInt) - case Some(waitForRes(flag)) => waitForOp = flag.toBoolean - case Some("wait") => lock.synchronized { lock.wait() } - case Some("ping") => dataManager.pingAll() - case Some("exit") => running = false - case Some(mp(times)) => multiput("key%n", "value%n", times.toInt) - case None => running = false - case other => + case Some(multiget(key, times)) => multiget(key, times.replace("_", "").toInt) + case Some(multiput(key, value, times)) => multiput(key, value, times.replace("_", "").toInt) + case Some(mp(times)) => multiput("key%n", "value%n", times.replace("_", "").toInt) + case Some(waitForRes(flag)) => + if doBenchmark then println("Can't change waiting mode while benchmarking!") + else waitForOp = flag.toBoolean + case Some(benchmark()) => + doBenchmark = true + waitForOp = true + case Some(saveBenchmark(path)) => + println(path) + val benchmarkPath = Path.of(path) + val runId = Uid.gen().delegate + val writer = new CSVWriter(";", benchmarkPath, s"${name.delegate}-$runId", BenchmarkData.header) + benchmarkData.foreach { row => + writer.writeRow( + s"${row.name}-$runId", + row.op, + row.args, + row.sendTime.toString, + row.receiveTime.toString, + row.latency.toString, + row.unit + ) + } + writer.close() + case Some("ping") => dataManager.pingAll() + case Some("wait") => lock.synchronized { lock.wait() } + case None | Some("exit") => running = false + case _ => println("assuming put") write("key", "value") } } println(s"ended") + System.exit(1) } export dataManager.addLatentConnection diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/benchmark/BenchmarkData.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/benchmark/BenchmarkData.scala new file mode 100644 index 000000000..2849cd283 --- /dev/null +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/benchmark/BenchmarkData.scala @@ -0,0 +1,15 @@ +package probench.benchmark + +case class BenchmarkData( + name: String, + op: String, + args: String, + sendTime: Long, + receiveTime: Long, + latency: Double, + unit: String +) + +object BenchmarkData { + val header: Seq[String] = Seq("name", "op", "args", "send-time", "receive-time", "latency", "unit") +} diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/benchmark/CSVWriter.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/benchmark/CSVWriter.scala new file mode 100644 index 000000000..13d58d902 --- /dev/null +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/benchmark/CSVWriter.scala @@ -0,0 +1,30 @@ +package probench.benchmark + +import java.io.{BufferedWriter, FileOutputStream, OutputStreamWriter} +import java.nio.file.Path + +class CSVWriter( + private val separator: String, + private val path: Path, + private val fileName: String, + private val header: Seq[String] +) { + path.toFile.mkdirs() + private val file = path.resolve(s"$fileName.csv").toFile + private val writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file))) + + writer.write(header.mkString(separator)) + writer.write("\n") + + def writeRow(data: String*): Unit = { + writer.write(data.mkString(separator)) + writer.write("\n") + } + + def close(): Unit = { + writer.flush() + println(s"Saving to ${path.toAbsolutePath}") + writer.close() + } + +} diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala index e60322afb..a95c1506f 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala @@ -121,7 +121,7 @@ object cli { val (ip, port) = clientNode.value - client.addLatentConnection(TCP.connect(socketPath(ip, port), ec)) + client.addLatentConnection(NioTCP.connect(socketPath(ip, port), ec, Abort())) client.startCLI() }.value diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/ProDataManager.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/ProDataManager.scala index 76795744c..93d293406 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/ProDataManager.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/ProDataManager.scala @@ -18,14 +18,15 @@ class ProDataManager[State: Lattice]( dataManager.allDeltas.foldLeft(ProtocolDots(initialState, Dots.empty))(Lattice[ProtocolDots[State]].merge) private def receivedChanges(changes: ProtocolDots[State]): Unit = { - val oldState = mergedState - dataManager.lock.synchronized { - mergedState = mergedState.merge(changes) + val (o, n) = synchronized { + val oldState = mergedState + mergedState = oldState.merge(changes) + (oldState, mergedState) } - onChange(oldState.data, mergedState.data) + onChange(o.data, n.data) } - def transform(fun: DeltaBuffer[State] => DeltaBuffer[State]): Unit = dataManager.lock.synchronized { + def transform(fun: DeltaBuffer[State] => DeltaBuffer[State]): Unit = { val current: DeltaBuffer[State] = DeltaBuffer(mergedState.data) val next: DeltaBuffer[State] = fun(current)