Skip to content

Commit

Permalink
Added RandomSprayRouter + minor adjusmtents
Browse files Browse the repository at this point in the history
  • Loading branch information
lh70 committed Aug 22, 2024
1 parent 41c19bb commit ff90775
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 29 deletions.
66 changes: 47 additions & 19 deletions Modules/DTN/jvm/src/main/scala/dtn/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ import scala.concurrent.Future
import routing.{BaseRouter, FloodingRouter, DirectRouter, EpidemicRouter, RdtRouter, RdtRouter2, SprayAndWaitRouter}
import rdt.{Client, ClientOperationMode}

import _root_.replication.DataManager
import rdts.base.LocalUid
import dtn.rdt.Channel
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
import com.github.plokhotnyuk.jsoniter_scala.macros.CodecMakerConfig
import dtn.routing.RandomSprayRouter

/*
this file contains all jvm main methods
*/
Expand All @@ -16,19 +24,19 @@ import rdt.{Client, ClientOperationMode}
if args.isEmpty || Set("-?", "-h", "--h", "help", "--help").contains(args(0)) || args.length % 2 != 0 then {
println("""
commandline options:
-m => method (mandatory) | available options: monitoring, routing, client, print.received, print.forwarded, print.statedev
-a => host address | default: 0.0.0.0 (for monitoring), 127.0.0.1 (everything else)
-p => host port | default: 5000 (for monitoring), 3000 (for everything else)
-rs => routing strategy | default: epidemic (options: direct, flooding, epidemic, rdt, rdt2, spray, binary)
-cr => client rdt selection | default: addwins.listen (options: addwins.listen, addwins.active, observeremove.listen, observeremove.active, lastwriterwins.listen, lastwriterwins.active)
-cm => client rdt operation mode | default: pushall (options: pushall, requestlater)
-ma => monitoring address | default: 127.0.0.1
-mp => monitoring port | default: 5000
-mid => monitoring creation client id | default: dtn://n2/rdt/testapp
-awa => add-wins rdt number of additions | default: 1000
-awt => add-wins rdt sleep time milliseconds | default: 500
-rrn => rdt-router n total nodes to deliver to | default: 10
-rrt => rdt-router top n nodes to forward to | default: 3
-m => method (mandatory) | available options: monitoring, routing, client, print.received, print.forwarded, print.statedev
-a => host address | default: 0.0.0.0 (for monitoring), 127.0.0.1 (everything else)
-p => host port | default: 5000 (for monitoring), 3000 (for everything else)
-rs => routing strategy | default: epidemic (options: direct, flooding, epidemic, rdt, rdt2, spray, random)
-cr => client rdt selection | default: addwins.listen (options: addwins.listen, addwins.active, observeremove.listen, observeremove.active, lastwriterwins.listen, lastwriterwins.active)
-cm => client rdt operation mode | default: pushall (options: pushall, requestlater)
-ma => monitoring address | default: 127.0.0.1
-mp => monitoring port | default: 5000
-mid => monitoring creation client id | default: dtn://n2/rdt/testapp
-awa => add-wins rdt number of additions | default: 1000
-awt => add-wins rdt sleep time milliseconds | default: 500
-rrn => rdt-router (and random-router) n total nodes to deliver to | default: 10
-rrt => rdt-router (and random-router) top n nodes to forward to | default: 3
""")
} else {
var keyword_args: Map[String, String] = Map()
Expand Down Expand Up @@ -76,8 +84,14 @@ commandline options:
RdtRouter2(host_address, host_port, MonitoringClient(monitoring_address, monitoring_port))
case "spray" =>
SprayAndWaitRouter(host_address, host_port, MonitoringClient(monitoring_address, monitoring_port))
case "binary" =>
throw Exception("binary spray and wait not implemented yet")
case "random" =>
RandomSprayRouter(
host_address,
host_port,
MonitoringClient(monitoring_address, monitoring_port),
rdt_router_n_total_nodes,
rdt_router_top_n_neighbours
)
case s =>
throw Exception(s"unknown routing strategy (-rs): ${s}")
)
Expand Down Expand Up @@ -192,11 +206,25 @@ def case_study_active(
}

@main def test(): Unit = {
WSEndpointClient("127.0.0.1", 3000).map(client => {
println(client.nodeId)
})
type RdtType = Set[String]

while true do {
given JsonValueCodec[RdtType] = JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true))

val dataManager: DataManager[RdtType] = DataManager[RdtType](
LocalUid.gen(),
state => println("replica received new state information"),
_ => ()
)

