Skip to content

Commit

Permalink
fix Paxos using given Participants
Browse files Browse the repository at this point in the history
  • Loading branch information
haaase committed Oct 27, 2024
1 parent 70a6eb2 commit 31c8c4f
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class ConsensusPropertySpec[A: Arbitrary, C[_]: Consensus](
for
numDevices <- Gen.choose(minDevices, maxDevices)
ids = Range(0, numDevices).map(_ => LocalUid.gen()).toList
yield ids.map(id => (id, Consensus.init(ids.map(_.uid).toSet))).toMap
yield ids.map(id => (id, Consensus[C].empty)).toMap

// generators
override def genCommand(state: State): Gen[Command] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ trait Consensus[C[_]] {
extension [A](c: C[A]) def members(using Participants): Set[Uid] = participants
extension [A](c: C[A]) def upkeep()(using LocalUid, Participants): C[A]

def init[A](members: Set[Uid]): C[A]
def empty[A]: C[A]
def lattice[A]: Lattice[C[A]]
}
Expand All @@ -21,7 +20,4 @@ object Consensus {
override def empty: C[A] = Consensus[C].empty

def apply[C[_]](using ev: Consensus[C]): Consensus[C] = ev
def init[A, C[_]](newMembers: Set[Uid])(using Consensus[C]): C[A] =
val a: Consensus[C] = apply[C]
a.init(newMembers)
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package rdts.datatypes.experiments.protocols

import rdts.base.LocalUid.replicaId
import rdts.base.{Lattice, LocalUid, Uid}
import rdts.base.{Bottom, Lattice, LocalUid, Uid}
import rdts.datatypes.experiments.protocols.Consensus.given
import rdts.time.Time

Expand Down Expand Up @@ -70,9 +70,10 @@ case class Membership[A, C[_], D[_]](
logger.info(s"Member consensus reached on members $members")
copy(
counter = counter + 1,
membersConsensus = Consensus.init(members),
innerConsensus = Consensus.init(members),
membershipChanging = false
membersConsensus = Consensus[C].empty,
innerConsensus = Consensus[D].empty,
membershipChanging = false,
members = members
)
// inner consensus is reached
case (None, Some(value)) if !membershipChanging =>
Expand All @@ -81,8 +82,8 @@ case class Membership[A, C[_], D[_]](
logger.info(s"Inner consensus reached on value $value, log: $newLog")
copy(
counter = counter + 1,
membersConsensus = Consensus.init(currentMembers),
innerConsensus = Consensus.init(currentMembers),
membersConsensus = Consensus[C].empty,
innerConsensus = Consensus[D].empty,
log = newLog
)
// nothing has changed
Expand All @@ -102,8 +103,8 @@ object Membership {
require(initialMembers.nonEmpty, "initial members can't be empty")
Membership(
counter = 0,
membersConsensus = Consensus[C].init[Set[Uid]](initialMembers),
innerConsensus = Consensus[D].init[A](initialMembers),
membersConsensus = Consensus[C].empty,
innerConsensus = Consensus[D].empty,
log = List(),
members = initialMembers
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ object Paxos {
override def read(using Participants): Option[A] = c.read
extension [A](c: Paxos[A])
override def upkeep()(using LocalUid, Participants): Paxos[A] = c.upkeep()
override def init[A](members: GrowOnlySet[Uid]): Paxos[A] = Paxos.init(members = members)

override def empty[A]: Paxos[A] = Paxos.unchanged

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import rdts.base.{Lattice, LocalUid, Uid}
import rdts.datatypes.LastWriterWins
import rdts.datatypes.experiments.protocols.{Consensus, Participants}

case class GeneralizedPaxos[A](rounds: Map[BallotNum, (LeaderElection, SimpleVoting[A])] =
Map.empty[BallotNum, (LeaderElection, SimpleVoting[A])], myValue: Option[LastWriterWins[A]] = None):
case class GeneralizedPaxos[A](
rounds: Map[BallotNum, (LeaderElection, SimpleVoting[A])] =
Map.empty[BallotNum, (LeaderElection, SimpleVoting[A])],
myValue: Option[LastWriterWins[A]] = None
):

def voteFor(leader: Uid, value: A)(using LocalUid, Participants): (LeaderElection, SimpleVoting[A]) =
(SimpleVoting[Uid]().voteFor(leader), SimpleVoting[A]().voteFor(value))
Expand All @@ -20,7 +23,7 @@ case class GeneralizedPaxos[A](rounds: Map[BallotNum, (LeaderElection, SimpleVot

def phase1b(using LocalUid, Participants): GeneralizedPaxos[A] =
// return greatest decided value
val r = rounds.filter { case (b, (l, v)) => v.result != None }
val r = rounds.filter { case (b, (l, v)) => v.result.isDefined }
val decidedVal = r.maxByOption { case (b, (l, v)) => b }.flatMap(_._2._2.result)

// vote for newest leader election
Expand All @@ -37,7 +40,7 @@ case class GeneralizedPaxos[A](rounds: Map[BallotNum, (LeaderElection, SimpleVot
def phase2a(using LocalUid, Participants): GeneralizedPaxos[A] =
// check if leader
myHighestBallot match
case Some((ballotNum, (leaderElection, voting))) if leaderElection.result == Some(replicaId) =>
case Some((ballotNum, (leaderElection, voting))) if leaderElection.result.contains(replicaId) =>
val value =
if voting.votes.nonEmpty then
voting.votes.head.value
Expand Down Expand Up @@ -70,7 +73,7 @@ case class GeneralizedPaxos[A](rounds: Map[BallotNum, (LeaderElection, SimpleVot
rounds.filter { case (b, (l, v)) => b.uid == replicaId }.maxByOption { case (b, (l, v)) => b }

def newestDecidedVal(using Participants): Option[A] =
val r = rounds.filter { case (b, (l, v)) => v.result != None }
val r = rounds.filter { case (b, (l, v)) => v.result.isDefined }
r.maxByOption { case (b, (l, v)) => b }.flatMap(_._2._2.result)

object GeneralizedPaxos:
Expand All @@ -89,15 +92,18 @@ object GeneralizedPaxos:
extension [A](c: GeneralizedPaxos[A])(using Participants)
override def read: Option[A] = c.newestDecidedVal
extension [A](c: GeneralizedPaxos[A])
override def upkeep()(using LocalUid): GeneralizedPaxos[A] =
override def upkeep()(using LocalUid, Participants): GeneralizedPaxos[A] =
// check which phase we are in

???



override def init[A](members: Set[Uid]): GeneralizedPaxos[A] = GeneralizedPaxos()
c.highestBallot match
// we have a leader -> phase 2
case Some((ballotNum, (leaderElection, voting))) if leaderElection.result.nonEmpty =>
c.phase2b
// we are in the process of electing a new leader
case None =>
c.phase1b

override def empty[A]: GeneralizedPaxos[A] = GeneralizedPaxos()

override def lattice[A]: Lattice[GeneralizedPaxos[A]] = lattice

// TODO: define lteq
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ case class Paxos[A](
) {
// override def toString: String = pprint.apply(this).render

private def quorum: Int = members.size / 2 + 1
private def quorum(using Participants): Int = participants.size / 2 + 1

def myHighestPromise(using LocalUid): Option[Promise[A]] =
promises.filter(_.acceptor == replicaId).maxByOption(_.proposal)
Expand Down Expand Up @@ -66,7 +66,7 @@ case class Paxos[A](
)

// phase 2a
def propose(proposal: ProposalNum, v: A)(using LocalUid): Paxos[A] =
def propose(proposal: ProposalNum, v: A)(using LocalUid, Participants): Paxos[A] =
// check if I have received enough promises and have not proposed yet
val myPromises = promises.filter(_.proposal == proposal)
val hasProposed = accepts.exists(_.proposal == proposal)
Expand All @@ -84,7 +84,7 @@ case class Paxos[A](
else
Paxos.unchanged // quorum not reached, do nothing

def propose(v: A)(using LocalUid): Paxos[A] =
def propose(v: A)(using LocalUid, Participants): Paxos[A] =
// find my newest proposalNum
val proposalNum = prepares.filter(_.proposal.proposer == replicaId).maxByOption(_.proposal)
proposalNum match
Expand Down Expand Up @@ -129,7 +129,7 @@ object Paxos:
given consensus: Consensus[Paxos] with
extension [A](c: Paxos[A])
override def write(value: A)(using LocalUid, Participants): Paxos[A] =
if c.members.contains(replicaId) then
if participants.contains(replicaId) then
def becomeLeader = c.prepare().copy(members = c.members.updated(replicaId, Some(LastWriterWins.now(value))))

val myNewestProposal = c.prepares.filter(_.proposal.proposer == replicaId).map(_.proposal).maxOption
Expand Down Expand Up @@ -190,8 +190,6 @@ object Paxos:
// there are no prepare messages, do nothing
Paxos.unchanged

override def init[A](members: GrowOnlySet[Uid]): Paxos[A] = Paxos.init(members = members)

override def empty[A]: Paxos[A] = Paxos.unchanged

override def lattice[A]: Lattice[Paxos[A]] = Paxos.lattice
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ class MembershipTest extends munit.FunSuite {
val id1 = LocalUid.gen()
val id2 = LocalUid.gen()
val id3 = LocalUid.gen()
val id4 = LocalUid.gen()

test("Membership happy path") {
given LogHack(false)
var membership = Membership.init[Int, Paxos, Paxos](Set(id1, id2, id3).map(_.uid))
// id1 writes -> prepare
assert(membership.isMember(using id1))
membership = membership.merge(membership.write(1)(using id1))
assert(membership.isMember(using id1))
assert(membership.isMember(using id2))
Expand All @@ -36,4 +38,69 @@ class MembershipTest extends munit.FunSuite {
assertEquals(membership.currentMembers, Set(id1, id2, id3).map(_.uid))
assertEquals(membership.read, List(1))
}

test("Membership with member change") {
given LogHack(false)
var membership = Membership.init[Int, Paxos, Paxos](Set(id1, id2, id3).map(_.uid))
// id1 writes -> prepare
assert(membership.isMember(using id1))
membership = membership.merge(membership.write(1)(using id1))
assert(membership.isMember(using id1))
assert(membership.isMember(using id2))
assert(membership.isMember(using id3))
// id2 vote kicks id1
membership = membership.merge(membership.removeMember(id1.uid)(using id2))
assert(membership.isMember(using id1))
assert(membership.isMember(using id2))
assert(membership.isMember(using id3))
// all upkeep --> promise and enter phase 2
membership = membership
.merge(membership.upkeep()(using id1))
.merge(membership.upkeep()(using id2))
.merge(membership.upkeep()(using id3))
// all upkeep again -> propose value
membership = membership
.merge(membership.upkeep()(using id1))
.merge(membership.upkeep()(using id2))
.merge(membership.upkeep()(using id3))
// all upkeep again -> accept value
membership = membership
.merge(membership.upkeep()(using id1))
.merge(membership.upkeep()(using id2))
.merge(membership.upkeep()(using id3))
assertEquals(membership.currentMembers, Set(id2, id3).map(_.uid))
assertEquals(membership.read, List())
// non-members should not be able to modify consensus
assertEquals(
membership.merge(membership.write(1)(using id1)),
membership
)
assertNotEquals(
membership.merge(membership.write(1)(using id2)),
membership
)
assertNotEquals(
membership.merge(membership.write(1)(using id3)),
membership
)
}

test("Fixed counterexample from suite") {
given LogHack(false)
var membership1 = Membership.init[Int, Paxos, Paxos](Set(id1, id2, id3, id4).map(_.uid))
var membership2 = Membership.init[Int, Paxos, Paxos](Set(id1, id2, id3, id4).map(_.uid))
// id1 writes -> prepare
membership1 = membership1.merge(membership1.write(1)(using id1))
// upkeep on same id
membership1 = membership1.merge(membership1.upkeep()(using id1))
// write on id2
membership2 = membership2.merge(membership2.write(2)(using id2))
// upkeep on id2
membership2 = membership2.merge(membership2.upkeep()(using id2))
// merge id1 and id2
membership1 = membership1.merge(membership2)
membership2 = membership2.merge(membership1)
// -> logs should diverge
assertEquals(membership1.log, membership2.log)
}
}

0 comments on commit 31c8c4f

Please sign in to comment.