Skip to content

Commit

Permalink
Added rdt client operation modes and made them configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
lh70 committed Aug 21, 2024
1 parent 4c275e0 commit e3784ef
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 13 deletions.
19 changes: 15 additions & 4 deletions Modules/DTN/jvm/src/main/scala/dtn/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future

import routing.{BaseRouter, FloodingRouter, DirectRouter, EpidemicRouter, RdtRouter, RdtRouter2, SprayAndWaitRouter}
import rdt.Client
import rdt.{Client, ClientOperationMode}

/*
this file contains all jvm main methods
Expand All @@ -21,6 +21,7 @@ commandline options:
-p => host port | default: 5000 (for monitoring), 3000 (for everything else)
-rs => routing strategy | default: epidemic (options: direct, flooding, epidemic, rdt, rdt2, spray, binary)
-cr => client rdt selection | default: addwins.listen (options: addwins.listen, addwins.active, observeremove.listen, observeremove.active, lastwriterwins.listen, lastwriterwins.active)
-cm => client rdt operation mode | default: pushall (options: pushall, requestlater)
-ma => monitoring address | default: 127.0.0.1
-mp => monitoring port | default: 5000
-mid => monitoring creation client id | default: dtn://n2/rdt/testapp
Expand All @@ -46,6 +47,7 @@ commandline options:
val add_wins_rdt_sleep_time_milliseconds: Long = keyword_args.getOrElse("-awt", "500").toLong
val routing_strategy: String = keyword_args.getOrElse("-rs", "epidemic")
val client_rdt: String = keyword_args.getOrElse("-cr", "addwins.listen")
val client_operation_mode = keyword_args.getOrElse("-cm", "pushall")

method match
case "monitoring" => start_monitoring_server(host_address, host_port)
Expand All @@ -71,19 +73,26 @@ commandline options:
)
}
case "client" => {
val mode: ClientOperationMode = client_operation_mode match
case "pushall" => ClientOperationMode.PushAll
case "requestlater" => ClientOperationMode.RequestLater
case s => throw Exception(s"unknown rdt client operation mode: $s")

client_rdt match
case "addwins.listen" =>
case_study_listen(
host_address,
host_port,
MonitoringClient(monitoring_address, monitoring_port),
mode,
AddWinsSetRDT(add_wins_rdt_number_of_additions, add_wins_rdt_sleep_time_milliseconds)
)
case "addwins.active" =>
case_study_active(
host_address,
host_port,
MonitoringClient(monitoring_address, monitoring_port),
mode,
AddWinsSetRDT(add_wins_rdt_number_of_additions, add_wins_rdt_sleep_time_milliseconds)
)
case "observeremove.listen" => throw Exception("observeremove.listen not implemented yet")
Expand Down Expand Up @@ -154,19 +163,21 @@ def case_study_listen(
host: String,
port: Int,
monitoringClient: MonitoringClientInterface,
rdt: CaseStudyRdt
operationMode: ClientOperationMode,
rdt: CaseStudyRdt,
): Unit = {
rdt.connect(host, port, monitoringClient)
rdt.connect(host, port, monitoringClient, operationMode)
rdt.caseStudyListen()
}

def case_study_active(
host: String,
port: Int,
monitoringClient: MonitoringClientInterface,
operationMode: ClientOperationMode,
rdt: CaseStudyRdt
): Unit = {
rdt.connect(host, port, monitoringClient)
rdt.connect(host, port, monitoringClient, operationMode)
rdt.caseStudyActive()
}

Expand Down
18 changes: 15 additions & 3 deletions Modules/DTN/shared/src/main/scala/dtn/RdtCreation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,15 @@ import _root_.replication.JsoniterCodecs.given
import com.github.plokhotnyuk.jsoniter_scala.core.JsonValueCodec
import com.github.plokhotnyuk.jsoniter_scala.macros.JsonCodecMaker
import com.github.plokhotnyuk.jsoniter_scala.macros.CodecMakerConfig
import dtn.rdt.ClientOperationMode

trait CaseStudyRdt {
def connect(host: String, port: Int, monitoringClient: MonitoringClientInterface): Unit
def connect(
host: String,
port: Int,
monitoringClient: MonitoringClientInterface,
operationMode: ClientOperationMode
): Unit
def caseStudyListen(): Unit
def caseStudyActive(): Unit
}
Expand All @@ -25,13 +31,19 @@ class AddWinsSetRDT(number_of_additions: Int, sleep_time_milliseconds: Long) ext
_ => ()
)

def connect(host: String, port: Int, monitoringClient: MonitoringClientInterface): Unit = {
def connect(
host: String,
port: Int,
monitoringClient: MonitoringClientInterface,
operationMode: ClientOperationMode
): Unit = {
dataManager.addLatentConnection(Channel[RdtType](
host,
port,
"app1",
scala.concurrent.ExecutionContext.global,
monitoringClient
monitoringClient,
operationMode
))
}

Expand Down
26 changes: 20 additions & 6 deletions Modules/DTN/shared/src/main/scala/dtn/rdt/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,25 @@ import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
import replication.ProtocolMessage

class ClientContext[T: JsonValueCodec](connection: Client, executionContext: ExecutionContext)
extends Connection[ProtocolMessage[T]] {
enum ClientOperationMode {
case PushAll
case RequestLater
}

class ClientContext[T: JsonValueCodec](
connection: Client,
executionContext: ExecutionContext,
operationMode: ClientOperationMode
) extends Connection[ProtocolMessage[T]] {
override def send(message: ProtocolMessage[T]): Async[Any, Unit] =
message match
case ProtocolMessage.Request(sender, dots) =>
// we could send requests into the network. the routing handles them correctly. but they are unnecessary with the cb.succeed() down below.
// todo: actually there should be no requests being sent anymore then. is that the case?
// connection.send(RdtMessageType.Request, Array(), dots).toAsync(using executionContext)
operationMode match
case ClientOperationMode.PushAll => Sync { () }
case ClientOperationMode.RequestLater =>
connection.send(RdtMessageType.Request, Array(), dots).toAsync(using executionContext)
Sync { () }
case ProtocolMessage.Payload(sender, dots, data) =>
connection.send(RdtMessageType.Payload, writeToArray[T](data), dots).toAsync(using executionContext)
Expand All @@ -36,7 +47,8 @@ class Channel[T: JsonValueCodec](
port: Int,
appName: String,
ec: ExecutionContext,
monitoringClient: MonitoringClientInterface = NoMonitoringClient
monitoringClient: MonitoringClientInterface = NoMonitoringClient,
operationMode: ClientOperationMode = ClientOperationMode.PushAll
) extends LatentConnection[ProtocolMessage[T]] {

// We use a local dtnid instead of a remote replica ID to signify that the local DTNd is the one providing information.
Expand All @@ -46,7 +58,7 @@ class Channel[T: JsonValueCodec](
override def prepare(incomingHandler: Handler[ProtocolMessage[T]]): Async[Abort, Connection[ProtocolMessage[T]]] =
Async {
val client: Client = Client(host, port, appName, monitoringClient).toAsync(using ec).bind
val conn = ClientContext[T](client, ec)
val conn = ClientContext[T](client, ec, operationMode)
val cb = incomingHandler.getCallbackFor(conn)

client.registerOnReceive { (message_type: RdtMessageType, payload: Array[Byte], dots: Dots) =>
Expand All @@ -57,7 +69,9 @@ class Channel[T: JsonValueCodec](

// This tells the rdt to send everything it has and new following stuff into the network.
// It makes any requests unnecessary.
cb.succeed(ProtocolMessage.Request(dtnid, Dots.empty))
operationMode match
case ClientOperationMode.PushAll => cb.succeed(ProtocolMessage.Request(dtnid, Dots.empty))
case ClientOperationMode.RequestLater => {}

conn
}
Expand Down

0 comments on commit e3784ef

Please sign in to comment.