From e3784ef1b5b9ee8df8f7fa70480150a7c3c82325 Mon Sep 17 00:00:00 2001 From: Lukas Holst Date: Wed, 21 Aug 2024 09:26:14 -0400 Subject: [PATCH] Added rdt client operation modes and made them configurable --- Modules/DTN/jvm/src/main/scala/dtn/Main.scala | 19 +++++++++++--- .../src/main/scala/dtn/RdtCreation.scala | 18 ++++++++++--- .../src/main/scala/dtn/rdt/Channel.scala | 26 ++++++++++++++----- 3 files changed, 50 insertions(+), 13 deletions(-) diff --git a/Modules/DTN/jvm/src/main/scala/dtn/Main.scala b/Modules/DTN/jvm/src/main/scala/dtn/Main.scala index 45bf104cd..3667b1806 100644 --- a/Modules/DTN/jvm/src/main/scala/dtn/Main.scala +++ b/Modules/DTN/jvm/src/main/scala/dtn/Main.scala @@ -6,7 +6,7 @@ import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future import routing.{BaseRouter, FloodingRouter, DirectRouter, EpidemicRouter, RdtRouter, RdtRouter2, SprayAndWaitRouter} -import rdt.Client +import rdt.{Client, ClientOperationMode} /* this file contains all jvm main methods @@ -21,6 +21,7 @@ commandline options: -p => host port | default: 5000 (for monitoring), 3000 (for everything else) -rs => routing strategy | default: epidemic (options: direct, flooding, epidemic, rdt, rdt2, spray, binary) -cr => client rdt selection | default: addwins.listen (options: addwins.listen, addwins.active, observeremove.listen, observeremove.active, lastwriterwins.listen, lastwriterwins.active) + -cm => client rdt operation mode | default: pushall (options: pushall, requestlater) -ma => monitoring address | default: 127.0.0.1 -mp => monitoring port | default: 5000 -mid => monitoring creation client id | default: dtn://n2/rdt/testapp @@ -46,6 +47,7 @@ commandline options: val add_wins_rdt_sleep_time_milliseconds: Long = keyword_args.getOrElse("-awt", "500").toLong val routing_strategy: String = keyword_args.getOrElse("-rs", "epidemic") val client_rdt: String = keyword_args.getOrElse("-cr", "addwins.listen") + val client_operation_mode = keyword_args.getOrElse("-cm", "pushall") method match case "monitoring" => start_monitoring_server(host_address, host_port) @@ -71,12 +73,18 @@ commandline options: ) } case "client" => { + val mode: ClientOperationMode = client_operation_mode match + case "pushall" => ClientOperationMode.PushAll + case "requestlater" => ClientOperationMode.RequestLater + case s => throw Exception(s"unknown rdt client operation mode: $s") + client_rdt match case "addwins.listen" => case_study_listen( host_address, host_port, MonitoringClient(monitoring_address, monitoring_port), + mode, AddWinsSetRDT(add_wins_rdt_number_of_additions, add_wins_rdt_sleep_time_milliseconds) ) case "addwins.active" => @@ -84,6 +92,7 @@ commandline options: host_address, host_port, MonitoringClient(monitoring_address, monitoring_port), + mode, AddWinsSetRDT(add_wins_rdt_number_of_additions, add_wins_rdt_sleep_time_milliseconds) ) case "observeremove.listen" => throw Exception("observeremove.listen not implemented yet") @@ -154,9 +163,10 @@ def case_study_listen( host: String, port: Int, monitoringClient: MonitoringClientInterface, - rdt: CaseStudyRdt + operationMode: ClientOperationMode, + rdt: CaseStudyRdt, ): Unit = { - rdt.connect(host, port, monitoringClient) + rdt.connect(host, port, monitoringClient, operationMode) rdt.caseStudyListen() } @@ -164,9 +174,10 @@ def case_study_active( host: String, port: Int, monitoringClient: MonitoringClientInterface, + operationMode: ClientOperationMode, rdt: CaseStudyRdt ): Unit = { - rdt.connect(host, port, monitoringClient) + rdt.connect(host, port, monitoringClient, operationMode) rdt.caseStudyActive() } diff --git a/Modules/DTN/shared/src/main/scala/dtn/RdtCreation.scala b/Modules/DTN/shared/src/main/scala/dtn/RdtCreation.scala index f6df6c57f..a18793317 100644 --- a/Modules/DTN/shared/src/main/scala/dtn/RdtCreation.scala +++ b/Modules/DTN/shared/src/main/scala/dtn/RdtCreation.scala @@ -7,9 +7,15 @@ import _root_.replication.JsoniterCodecs.given import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker import com.github.plokhotnyuk.jsoniter_scala.macros.CodecMakerConfig +import dtn.rdt.ClientOperationMode trait CaseStudyRdt { - def connect(host: String, port: Int, monitoringClient: MonitoringClientInterface): Unit + def connect( + host: String, + port: Int, + monitoringClient: MonitoringClientInterface, + operationMode: ClientOperationMode + ): Unit def caseStudyListen(): Unit def caseStudyActive(): Unit } @@ -25,13 +31,19 @@ class AddWinsSetRDT(number_of_additions: Int, sleep_time_milliseconds: Long) ext _ => () ) - def connect(host: String, port: Int, monitoringClient: MonitoringClientInterface): Unit = { + def connect( + host: String, + port: Int, + monitoringClient: MonitoringClientInterface, + operationMode: ClientOperationMode + ): Unit = { dataManager.addLatentConnection(Channel[RdtType]( host, port, "app1", scala.concurrent.ExecutionContext.global, - monitoringClient + monitoringClient, + operationMode )) } diff --git a/Modules/DTN/shared/src/main/scala/dtn/rdt/Channel.scala b/Modules/DTN/shared/src/main/scala/dtn/rdt/Channel.scala index 04bde7091..56d9cb2fd 100644 --- a/Modules/DTN/shared/src/main/scala/dtn/rdt/Channel.scala +++ b/Modules/DTN/shared/src/main/scala/dtn/rdt/Channel.scala @@ -13,14 +13,25 @@ import scala.concurrent.ExecutionContext import scala.util.{Failure, Success} import replication.ProtocolMessage -class ClientContext[T: JsonValueCodec](connection: Client, executionContext: ExecutionContext) - extends Connection[ProtocolMessage[T]] { +enum ClientOperationMode { + case PushAll + case RequestLater +} + +class ClientContext[T: JsonValueCodec]( + connection: Client, + executionContext: ExecutionContext, + operationMode: ClientOperationMode +) extends Connection[ProtocolMessage[T]] { override def send(message: ProtocolMessage[T]): Async[Any, Unit] = message match case ProtocolMessage.Request(sender, dots) => // we could send requests into the network. the routing handles them correctly. but they are unnecessary with the cb.succeed() down below. // todo: actually there should be no requests being sent anymore then. is that the case? - // connection.send(RdtMessageType.Request, Array(), dots).toAsync(using executionContext) + operationMode match + case ClientOperationMode.PushAll => Sync { () } + case ClientOperationMode.RequestLater => + connection.send(RdtMessageType.Request, Array(), dots).toAsync(using executionContext) Sync { () } case ProtocolMessage.Payload(sender, dots, data) => connection.send(RdtMessageType.Payload, writeToArray[T](data), dots).toAsync(using executionContext) @@ -36,7 +47,8 @@ class Channel[T: JsonValueCodec]( port: Int, appName: String, ec: ExecutionContext, - monitoringClient: MonitoringClientInterface = NoMonitoringClient + monitoringClient: MonitoringClientInterface = NoMonitoringClient, + operationMode: ClientOperationMode = ClientOperationMode.PushAll ) extends LatentConnection[ProtocolMessage[T]] { // We use a local dtnid instead of a remote replica ID to signify that the local DTNd is the one providing information. @@ -46,7 +58,7 @@ class Channel[T: JsonValueCodec]( override def prepare(incomingHandler: Handler[ProtocolMessage[T]]): Async[Abort, Connection[ProtocolMessage[T]]] = Async { val client: Client = Client(host, port, appName, monitoringClient).toAsync(using ec).bind - val conn = ClientContext[T](client, ec) + val conn = ClientContext[T](client, ec, operationMode) val cb = incomingHandler.getCallbackFor(conn) client.registerOnReceive { (message_type: RdtMessageType, payload: Array[Byte], dots: Dots) => @@ -57,7 +69,9 @@ class Channel[T: JsonValueCodec]( // This tells the rdt to send everything it has and new following stuff into the network. // It makes any requests unnecessary. - cb.succeed(ProtocolMessage.Request(dtnid, Dots.empty)) + operationMode match + case ClientOperationMode.PushAll => cb.succeed(ProtocolMessage.Request(dtnid, Dots.empty)) + case ClientOperationMode.RequestLater => {} conn }