Skip to content

Commit

Permalink
optimize java io connection performance
Browse files Browse the repository at this point in the history
  • Loading branch information
Kiibou-chan committed Nov 5, 2024
1 parent c3979e8 commit 271c853
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 44 deletions.
14 changes: 12 additions & 2 deletions Modules/Channels/jvm-native/src/main/scala/channels/TCP.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@ object TCP {
executionContext: ExecutionContext
): JIOStreamConnection = {
println(s"handling new connection")
socket.setTcpNoDelay(true)
socket.setPerformancePreferences(1, 2, 0)
val conn = new JIOStreamConnection(socket.getInputStream, socket.getOutputStream, () => socket.close())
executionContext.execute: () =>
println(s"executing task")
conn.loopHandler(incoming)
conn
}

def connect(socketAddress: InetSocketAddress, executionContext: ExecutionContext): LatentConnection[MessageBuffer] =
connect(socketAddress.getHostName, socketAddress.getPort, executionContext: ExecutionContext)

def connect(host: String, port: Int, executionContext: ExecutionContext): LatentConnection[MessageBuffer] =
connect(() => new Socket(host, port), executionContext)

Expand All @@ -41,11 +46,16 @@ object TCP {
}
}

def defaultSocket(socketAddress: InetSocketAddress): () => ServerSocket =
defaultSocket(socketAddress.getHostName, socketAddress.getPort)