dataManager.addLatentConnection(Channel[RdtType](
"127.0.0.1",
3000,
"app1",
scala.concurrent.ExecutionContext.global
))

for i <- 0 to 10 do {
Thread.sleep(1000)
dataManager.applyUnrelatedDelta(Set(s"hello world ${i} from ${dataManager.replicaId}"))
}
}
2 changes: 1 addition & 1 deletion Modules/DTN/jvm/src/main/scala/dtn/Monitoring.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class MonitoringStateDevelopmentPrinter(creationClientId: String, paths: Monitor
for (clientId: String, dots: Dots) <- deliveredStates do {
val deliveredStateNum: Double = dots.size.toDouble
val ratio = deliveredStateNum / creationStateNum
println(s"${clientId} | ration: ${ratio}, num dots delivered: ${deliveredStateNum}")
println(s"${clientId} | ratio: ${ratio}, num dots delivered: ${deliveredStateNum}")
}
println(s"\nNum bundles created at other nodes: ${bundlesCreatedAtOtherNodesCounter}")
println(s"Num bundles delivered at creation node: ${bundlesDeliveredAtCreationCounter}")
Expand Down
121 changes: 121 additions & 0 deletions Modules/DTN/shared/src/main/scala/dtn/routing/RandomSprayRouter.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package dtn.routing

import dtn.{DtnPeer, Packet, Sender, WSEroutingClient, MonitoringClientInterface, NoMonitoringClient}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.jdk.CollectionConverters.*
import java.util.concurrent.ConcurrentHashMap
import java.time.ZonedDateTime
import dtn.Endpoint
import scala.util.Random
import dtn.PreviousNodeBlock

/*
The RandomSprayRouter implements the backup strategy only of the RdtRouter
*/

class RandomSprayRouter(
ws: WSEroutingClient,
monitoringClient: MonitoringClientInterface,
nTotalNodes: Int,
topNNeighbours: Int
) extends BaseRouter(ws: WSEroutingClient, monitoringClient: MonitoringClientInterface) {

// will grow indefinitely as we do not garbage collect here
val tempPreviousNodeStore: ConcurrentHashMap[String, Endpoint] = ConcurrentHashMap()
val delivered = ConcurrentHashMap[String, Set[String]]()

override def onRequestSenderForBundle(packet: Packet.RequestSenderForBundle)
: Option[Packet.ResponseSenderForBundle] = {
println(s"received sender-request for bundle: ${packet.bp}")

if delivered.getOrDefault(packet.bp.id, Set()).size >= nTotalNodes then {
println("bundle was forwarded to enough unique neighbours. deleting.")
tempPreviousNodeStore.remove(packet.bp.id)
return Option(Packet.ResponseSenderForBundle(bp = packet.bp, clas = List(), delete_afterwards = true))
}

val source_node: Endpoint = packet.bp.source.extract_node_endpoint()

var random_peers: Iterable[DtnPeer] = peers.values().asScala

// remove previous node and source node from peers if available
random_peers = tempPreviousNodeStore.get(packet.bp.id) match
case null => random_peers.filter(p => !p.eid.equals(source_node))
case previous_node: Endpoint =>
random_peers.filter(p => !p.eid.equals(previous_node) && !p.eid.equals(source_node))

println(s"filtered random peers available: ${List.from(random_peers).map(peer => peer.eid)}")

// use peer-info and available clas' to build a list of cla-connections to forward the bundle over
val selected_clas: List[Sender] = Random.shuffle(random_peers).take(topNNeighbours).flatMap(target => {
target.cla_list
.filter((agent, port_option) => packet.clas.contains(agent))
.map((agent, port_option) =>
Sender(remote = target.addr, port = port_option, agent = agent, next_hop = target.eid)
)
}).toList
println(s"selected clas: $selected_clas")

println(s"time: ${ZonedDateTime.now()}")

return Option(Packet.ResponseSenderForBundle(
bp = packet.bp,
clas = selected_clas,
delete_afterwards = false
))
}

override def onError(packet: Packet.Error): Unit = {
println(s"received error from dtnd: ${packet.reason}")
}

override def onTimeout(packet: Packet.Timeout): Unit = {
println(s"sending ran into timeout for bundle-forward-response ${packet.bp}")
}

override def onSendingFailed(packet: Packet.SendingFailed): Unit = {
println(s"sending failed for bundle ${packet.bid} on cla ${packet.cla_sender}")
}

override def onSendingSucceeded(packet: Packet.SendingSucceeded): Unit = {
println(s"sending succeeded for bundle ${packet.bid} on cla ${packet.cla_sender}.")
delivered.get(packet.bid) match {
case null =>
delivered.put(packet.bid, Set(packet.cla_sender))
()
case x: Set[String] =>
delivered.put(packet.bid, (x + packet.cla_sender))
()
}
}

override def onIncomingBundle(packet: Packet.IncomingBundle): Unit = {
println("received incoming bundle. extracting previous node info")
packet.bndl.other_blocks.collectFirst {
case x: PreviousNodeBlock => x
} match {
case None => println("received incoming bundle without previous node block. ignoring")
case Some(previous_node_block) => tempPreviousNodeStore.put(packet.bndl.id, previous_node_block.previous_node_id)
}
()
}

override def onIncomingBundleWithoutPreviousNode(packet: Packet.IncomingBundleWithoutPreviousNode): Unit = {
println("received incoming bundle without previous node. information not used for routing. ignoring.")
}
}
object RandomSprayRouter {
val N_TOTAL_NODES = 10
val TOP_N_NEIGHBOURS = 3

def apply(
host: String,
port: Int,
monitoringClient: MonitoringClientInterface = NoMonitoringClient,
nTotalNodes: Int = N_TOTAL_NODES,
topNNeighbours: Int = TOP_N_NEIGHBOURS,
): Future[RandomSprayRouter] =
WSEroutingClient(host, port).map(ws => new RandomSprayRouter(ws, monitoringClient, nTotalNodes, topNNeighbours))
}
32 changes: 23 additions & 9 deletions Modules/DTN/simulation/shared/run-dgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,32 @@

cutoff_after_x_steps = 120
wait_time_per_step_seconds = 5.0
janitor_interval_milliseconds = 1500
discovery_interval_milliseconds = 300
janitor_interval_milliseconds = 2500
discovery_interval_milliseconds = 500

rdt_variant = "addwins" # options: "addwins", "observeremove", "lastwriterwins"
clients = {
"n1": "listen",
"n2": "active",
"n3": "listen"
"n3": "listen",
"n4": "listen",
"n5": "listen",
"n6": "listen",
"n7": "listen",
"n8": "listen"
}

router_variant = "rdt" # options: "flooding", "epidemic", "spray", "binary", "rdt", "rdt2"

rdt_client_operation_mode = "pushall" # options: "pushall", "requestlater"

dtnd_cla = "udp"

# special configs
addwins_rdt_number_of_additions = 1000
addwins_rdt_sleep_time_milliseconds = 500
router_rdt_n_total_nodes = 10
router_rdt_top_n_neighbours = 3
addwins_rdt_number_of_additions = 2000
addwins_rdt_sleep_time_milliseconds = 250
router_rdt_n_total_nodes = 5
router_rdt_top_n_neighbours = 2

# WARNING
# this script is custom tailored for my simulation use case and other simulations might use parts of this script because it works,
Expand Down Expand Up @@ -127,6 +135,8 @@ def add_discovery_address(self, node_id, address):
def _substitute_config_file(self, node_id):
file_contents = self.configs[node_id]

#file_contents = re.sub(r'debug = false', 'debug = true', file_contents, 1)
file_contents = re.sub(r'cla.0.id = "mtcp"', f'cla.0.id = "{dtnd_cla}"', file_contents, 1)
file_contents = re.sub(r'strategy = "epidemic"', 'strategy = "external"', file_contents, 1)
file_contents = re.sub(r'interval = "2s"', f'interval = "{discovery_interval_milliseconds}ms"', file_contents, 1)
file_contents = re.sub(r'janitor = "10s"', f'janitor = "{janitor_interval_milliseconds}ms"', file_contents, 1)
Expand Down Expand Up @@ -335,7 +345,7 @@ def get_substituted_config_files(self):

link_name_map[link_name] = (node_min_id, node_max_id)

print(f"activated link {link_name} between {node1_name} and {node2_name}")
print(f"activated link {link_name} between {node_min_id} and {node_max_id}")
elif action == "de":
node_min_id, node_max_id = link_name_map[link_name]

Expand All @@ -350,7 +360,7 @@ def get_substituted_config_files(self):
options=LinkOptions(loss=100)
)

print(f"deactivated link {link_name} between {node1_name} and {node2_name}")
print(f"deactivated link {link_name} between {node_min_id} and {node_max_id}")
else:
raise Exception(f"unknown action '{line}'")

Expand All @@ -361,3 +371,7 @@ def get_substituted_config_files(self):

if len(dgs_lines) <= 0:
break

print("reached end of simulation, shutting down")
core.set_session_state(session_id, SessionState.SHUTDOWN)
print("done")

0 comments on commit ff90775

Please sign in to comment.