From 271c8530d86cd157890c112d1248e09cea99626b Mon Sep 17 00:00:00 2001 From: Svenja Kernig Date: Tue, 5 Nov 2024 01:52:12 +0100 Subject: [PATCH] optimize java io connection performance --- .../src/main/scala/channels/TCP.scala | 14 ++++- .../src/main/scala/probench/Client.scala | 4 ++ .../src/main/scala/probench/Node.scala | 40 ++++++++------ .../src/main/scala/probench/cli.scala | 53 ++++++++++--------- .../main/scala/replication/DataManager.scala | 1 - 5 files changed, 68 insertions(+), 44 deletions(-) diff --git a/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala b/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala index 9748c33b2..4e2dd881e 100644 --- a/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala +++ b/Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala @@ -22,6 +22,8 @@ object TCP { executionContext: ExecutionContext ): JIOStreamConnection = { println(s"handling new connection") + socket.setTcpNoDelay(true) + socket.setPerformancePreferences(1, 2, 0) val conn = new JIOStreamConnection(socket.getInputStream, socket.getOutputStream, () => socket.close()) executionContext.execute: () => println(s"executing task") @@ -29,6 +31,9 @@ object TCP { conn } + def connect(socketAddress: InetSocketAddress, executionContext: ExecutionContext): LatentConnection[MessageBuffer] = + connect(socketAddress.getHostName, socketAddress.getPort, executionContext: ExecutionContext) + def connect(host: String, port: Int, executionContext: ExecutionContext): LatentConnection[MessageBuffer] = connect(() => new Socket(host, port), executionContext) @@ -41,11 +46,16 @@ object TCP { } } + def defaultSocket(socketAddress: InetSocketAddress): () => ServerSocket = + defaultSocket(socketAddress.getHostName, socketAddress.getPort) + def defaultSocket(interface: String, port: Int): () => ServerSocket = () => { val socket = new ServerSocket - try socket.setReuseAddress(true) - catch { + try { + socket.setReuseAddress(true) + socket.setPerformancePreferences(1, 2, 0) + } catch { case _: SocketException => // some implementations may not allow SO_REUSEADDR to be set } diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala index 30e2856d9..a2d56c54a 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala @@ -77,11 +77,15 @@ class Client(val name: Uid) { } private def multiget(key: String, times: Int): Unit = { + val start = System.nanoTime() for i <- 1 to times do read(key.replace("%n", i.toString)) + println(s"Did $times get queries in ${(System.nanoTime() - start) / 1_000_000}ms") } private def multiput(key: String, value: String, times: Int): Unit = { + val start = System.nanoTime() for i <- 1 to times do write(key.replace("%n", i.toString), value.replace("%n", i.toString)) + println(s"Did $times put queries in ${(System.nanoTime() - start) / 1_000_000}ms") } def startCLI(): Unit = { diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Node.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Node.scala index 12f85e3a3..586981ac8 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Node.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Node.scala @@ -11,11 +11,17 @@ import rdts.syntax.DeltaBuffer import scala.util.chaining.scalaUtilChainingOps object Time { - var current = System.nanoTime() - def report(name: String = "") = synchronized { - val last = current - current = System.nanoTime() - println(s"$name took ${(current - last).doubleValue / 1000_000}ms") + + val logger = LogHack(false) + + var current: Long = System.nanoTime() + + def report(name: => String = ""): Unit = logger.info { + synchronized { + val last = current + current = System.nanoTime() + s"$name took ${(current - last).doubleValue / 1000_000}ms" + } } } @@ -26,9 +32,9 @@ class Node(val name: Uid, val initialClusterIds: Set[Uid]) { given localUid: LocalUid = LocalUid(name) given LogHack = new LogHack(false) - val clientDataManager = + val clientDataManager: ProDataManager[ClientNodeState] = ProDataManager[ClientNodeState](localUid, Bottom[ClientNodeState].empty, onClientStateChange) - val clusterDataManager = + val clusterDataManager: ProDataManager[ClusterState] = ProDataManager[ClusterState](localUid, Membership.init(initialClusterIds), onClusterStateChange) private def onClientStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = { @@ -54,19 +60,19 @@ class Node(val name: Uid, val initialClusterIds: Set[Uid]) { private def onClusterStateChange(oldState: ClusterState, newState: ClusterState): Unit = { val start = System.nanoTime() - var last = start - val tid = - synchronized { - counter = counter + 1 - counter - } + var last = start + val tid = synchronized { + counter += + 1 + counter + } - Time.report(s"[${tid}] cluster changed") + Time.report(s"[$tid] cluster changed") - def timeStep(msg: String) = + def timeStep(msg: => String): Unit = Time.logger.info { val current = last last = System.nanoTime() - println(s"[$tid] $msg after ${(last - current).doubleValue / 1000_000}ms") + s"[$tid] $msg after ${(last - current).doubleValue / 1000_000}ms" + } val delta = newState.upkeep() val upkept: ClusterState = newState.merge(delta) @@ -117,7 +123,7 @@ class Node(val name: Uid, val initialClusterIds: Set[Uid]) { } timeStep("done") - println(s"[${tid}] total ${(System.nanoTime() - start).doubleValue / 1000_000}ms") + Time.logger.info(s"[$tid] total ${(System.nanoTime() - start).doubleValue / 1000_000}ms") } export clientDataManager.addLatentConnection as addClientConnection diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala index a890de91c..201a97dff 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala @@ -1,6 +1,6 @@ package probench -import channels.{Abort, NioTCP, UDP} +import channels.{Abort, NioTCP, TCP, UDP} import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec import com.github.plokhotnyuk.jsoniter_scala.macros.{CodecMakerConfig, JsonCodecMaker} import de.rmgk.options.* @@ -70,31 +70,30 @@ object cli { subcommand("node", "starts a cluster node") { val node = Node(name.value, initialClusterIds.value.toSet) - node.addClientConnection(NioTCP.listen( - NioTCP.defaultSocket( - socketPath("localhost", clientPort.value) - ), - ec - )) - node.addClusterConnection(NioTCP.listen( - NioTCP.defaultSocket( - socketPath("localhost", peerPort.value) - ), - ec - )) + node.addClientConnection(TCP.listen(TCP.defaultSocket(socketPath("localhost", clientPort.value)), ec)) + node.addClusterConnection(TCP.listen(TCP.defaultSocket(socketPath("localhost", peerPort.value)), ec)) Timer().schedule(() => node.clusterDataManager.pingAll(), 1000, 1000) cluster.value.foreach { (ip, port) => - node.addClusterConnection(NioTCP.connect( - socketPath(ip, port), - ec, - Abort() - )) + node.addClusterConnection(TCP.connect(socketPath(ip, port), ec)) } }.value - subcommand("udpnode", "starts a cluster node") { + subcommand("nio-node", "starts a cluster node") { + val node = Node(name.value, initialClusterIds.value.toSet) + + node.addClientConnection(NioTCP.listen(NioTCP.defaultSocket(socketPath("localhost", clientPort.value)), ec)) + node.addClusterConnection(NioTCP.listen(NioTCP.defaultSocket(socketPath("localhost", peerPort.value)), ec)) + + Timer().schedule(() => node.clusterDataManager.pingAll(), 1000, 1000) + + cluster.value.foreach { (ip, port) => + node.addClusterConnection(NioTCP.connect(socketPath(ip, port), ec, Abort())) + } + }.value + + subcommand("udp-node", "starts a cluster node") { val node = Node(name.value, initialClusterIds.value.toSet) node.addClientConnection(UDP.listen(() => new DatagramSocket(clientPort.value), ec)) @@ -112,11 +111,17 @@ object cli { val (ip, port) = clientNode.value - client.addLatentConnection(NioTCP.connect( - socketPath(ip, port), - ec, - Abort() - )) + client.addLatentConnection(NioTCP.connect(socketPath(ip, port), ec, Abort())) + + client.startCLI() + }.value + + subcommand("nio-client", "starts a client to interact with a node") { + val client = Client(name.value) + + val (ip, port) = clientNode.value + + client.addLatentConnection(TCP.connect(socketPath(ip, port), ec)) client.startCLI() }.value diff --git a/Modules/Replication/shared/src/main/scala/replication/DataManager.scala b/Modules/Replication/shared/src/main/scala/replication/DataManager.scala index 596571b6d..e314c50e3 100644 --- a/Modules/Replication/shared/src/main/scala/replication/DataManager.scala +++ b/Modules/Replication/shared/src/main/scala/replication/DataManager.scala @@ -102,7 +102,6 @@ class DataManager[State]( lock.synchronized { connections = conn :: connections } - conn.send(Ping(System.nanoTime())).run(debugCallbackAndRemoveCon(conn)) conn.send(Request(replicaId.uid, selfContext)).run(using ())(debugCallbackAndRemoveCon(conn)) case Failure(ex) => println(s"exception during connection activation")