From 91fb8057d70bf836f53dc25ba5029c4c6a350b50 Mon Sep 17 00:00:00 2001 From: Svenja Kernig Date: Tue, 29 Oct 2024 14:12:54 +0100 Subject: [PATCH] improve key-value store for benchmarks - fix request ordering - fix concurrency issues in the DataManager --- .../scala/channels/JIOStreamConnection.scala | 2 +- .../Protocol Benchmarks/args/client-1-1 | 5 ++ .../Protocol Benchmarks/args/client-1-2 | 5 ++ .../Protocol Benchmarks/args/client-1-3 | 5 ++ .../src/main/scala/probench/Client.scala | 61 +++++++++++---- .../src/main/scala/probench/Node.scala | 75 +++++++++++++------ .../src/main/scala/probench/cli.scala | 19 ++--- .../src/main/scala/probench/data/Data.scala | 6 +- .../scala/probench/data/DataManager.scala | 16 ++-- .../datatypes/contextual/CausalQueue.scala | 17 ++--- .../experiments/protocols/Membership.scala | 9 +-- .../main/scala/rdts/time/VectorClock.scala | 6 +- .../test/scala/test/rdts/DataGenerator.scala | 25 ++++--- .../main/scala/replication/DataManager.scala | 1 + 14 files changed, 169 insertions(+), 83 deletions(-) create mode 100644 Modules/Examples/Protocol Benchmarks/args/client-1-1 create mode 100644 Modules/Examples/Protocol Benchmarks/args/client-1-2 create mode 100644 Modules/Examples/Protocol Benchmarks/args/client-1-3 diff --git a/Modules/Channels/shared/src/main/scala/channels/JIOStreamConnection.scala b/Modules/Channels/shared/src/main/scala/channels/JIOStreamConnection.scala index 3229f6212..ecd5194e9 100644 --- a/Modules/Channels/shared/src/main/scala/channels/JIOStreamConnection.scala +++ b/Modules/Channels/shared/src/main/scala/channels/JIOStreamConnection.scala @@ -2,7 +2,7 @@ package channels import de.rmgk.delay.{Async, Callback} -import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream, EOFException, IOException, InputStream, OutputStream} +import java.io.* class SendingClosedException extends IOException diff --git a/Modules/Examples/Protocol Benchmarks/args/client-1-1 b/Modules/Examples/Protocol Benchmarks/args/client-1-1 new file mode 100644 index 000000000..159597666 --- /dev/null +++ b/Modules/Examples/Protocol Benchmarks/args/client-1-1 @@ -0,0 +1,5 @@ +# wait-for-res false +multiput key%n valueX%n 200 +multiget key%n 200 +# wait +exit diff --git a/Modules/Examples/Protocol Benchmarks/args/client-1-2 b/Modules/Examples/Protocol Benchmarks/args/client-1-2 new file mode 100644 index 000000000..c8d95a1cc --- /dev/null +++ b/Modules/Examples/Protocol Benchmarks/args/client-1-2 @@ -0,0 +1,5 @@ +# wait-for-res false +multiput key%n valueY%n 200 +multiget key%n 200 +# wait +exit diff --git a/Modules/Examples/Protocol Benchmarks/args/client-1-3 b/Modules/Examples/Protocol Benchmarks/args/client-1-3 new file mode 100644 index 000000000..124721dcf --- /dev/null +++ b/Modules/Examples/Protocol Benchmarks/args/client-1-3 @@ -0,0 +1,5 @@ +# wait-for-res false +multiput key%n valueZ%n 200 +multiget key%n 200 +# wait +exit diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala index 78f72e2de..41285c0e7 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Client.scala @@ -1,31 +1,42 @@ package probench -import probench.data.{ClientNodeState, DataManager, KVOperation, Request, Response} +import probench.data.* import rdts.base.{Bottom, LocalUid, Uid} import rdts.datatypes.contextual.CausalQueue import rdts.dotted.Dotted import rdts.syntax.DeltaBuffer +import scala.io.StdIn import scala.io.StdIn.readLine import scala.util.matching.Regex -class Client { +class Client(val name: Uid) { - given localUid: LocalUid = LocalUid.gen() + given localUid: LocalUid = LocalUid(name) private val dataManager = DataManager[ClientNodeState](localUid, Bottom[ClientNodeState].empty, onStateChange) private val lock = new Object() private var currentOp: Option[Request] = None + private var waitForOp: Boolean = true - val get: Regex = """get (\w+)""".r - val put: Regex = """put (\w+) (\w+)""".r + private val commented: Regex = """#.*""".r + private val waitForRes: Regex = """wait-for-res (true|false)""".r + private val get: Regex = """get ([\w%]+)""".r + private val put: Regex = """put ([\w%]+) ([\w%]+)""".r + private val multiget: Regex = """multiget ([\w%]+) (\d+)""".r + private val multiput: Regex = """multiput ([\w%]+) ([\w%]+) (\d+)""".r private def onStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = { + /* val diff = newState.responses.data.values.size - oldState.responses.data.values.size + if diff > 0 then { + println(s"Got $diff result(s): ${newState.responses.data.values.toList.reverseIterator.take(diff).toList.reverse.map(_.value)}") + } */ + for { - op <- currentOp - CausalQueue.QueueElement(res@Response(req, _), _, _) <- newState.responses.data.values if req == op + op <- currentOp + CausalQueue.QueueElement(res @ Response(req, _), _, _) <- newState.responses.data.values if req == op } { - println(res.response) + println(res.payload) currentOp = None @@ -41,12 +52,18 @@ class Client { val req = Request(op) currentOp = Some(req) + // println(s"Put $req") + dataManager.transform { current => current.mod(it => it.copy(requests = it.requests.mod(_.enqueue(req)))) } - lock.synchronized { - lock.wait() + // println(s"New Requests ${dataManager.mergedState.data.requests.data.values.toList.map(_.value)}") + + if waitForOp then { + lock.synchronized { + lock.wait() + } } } @@ -58,16 +75,28 @@ class Client { handleOp(KVOperation.Write(key, value)) } + private def multiget(key: String, times: Int): Unit = { + for i <- 1 to times do read(key.replace("%n", i.toString)) + } + + private def multiput(key: String, value: String, times: Int): Unit = { + for i <- 1 to times do write(key.replace("%n", i.toString), value.replace("%n", i.toString)) + } + def startCLI(): Unit = { while true do { print("client> ") - val line = readLine() - + val line = Option(readLine()) line match { - case get(key) => read(key) - case put(key, value) => write(key, value) - case "exit" => System.exit(0) - case _ => println(s"Error parsing: $line") + case Some(commented()) => // ignore + case Some(get(key)) => read(key) + case Some(put(key, value)) => write(key, value) + case Some(multiget(key, times)) => multiget(key, times.toInt) + case Some(multiput(key, value, times)) => multiput(key, value, times.toInt) + case Some(waitForRes(flag)) => waitForOp = flag.toBoolean + case Some("wait") => lock.synchronized { lock.wait() } + case Some("exit") => System.exit(0) + case None | Some(_) => println(s"Error parsing: $line") } } } diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Node.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Node.scala index 064cf1b1b..81c8a6e18 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Node.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/Node.scala @@ -8,49 +8,76 @@ import rdts.datatypes.experiments.protocols.{LogHack, Membership} import rdts.dotted.Dotted import rdts.syntax.DeltaBuffer -class Node(val name: String, val initialClusterIds: Set[Uid]) { +class Node(val name: Uid, val initialClusterIds: Set[Uid]) { private type ClusterState = Membership[Request, Paxos, Paxos] - given localUid: LocalUid = LocalUid.predefined(name) + given localUid: LocalUid = LocalUid(name) + given LogHack = new LogHack(false) + private val clientDataManager = DataManager[ClientNodeState](localUid, Bottom[ClientNodeState].empty, onClientStateChange) private val clusterDataManager = DataManager[ClusterState](localUid, Membership.init(initialClusterIds), onClusterStateChange) private def onClientStateChange(oldState: ClientNodeState, newState: ClientNodeState): Unit = { - if newState.requests.data.values.nonEmpty then { - clusterDataManager.transform(_.mod(_.write(newState.requests.data.values.last.value))) + /* + val diff = newState.requests.data.values.size - oldState.requests.data.values.size + if diff > 0 then { + println(s"Requests: ${newState.requests.data.values.toList.map(_.value)}") + println(s"Sorted : ${newState.requests.data.values.toList.sortBy(_.order)(using VectorClock.vectorClockTotalOrdering).map(it => it.order -> it.value)}") + println(s"Dots : ${newState.requests.data.dots}") + println(s"Time : ${newState.requests.data.clock}") + } + */ + + if newState.requests.data.values.size == 1 then { + clusterDataManager.transform(_.mod(_.write(newState.requests.data.head))) } } - given LogHack = new LogHack(true) - private def onClusterStateChange(oldState: ClusterState, newState: ClusterState): Unit = { - val upkept: ClusterState = newState.merge(newState.upkeep()) + val delta = newState.upkeep() + val upkept: ClusterState = newState.merge(delta) - if !(upkept <= newState) then { - clusterDataManager.transform(_.mod(_ => upkept)) + if !(upkept <= newState) || upkept.log.size > newState.log.size then { + clusterDataManager.transform(_.mod(_ => delta)) } - if newState.log.size > oldState.log.size then { - val op = newState.log.last + if upkept.log.size > oldState.log.size then { + val diff = upkept.log.size - oldState.log.size + // println(s"DIFF $diff") - val res: String = op match { - case Request(KVOperation.Read(key), _) => - newState.log.reverseIterator.collectFirst { - case Request(KVOperation.Write(writeKey, value), _) if writeKey == key => value - }.getOrElse(s"Key $key has not been written to!") - case Request(KVOperation.Write(_, _), _) => "OK" - } + for op <- upkept.log.reverseIterator.take(diff).toList.reverseIterator do { + + val res: String = op match { + case Request(KVOperation.Read(key), _) => + upkept.log.reverseIterator.collectFirst { + case Request(KVOperation.Write(writeKey, value), _) if writeKey == key => s"$key=$value" + }.getOrElse(s"Key '$key' has not been written to!") + case Request(KVOperation.Write(key, value), _) => s"$key=$value; OK" + } - clientDataManager.transform(_.mod(state => - state.copy( - requests = state.requests.mod(_.removeBy(_ == op)), - responses = state.responses.mod(_.enqueue(Response(op, res))), - ) - )) + clientDataManager.transform { it => + if it.state.requests.data.values.exists { e => e.value == op } then { + it.mod { state => + // println(s"Writing Response: $op -> $res") + val newState = state.copy( + requests = state.requests.mod(_.removeBy(_ == op)), + responses = state.responses.mod(_.enqueue(Response(op, res))), + ) + //println(s"Remaining Requests: ${newState.requests.data.values.toList.map(_.value)}") + newState + } + } else it + } + val clientState = clientDataManager.mergedState.data + + if clientState.requests.data.values.nonEmpty then { + clusterDataManager.transform(_.mod(_.write(clientState.requests.data.head))) + } + } } } diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala index 64754c6d4..b93568488 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/cli.scala @@ -19,11 +19,11 @@ object cli { private val ec: ExecutionContext = ExecutionContext.fromExecutor(executor) def main(args: Array[String]): Unit = { - val name = named[String]("--name", "") + val clientPort = named[Int]("--listen-client-port", "") val peerPort = named[Int]("--listen-peer-port", "") - val ipAndPort = """(.+):(\d*)""".r + val ipAndPort = """(.+):(\d+)""".r given ipAndPortParser: ArgumentValueParser[(String, Int)] with override def apply(args: List[String]): (Option[(String, Int)], List[String]) = @@ -32,7 +32,7 @@ object cli { case _ => (None, args) } - override def valueDescription: String = "[]" + override def valueDescription: String = "" end ipAndPortParser given uidParser: ArgumentValueParser[Uid] with @@ -42,18 +42,19 @@ object cli { case _ => (None, args) } - override def valueDescription: String = "[uid]" + override def valueDescription: String = "" end uidParser given JsonValueCodec[ClientNodeState] = JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true)) + given JsonValueCodec[Membership[Request, Paxos, Paxos]] = JsonCodecMaker.make(CodecMakerConfig.withMapAsArray(true)) val argparse = argumentParser { - inline def cluster = named[List[(String, Int)]]("--cluster", "[]") - inline def initialClusterIds = named[List[Uid]]("--initial-cluster-ids", "[name]") - - inline def clientNode = named[(String, Int)]("--node", "") + inline def cluster = named[List[(String, Int)]]("--cluster", "") + inline def initialClusterIds = named[List[Uid]]("--initial-cluster-ids", "") + inline def clientNode = named[(String, Int)]("--node", "") + inline def name = named[Uid]("--name", "", Uid.gen()) subcommand("node", "starts a cluster node") { val node = Node(name.value, initialClusterIds.value.toSet) @@ -67,7 +68,7 @@ object cli { }.value subcommand("client", "starts a client to interact with a node") { - val client = Client() + val client = Client(name.value) val (ip, port) = clientNode.value diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/Data.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/Data.scala index 4862d2ed6..7ac483196 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/Data.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/Data.scala @@ -1,9 +1,10 @@ package probench.data -import rdts.base.{Bottom, Lattice, Uid} +import rdts.base.{Bottom, Lattice, LocalUid, Uid} import rdts.datatypes.contextual.CausalQueue import rdts.datatypes.{GrowOnlyList, GrowOnlyMap} import rdts.dotted.Dotted +import rdts.time.VectorClock enum KVOperation[Key, Value] { def key: Key @@ -13,9 +14,10 @@ enum KVOperation[Key, Value] { } case class Request(op: KVOperation[String, String], requestUid: Uid = Uid.gen()) -case class Response(request: Request, response: String) +case class Response(request: Request, payload: String) case class ClientNodeState( requests: Dotted[CausalQueue[Request]], responses: Dotted[CausalQueue[Response]], ) derives Lattice, Bottom + diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/DataManager.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/DataManager.scala index ed5d88015..36dd81c99 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/DataManager.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/data/DataManager.scala @@ -1,9 +1,10 @@ package probench.data +import channels.LatentConnection import rdts.base.{Lattice, LocalUid} import rdts.syntax.DeltaBuffer import rdts.time.Dots -import replication.{ProtocolDots, DataManager as RepDataManager} +import replication.{ProtocolDots, ProtocolMessage, DataManager as RepDataManager} class DataManager[State: Lattice]( val localReplicaId: LocalUid, @@ -11,20 +12,23 @@ class DataManager[State: Lattice]( val onChange: (State, State) => Unit, ) { given Lattice[ProtocolDots[State]] = Lattice.derived - private val dataManager = RepDataManager[State](localReplicaId, _ => (), receivedChanges) - private var mergedState: ProtocolDots[State] = dataManager.allDeltas.foldLeft(ProtocolDots(initialState, Dots.empty))(Lattice[ProtocolDots[State]].merge) + private val dataManager = RepDataManager[State](localReplicaId, _ => (), receivedChanges) + + var mergedState: ProtocolDots[State] = + dataManager.allDeltas.foldLeft(ProtocolDots(initialState, Dots.empty))(Lattice[ProtocolDots[State]].merge) private def receivedChanges(changes: ProtocolDots[State]): Unit = { val oldState = mergedState - mergedState = mergedState.merge(changes) - + dataManager.lock.synchronized { + mergedState = mergedState.merge(changes) + } onChange(oldState.data, mergedState.data) } def transform(fun: DeltaBuffer[State] => DeltaBuffer[State]): Unit = dataManager.lock.synchronized { val current: DeltaBuffer[State] = DeltaBuffer(mergedState.data) val next: DeltaBuffer[State] = fun(current) - + next.deltaBuffer.foreach { delta => dataManager.applyLocalDelta(ProtocolDots( delta, diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/contextual/CausalQueue.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/contextual/CausalQueue.scala index 55cf6f866..490502482 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/contextual/CausalQueue.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/contextual/CausalQueue.scala @@ -7,16 +7,16 @@ import rdts.time.{Dot, Dots, VectorClock} import scala.collection.immutable.Queue -case class CausalQueue[T](values: Queue[QueueElement[T]]) { +case class CausalQueue[T](values: Queue[QueueElement[T]], clock: VectorClock) { type Delta = Dotted[CausalQueue[T]] def enqueue(using LocalUid)(e: T)(using context: Dots): Delta = - val time = context.clock.inc(LocalUid.replicaId) + val time = clock.merge(clock.inc(LocalUid.replicaId)) val dot = time.dotOf(LocalUid.replicaId) - Dotted(CausalQueue(Queue(QueueElement(e, dot, time))), Dots.single(dot)) + Dotted(CausalQueue(Queue(QueueElement(e, dot, time)), time), Dots.single(dot)) - def head = + def head: T = val QueueElement(e, _, _) = values.head e @@ -41,22 +41,21 @@ object CausalQueue: if left.order < right.order then right else left } - def empty[T]: CausalQueue[T] = CausalQueue(Queue()) + def empty[T]: CausalQueue[T] = CausalQueue(Queue(), VectorClock.zero) given hasDots[A]: HasDots[CausalQueue[A]] with { extension (value: CausalQueue[A]) override def dots: Dots = Dots.from(value.values.view.map(_.dot)) override def removeDots(dots: Dots): Option[CausalQueue[A]] = - Some(CausalQueue(value.values.filter(qe => !dots.contains(qe.dot)))) + Some(CausalQueue(value.values.filter(qe => !dots.contains(qe.dot)), value.clock)) } given bottomInstance[T]: Bottom[CausalQueue[T]] = Bottom.derived given lattice[A]: Lattice[CausalQueue[A]] with { override def merge(left: CausalQueue[A], right: CausalQueue[A]): CausalQueue[A] = - CausalQueue: - (left.values concat right.values) - .sortBy { qe => qe.order }(using VectorClock.vectorClockTotalOrdering).distinct + CausalQueue((left.values concat right.values) + .sortBy { qe => qe.order }(using Ordering.fromLessThan(VectorClock.vectorClockOrdering.lt)).distinct, left.clock.merge(right.clock)) } diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala index 290d69a8a..a919c6518 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala @@ -6,7 +6,7 @@ import rdts.datatypes.experiments.protocols.Consensus.given import rdts.time.Time class LogHack(on: Boolean) { - inline def info(arg: String): Unit = if on then println(arg) else () + inline def info(arg: => String): Unit = if on then println(arg) else () } case class Membership[A, C[_], D[_]]( @@ -28,7 +28,7 @@ case class Membership[A, C[_], D[_]]( given Participants = Participants(members) override def toString: String = - s"Membership(counter: $counter, members: $membersConsensus,log: $log, membershipChanging: $membershipChanging)".stripMargin + s"$Membership(counter: $counter, members: $membersConsensus,log: $log, membershipChanging: $membershipChanging)".stripMargin def currentMembers(using Consensus[C], Consensus[D]): Set[Uid] = members @@ -67,7 +67,7 @@ case class Membership[A, C[_], D[_]]( (newMembers.read, newInner.read) match // member consensus reached -> members have changed case (Some(members), _) => - logger.info(s"Member consensus reached on members $members") + logger.info { s"Member consensus reached on members $members" } copy( counter = counter + 1, membersConsensus = Consensus[C].empty, @@ -78,8 +78,7 @@ case class Membership[A, C[_], D[_]]( // inner consensus is reached case (None, Some(value)) if !membershipChanging => val newLog = log :+ value - if newLog.length > 1 then - logger.info(s"Inner consensus reached on value $value, log: $newLog") + logger.info { s"$rid: Inner consensus reached on value $value, log: $newLog" } copy( counter = counter + 1, membersConsensus = Consensus[C].empty, diff --git a/Modules/RDTs/src/main/scala/rdts/time/VectorClock.scala b/Modules/RDTs/src/main/scala/rdts/time/VectorClock.scala index b50297b41..21f030b0c 100644 --- a/Modules/RDTs/src/main/scala/rdts/time/VectorClock.scala +++ b/Modules/RDTs/src/main/scala/rdts/time/VectorClock.scala @@ -1,6 +1,6 @@ package rdts.time -import rdts.base.{Bottom, Lattice, Uid} +import rdts.base.{Bottom, Lattice, LocalUid, Uid} import scala.annotation.tailrec import scala.math.PartialOrdering @@ -30,6 +30,10 @@ object VectorClock { given Lattice[Time] = _ max _ Lattice.derived + given bottom: Bottom[VectorClock] with { + def empty: VectorClock = zero + } + val vectorClockTotalOrdering: Ordering[VectorClock] = new Ordering[VectorClock] { override def compare(x: VectorClock, y: VectorClock): Int = vectorClockOrdering.tryCompare(x, y) match diff --git a/Modules/RDTs/src/test/scala/test/rdts/DataGenerator.scala b/Modules/RDTs/src/test/scala/test/rdts/DataGenerator.scala index 3acacb558..f1d888d5a 100644 --- a/Modules/RDTs/src/test/scala/test/rdts/DataGenerator.scala +++ b/Modules/RDTs/src/test/scala/test/rdts/DataGenerator.scala @@ -13,7 +13,7 @@ import rdts.dotted.* import rdts.dotted.HasDots.mapInstance import rdts.time.* -import scala.annotation.nowarn +import scala.annotation.{nowarn, tailrec} import scala.collection.immutable.Queue object DataGenerator { @@ -38,7 +38,7 @@ object DataGenerator { value: List[Long] <- Gen.listOfN(ids.size, Gen.oneOf(0L to 100L)) yield VectorClock.fromMap(ids.zip(value).toMap) - val smallNum = Gen.chooseNum(-10, 10) + val smallNum: Gen[Int] = Gen.chooseNum(-10, 10) given arbCausalTime: Arbitrary[CausalTime] = Arbitrary: for @@ -84,12 +84,16 @@ object DataGenerator { Arbitrary(map) given arbCausalQueue[A: Arbitrary]: Arbitrary[CausalQueue[A]] = - Arbitrary: - Gen.listOf(Gen.zip(uniqueDot, Arbitrary.arbitrary[A])).map: list => - CausalQueue: - Queue.from: - list.map: (dot, value) => - QueueElement(value, dot, VectorClock(Map(dot.place -> dot.time))) + Arbitrary { + Gen.listOf( + Gen.zip(uniqueDot, Arbitrary.arbitrary[A]) + ).map { list => + val queue = Queue.from(list.map((dot, value) => { + QueueElement(value, dot, VectorClock(Map(dot.place -> dot.time))) + })) + CausalQueue(queue, VectorClock(queue.map(it => it.dot.place -> it.dot.time).toMap)) + } + } val genDot: Gen[Dot] = for @@ -123,6 +127,7 @@ object DataGenerator { given arbDotFun[A](using g: Arbitrary[A]): Arbitrary[Map[Dot, A]] = Arbitrary(genDotFun) + @tailrec def makeUnique(rem: List[Dots], acc: List[Dots], state: Dots): List[Dots] = rem match case Nil => acc @@ -132,7 +137,7 @@ object DataGenerator { given Arbitrary[SmallTimeSet] = Arbitrary(for contents <- Gen.listOf(Gen.chooseNum(0L, 100L)) - yield (SmallTimeSet(contents.toSet))) + yield SmallTimeSet(contents.toSet)) given arbGrowOnlyList[E](using arb: Arbitrary[E]): Arbitrary[GrowOnlyList[E]] = Arbitrary: Gen.listOf(arb.arbitrary).map: list => @@ -142,7 +147,7 @@ object DataGenerator { Gen.listOf(arbLww(using arb).arbitrary).map: list => val elems: List[Node.Elem[LastWriterWins[E]]] = list.map(GrowOnlyList.Node.Elem.apply) val pairs = elems.distinct.sortBy(_.value.timestamp).sliding(2).flatMap: - case Seq(l, r) => Some((l) -> (r)) + case Seq(l, r) => Some(l -> r) case _ => None // filters out uneven numbers of elements val all = elems.headOption.map(GrowOnlyList.Node.Head -> _) concat pairs diff --git a/Modules/Replication/shared/src/main/scala/replication/DataManager.scala b/Modules/Replication/shared/src/main/scala/replication/DataManager.scala index 2d5fb8005..fe286d40a 100644 --- a/Modules/Replication/shared/src/main/scala/replication/DataManager.scala +++ b/Modules/Replication/shared/src/main/scala/replication/DataManager.scala @@ -93,6 +93,7 @@ class DataManager[State]( lock.synchronized { connections = conn :: connections } + conn.send(Request(replicaId.uid, selfContext)).run(using ())(debugCallbackAndRemoveCon(conn)) case Failure(ex) => println(s"exception during connection activation") ex.printStackTrace()