Skip to content

Commit

Permalink
back to tcp
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Nov 4, 2024
1 parent 3b272dd commit ebbdc76
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 15 deletions.
3 changes: 2 additions & 1 deletion Modules/Channels/jvm/src/main/scala/channels/NioTCP.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import de.rmgk.delay
import de.rmgk.delay.{Async, Callback}

import java.io.IOException
import java.net.{SocketAddress, SocketException, StandardProtocolFamily, UnixDomainSocketAddress}
import java.net.{SocketAddress, SocketException, StandardProtocolFamily, StandardSocketOptions, UnixDomainSocketAddress}
import java.nio.ByteBuffer
import java.nio.channels.{SelectionKey, Selector, ServerSocketChannel, SocketChannel}
import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -139,6 +139,7 @@ object NioTCP {
case other => StandardProtocolFamily.INET
val socket = ServerSocketChannel.open(pf)
socket.configureBlocking(false)

socket.bind(socketAddress)
socket
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package probench

import channels.{Abort, NioTCP, TCP, UDP}
import channels.{Abort, NioTCP, UDP}
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.{CodecMakerConfig, JsonCodecMaker}
import de.rmgk.options.*
Expand All @@ -9,8 +9,7 @@ import rdts.base.Uid
import rdts.datatypes.experiments.protocols.Membership
import rdts.datatypes.experiments.protocols.simplified.Paxos

import java.net.{DatagramSocket, InetSocketAddress, Socket, UnixDomainSocketAddress}
import java.nio.file.{Files, Path}
import java.net.{DatagramSocket, InetSocketAddress}
import java.util.Timer
import java.util.concurrent.{ExecutorService, Executors}
import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -52,11 +51,14 @@ object cli {
given JsonValueCodec[Membership[Request, Paxos, Paxos]] =
JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true))

def socketPath(name: String) = {
val p = Path.of(s"target/sockets/$name")
Files.createDirectories(p.getParent)
p.toFile.deleteOnExit()
UnixDomainSocketAddress.of(p)
def socketPath(host: String, port: Int) = {
// val p = Path.of(s"target/sockets/$name")
// Files.createDirectories(p.getParent)
// p.toFile.deleteOnExit()
// UnixDomainSocketAddress.of(p)

InetSocketAddress(host, port)

}

val argparse = argumentParser {
Expand All @@ -65,25 +67,30 @@ object cli {
inline def clientNode = named[(String, Int)]("--node", "<ip:port>")
inline def name = named[Uid]("--name", "", Uid.gen())


subcommand("node", "starts a cluster node") {
val node = Node(name.value, initialClusterIds.value.toSet)

node.addClientConnection(NioTCP.listen(
NioTCP.defaultSocket(
socketPath(clientPort.value.toString)
socketPath("localhost", clientPort.value)
),
ec
))
node.addClusterConnection(NioTCP.listen(NioTCP.defaultSocket(
socketPath(peerPort.value.toString)),
node.addClusterConnection(NioTCP.listen(
NioTCP.defaultSocket(
socketPath("localhost", peerPort.value)
),
ec
))

Timer().schedule(() => node.clusterDataManager.pingAll(), 1000, 1000)

cluster.value.foreach { (ip, port) =>
node.addClusterConnection(NioTCP.connect(socketPath(port.toString), ec, Abort()))
node.addClusterConnection(NioTCP.connect(
socketPath(ip, port),
ec,
Abort()
))
}
}.value

Expand All @@ -106,7 +113,10 @@ object cli {
val (ip, port) = clientNode.value

client.addLatentConnection(NioTCP.connect(
socketPath(s"$port"), ec, Abort()))
socketPath(ip, port),
ec,
Abort()
))

client.startCLI()
}.value
Expand Down

0 comments on commit ebbdc76

Please sign in to comment.