def defaultSocket(interface: String, port: Int): () => ServerSocket = () => {
val socket = new ServerSocket

try socket.setReuseAddress(true)
catch {
try {
socket.setReuseAddress(true)
socket.setPerformancePreferences(1, 2, 0)
} catch {
case _: SocketException =>
// some implementations may not allow SO_REUSEADDR to be set
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,15 @@ class Client(val name: Uid) {
}

private def multiget(key: String, times: Int): Unit = {
val start = System.nanoTime()
for i <- 1 to times do read(key.replace("%n", i.toString))
println(s"Did $times get queries in ${(System.nanoTime() - start) / 1_000_000}ms")
}

private def multiput(key: String, value: String, times: Int): Unit = {
val start = System.nanoTime()
for i <- 1 to times do write(key.replace("%n", i.toString), value.replace("%n", i.toString))
println(s"Did $times put queries in ${(System.nanoTime() - start) / 1_000_000}ms")
}

def startCLI(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@ import rdts.syntax.DeltaBuffer
import scala.util.chaining.scalaUtilChainingOps

object Time {
var current = System.nanoTime()
def report(name: String = "") = synchronized {
val last = current
current = System.nanoTime()
println(s"$name took ${(current - last).doubleValue / 1000_000}ms")

val logger = LogHack(false)

var current: Long = System.nanoTime()

def report(name: => String = ""): Unit = logger.info {
synchronized {
val last = current
current = System.nanoTime()
s"$name took ${(current - last).doubleValue / 1000_000}ms"
}
}
}

Expand All @@ -26,9 +32,9 @@ class Node(val name: Uid, val initialClusterIds: Set[Uid]) {
given localUid: LocalUid = LocalUid(name)
given LogHack = new LogHack(false)

val clientDataManager =
val clientDataManager: ProDataManager[ClientNodeState] =
ProDataManager[ClientNodeState](localUid, Bottom[ClientNodeState].empty, onClientStateChange)
val clusterDataManager =
val clusterDataManager: ProDataManager[ClusterState] =
ProDataManager[ClusterState](localUid, Membership.init(initialClusterIds), onClusterStateChange)

private def onClientStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = {
Expand All @@ -54,19 +60,19 @@ class Node(val name: Uid, val initialClusterIds: Set[Uid]) {
private def onClusterStateChange(oldState: ClusterState, newState: ClusterState): Unit = {

val start = System.nanoTime()
var last = start
val tid =
synchronized {
counter = counter + 1
counter
}
var last = start
val tid = synchronized {
counter += + 1
counter
}

Time.report(s"[${tid}] cluster changed")
Time.report(s"[$tid] cluster changed")

def timeStep(msg: String) =
def timeStep(msg: => String): Unit = Time.logger.info {
val current = last
last = System.nanoTime()
println(s"[$tid] $msg after ${(last - current).doubleValue / 1000_000}ms")
s"[$tid] $msg after ${(last - current).doubleValue / 1000_000}ms"
}

val delta = newState.upkeep()
val upkept: ClusterState = newState.merge(delta)
Expand Down Expand Up @@ -117,7 +123,7 @@ class Node(val name: Uid, val initialClusterIds: Set[Uid]) {
}

timeStep("done")
println(s"[${tid}] total ${(System.nanoTime() - start).doubleValue / 1000_000}ms")
Time.logger.info(s"[$tid] total ${(System.nanoTime() - start).doubleValue / 1000_000}ms")
}

export clientDataManager.addLatentConnection as addClientConnection
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package probench

import channels.{Abort, NioTCP, UDP}
import channels.{Abort, NioTCP, TCP, UDP}
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.{CodecMakerConfig, JsonCodecMaker}
import de.rmgk.options.*
Expand Down Expand Up @@ -70,31 +70,30 @@ object cli {
subcommand("node", "starts a cluster node") {
val node = Node(name.value, initialClusterIds.value.toSet)

node.addClientConnection(NioTCP.listen(
NioTCP.defaultSocket(
socketPath("localhost", clientPort.value)
),
ec
))
node.addClusterConnection(NioTCP.listen(
NioTCP.defaultSocket(
socketPath("localhost", peerPort.value)
),
ec
))
node.addClientConnection(TCP.listen(TCP.defaultSocket(socketPath("localhost", clientPort.value)), ec))
node.addClusterConnection(TCP.listen(TCP.defaultSocket(socketPath("localhost", peerPort.value)), ec))

Timer().schedule(() => node.clusterDataManager.pingAll(), 1000, 1000)

cluster.value.foreach { (ip, port) =>
node.addClusterConnection(NioTCP.connect(
socketPath(ip, port),
ec,
Abort()
))
node.addClusterConnection(TCP.connect(socketPath(ip, port), ec))
}
}.value

subcommand("udpnode", "starts a cluster node") {
subcommand("nio-node", "starts a cluster node") {
val node = Node(name.value, initialClusterIds.value.toSet)

node.addClientConnection(NioTCP.listen(NioTCP.defaultSocket(socketPath("localhost", clientPort.value)), ec))
node.addClusterConnection(NioTCP.listen(NioTCP.defaultSocket(socketPath("localhost", peerPort.value)), ec))

Timer().schedule(() => node.clusterDataManager.pingAll(), 1000, 1000)

cluster.value.foreach { (ip, port) =>
node.addClusterConnection(NioTCP.connect(socketPath(ip, port), ec, Abort()))
}
}.value

subcommand("udp-node", "starts a cluster node") {
val node = Node(name.value, initialClusterIds.value.toSet)

node.addClientConnection(UDP.listen(() => new DatagramSocket(clientPort.value), ec))
Expand All @@ -112,11 +111,17 @@ object cli {

val (ip, port) = clientNode.value

client.addLatentConnection(NioTCP.connect(
socketPath(ip, port),
ec,
Abort()
))
client.addLatentConnection(NioTCP.connect(socketPath(ip, port), ec, Abort()))

client.startCLI()
}.value

subcommand("nio-client", "starts a client to interact with a node") {
val client = Client(name.value)

val (ip, port) = clientNode.value

client.addLatentConnection(TCP.connect(socketPath(ip, port), ec))

client.startCLI()
}.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ class DataManager[State](
lock.synchronized {
connections = conn :: connections
}
conn.send(Ping(System.nanoTime())).run(debugCallbackAndRemoveCon(conn))
conn.send(Request(replicaId.uid, selfContext)).run(using ())(debugCallbackAndRemoveCon(conn))
case Failure(ex) =>
println(s"exception during connection activation")
Expand Down

0 comments on commit 271c853

Please sign in to comment.