Skip to content

Commit

Permalink
experiment with unix domain sockets
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Nov 4, 2024
1 parent 2421042 commit 3b272dd
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 12 deletions.
17 changes: 8 additions & 9 deletions Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package channels

import channels.Abort
import de.rmgk.delay
import de.rmgk.delay.{Async, Callback}

import java.io.IOException
import java.net.{InetAddress, InetSocketAddress, ServerSocket, Socket, SocketAddress, SocketException, StandardProtocolFamily, UnixDomainSocketAddress}
import java.net.{SocketAddress, SocketException, StandardProtocolFamily, UnixDomainSocketAddress}
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}
import java.nio.file.Files
import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
Expand All @@ -27,9 +25,11 @@ object NioTCP {
buffer.put(bytes)
buffer.flip()

while buffer.hasRemaining() do {
val res = clientChannel.write(buffer)
()
NioTCP.synchronized {
while buffer.hasRemaining() do {
val res = clientChannel.write(buffer)
()
}
}
()

Expand Down Expand Up @@ -112,7 +112,7 @@ object NioTCP {
def socketChannel: SocketChannel = {
val pf = socketAddress match
case _: UnixDomainSocketAddress => StandardProtocolFamily.UNIX
case other => StandardProtocolFamily.INET
case other => StandardProtocolFamily.INET
val channel = SocketChannel.open(pf)
channel.connect(socketAddress)
channel.configureBlocking(false)
Expand All @@ -136,14 +136,13 @@ object NioTCP {
def defaultSocket(socketAddress: SocketAddress): () => ServerSocketChannel = () => {
val pf = socketAddress match
case _: UnixDomainSocketAddress => StandardProtocolFamily.UNIX
case other => StandardProtocolFamily.INET
case other => StandardProtocolFamily.INET
val socket = ServerSocketChannel.open(pf)
socket.configureBlocking(false)
socket.bind(socketAddress)
socket
}


def listen(
bindsocket: () => ServerSocketChannel,
executionContext: ExecutionContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class Client(val name: Uid) {
private val put: Regex = """put ([\w%]+) ([\w%]+)""".r
private val multiget: Regex = """multiget ([\w%]+) (\d+)""".r
private val multiput: Regex = """multiput ([\w%]+) ([\w%]+) (\d+)""".r
private val mp: Regex = """mp (\d+)""".r

private def onStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = {
/* val diff = newState.responses.data.values.size - oldState.responses.data.values.size
Expand Down Expand Up @@ -98,7 +99,7 @@ class Client(val name: Uid) {
case Some("wait") => lock.synchronized { lock.wait() }
case Some("ping") => dataManager.pingAll()
case Some("exit") => running = false
case Some("mp") => multiput("key%n", "value%n", 1000)
case Some(mp(times)) => multiput("key%n", "value%n", times.toInt)
case None => running = false
case other =>
println("assuming put")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,18 @@ object cli {
JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true))

def socketPath(name: String) = {
UnixDomainSocketAddress.of(name)
val p = Path.of(s"target/sockets/$name")
Files.createDirectories(p.getParent)
p.toFile.deleteOnExit()
UnixDomainSocketAddress.of(p)
}

val argparse = argumentParser {
inline def cluster = named[List[(String, Int)]]("--cluster", "")
inline def initialClusterIds = named[List[Uid]]("--initial-cluster-ids", "")
inline def clientNode = named[(String, Int)]("--node", "<ip:port>")
inline def name = named[Uid]("--name", "", Uid.gen())


subcommand("node", "starts a cluster node") {
val node = Node(name.value, initialClusterIds.value.toSet)
Expand Down

0 comments on commit 3b272dd

Please sign in to comment.