diff --git a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala index 4bc767836..edac956f6 100644 --- a/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala +++ b/Modules/Examples/Protocol Benchmarks/src/main/scala/probench/KeyValueReplica.scala @@ -96,7 +96,7 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) { log(s"${newState.log}") // println(s"${pprint.tokenize(newState).mkString("")}") - for decidedRequest <- newState.readDecisionsSince(oldState.counter) do { + for decidedRequest <- newState.readDecisionsSince(oldState.rounds.counter) do { val decision: String = decidedRequest match { case ClusterData(KVOperation.Read(key), _) => kvCache.synchronized { diff --git a/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala b/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala index af166338b..34102c0a2 100644 --- a/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala +++ b/Modules/Examples/Protocol Benchmarks/src/test/scala/probench/ClusterConsensus.scala @@ -37,7 +37,6 @@ class ClusterConsensus extends munit.FunSuite { client.addLatentConnection(clientConnection.client(clientUid.toString)) client.read("test") - Thread.sleep(200) assertEquals(nodes(0).currentState, nodes(1).currentState) assertEquals(nodes(1).currentState, nodes(2).currentState) diff --git a/Modules/RDTs/.jvm/src/test/scala/MembershipSuite.scala b/Modules/RDTs/.jvm/src/test/scala/MembershipSuite.scala index 5d512e673..ba9619224 100644 --- a/Modules/RDTs/.jvm/src/test/scala/MembershipSuite.scala +++ b/Modules/RDTs/.jvm/src/test/scala/MembershipSuite.scala @@ -131,10 +131,10 @@ class MembershipSpec[A: Arbitrary, C[_]: Consensus, D[_]: Consensus]( (res(index1), res(index2)) match case (membership1, membership2) => (membership1.currentMembers.nonEmpty && membership2.currentMembers.nonEmpty) :| "set of members can never be empty" && - ((membership1.counter != membership2.counter) || membership1.currentMembers == membership2.currentMembers) :| "members for a given counter are the same for all indices" && + ((membership1.rounds.counter != membership2.rounds.counter) || membership1.currentMembers == membership2.currentMembers) :| "members for a given counter are the same for all indices" && (membership1.read.containsSlice(membership2.read) || membership2.read.containsSlice( membership1.read - )) :| s"every log is a prefix of another log or vice versa, but we had:\n${membership1.counter}${membership1.read}\n${membership2.counter}${membership2.read}" + )) :| s"every log is a prefix of another log or vice versa, but we had:\n${membership1.rounds.counter}${membership1.read}\n${membership2.rounds.counter}${membership2.read}" } diff --git a/Modules/RDTs/src/main/scala/rdts/base/Lattice.scala b/Modules/RDTs/src/main/scala/rdts/base/Lattice.scala index 2ab95bf30..eb0fe8d29 100644 --- a/Modules/RDTs/src/main/scala/rdts/base/Lattice.scala +++ b/Modules/RDTs/src/main/scala/rdts/base/Lattice.scala @@ -52,10 +52,15 @@ object Lattice { // Sometimes the merge extension on the lattice trait is not found, and it is unclear what needs to be imported. // This could be just an extension method, but then would be ambiguous in cases where the extension on the interface is available. // Thus, we put the extension into this implicit object, when `Lattice.syntax` is imported (or otherwise in the implicit scope) then it is eligible as the receiver for the extension method rewrite. For some reason, this never causes conflicts even if multiple objects are named `syntax` (as opposed to name conflicts with the extension method, which does cause conflicts). + // also, intellij does find these, but not the ones on the trait … ? given syntax: {} with - extension [A: Lattice](left: A) + extension [A: Lattice](left: A) { def merge(right: A): A = Lattice[A].merge(left, right) + /** Convenience method to apply delta mutation to grow current value */ + def grow(f: A => A): A = Lattice.this.merge(left, f(left)) + } + def latticeOrder[A: Lattice]: PartialOrdering[A] = new { override def lteq(x: A, y: A): Boolean = Lattice.lteq(x, y) override def tryCompare(x: A, y: A): Option[Int] = diff --git a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala index eb39b31ea..ec170ae20 100644 --- a/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala +++ b/Modules/RDTs/src/main/scala/rdts/datatypes/experiments/protocols/Membership.scala @@ -1,92 +1,128 @@ package rdts.datatypes.experiments.protocols +import rdts.base.Lattice.syntax import rdts.base.LocalUid.replicaId -import rdts.base.{Lattice, LocalUid, Uid} +import rdts.base.{Bottom, Lattice, LocalUid, Uid} +import rdts.datatypes.Epoch import rdts.datatypes.experiments.protocols.Consensus.given import rdts.time.Time import scala.collection.immutable.NumericRange -case class Membership[A, C[_], D[_]]( - counter: Time, +case class MembershipRound[A, C[_], D[_]]( membersConsensus: C[Set[Uid]], innerConsensus: D[A], - log: Map[Long, A], membershipChanging: Boolean = false, members: Set[Uid] -) { - private def sameRound(using Consensus[C], Consensus[D]): Membership[A, C, D] = Membership( - counter = counter, - membersConsensus = Consensus[C].empty, - innerConsensus = Consensus[D].empty, - log = Map.empty, - members = Set.empty +) + +object MembershipRound { + given bottom[A, C[_], D[_]](using Consensus[C], Consensus[D]): Bottom[MembershipRound[A, C, D]] = Bottom.provide( + MembershipRound( + membersConsensus = Consensus[C].empty, + innerConsensus = Consensus[D].empty, + members = Set.empty + ) ) - given Participants = Participants(members) + given lattice[A, C[_], D[_]](using Lattice[C[Set[Uid]]], Lattice[D[A]]): Lattice[MembershipRound[A, C, D]] = + given Lattice[Boolean] = _ || _ + Lattice.derived +} + +case class Membership[A, C[_], D[_]]( + rounds: Epoch[MembershipRound[A, C, D]], + log: Map[Long, A] = Map.empty, +) { + + private def bottomRound(using Consensus[C], Consensus[D]): MembershipRound[A, C, D] = + MembershipRound.bottom[A, C, D].empty + + given Participants = Participants(rounds.value.members) def currentMembers(using Consensus[C], Consensus[D]): Set[Uid] = - members + rounds.value.members - def addMember(id: Uid)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] = + def counter: Time = rounds.counter + def innerConsensus: D[A] = rounds.value.innerConsensus + def membersConsensus: C[Set[Uid]] = rounds.value.membersConsensus + + def writeRound(membershipRound: MembershipRound[A, C, D]): Membership[A, C, D] = { + Membership(rounds.write(membershipRound)) + } + + def addMember(id: Uid)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] = writeRound { if isMember then - sameRound.copy( + bottomRound.copy( membershipChanging = true, - membersConsensus = membersConsensus.write(currentMembers + id) + membersConsensus = rounds.value.membersConsensus.write(currentMembers + id) ) - else sameRound + else bottomRound + } - def removeMember(id: Uid)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] = + def removeMember(id: Uid)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] = writeRound { if currentMembers.size > 1 && isMember then // cannot remove last member - sameRound.copy( + bottomRound.copy( membershipChanging = true, - membersConsensus = membersConsensus.write(currentMembers - id) + membersConsensus = rounds.value.membersConsensus.write(currentMembers - id) ) - else sameRound + else bottomRound + } def read: List[A] = log.toList.sortBy(_._1).map(_._2) def readDecisionsSince(time: Time): Iterable[A] = - NumericRange(time, counter, 1L).view.flatMap(log.get) + NumericRange(time, rounds.counter, 1L).view.flatMap(log.get) - def write(value: A)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] = - if !membershipChanging && isMember then - sameRound.copy( - innerConsensus = innerConsensus.write(value) + def write(value: A)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] = writeRound { + if !rounds.value.membershipChanging && isMember then + bottomRound.copy( + innerConsensus = rounds.value.innerConsensus.write(value) ) - else sameRound + else bottomRound + } def isMember(using LocalUid, Consensus[C], Consensus[D]): Boolean = currentMembers.contains(replicaId) def upkeep()(using rid: LocalUid, cc: Consensus[C], cd: Consensus[D]): Membership[A, C, D] = - if !isMember then return sameRound // do nothing if we are not a member anymore - val newMembers = membersConsensus.merge(membersConsensus.upkeep()) - val newInner = innerConsensus.merge(innerConsensus.upkeep()) + if !isMember then return writeRound(bottomRound) // do nothing if we are not a member anymore + val deltaMembers = rounds.value.membersConsensus.upkeep() + val newMembers = rounds.value.membersConsensus.merge(deltaMembers) + val deltaInner = rounds.value.innerConsensus.upkeep() + val newInner = rounds.value.innerConsensus.merge(deltaInner) (newMembers.read, newInner.read) match // member consensus reached -> members have changed case (Some(members), _) => assert(!members.isEmpty, "members consensus reached but no members found") - copy( - counter = counter + 1, - membersConsensus = Consensus[C].empty, - innerConsensus = Consensus[D].empty, - membershipChanging = false, - members = members - ) + Membership(rounds.epocheWrite( + MembershipRound( + membersConsensus = Consensus[C].empty, + innerConsensus = Consensus[D].empty, + membershipChanging = false, + members = members + ) + )) // inner consensus is reached - case (None, Some(value)) if !membershipChanging => - val newLog = Map(counter -> value) - copy( - counter = counter + 1, - membersConsensus = Consensus[C].empty, - innerConsensus = Consensus[D].empty, + case (None, Some(value)) if !rounds.value.membershipChanging => + val newLog = Map(rounds.counter -> value) + Membership( + rounds.epocheWrite( + MembershipRound( + membersConsensus = Consensus[C].empty, + innerConsensus = Consensus[D].empty, + membershipChanging = false, + members = currentMembers + ) + ), log = newLog ) // nothing has changed case _ => - sameRound.copy( - membersConsensus = newMembers, - innerConsensus = newInner + writeRound( + bottomRound.copy( + membersConsensus = deltaMembers, + innerConsensus = deltaInner + ) ) } @@ -98,30 +134,20 @@ object Membership { ): Membership[A, C, D] = require(initialMembers.nonEmpty, "initial members can't be empty") Membership( - counter = 0, - membersConsensus = Consensus[C].empty, - innerConsensus = Consensus[D].empty, - log = Map.empty, - members = initialMembers + Epoch(0, MembershipRound.bottom[A, C, D].empty.copy(members = initialMembers)), + Map.empty ) - def logLattice[A]: Lattice[Map[Long, A]] = Lattice.mapLattice(using Lattice.assertEquals) - - given lattice[A, C[_], D[_]](using - Consensus[C], - Consensus[D] - ): Lattice[Membership[A, C, D]] with - override def merge(left: Membership[A, C, D], right: Membership[A, C, D]): Membership[A, C, D] = - if left.counter > right.counter then left.copy(log = logLattice.merge(left.log, right.log)) - else if right.counter > left.counter then right.copy(log = logLattice.merge(right.log, left.log)) - else - Membership( - left.counter, - Lattice[C[Set[Uid]]].merge(left.membersConsensus, right.membersConsensus), - Lattice[D[A]].merge(left.innerConsensus, right.innerConsensus), - logLattice.merge(left.log, right.log), - left.membershipChanging || right.membershipChanging, - left.members `union` right.members - ) - + given latticeFromConsensus[A, C[_], D[_]](using + ccon: Consensus[C], + dcon: Consensus[D] + ): Lattice[Membership[A, C, D]] = + lattice(using ccon.lattice, dcon.lattice) + + def lattice[A, C[_], D[_]](using Lattice[C[Set[Uid]]], Lattice[D[A]]): Lattice[Membership[A, C, D]] = + // for the log + given Lattice[Map[Long, A]] = + given Lattice[A] = Lattice.assertEquals + Lattice.mapLattice + Lattice.derived } diff --git a/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala b/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala index a2b10ffc6..ed6b2d04a 100644 --- a/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala +++ b/Modules/RDTs/src/test/scala/test/rdts/protocols/MembershipTest.scala @@ -15,7 +15,7 @@ class MembershipTest extends munit.FunSuite { test("basic membership merge") { val membership = Membership.init[Int, Paxos, Paxos](Set(id1, id2, id3).map(_.uid)) val delta = membership.upkeep()(using id1) - println(delta.members) + println(delta.rounds.value.members) val res = delta `merge` membership assertEquals(res, membership) }