From ff90775b77de139e8c0c42065c514a2164031a90 Mon Sep 17 00:00:00 2001 From: Lukas Holst Date: Thu, 22 Aug 2024 14:08:53 -0400 Subject: [PATCH] Added RandomSprayRouter + minor adjusmtents --- Modules/DTN/jvm/src/main/scala/dtn/Main.scala | 66 +++++++--- .../jvm/src/main/scala/dtn/Monitoring.scala | 2 +- .../scala/dtn/routing/RandomSprayRouter.scala | 121 ++++++++++++++++++ Modules/DTN/simulation/shared/run-dgs.py | 32 +++-- 4 files changed, 192 insertions(+), 29 deletions(-) create mode 100644 Modules/DTN/shared/src/main/scala/dtn/routing/RandomSprayRouter.scala diff --git a/Modules/DTN/jvm/src/main/scala/dtn/Main.scala b/Modules/DTN/jvm/src/main/scala/dtn/Main.scala index 656e33e00..5e3a7d7de 100644 --- a/Modules/DTN/jvm/src/main/scala/dtn/Main.scala +++ b/Modules/DTN/jvm/src/main/scala/dtn/Main.scala @@ -8,6 +8,14 @@ import scala.concurrent.Future import routing.{BaseRouter, FloodingRouter, DirectRouter, EpidemicRouter, RdtRouter, RdtRouter2, SprayAndWaitRouter} import rdt.{Client, ClientOperationMode} +import _root_.replication.DataManager +import rdts.base.LocalUid +import dtn.rdt.Channel +import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec +import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker +import com.github.plokhotnyuk.jsoniter_scala.macros.CodecMakerConfig +import dtn.routing.RandomSprayRouter + /* this file contains all jvm main methods */ @@ -16,19 +24,19 @@ import rdt.{Client, ClientOperationMode} if args.isEmpty || Set("-?", "-h", "--h", "help", "--help").contains(args(0)) || args.length % 2 != 0 then { println(""" commandline options: - -m => method (mandatory) | available options: monitoring, routing, client, print.received, print.forwarded, print.statedev - -a => host address | default: 0.0.0.0 (for monitoring), 127.0.0.1 (everything else) - -p => host port | default: 5000 (for monitoring), 3000 (for everything else) - -rs => routing strategy | default: epidemic (options: direct, flooding, epidemic, rdt, rdt2, spray, binary) - -cr => client rdt selection | default: addwins.listen (options: addwins.listen, addwins.active, observeremove.listen, observeremove.active, lastwriterwins.listen, lastwriterwins.active) - -cm => client rdt operation mode | default: pushall (options: pushall, requestlater) - -ma => monitoring address | default: 127.0.0.1 - -mp => monitoring port | default: 5000 - -mid => monitoring creation client id | default: dtn://n2/rdt/testapp - -awa => add-wins rdt number of additions | default: 1000 - -awt => add-wins rdt sleep time milliseconds | default: 500 - -rrn => rdt-router n total nodes to deliver to | default: 10 - -rrt => rdt-router top n nodes to forward to | default: 3 + -m => method (mandatory) | available options: monitoring, routing, client, print.received, print.forwarded, print.statedev + -a => host address | default: 0.0.0.0 (for monitoring), 127.0.0.1 (everything else) + -p => host port | default: 5000 (for monitoring), 3000 (for everything else) + -rs => routing strategy | default: epidemic (options: direct, flooding, epidemic, rdt, rdt2, spray, random) + -cr => client rdt selection | default: addwins.listen (options: addwins.listen, addwins.active, observeremove.listen, observeremove.active, lastwriterwins.listen, lastwriterwins.active) + -cm => client rdt operation mode | default: pushall (options: pushall, requestlater) + -ma => monitoring address | default: 127.0.0.1 + -mp => monitoring port | default: 5000 + -mid => monitoring creation client id | default: dtn://n2/rdt/testapp + -awa => add-wins rdt number of additions | default: 1000 + -awt => add-wins rdt sleep time milliseconds | default: 500 + -rrn => rdt-router (and random-router) n total nodes to deliver to | default: 10 + -rrt => rdt-router (and random-router) top n nodes to forward to | default: 3 """) } else { var keyword_args: Map[String, String] = Map() @@ -76,8 +84,14 @@ commandline options: RdtRouter2(host_address, host_port, MonitoringClient(monitoring_address, monitoring_port)) case "spray" => SprayAndWaitRouter(host_address, host_port, MonitoringClient(monitoring_address, monitoring_port)) - case "binary" => - throw Exception("binary spray and wait not implemented yet") + case "random" => + RandomSprayRouter( + host_address, + host_port, + MonitoringClient(monitoring_address, monitoring_port), + rdt_router_n_total_nodes, + rdt_router_top_n_neighbours + ) case s => throw Exception(s"unknown routing strategy (-rs): ${s}") ) @@ -192,11 +206,25 @@ def case_study_active( } @main def test(): Unit = { - WSEndpointClient("127.0.0.1", 3000).map(client => { - println(client.nodeId) - }) + type RdtType = Set[String] - while true do { + given JsonValueCodec[RdtType] = JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true)) + + val dataManager: DataManager[RdtType] = DataManager[RdtType]( + LocalUid.gen(), + state => println("replica received new state information"), + _ => () + ) + + dataManager.addLatentConnection(Channel[RdtType]( + "127.0.0.1", + 3000, + "app1", + scala.concurrent.ExecutionContext.global + )) + + for i <- 0 to 10 do { Thread.sleep(1000) + dataManager.applyUnrelatedDelta(Set(s"hello world ${i} from ${dataManager.replicaId}")) } } diff --git a/Modules/DTN/jvm/src/main/scala/dtn/Monitoring.scala b/Modules/DTN/jvm/src/main/scala/dtn/Monitoring.scala index d6276aad7..f3419f97f 100644 --- a/Modules/DTN/jvm/src/main/scala/dtn/Monitoring.scala +++ b/Modules/DTN/jvm/src/main/scala/dtn/Monitoring.scala @@ -210,7 +210,7 @@ class MonitoringStateDevelopmentPrinter(creationClientId: String, paths: Monitor for (clientId: String, dots: Dots) <- deliveredStates do { val deliveredStateNum: Double = dots.size.toDouble val ratio = deliveredStateNum / creationStateNum - println(s"${clientId} | ration: ${ratio}, num dots delivered: ${deliveredStateNum}") + println(s"${clientId} | ratio: ${ratio}, num dots delivered: ${deliveredStateNum}") } println(s"\nNum bundles created at other nodes: ${bundlesCreatedAtOtherNodesCounter}") println(s"Num bundles delivered at creation node: ${bundlesDeliveredAtCreationCounter}") diff --git a/Modules/DTN/shared/src/main/scala/dtn/routing/RandomSprayRouter.scala b/Modules/DTN/shared/src/main/scala/dtn/routing/RandomSprayRouter.scala new file mode 100644 index 000000000..5af34e888 --- /dev/null +++ b/Modules/DTN/shared/src/main/scala/dtn/routing/RandomSprayRouter.scala @@ -0,0 +1,121 @@ +package dtn.routing + +import dtn.{DtnPeer, Packet, Sender, WSEroutingClient, MonitoringClientInterface, NoMonitoringClient} + +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.Future +import scala.jdk.CollectionConverters.* +import java.util.concurrent.ConcurrentHashMap +import java.time.ZonedDateTime +import dtn.Endpoint +import scala.util.Random +import dtn.PreviousNodeBlock + +/* + The RandomSprayRouter implements the backup strategy only of the RdtRouter + */ + +class RandomSprayRouter( + ws: WSEroutingClient, + monitoringClient: MonitoringClientInterface, + nTotalNodes: Int, + topNNeighbours: Int +) extends BaseRouter(ws: WSEroutingClient, monitoringClient: MonitoringClientInterface) { + + // will grow indefinitely as we do not garbage collect here + val tempPreviousNodeStore: ConcurrentHashMap[String, Endpoint] = ConcurrentHashMap() + val delivered = ConcurrentHashMap[String, Set[String]]() + + override def onRequestSenderForBundle(packet: Packet.RequestSenderForBundle) + : Option[Packet.ResponseSenderForBundle] = { + println(s"received sender-request for bundle: ${packet.bp}") + + if delivered.getOrDefault(packet.bp.id, Set()).size >= nTotalNodes then { + println("bundle was forwarded to enough unique neighbours. deleting.") + tempPreviousNodeStore.remove(packet.bp.id) + return Option(Packet.ResponseSenderForBundle(bp = packet.bp, clas = List(), delete_afterwards = true)) + } + + val source_node: Endpoint = packet.bp.source.extract_node_endpoint() + + var random_peers: Iterable[DtnPeer] = peers.values().asScala + + // remove previous node and source node from peers if available + random_peers = tempPreviousNodeStore.get(packet.bp.id) match + case null => random_peers.filter(p => !p.eid.equals(source_node)) + case previous_node: Endpoint => + random_peers.filter(p => !p.eid.equals(previous_node) && !p.eid.equals(source_node)) + + println(s"filtered random peers available: ${List.from(random_peers).map(peer => peer.eid)}") + + // use peer-info and available clas' to build a list of cla-connections to forward the bundle over + val selected_clas: List[Sender] = Random.shuffle(random_peers).take(topNNeighbours).flatMap(target => { + target.cla_list + .filter((agent, port_option) => packet.clas.contains(agent)) + .map((agent, port_option) => + Sender(remote = target.addr, port = port_option, agent = agent, next_hop = target.eid) + ) + }).toList + println(s"selected clas: $selected_clas") + + println(s"time: ${ZonedDateTime.now()}") + + return Option(Packet.ResponseSenderForBundle( + bp = packet.bp, + clas = selected_clas, + delete_afterwards = false + )) + } + + override def onError(packet: Packet.Error): Unit = { + println(s"received error from dtnd: ${packet.reason}") + } + + override def onTimeout(packet: Packet.Timeout): Unit = { + println(s"sending ran into timeout for bundle-forward-response ${packet.bp}") + } + + override def onSendingFailed(packet: Packet.SendingFailed): Unit = { + println(s"sending failed for bundle ${packet.bid} on cla ${packet.cla_sender}") + } + + override def onSendingSucceeded(packet: Packet.SendingSucceeded): Unit = { + println(s"sending succeeded for bundle ${packet.bid} on cla ${packet.cla_sender}.") + delivered.get(packet.bid) match { + case null => + delivered.put(packet.bid, Set(packet.cla_sender)) + () + case x: Set[String] => + delivered.put(packet.bid, (x + packet.cla_sender)) + () + } + } + + override def onIncomingBundle(packet: Packet.IncomingBundle): Unit = { + println("received incoming bundle. extracting previous node info") + packet.bndl.other_blocks.collectFirst { + case x: PreviousNodeBlock => x + } match { + case None => println("received incoming bundle without previous node block. ignoring") + case Some(previous_node_block) => tempPreviousNodeStore.put(packet.bndl.id, previous_node_block.previous_node_id) + } + () + } + + override def onIncomingBundleWithoutPreviousNode(packet: Packet.IncomingBundleWithoutPreviousNode): Unit = { + println("received incoming bundle without previous node. information not used for routing. ignoring.") + } +} +object RandomSprayRouter { + val N_TOTAL_NODES = 10 + val TOP_N_NEIGHBOURS = 3 + + def apply( + host: String, + port: Int, + monitoringClient: MonitoringClientInterface = NoMonitoringClient, + nTotalNodes: Int = N_TOTAL_NODES, + topNNeighbours: Int = TOP_N_NEIGHBOURS, + ): Future[RandomSprayRouter] = + WSEroutingClient(host, port).map(ws => new RandomSprayRouter(ws, monitoringClient, nTotalNodes, topNNeighbours)) +} diff --git a/Modules/DTN/simulation/shared/run-dgs.py b/Modules/DTN/simulation/shared/run-dgs.py index 23321859b..178d099b4 100644 --- a/Modules/DTN/simulation/shared/run-dgs.py +++ b/Modules/DTN/simulation/shared/run-dgs.py @@ -14,24 +14,32 @@ cutoff_after_x_steps = 120 wait_time_per_step_seconds = 5.0 -janitor_interval_milliseconds = 1500 -discovery_interval_milliseconds = 300 +janitor_interval_milliseconds = 2500 +discovery_interval_milliseconds = 500 rdt_variant = "addwins" # options: "addwins", "observeremove", "lastwriterwins" clients = { + "n1": "listen", "n2": "active", - "n3": "listen" + "n3": "listen", + "n4": "listen", + "n5": "listen", + "n6": "listen", + "n7": "listen", + "n8": "listen" } router_variant = "rdt" # options: "flooding", "epidemic", "spray", "binary", "rdt", "rdt2" rdt_client_operation_mode = "pushall" # options: "pushall", "requestlater" +dtnd_cla = "udp" + # special configs -addwins_rdt_number_of_additions = 1000 -addwins_rdt_sleep_time_milliseconds = 500 -router_rdt_n_total_nodes = 10 -router_rdt_top_n_neighbours = 3 +addwins_rdt_number_of_additions = 2000 +addwins_rdt_sleep_time_milliseconds = 250 +router_rdt_n_total_nodes = 5 +router_rdt_top_n_neighbours = 2 # WARNING # this script is custom tailored for my simulation use case and other simulations might use parts of this script because it works, @@ -127,6 +135,8 @@ def add_discovery_address(self, node_id, address): def _substitute_config_file(self, node_id): file_contents = self.configs[node_id] + #file_contents = re.sub(r'debug = false', 'debug = true', file_contents, 1) + file_contents = re.sub(r'cla.0.id = "mtcp"', f'cla.0.id = "{dtnd_cla}"', file_contents, 1) file_contents = re.sub(r'strategy = "epidemic"', 'strategy = "external"', file_contents, 1) file_contents = re.sub(r'interval = "2s"', f'interval = "{discovery_interval_milliseconds}ms"', file_contents, 1) file_contents = re.sub(r'janitor = "10s"', f'janitor = "{janitor_interval_milliseconds}ms"', file_contents, 1) @@ -335,7 +345,7 @@ def get_substituted_config_files(self): link_name_map[link_name] = (node_min_id, node_max_id) - print(f"activated link {link_name} between {node1_name} and {node2_name}") + print(f"activated link {link_name} between {node_min_id} and {node_max_id}") elif action == "de": node_min_id, node_max_id = link_name_map[link_name] @@ -350,7 +360,7 @@ def get_substituted_config_files(self): options=LinkOptions(loss=100) ) - print(f"deactivated link {link_name} between {node1_name} and {node2_name}") + print(f"deactivated link {link_name} between {node_min_id} and {node_max_id}") else: raise Exception(f"unknown action '{line}'") @@ -361,3 +371,7 @@ def get_substituted_config_files(self): if len(dgs_lines) <= 0: break + +print("reached end of simulation, shutting down") +core.set_session_state(session_id, SessionState.SHUTDOWN) +print("done")