From 3b272ddddeb004dfe83ecbffd1a85bcdfea2c798 Mon Sep 17 00:00:00 2001 From: ragnar Date: Mon, 4 Nov 2024 22:53:15 +0100 Subject: [PATCH] experiment with unix domain sockets --- .../jvm/src/main/scala/channels/NioTCP.scala | 17 ++++++++--------- .../src/main/scala/probench/Client.scala | 3 ++- .../src/main/scala/probench/cli.scala | 7 +++++-- 3 files changed, 15 insertions(+), 12 deletions(-) diff --git a/Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala b/Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala index 74c6d593e..d0ea53287 100644 --- a/Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala +++ b/Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala @@ -1,14 +1,12 @@ package channels -import channels.Abort import de.rmgk.delay import de.rmgk.delay.{Async, Callback} import java.io.IOException -import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket, SocketAddress, SocketException, StandardProtocolFamily, UnixDomainSocketAddress} +import java.net.{SocketAddress, SocketException, StandardProtocolFamily, UnixDomainSocketAddress} import java.nio.ByteBuffer import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel} -import java.nio.file.Files import scala.concurrent.ExecutionContext import scala.util.control.NonFatal import scala.util.{Failure, Success} @@ -27,9 +25,11 @@ object NioTCP { buffer.put(bytes) buffer.flip() - while buffer.hasRemaining() do { - val res = clientChannel.write(buffer) - () + NioTCP.synchronized { + while buffer.hasRemaining() do { + val res = clientChannel.write(buffer) + () + } } () @@ -112,7 +112,7 @@ object NioTCP { def socketChannel: SocketChannel = { val pf = socketAddress match case _: UnixDomainSocketAddress => StandardProtocolFamily.UNIX - case other => StandardProtocolFamily.INET + case other => StandardProtocolFamily.INET val channel = SocketChannel.open(pf) channel.connect(socketAddress) channel.configureBlocking(false) @@ -136,14 +136,13 @@ object NioTCP { def defaultSocket(socketAddress: SocketAddress): () => ServerSocketChannel = () => { val pf = socketAddress match case _: UnixDomainSocketAddress => StandardProtocolFamily.UNIX - case other => StandardProtocolFamily.INET + case other => StandardProtocolFamily.INET val socket = ServerSocketChannel.open(pf) socket.configureBlocking(false) socket.bind(socketAddress) socket } - def listen( bindsocket: () => ServerSocketChannel, executionContext: ExecutionContext diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala index 31bd2248d..30e2856d9 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala @@ -25,6 +25,7 @@ class Client(val name: Uid) { private val put: Regex = """put ([\w%]+) ([\w%]+)""".r private val multiget: Regex = """multiget ([\w%]+) (\d+)""".r private val multiput: Regex = """multiput ([\w%]+) ([\w%]+) (\d+)""".r + private val mp: Regex = """mp (\d+)""".r private def onStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = { /* val diff = newState.responses.data.values.size - oldState.responses.data.values.size @@ -98,7 +99,7 @@ class Client(val name: Uid) { case Some("wait") => lock.synchronized { lock.wait() } case Some("ping") => dataManager.pingAll() case Some("exit") => running = false - case Some("mp") => multiput("key%n", "value%n", 1000) + case Some(mp(times)) => multiput("key%n", "value%n", times.toInt) case None => running = false case other => println("assuming put") diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala index 9e6025b77..9dead0eb9 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala @@ -53,7 +53,10 @@ object cli { JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true)) def socketPath(name: String) = { - UnixDomainSocketAddress.of(name) + val p = Path.of(s"target/sockets/$name") + Files.createDirectories(p.getParent) + p.toFile.deleteOnExit() + UnixDomainSocketAddress.of(p) } val argparse = argumentParser { @@ -61,7 +64,7 @@ object cli { inline def initialClusterIds = named[List[Uid]]("--initial-cluster-ids", "") inline def clientNode = named[(String, Int)]("--node", "") inline def name = named[Uid]("--name", "", Uid.gen()) - + subcommand("node", "starts a cluster node") { val node = Node(name.value, initialClusterIds.value.toSet)