Skip to content

Commit

Permalink
replace membership with automatic variant
Browse files Browse the repository at this point in the history
  • Loading branch information
rmgk committed Dec 8, 2024
1 parent 22c2c14 commit 4862ff3
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class KeyValueReplica(val uid: Uid, val votingReplicas: Set[Uid]) {
log(s"${newState.log}")
// println(s"${pprint.tokenize(newState).mkString("")}")

for decidedRequest <- newState.readDecisionsSince(oldState.counter) do {
for decidedRequest <- newState.readDecisionsSince(oldState.rounds.counter) do {
val decision: String = decidedRequest match {
case ClusterData(KVOperation.Read(key), _) =>
kvCache.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ class ClusterConsensus extends munit.FunSuite {
client.addLatentConnection(clientConnection.client(clientUid.toString))

client.read("test")
Thread.sleep(200)

assertEquals(nodes(0).currentState, nodes(1).currentState)
assertEquals(nodes(1).currentState, nodes(2).currentState)
Expand Down
4 changes: 2 additions & 2 deletions Modules/RDTs/.jvm/src/test/scala/MembershipSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,10 @@ class MembershipSpec[A: Arbitrary, C[_]: Consensus, D[_]: Consensus](
(res(index1), res(index2)) match
case (membership1, membership2) =>
(membership1.currentMembers.nonEmpty && membership2.currentMembers.nonEmpty) :| "set of members can never be empty" &&
((membership1.counter != membership2.counter) || membership1.currentMembers == membership2.currentMembers) :| "members for a given counter are the same for all indices" &&
((membership1.rounds.counter != membership2.rounds.counter) || membership1.currentMembers == membership2.currentMembers) :| "members for a given counter are the same for all indices" &&
(membership1.read.containsSlice(membership2.read) || membership2.read.containsSlice(
membership1.read
)) :| s"every log is a prefix of another log or vice versa, but we had:\n${membership1.counter}${membership1.read}\n${membership2.counter}${membership2.read}"
)) :| s"every log is a prefix of another log or vice versa, but we had:\n${membership1.rounds.counter}${membership1.read}\n${membership2.rounds.counter}${membership2.read}"

}

Expand Down
7 changes: 6 additions & 1 deletion Modules/RDTs/src/main/scala/rdts/base/Lattice.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,15 @@ object Lattice {
// Sometimes the merge extension on the lattice trait is not found, and it is unclear what needs to be imported.
// This could be just an extension method, but then would be ambiguous in cases where the extension on the interface is available.
// Thus, we put the extension into this implicit object, when `Lattice.syntax` is imported (or otherwise in the implicit scope) then it is eligible as the receiver for the extension method rewrite. For some reason, this never causes conflicts even if multiple objects are named `syntax` (as opposed to name conflicts with the extension method, which does cause conflicts).
// also, intellij does find these, but not the ones on the trait … ?
given syntax: {} with
extension [A: Lattice](left: A)
extension [A: Lattice](left: A) {
def merge(right: A): A = Lattice[A].merge(left, right)

/** Convenience method to apply delta mutation to grow current value */
def grow(f: A => A): A = Lattice.this.merge(left, f(left))
}

def latticeOrder[A: Lattice]: PartialOrdering[A] = new {
override def lteq(x: A, y: A): Boolean = Lattice.lteq(x, y)
override def tryCompare(x: A, y: A): Option[Int] =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,92 +1,128 @@
package rdts.datatypes.experiments.protocols

import rdts.base.Lattice.syntax
import rdts.base.LocalUid.replicaId
import rdts.base.{Lattice, LocalUid, Uid}
import rdts.base.{Bottom, Lattice, LocalUid, Uid}
import rdts.datatypes.Epoch
import rdts.datatypes.experiments.protocols.Consensus.given
import rdts.time.Time

import scala.collection.immutable.NumericRange

case class Membership[A, C[_], D[_]](
counter: Time,
case class MembershipRound[A, C[_], D[_]](
membersConsensus: C[Set[Uid]],
innerConsensus: D[A],
log: Map[Long, A],
membershipChanging: Boolean = false,
members: Set[Uid]
) {
private def sameRound(using Consensus[C], Consensus[D]): Membership[A, C, D] = Membership(
counter = counter,
membersConsensus = Consensus[C].empty,
innerConsensus = Consensus[D].empty,
log = Map.empty,
members = Set.empty
)

object MembershipRound {
given bottom[A, C[_], D[_]](using Consensus[C], Consensus[D]): Bottom[MembershipRound[A, C, D]] = Bottom.provide(
MembershipRound(
membersConsensus = Consensus[C].empty,
innerConsensus = Consensus[D].empty,
members = Set.empty
)
)

given Participants = Participants(members)
given lattice[A, C[_], D[_]](using Lattice[C[Set[Uid]]], Lattice[D[A]]): Lattice[MembershipRound[A, C, D]] =
given Lattice[Boolean] = _ || _
Lattice.derived
}

case class Membership[A, C[_], D[_]](
rounds: Epoch[MembershipRound[A, C, D]],
log: Map[Long, A] = Map.empty,
) {

private def bottomRound(using Consensus[C], Consensus[D]): MembershipRound[A, C, D] =
MembershipRound.bottom[A, C, D].empty

given Participants = Participants(rounds.value.members)

def currentMembers(using Consensus[C], Consensus[D]): Set[Uid] =
members
rounds.value.members

def addMember(id: Uid)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] =
def counter: Time = rounds.counter
def innerConsensus: D[A] = rounds.value.innerConsensus
def membersConsensus: C[Set[Uid]] = rounds.value.membersConsensus

def writeRound(membershipRound: MembershipRound[A, C, D]): Membership[A, C, D] = {
Membership(rounds.write(membershipRound))
}

def addMember(id: Uid)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] = writeRound {
if isMember then
sameRound.copy(
bottomRound.copy(
membershipChanging = true,
membersConsensus = membersConsensus.write(currentMembers + id)
membersConsensus = rounds.value.membersConsensus.write(currentMembers + id)
)
else sameRound
else bottomRound
}

def removeMember(id: Uid)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] =
def removeMember(id: Uid)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] = writeRound {
if currentMembers.size > 1 && isMember then // cannot remove last member
sameRound.copy(
bottomRound.copy(
membershipChanging = true,
membersConsensus = membersConsensus.write(currentMembers - id)
membersConsensus = rounds.value.membersConsensus.write(currentMembers - id)
)
else sameRound
else bottomRound
}

def read: List[A] = log.toList.sortBy(_._1).map(_._2)

def readDecisionsSince(time: Time): Iterable[A] =
NumericRange(time, counter, 1L).view.flatMap(log.get)
NumericRange(time, rounds.counter, 1L).view.flatMap(log.get)

def write(value: A)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] =
if !membershipChanging && isMember then
sameRound.copy(
innerConsensus = innerConsensus.write(value)
def write(value: A)(using LocalUid, Consensus[C], Consensus[D]): Membership[A, C, D] = writeRound {
if !rounds.value.membershipChanging && isMember then
bottomRound.copy(
innerConsensus = rounds.value.innerConsensus.write(value)
)
else sameRound
else bottomRound
}

def isMember(using LocalUid, Consensus[C], Consensus[D]): Boolean = currentMembers.contains(replicaId)

def upkeep()(using rid: LocalUid, cc: Consensus[C], cd: Consensus[D]): Membership[A, C, D] =
if !isMember then return sameRound // do nothing if we are not a member anymore
val newMembers = membersConsensus.merge(membersConsensus.upkeep())
val newInner = innerConsensus.merge(innerConsensus.upkeep())
if !isMember then return writeRound(bottomRound) // do nothing if we are not a member anymore
val deltaMembers = rounds.value.membersConsensus.upkeep()
val newMembers = rounds.value.membersConsensus.merge(deltaMembers)
val deltaInner = rounds.value.innerConsensus.upkeep()
val newInner = rounds.value.innerConsensus.merge(deltaInner)
(newMembers.read, newInner.read) match
// member consensus reached -> members have changed
case (Some(members), _) =>
assert(!members.isEmpty, "members consensus reached but no members found")
copy(
counter = counter + 1,
membersConsensus = Consensus[C].empty,
innerConsensus = Consensus[D].empty,
membershipChanging = false,
members = members
)
Membership(rounds.epocheWrite(
MembershipRound(
membersConsensus = Consensus[C].empty,
innerConsensus = Consensus[D].empty,
membershipChanging = false,
members = members
)
))
// inner consensus is reached
case (None, Some(value)) if !membershipChanging =>
val newLog = Map(counter -> value)
copy(
counter = counter + 1,
membersConsensus = Consensus[C].empty,
innerConsensus = Consensus[D].empty,
case (None, Some(value)) if !rounds.value.membershipChanging =>
val newLog = Map(rounds.counter -> value)
Membership(
rounds.epocheWrite(
MembershipRound(
membersConsensus = Consensus[C].empty,
innerConsensus = Consensus[D].empty,
membershipChanging = false,
members = currentMembers
)
),
log = newLog
)
// nothing has changed
case _ =>
sameRound.copy(
membersConsensus = newMembers,
innerConsensus = newInner
writeRound(
bottomRound.copy(
membersConsensus = deltaMembers,
innerConsensus = deltaInner
)
)
}

Expand All @@ -98,30 +134,20 @@ object Membership {
): Membership[A, C, D] =
require(initialMembers.nonEmpty, "initial members can't be empty")
Membership(
counter = 0,
membersConsensus = Consensus[C].empty,
innerConsensus = Consensus[D].empty,
log = Map.empty,
members = initialMembers
Epoch(0, MembershipRound.bottom[A, C, D].empty.copy(members = initialMembers)),
Map.empty
)

def logLattice[A]: Lattice[Map[Long, A]] = Lattice.mapLattice(using Lattice.assertEquals)

given lattice[A, C[_], D[_]](using
Consensus[C],
Consensus[D]
): Lattice[Membership[A, C, D]] with
override def merge(left: Membership[A, C, D], right: Membership[A, C, D]): Membership[A, C, D] =
if left.counter > right.counter then left.copy(log = logLattice.merge(left.log, right.log))
else if right.counter > left.counter then right.copy(log = logLattice.merge(right.log, left.log))
else
Membership(
left.counter,
Lattice[C[Set[Uid]]].merge(left.membersConsensus, right.membersConsensus),
Lattice[D[A]].merge(left.innerConsensus, right.innerConsensus),
logLattice.merge(left.log, right.log),
left.membershipChanging || right.membershipChanging,
left.members `union` right.members
)

given latticeFromConsensus[A, C[_], D[_]](using
ccon: Consensus[C],
dcon: Consensus[D]
): Lattice[Membership[A, C, D]] =
lattice(using ccon.lattice, dcon.lattice)

def lattice[A, C[_], D[_]](using Lattice[C[Set[Uid]]], Lattice[D[A]]): Lattice[Membership[A, C, D]] =
// for the log
given Lattice[Map[Long, A]] =
given Lattice[A] = Lattice.assertEquals
Lattice.mapLattice
Lattice.derived
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class MembershipTest extends munit.FunSuite {
test("basic membership merge") {
val membership = Membership.init[Int, Paxos, Paxos](Set(id1, id2, id3).map(_.uid))
val delta = membership.upkeep()(using id1)
println(delta.members)
println(delta.rounds.value.members)
val res = delta `merge` membership
assertEquals(res, membership)
}
Expand Down

0 comments on commit 4862ff3

Please sign in to comment.