diff --git a/beacon/beacon.go b/beacon/beacon.go index 092ce92d91f..fcea789e899 100644 --- a/beacon/beacon.go +++ b/beacon/beacon.go @@ -11,6 +11,7 @@ import ( "github.com/ALTree/bigfloat" "github.com/spacemeshos/fixed" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" "github.com/spacemeshos/go-spacemesh/beacon/metrics" @@ -52,18 +53,6 @@ type ( } ) -type defaultFetcher struct { - cdb *datastore.CachedDB -} - -func (f defaultFetcher) VRFNonce(nodeID types.NodeID, epoch types.EpochID) (types.VRFPostIndex, error) { - nonce, err := f.cdb.VRFNonce(nodeID, epoch) - if err != nil { - return types.VRFPostIndex(0), fmt.Errorf("get vrf nonce: %w", err) - } - return nonce, nil -} - // Opt for configuring beacon protocol. type Opt func(*ProtocolDriver) @@ -94,16 +83,9 @@ func withWeakCoin(wc coin) Opt { } } -func withNonceFetcher(nf nonceFetcher) Opt { - return func(pd *ProtocolDriver) { - pd.nonceFetcher = nf - } -} - // New returns a new ProtocolDriver. func New( publisher pubsub.Publisher, - edSigner *signing.EdSigner, edVerifier *signing.EdVerifier, vrfVerifier vrfVerifier, cdb *datastore.CachedDB, @@ -115,11 +97,12 @@ func New( logger: log.NewNop(), config: DefaultConfig(), publisher: publisher, - edSigner: edSigner, edVerifier: edVerifier, vrfVerifier: vrfVerifier, + nonceFetcher: cdb, cdb: cdb, clock: clock, + signers: make(map[types.NodeID]*signing.EdSigner), beacons: make(map[types.EpochID]types.Beacon), ballotsBeacons: make(map[types.EpochID]map[types.Beacon]*beaconWeight), states: make(map[types.EpochID]*state), @@ -135,12 +118,13 @@ func New( pd.ctx, pd.cancel = context.WithCancel(pd.ctx) pd.theta = new(big.Float).SetRat(pd.config.Theta) - if pd.nonceFetcher == nil { - pd.nonceFetcher = defaultFetcher{cdb: cdb} - } if pd.weakCoin == nil { - pd.weakCoin = weakcoin.New(pd.publisher, edSigner.VRFSigner(), vrfVerifier, pd.nonceFetcher, pd, + pd.weakCoin = weakcoin.New( + pd.publisher, + pd.vrfVerifier, + pd.nonceFetcher, + pd, pd.msgTimes, weakcoin.WithLog(pd.logger.WithName("weakCoin")), weakcoin.WithMaxRound(pd.config.RoundsNumber), @@ -151,6 +135,33 @@ func New( return pd } +func (pd *ProtocolDriver) Register(s *signing.EdSigner) { + pd.mu.Lock() + defer pd.mu.Unlock() + if _, exists := pd.signers[s.NodeID()]; exists { + pd.logger.With().Error("signing key already registered", log.ShortStringer("id", s.NodeID())) + return + } + + pd.logger.With().Info("registered signing key", log.ShortStringer("id", s.NodeID())) + pd.signers[s.NodeID()] = s +} + +type participant struct { + signer *signing.EdSigner + nonce types.VRFPostIndex +} + +func (s *participant) Id() log.Field { + return log.ShortStringer("id", s.signer.NodeID()) +} + +func (s participant) MarshalLogObject(enc zapcore.ObjectEncoder) error { + enc.AddString("id", s.signer.NodeID().ShortString()) + enc.AddUint64("nonce", uint64(s.nonce)) + return nil +} + // ProtocolDriver is the driver for the beacon protocol. type ProtocolDriver struct { inProtocol uint64 @@ -160,14 +171,16 @@ type ProtocolDriver struct { cancel context.CancelFunc startOnce sync.Once - config Config - sync system.SyncStateProvider - publisher pubsub.Publisher - edSigner *signing.EdSigner + config Config + sync system.SyncStateProvider + publisher pubsub.Publisher + + signers map[types.NodeID]*signing.EdSigner + weakCoin coin + edVerifier *signing.EdVerifier vrfVerifier vrfVerifier nonceFetcher nonceFetcher - weakCoin coin theta *big.Float clock layerClock @@ -536,10 +549,9 @@ func (pd *ProtocolDriver) initEpochStateIfNotPresent(logger log.Log, epoch types } var ( - epochWeight uint64 - miners = make(map[types.NodeID]*minerInfo) - active bool - nonce *types.VRFPostIndex + epochWeight uint64 + miners = make(map[types.NodeID]*minerInfo) + potentiallyActive = make(map[types.NodeID]*signing.EdSigner) // w1 is the weight units at δ before the end of the previous epoch, used to calculate `thresholdStrict` // w2 is the weight units at the end of the previous epoch, used to calculate `threshold` w1, w2 int @@ -554,7 +566,7 @@ func (pd *ProtocolDriver) initEpochStateIfNotPresent(logger log.Log, epoch types if !malicious { epochWeight += header.GetWeight() } else { - pd.logger.With().Debug("malicious miner get 0 weight", log.Stringer("smesher", header.NodeID)) + logger.With().Debug("malicious miner get 0 weight", log.Stringer("smesher", header.NodeID)) } if _, ok := miners[header.NodeID]; !ok { miners[header.NodeID] = &minerInfo{ @@ -567,13 +579,14 @@ func (pd *ProtocolDriver) initEpochStateIfNotPresent(logger log.Log, epoch types w2++ } } else { - pd.logger.With().Warning("ignoring malicious atx from miner", + logger.With().Warning("ignoring malicious atx from miner", header.ID, log.Bool("malicious", malicious), log.Stringer("smesher", header.NodeID)) } - if header.NodeID == pd.edSigner.NodeID() { - active = true + + if s, ok := pd.signers[header.NodeID]; ok { + potentiallyActive[header.NodeID] = s } return nil }); err != nil { @@ -581,20 +594,34 @@ func (pd *ProtocolDriver) initEpochStateIfNotPresent(logger log.Log, epoch types } if epochWeight == 0 { - logger.With().Error("zero weight targeting epoch", log.Err(errZeroEpochWeight)) return nil, errZeroEpochWeight } - if active { - nnc, err := pd.nonceFetcher.VRFNonce(pd.edSigner.NodeID(), epoch) - if err != nil { - logger.With().Error("failed to get own VRF nonce", log.Err(err)) - return nil, fmt.Errorf("get own VRF nonce: %w", err) + active := map[types.NodeID]participant{} + for id, signer := range potentiallyActive { + if nonce, err := pd.nonceFetcher.VRFNonce(id, epoch); err != nil { + logger.With().Error("getting own VRF nonce", id, log.Err(err)) + } else { + active[id] = participant{ + signer: signer, + nonce: nonce, + } } - nonce = &nnc } + + logger.With().Info( + "selected active signers", + log.Int("count", len(active)), + log.Array("signers", zapcore.ArrayMarshalerFunc(func(enc zapcore.ArrayEncoder) error { + for _, p := range active { + enc.AppendObject(p) + } + return nil + })), + ) + checker := createProposalChecker(logger, pd.config, w1, w1+w2) - pd.states[epoch] = newState(logger, pd.config, nonce, epochWeight, miners, checker) + pd.states[epoch] = newState(logger, pd.config, active, epochWeight, miners, checker) return pd.states[epoch], nil } @@ -726,7 +753,7 @@ func (pd *ProtocolDriver) onNewEpoch(ctx context.Context, epoch types.EpochID) e func (pd *ProtocolDriver) runProtocol(ctx context.Context, epoch types.EpochID, st *state) { ctx = log.WithNewSessionID(ctx) targetEpoch := epoch + 1 - logger := pd.logger.WithContext(ctx).WithFields(epoch, log.FieldNamed("target_epoch", targetEpoch)) + logger := pd.logger.WithContext(ctx).WithFields(epoch, log.Uint32("target_epoch", uint32(targetEpoch))) pd.setBeginProtocol(ctx) defer pd.setEndProtocol(ctx) @@ -738,7 +765,7 @@ func (pd *ProtocolDriver) runProtocol(ctx context.Context, epoch types.EpochID, logger.With().Warning("proposal phase failed", log.Err(err)) return } - lastRoundOwnVotes, err := pd.runConsensusPhase(ctx, epoch, st.nonce) + lastRoundOwnVotes, err := pd.runConsensusPhase(ctx, epoch, st) if err != nil { logger.With().Warning("consensus phase failed", log.Err(err)) return @@ -761,7 +788,7 @@ func (pd *ProtocolDriver) runProtocol(ctx context.Context, epoch types.EpochID, } func calcBeacon(logger log.Log, set proposalSet) types.Beacon { - allProposals := set.sort() + allProposals := set.sorted() // Beacon should appear to have the same entropy as the initial proposals, hence cropping it // to the same size as the proposal @@ -781,9 +808,10 @@ func (pd *ProtocolDriver) runProposalPhase(ctx context.Context, epoch types.Epoc ctx, cancel := context.WithTimeout(ctx, pd.config.ProposalDuration) defer cancel() - if st.nonce != nil { + for _, session := range st.active { + session := session pd.eg.Go(func() error { - pd.sendProposal(ctx, epoch, *st.nonce, st.proposalChecker) + pd.sendProposal(ctx, epoch, session, st.proposalChecker) return nil }) } @@ -794,45 +822,46 @@ func (pd *ProtocolDriver) runProposalPhase(ctx context.Context, epoch types.Epoc return pd.ctx.Err() } - if err := pd.markProposalPhaseFinished(epoch, time.Now()); err != nil { - return err - } - - logger.Info("beacon proposal phase finished") + finished := time.Now() + pd.markProposalPhaseFinished(st, finished) + logger.With().Info("proposal phase finished", log.Time("finished_at", finished)) return nil } -func (pd *ProtocolDriver) sendProposal(ctx context.Context, epoch types.EpochID, nonce types.VRFPostIndex, checker eligibilityChecker) { +func (pd *ProtocolDriver) sendProposal(ctx context.Context, epoch types.EpochID, s participant, checker eligibilityChecker) { if pd.isClosed() { return } - atx, malicious, err := pd.minerAtxHdr(epoch, pd.edSigner.NodeID()) + atx, malicious, err := pd.minerAtxHdr(epoch, s.signer.NodeID()) if err != nil || malicious { return } logger := pd.logger.WithContext(ctx).WithFields(epoch) - vrfSig := buildSignedProposal(ctx, pd.logger, pd.edSigner.VRFSigner(), epoch, nonce) + vrfSig := buildSignedProposal(ctx, pd.logger, s.signer.VRFSigner(), epoch, s.nonce) proposal := ProposalFromVrf(vrfSig) m := ProposalMessage{ EpochID: epoch, - NodeID: pd.edSigner.NodeID(), + NodeID: s.signer.NodeID(), VRFSignature: vrfSig, } if invalid == pd.classifyProposal(logger, m, atx.Received, time.Now(), checker) { - logger.With().Debug("own proposal doesn't pass threshold", log.Inline(proposal)) + logger.With().Debug("own proposal doesn't pass threshold", log.Inline(proposal), s.Id()) return } - logger.With().Debug("own proposal passes threshold", log.Inline(proposal)) - pd.sendToGossip(ctx, pubsub.BeaconProposalProtocol, codec.MustEncode(&m)) - logger.With().Info("beacon proposal sent", log.Inline(proposal)) + logger.With().Debug("own proposal passes threshold", log.Inline(proposal), s.Id()) + if err := pd.sendToGossip(ctx, pubsub.BeaconProposalProtocol, codec.MustEncode(&m)); err != nil { + logger.With().Error("failed to broadcast", log.Err(err), log.Inline(proposal), s.Id()) + } else { + logger.With().Info("beacon proposal sent", log.Inline(proposal), s.Id()) + } } // runConsensusPhase runs K voting rounds and returns result from last weak coin round. -func (pd *ProtocolDriver) runConsensusPhase(ctx context.Context, epoch types.EpochID, nonce *types.VRFPostIndex) (allVotes, error) { +func (pd *ProtocolDriver) runConsensusPhase(ctx context.Context, epoch types.EpochID, st *state) (allVotes, error) { logger := pd.logger.WithContext(ctx).WithFields(epoch) logger.Info("starting consensus phase") @@ -846,23 +875,48 @@ func (pd *ProtocolDriver) runConsensusPhase(ctx context.Context, epoch types.Epo var ( ownVotes allVotes undecided proposalList - err error ) - for round := types.FirstRound; round < pd.config.RoundsNumber; round++ { + + // First round + round := types.FirstRound + pd.setRoundInProgress(round) + pd.mu.RLock() // shared lock is fine as sorting doesn't modify the state + msg := FirstVotingMessageBody{ + EpochID: epoch, + ValidProposals: st.incomingProposals.valid.sorted(), + PotentiallyValidProposals: st.incomingProposals.potentiallyValid.sorted(), + } + pd.mu.RUnlock() + for _, session := range st.active { + session := session + pd.eg.Go(func() error { + if err := pd.sendFirstRoundVote(ctx, msg, session.signer); err != nil { + logger.With().Error("failed to send proposal vote", log.Err(err), session.Id(), round) + } + return nil + }) + } + + select { + case <-timer.C: + case <-ctx.Done(): + return allVotes{}, fmt.Errorf("context done: %w", ctx.Err()) + } + ownVotes, _ = pd.calcVotesBeforeWeakCoin(logger, st) + + // Subsequent rounds + for round := types.FirstRound + 1; round < pd.config.RoundsNumber; round++ { round := round pd.setRoundInProgress(round) rLogger := logger.WithFields(round) + timer.Reset(pd.config.VotingRoundDuration) + votes := ownVotes - if nonce != nil { + for _, session := range st.active { + session := session pd.eg.Go(func() error { - if round == types.FirstRound { - if err := pd.sendFirstRoundVote(ctx, epoch); err != nil { - rLogger.With().Error("failed to send proposal vote", log.Err(err)) - } - } else { - if err := pd.sendFollowingVote(ctx, epoch, round, votes); err != nil { - rLogger.With().Error("failed to send following vote", log.Err(err)) - } + if err := pd.sendFollowingVote(ctx, epoch, round, votes, session.signer); err != nil { + rLogger.With().Error("failed to send following vote", log.Err(err), session.Id()) } return nil }) @@ -874,91 +928,67 @@ func (pd *ProtocolDriver) runConsensusPhase(ctx context.Context, epoch types.Epo return allVotes{}, fmt.Errorf("context done: %w", ctx.Err()) } - // note that votes after this calcVotes() call will _not_ be counted towards our votes + // note that votes after this call will _not_ be counted towards our votes // for this round, as the late votes can be cast after the weak coin is revealed. we // count them towards our votes in the next round. - ownVotes, undecided, err = pd.calcVotesBeforeWeakCoin(rLogger, epoch) + ownVotes, undecided = pd.calcVotesBeforeWeakCoin(rLogger, st) + + timer.Reset(pd.config.WeakCoinRoundDuration) + + pd.eg.Go(func() error { + participants := make([]weakcoin.Participant, 0, len(st.active)) + for _, session := range st.active { + participants = append(participants, weakcoin.Participant{ + Signer: session.signer.VRFSigner(), + Nonce: session.nonce, + }) + } + pd.weakCoin.StartRound(ctx, round, participants) + return nil + }) + + select { + case <-timer.C: + case <-ctx.Done(): + return allVotes{}, fmt.Errorf("context done: %w", ctx.Err()) + } + + pd.weakCoin.FinishRound(ctx) + + flip, err := pd.weakCoin.Get(ctx, epoch, round) if err != nil { + rLogger.With().Error("failed to generate weak coin", log.Err(err)) return allVotes{}, err } - if round != types.FirstRound { - timer.Reset(pd.config.WeakCoinRoundDuration) - pd.eg.Go(func() error { - pd.weakCoin.StartRound(ctx, round, nonce) - return nil - }) - select { - case <-timer.C: - case <-ctx.Done(): - return allVotes{}, fmt.Errorf("context done: %w", ctx.Err()) - } - pd.weakCoin.FinishRound(ctx) - flip, err := pd.weakCoin.Get(ctx, epoch, round) - if err != nil { - rLogger.With().Error("failed to generate weak coin", log.Err(err)) - return allVotes{}, err - } - tallyUndecided(&ownVotes, undecided, flip) - } - timer.Reset(pd.config.VotingRoundDuration) + tallyUndecided(&ownVotes, undecided, flip) } logger.Info("consensus phase finished") return ownVotes, nil } -func (pd *ProtocolDriver) markProposalPhaseFinished(epoch types.EpochID, finishedAt time.Time) error { - pd.logger.With().Debug("proposal phase finished", epoch, log.Time("finished_at", finishedAt)) +func (pd *ProtocolDriver) markProposalPhaseFinished(st *state, finishedAt time.Time) { pd.mu.Lock() defer pd.mu.Unlock() - if _, ok := pd.states[epoch]; !ok { - return errEpochNotActive - } - pd.states[epoch].proposalPhaseFinishedTime = finishedAt - return nil + st.proposalPhaseFinishedTime = finishedAt } -func (pd *ProtocolDriver) calcVotesBeforeWeakCoin(logger log.Log, epoch types.EpochID) (allVotes, proposalList, error) { +func (pd *ProtocolDriver) calcVotesBeforeWeakCoin(logger log.Log, st *state) (allVotes, proposalList) { pd.mu.RLock() defer pd.mu.RUnlock() - if _, ok := pd.states[epoch]; !ok { - return allVotes{}, nil, errEpochNotActive - } - decided, undecided := calcVotes(logger, pd.theta, pd.states[epoch]) - return decided, undecided, nil -} - -func (pd *ProtocolDriver) genFirstRoundMsgBody(epoch types.EpochID) (FirstVotingMessageBody, error) { - pd.mu.RLock() - defer pd.mu.RUnlock() - - if _, ok := pd.states[epoch]; !ok { - return FirstVotingMessageBody{}, errEpochNotActive - } - s := pd.states[epoch] - return FirstVotingMessageBody{ - EpochID: epoch, - ValidProposals: s.incomingProposals.valid.sort(), - PotentiallyValidProposals: s.incomingProposals.potentiallyValid.sort(), - }, nil + return calcVotes(logger, pd.theta, st) } -func (pd *ProtocolDriver) sendFirstRoundVote(ctx context.Context, epoch types.EpochID) error { - mb, err := pd.genFirstRoundMsgBody(epoch) - if err != nil { - return fmt.Errorf("getting first round message: %w", err) - } - +func (pd *ProtocolDriver) sendFirstRoundVote(ctx context.Context, msg FirstVotingMessageBody, signer *signing.EdSigner) error { m := FirstVotingMessage{ - FirstVotingMessageBody: mb, - SmesherID: pd.edSigner.NodeID(), - Signature: pd.edSigner.Sign(signing.BEACON_FIRST_MSG, codec.MustEncode(&mb)), + FirstVotingMessageBody: msg, + SmesherID: signer.NodeID(), + Signature: signer.Sign(signing.BEACON_FIRST_MSG, codec.MustEncode(&msg)), } - pd.logger.WithContext(ctx).With().Debug("sending first round vote", epoch, types.FirstRound) - pd.sendToGossip(ctx, pubsub.BeaconFirstVotesProtocol, codec.MustEncode(&m)) - return nil + pd.logger.WithContext(ctx).With().Debug("sending first round vote", msg.EpochID, types.FirstRound, log.ShortStringer("id", signer.NodeID())) + return pd.sendToGossip(ctx, pubsub.BeaconFirstVotesProtocol, codec.MustEncode(&m)) } func (pd *ProtocolDriver) getFirstRoundVote(epoch types.EpochID, nodeID types.NodeID) (proposalList, error) { @@ -973,10 +1003,10 @@ func (pd *ProtocolDriver) getFirstRoundVote(epoch types.EpochID, nodeID types.No return st.getMinerFirstRoundVote(nodeID) } -func (pd *ProtocolDriver) sendFollowingVote(ctx context.Context, epoch types.EpochID, round types.RoundID, ownCurrentRoundVotes allVotes) error { - firstRoundVotes, err := pd.getFirstRoundVote(epoch, pd.edSigner.NodeID()) +func (pd *ProtocolDriver) sendFollowingVote(ctx context.Context, epoch types.EpochID, round types.RoundID, ownCurrentRoundVotes allVotes, signer *signing.EdSigner) error { + firstRoundVotes, err := pd.getFirstRoundVote(epoch, signer.NodeID()) if err != nil { - return fmt.Errorf("get own first round votes %s: %w", pd.edSigner.NodeID(), err) + return fmt.Errorf("get own first round votes %s: %w", signer.NodeID(), err) } bitVector := encodeVotes(ownCurrentRoundVotes, firstRoundVotes) @@ -988,13 +1018,12 @@ func (pd *ProtocolDriver) sendFollowingVote(ctx context.Context, epoch types.Epo m := FollowingVotingMessage{ FollowingVotingMessageBody: mb, - SmesherID: pd.edSigner.NodeID(), - Signature: pd.edSigner.Sign(signing.BEACON_FOLLOWUP_MSG, codec.MustEncode(&mb)), + SmesherID: signer.NodeID(), + Signature: signer.Sign(signing.BEACON_FOLLOWUP_MSG, codec.MustEncode(&mb)), } - pd.logger.WithContext(ctx).With().Debug("sending following round vote", epoch, round) - pd.sendToGossip(ctx, pubsub.BeaconFollowingVotesProtocol, codec.MustEncode(&m)) - return nil + pd.logger.WithContext(ctx).With().Debug("sending following round vote", epoch, round, log.ShortStringer("id", signer.NodeID())) + return pd.sendToGossip(ctx, pubsub.BeaconFollowingVotesProtocol, codec.MustEncode(&m)) } type proposalChecker struct { @@ -1086,7 +1115,7 @@ func buildSignedProposal(ctx context.Context, logger log.Log, signer vrfSigner, p := buildProposal(logger, epoch, nonce) vrfSig := signer.Sign(p) proposal := ProposalFromVrf(vrfSig) - logger.WithContext(ctx).With().Debug("calculated beacon proposal", epoch, nonce, log.Inline(proposal)) + logger.WithContext(ctx).With().Debug("calculated beacon proposal", epoch, nonce, log.Inline(proposal), log.ShortStringer("id", signer.NodeID())) return vrfSig } @@ -1099,14 +1128,11 @@ func buildProposal(logger log.Log, epoch types.EpochID, nonce types.VRFPostIndex return codec.MustEncode(message) } -func (pd *ProtocolDriver) sendToGossip(ctx context.Context, protocol string, serialized []byte) { - // NOTE(dshulyak) moved to goroutine because self-broadcast is applied synchronously - pd.eg.Go(func() error { - if err := pd.publisher.Publish(ctx, protocol, serialized); err != nil { - pd.logger.With().Error("failed to broadcast", log.String("protocol", protocol), log.Err(err)) - } - return nil - }) +func (pd *ProtocolDriver) sendToGossip(ctx context.Context, protocol string, serialized []byte) error { + if err := pd.publisher.Publish(ctx, protocol, serialized); err != nil { + return fmt.Errorf("publishing on protocol %s: %w", protocol, err) + } + return nil } func (pd *ProtocolDriver) gatherMetricsData() ([]*metrics.BeaconStats, *metrics.BeaconStats) { diff --git a/beacon/beacon_test.go b/beacon/beacon_test.go index bde3738eb7b..cb26ba80aa4 100644 --- a/beacon/beacon_test.go +++ b/beacon/beacon_test.go @@ -17,6 +17,7 @@ import ( "go.uber.org/mock/gomock" "github.com/spacemeshos/go-spacemesh/activation" + "github.com/spacemeshos/go-spacemesh/beacon/weakcoin" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/common/types/result" "github.com/spacemeshos/go-spacemesh/datastore" @@ -31,10 +32,6 @@ import ( "github.com/spacemeshos/go-spacemesh/system/mocks" ) -const ( - numATXs = 10 -) - func coinValueMock(tb testing.TB, value bool) coin { ctrl := gomock.NewController(tb) coinMock := NewMockcoin(ctrl) @@ -43,10 +40,9 @@ func coinValueMock(tb testing.TB, value bool) coin { gomock.AssignableToTypeOf(types.EpochID(0)), ).AnyTimes() coinMock.EXPECT().FinishEpoch(gomock.Any(), gomock.AssignableToTypeOf(types.EpochID(0))).AnyTimes() - nonce := types.VRFPostIndex(0) coinMock.EXPECT().StartRound(gomock.Any(), gomock.AssignableToTypeOf(types.RoundID(0)), - gomock.AssignableToTypeOf(&nonce), + gomock.AssignableToTypeOf([]weakcoin.Participant{}), ).AnyTimes() coinMock.EXPECT().FinishRound(gomock.Any()).AnyTimes() coinMock.EXPECT().Get( @@ -71,42 +67,41 @@ func newPublisher(tb testing.TB) pubsub.Publisher { type testProtocolDriver struct { *ProtocolDriver - ctrl *gomock.Controller - cdb *datastore.CachedDB - mClock *MocklayerClock - mSync *mocks.MockSyncStateProvider - mVerifier *MockvrfVerifier - mNonceFetcher *MocknonceFetcher + ctrl *gomock.Controller + cdb *datastore.CachedDB + mClock *MocklayerClock + mSync *mocks.MockSyncStateProvider + mVerifier *MockvrfVerifier } func setUpProtocolDriver(tb testing.TB) *testProtocolDriver { - return newTestDriver(tb, UnitTestConfig(), newPublisher(tb)) + return newTestDriver(tb, UnitTestConfig(), newPublisher(tb), 3, "") } -func newTestDriver(tb testing.TB, cfg Config, p pubsub.Publisher) *testProtocolDriver { +func newTestDriver(tb testing.TB, cfg Config, p pubsub.Publisher, miners int, id string) *testProtocolDriver { ctrl := gomock.NewController(tb) tpd := &testProtocolDriver{ - ctrl: ctrl, - mClock: NewMocklayerClock(ctrl), - mSync: mocks.NewMockSyncStateProvider(ctrl), - mVerifier: NewMockvrfVerifier(ctrl), - mNonceFetcher: NewMocknonceFetcher(ctrl), + ctrl: ctrl, + mClock: NewMocklayerClock(ctrl), + mSync: mocks.NewMockSyncStateProvider(ctrl), + mVerifier: NewMockvrfVerifier(ctrl), } - edSgn, err := signing.NewEdSigner() - require.NoError(tb, err) - lg := logtest.New(tb) + lg := logtest.New(tb).Named(id) tpd.mVerifier.EXPECT().Verify(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(true) - tpd.mNonceFetcher.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).AnyTimes().Return(types.VRFPostIndex(1), nil) tpd.cdb = datastore.NewCachedDB(sql.InMemory(), lg) - tpd.ProtocolDriver = New(p, edSgn, signing.NewEdVerifier(), tpd.mVerifier, tpd.cdb, tpd.mClock, + tpd.ProtocolDriver = New(p, signing.NewEdVerifier(), tpd.mVerifier, tpd.cdb, tpd.mClock, WithConfig(cfg), WithLogger(lg), withWeakCoin(coinValueMock(tb, true)), - withNonceFetcher(tpd.mNonceFetcher), ) tpd.ProtocolDriver.SetSyncState(tpd.mSync) + for i := 0; i < miners; i++ { + edSgn, err := signing.NewEdSigner() + require.NoError(tb, err) + tpd.ProtocolDriver.Register(edSgn) + } return tpd } @@ -148,18 +143,20 @@ func TestMain(m *testing.M) { func TestBeacon_MultipleNodes(t *testing.T) { numNodes := 5 + numMinersPerNode := 7 testNodes := make([]*testProtocolDriver, 0, numNodes) publisher := pubsubmocks.NewMockPublisher(gomock.NewController(t)) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, protocol string, data []byte) error { - for _, node := range testNodes { + for i, node := range testNodes { + peer := p2p.Peer(fmt.Sprint(i)) switch protocol { case pubsub.BeaconProposalProtocol: - require.NoError(t, node.HandleProposal(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleProposal(ctx, peer, data)) case pubsub.BeaconFirstVotesProtocol: - require.NoError(t, node.HandleFirstVotes(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleFirstVotes(ctx, peer, data)) case pubsub.BeaconFollowingVotesProtocol: - require.NoError(t, node.HandleFollowingVotes(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleFollowingVotes(ctx, peer, data)) case pubsub.BeaconWeakCoinProtocol: } } @@ -173,7 +170,7 @@ func TestBeacon_MultipleNodes(t *testing.T) { bootstrap := types.Beacon{1, 2, 3, 4} now := time.Now() for i := 0; i < numNodes; i++ { - node := newTestDriver(t, cfg, publisher) + node := newTestDriver(t, cfg, publisher, numMinersPerNode, fmt.Sprintf("node-%d", i)) require.NoError(t, node.UpdateBeacon(types.EpochID(2), bootstrap)) node.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).AnyTimes() node.mClock.EXPECT().CurrentLayer().Return(current).AnyTimes() @@ -192,8 +189,11 @@ func TestBeacon_MultipleNodes(t *testing.T) { // make the first node non-smeshing node continue } + for _, db := range dbs { - createATX(t, db, atxPublishLid, node.edSigner, 1, time.Now().Add(-1*time.Second)) + for _, s := range node.signers { + createATX(t, db, atxPublishLid, s, 1, time.Now().Add(-1*time.Second)) + } } } var wg sync.WaitGroup @@ -221,14 +221,14 @@ func TestBeacon_MultipleNodes_OnlyOneHonest(t *testing.T) { publisher := pubsubmocks.NewMockPublisher(gomock.NewController(t)) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, protocol string, data []byte) error { - for _, node := range testNodes { + for i, node := range testNodes { switch protocol { case pubsub.BeaconProposalProtocol: - require.NoError(t, node.HandleProposal(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleProposal(ctx, p2p.Peer(fmt.Sprint(i)), data)) case pubsub.BeaconFirstVotesProtocol: - require.NoError(t, node.HandleFirstVotes(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleFirstVotes(ctx, p2p.Peer(fmt.Sprint(i)), data)) case pubsub.BeaconFollowingVotesProtocol: - require.NoError(t, node.HandleFollowingVotes(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleFollowingVotes(ctx, p2p.Peer(fmt.Sprint(i)), data)) case pubsub.BeaconWeakCoinProtocol: } } @@ -242,7 +242,7 @@ func TestBeacon_MultipleNodes_OnlyOneHonest(t *testing.T) { bootstrap := types.Beacon{1, 2, 3, 4} now := time.Now() for i := 0; i < numNodes; i++ { - node := newTestDriver(t, cfg, publisher) + node := newTestDriver(t, cfg, publisher, 3, fmt.Sprintf("node-%d", i)) require.NoError(t, node.UpdateBeacon(types.EpochID(2), bootstrap)) node.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).AnyTimes() node.mClock.EXPECT().CurrentLayer().Return(current).AnyTimes() @@ -258,9 +258,11 @@ func TestBeacon_MultipleNodes_OnlyOneHonest(t *testing.T) { } for i, node := range testNodes { for _, db := range dbs { - createATX(t, db, atxPublishLid, node.edSigner, 1, time.Now().Add(-1*time.Second)) - if i != 0 { - require.NoError(t, identities.SetMalicious(db, node.edSigner.NodeID(), []byte("bad"), time.Now())) + for _, s := range node.signers { + createATX(t, db, atxPublishLid, s, 1, time.Now().Add(-1*time.Second)) + if i != 0 { + require.NoError(t, identities.SetMalicious(db, s.NodeID(), []byte("bad"), time.Now())) + } } } } @@ -296,7 +298,7 @@ func TestBeacon_NoProposals(t *testing.T) { now := time.Now() bootstrap := types.Beacon{1, 2, 3, 4} for i := 0; i < numNodes; i++ { - node := newTestDriver(t, cfg, publisher) + node := newTestDriver(t, cfg, publisher, 3, fmt.Sprintf("node-%d", i)) require.NoError(t, node.UpdateBeacon(types.EpochID(2), bootstrap)) node.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).AnyTimes() node.mClock.EXPECT().CurrentLayer().Return(current).AnyTimes() @@ -312,7 +314,9 @@ func TestBeacon_NoProposals(t *testing.T) { } for _, node := range testNodes { for _, db := range dbs { - createATX(t, db, atxPublishLid, node.edSigner, 1, time.Now().Add(-1*time.Second)) + for _, s := range node.signers { + createATX(t, db, atxPublishLid, s, 1, time.Now().Add(-1*time.Second)) + } } } var wg sync.WaitGroup @@ -404,8 +408,10 @@ func TestBeaconWithMetrics(t *testing.T) { epoch := types.EpochID(3) for i := types.EpochID(2); i < epoch; i++ { lid := i.FirstLayer().Sub(1) - createATX(t, tpd.cdb, lid, tpd.edSigner, 199, time.Now()) - createRandomATXs(t, tpd.cdb, lid, numATXs-1) + for _, s := range tpd.signers { + createATX(t, tpd.cdb, lid, s, 199, time.Now()) + } + createRandomATXs(t, tpd.cdb, lid, 9) } finalLayer := types.LayerID(types.GetLayersPerEpoch() * uint32(epoch)) beacon1 := types.RandomBeacon() diff --git a/beacon/handlers.go b/beacon/handlers.go index 1a7a5f413b5..6663b1edd63 100644 --- a/beacon/handlers.go +++ b/beacon/handlers.go @@ -38,11 +38,10 @@ var ( ) // HandleWeakCoinProposal handles weakcoin proposal from gossip. -func (pd *ProtocolDriver) HandleWeakCoinProposal(ctx context.Context, peer p2p.Peer, msg []byte) error { +func (pd *ProtocolDriver) HandleWeakCoinProposal(ctx context.Context, peer p2p.Peer, msg []byte) (err error) { if !pd.isInProtocol() { return errBeaconProtocolInactive } - return pd.weakCoin.HandleProposal(ctx, peer, msg) } @@ -229,8 +228,9 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m currentEpoch := pd.currentEpoch() if m.EpochID != currentEpoch { logger.With().Debug("first votes from different epoch", - log.Uint32("current_epoch", uint32(currentEpoch)), - log.Uint32("message_epoch", uint32(m.EpochID))) + log.Uint32("current_epoch", uint32((currentEpoch))), + log.Uint32("message_epoch", uint32((m.EpochID))), + ) return errEpochNotActive } diff --git a/beacon/handlers_test.go b/beacon/handlers_test.go index 795cf764f72..55ecc7de3eb 100644 --- a/beacon/handlers_test.go +++ b/beacon/handlers_test.go @@ -42,11 +42,12 @@ func createProtocolDriverWithFirstRoundVotes( return tpd, plist } -func createEpochState(tb testing.TB, pd *ProtocolDriver, epoch types.EpochID, minerAtxs map[types.NodeID]*minerInfo, checker eligibilityChecker) { +func createEpochState(tb testing.TB, pd *ProtocolDriver, epoch types.EpochID, minerAtxs map[types.NodeID]*minerInfo, checker eligibilityChecker) *state { tb.Helper() pd.mu.Lock() defer pd.mu.Unlock() pd.states[epoch] = newState(pd.logger, pd.config, nil, epochWeight, minerAtxs, checker) + return pd.states[epoch] } func setOwnFirstRoundVotes(t *testing.T, pd *ProtocolDriver, epoch types.EpochID, ownFirstRound proposalList) { @@ -225,7 +226,7 @@ func Test_HandleProposal_Success(t *testing.T) { signer1.NodeID(): {atxid: createATX(t, tpd.cdb, epoch.FirstLayer().Sub(1), signer1, 10, epochStart.Add(-1*time.Minute))}, signer2.NodeID(): {atxid: createATX(t, tpd.cdb, epoch.FirstLayer().Sub(1), signer2, 10, epochStart.Add(-2*time.Minute))}, } - createEpochState(t, tpd.ProtocolDriver, epoch, minerAtxs, mockChecker) + st := createEpochState(t, tpd.ProtocolDriver, epoch, minerAtxs, mockChecker) tpd.mClock.EXPECT().LayerToTime(epoch.FirstLayer()).Return(epochStart).AnyTimes() msg1 := createProposal(t, vrfSigner1, epoch, false) @@ -236,7 +237,7 @@ func Test_HandleProposal_Success(t *testing.T) { mockChecker.EXPECT().PassStrictThreshold(gomock.Any()).Return(true) require.NoError(t, tpd.HandleProposal(context.Background(), "peerID", msgBytes1)) - require.NoError(t, tpd.markProposalPhaseFinished(epoch, time.Now().Add(-20*time.Millisecond))) + tpd.markProposalPhaseFinished(st, time.Now().Add(-20*time.Millisecond)) msg2 := createProposal(t, vrfSigner2, epoch, false) msgBytes2, err := codec.Encode(msg2) @@ -278,7 +279,7 @@ func Test_HandleProposal_Malicious(t *testing.T) { signer1.NodeID(): {atxid: createATX(t, tpd.cdb, epoch.FirstLayer().Sub(1), signer1, 10, epochStart.Add(-2*time.Minute)), malicious: true}, signer2.NodeID(): {atxid: createATX(t, tpd.cdb, epoch.FirstLayer().Sub(1), signer2, 10, epochStart.Add(-1*time.Minute))}, } - createEpochState(t, tpd.ProtocolDriver, epoch, minerAtxs, mockChecker) + st := createEpochState(t, tpd.ProtocolDriver, epoch, minerAtxs, mockChecker) tpd.mClock.EXPECT().LayerToTime(epoch.FirstLayer()).Return(epochStart).AnyTimes() msg1 := createProposal(t, vrfSigner1, epoch, false) @@ -289,7 +290,7 @@ func Test_HandleProposal_Malicious(t *testing.T) { mockChecker.EXPECT().PassStrictThreshold(gomock.Any()).Return(true) require.NoError(t, tpd.HandleProposal(context.Background(), "peerID", msgBytes1)) - require.NoError(t, tpd.markProposalPhaseFinished(epoch, time.Now().Add(-20*time.Millisecond))) + tpd.markProposalPhaseFinished(st, time.Now().Add(-20*time.Millisecond)) msg2 := createProposal(t, vrfSigner2, epoch, false) msgBytes2, err := codec.Encode(msg2) diff --git a/beacon/interface.go b/beacon/interface.go index 3c0ac20c40e..4d751a09c5d 100644 --- a/beacon/interface.go +++ b/beacon/interface.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/spacemeshos/go-spacemesh/beacon/weakcoin" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/p2p" ) @@ -12,7 +13,7 @@ import ( type coin interface { StartEpoch(context.Context, types.EpochID) - StartRound(context.Context, types.RoundID, *types.VRFPostIndex) + StartRound(context.Context, types.RoundID, []weakcoin.Participant) FinishRound(context.Context) Get(context.Context, types.EpochID, types.RoundID) (bool, error) FinishEpoch(context.Context, types.EpochID) @@ -33,7 +34,6 @@ type layerClock interface { type vrfSigner interface { Sign(msg []byte) types.VrfSignature NodeID() types.NodeID - LittleEndian() bool } type vrfVerifier interface { diff --git a/beacon/mocks.go b/beacon/mocks.go index 9d62418cf4b..39fb83e353f 100644 --- a/beacon/mocks.go +++ b/beacon/mocks.go @@ -13,6 +13,7 @@ import ( reflect "reflect" time "time" + weakcoin "github.com/spacemeshos/go-spacemesh/beacon/weakcoin" types "github.com/spacemeshos/go-spacemesh/common/types" p2p "github.com/spacemeshos/go-spacemesh/p2p" gomock "go.uber.org/mock/gomock" @@ -227,7 +228,7 @@ func (c *coinStartEpochCall) DoAndReturn(f func(context.Context, types.EpochID)) } // StartRound mocks base method. -func (m *Mockcoin) StartRound(arg0 context.Context, arg1 types.RoundID, arg2 *types.VRFPostIndex) { +func (m *Mockcoin) StartRound(arg0 context.Context, arg1 types.RoundID, arg2 []weakcoin.Participant) { m.ctrl.T.Helper() m.ctrl.Call(m, "StartRound", arg0, arg1, arg2) } @@ -251,13 +252,13 @@ func (c *coinStartRoundCall) Return() *coinStartRoundCall { } // Do rewrite *gomock.Call.Do -func (c *coinStartRoundCall) Do(f func(context.Context, types.RoundID, *types.VRFPostIndex)) *coinStartRoundCall { +func (c *coinStartRoundCall) Do(f func(context.Context, types.RoundID, []weakcoin.Participant)) *coinStartRoundCall { c.Call = c.Call.Do(f) return c } // DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *coinStartRoundCall) DoAndReturn(f func(context.Context, types.RoundID, *types.VRFPostIndex)) *coinStartRoundCall { +func (c *coinStartRoundCall) DoAndReturn(f func(context.Context, types.RoundID, []weakcoin.Participant)) *coinStartRoundCall { c.Call = c.Call.DoAndReturn(f) return c } @@ -521,44 +522,6 @@ func (m *MockvrfSigner) EXPECT() *MockvrfSignerMockRecorder { return m.recorder } -// LittleEndian mocks base method. -func (m *MockvrfSigner) LittleEndian() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LittleEndian") - ret0, _ := ret[0].(bool) - return ret0 -} - -// LittleEndian indicates an expected call of LittleEndian. -func (mr *MockvrfSignerMockRecorder) LittleEndian() *vrfSignerLittleEndianCall { - mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LittleEndian", reflect.TypeOf((*MockvrfSigner)(nil).LittleEndian)) - return &vrfSignerLittleEndianCall{Call: call} -} - -// vrfSignerLittleEndianCall wrap *gomock.Call -type vrfSignerLittleEndianCall struct { - *gomock.Call -} - -// Return rewrite *gomock.Call.Return -func (c *vrfSignerLittleEndianCall) Return(arg0 bool) *vrfSignerLittleEndianCall { - c.Call = c.Call.Return(arg0) - return c -} - -// Do rewrite *gomock.Call.Do -func (c *vrfSignerLittleEndianCall) Do(f func() bool) *vrfSignerLittleEndianCall { - c.Call = c.Call.Do(f) - return c -} - -// DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *vrfSignerLittleEndianCall) DoAndReturn(f func() bool) *vrfSignerLittleEndianCall { - c.Call = c.Call.DoAndReturn(f) - return c -} - // NodeID mocks base method. func (m *MockvrfSigner) NodeID() types.NodeID { m.ctrl.T.Helper() diff --git a/beacon/proposal_set.go b/beacon/proposal_set.go index 55c1fea2fe5..5bf367abe83 100644 --- a/beacon/proposal_set.go +++ b/beacon/proposal_set.go @@ -9,8 +9,8 @@ import ( type proposalSet map[Proposal]struct{} -func (vs proposalSet) sort() proposalList { - return proposalList(maps.Keys(vs)).sort() +func (p proposalSet) sorted() proposalList { + return proposalList(maps.Keys(p)).sort() } func (p proposalSet) MarshalLogArray(enc zapcore.ArrayEncoder) error { diff --git a/beacon/state.go b/beacon/state.go index 45449349507..9295210b966 100644 --- a/beacon/state.go +++ b/beacon/state.go @@ -19,7 +19,7 @@ type minerInfo struct { type state struct { cfg Config logger log.Log - nonce *types.VRFPostIndex + active map[types.NodeID]participant epochWeight uint64 // the original proposals as received, bucketed by validity. incomingProposals proposals @@ -38,7 +38,7 @@ type state struct { func newState( logger log.Log, cfg Config, - nonce *types.VRFPostIndex, + active map[types.NodeID]participant, epochWeight uint64, miners map[types.NodeID]*minerInfo, checker eligibilityChecker, @@ -47,7 +47,7 @@ func newState( cfg: cfg, logger: logger, epochWeight: epochWeight, - nonce: nonce, + active: active, minerAtxs: miners, firstRoundIncomingVotes: make(map[types.NodeID]proposalList), votesMargin: map[Proposal]*big.Int{}, @@ -101,8 +101,6 @@ func (s *state) addVote(proposal Proposal, vote uint, voteWeight *big.Int) { func (s *state) registerProposed(logger log.Log, nodeID types.NodeID) error { if _, ok := s.hasProposed[nodeID]; ok { - // see TODOs for registerVoted() - logger.Warning("already received proposal from miner") return fmt.Errorf("already made proposal (miner ID %v): %w", nodeID.ShortString(), errAlreadyProposed) } diff --git a/beacon/weakcoin/interface.go b/beacon/weakcoin/interface.go index 92a2b14ac6c..7b7164bfd0d 100644 --- a/beacon/weakcoin/interface.go +++ b/beacon/weakcoin/interface.go @@ -9,7 +9,6 @@ import ( type vrfSigner interface { Sign(msg []byte) types.VrfSignature NodeID() types.NodeID - LittleEndian() bool } type vrfVerifier interface { diff --git a/beacon/weakcoin/mocks.go b/beacon/weakcoin/mocks.go index 1bc89bc68f1..59b6fed82bf 100644 --- a/beacon/weakcoin/mocks.go +++ b/beacon/weakcoin/mocks.go @@ -38,44 +38,6 @@ func (m *MockvrfSigner) EXPECT() *MockvrfSignerMockRecorder { return m.recorder } -// LittleEndian mocks base method. -func (m *MockvrfSigner) LittleEndian() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LittleEndian") - ret0, _ := ret[0].(bool) - return ret0 -} - -// LittleEndian indicates an expected call of LittleEndian. -func (mr *MockvrfSignerMockRecorder) LittleEndian() *vrfSignerLittleEndianCall { - mr.mock.ctrl.T.Helper() - call := mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LittleEndian", reflect.TypeOf((*MockvrfSigner)(nil).LittleEndian)) - return &vrfSignerLittleEndianCall{Call: call} -} - -// vrfSignerLittleEndianCall wrap *gomock.Call -type vrfSignerLittleEndianCall struct { - *gomock.Call -} - -// Return rewrite *gomock.Call.Return -func (c *vrfSignerLittleEndianCall) Return(arg0 bool) *vrfSignerLittleEndianCall { - c.Call = c.Call.Return(arg0) - return c -} - -// Do rewrite *gomock.Call.Do -func (c *vrfSignerLittleEndianCall) Do(f func() bool) *vrfSignerLittleEndianCall { - c.Call = c.Call.Do(f) - return c -} - -// DoAndReturn rewrite *gomock.Call.DoAndReturn -func (c *vrfSignerLittleEndianCall) DoAndReturn(f func() bool) *vrfSignerLittleEndianCall { - c.Call = c.Call.DoAndReturn(f) - return c -} - // NodeID mocks base method. func (m *MockvrfSigner) NodeID() types.NodeID { m.ctrl.T.Helper() diff --git a/beacon/weakcoin/weak_coin.go b/beacon/weakcoin/weak_coin.go index 22f891819d6..d0da52e5a7a 100644 --- a/beacon/weakcoin/weak_coin.go +++ b/beacon/weakcoin/weak_coin.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "golang.org/x/sync/errgroup" + "github.com/spacemeshos/go-spacemesh/codec" "github.com/spacemeshos/go-spacemesh/common/types" "github.com/spacemeshos/go-spacemesh/log" @@ -107,7 +109,6 @@ type messageTime interface { // New creates an instance of weak coin protocol. func New( publisher pubsub.Publisher, - signer vrfSigner, verifier vrfVerifier, nonceFetcher nonceFetcher, allowance allowance, @@ -117,7 +118,6 @@ func New( wc := &WeakCoin{ logger: log.NewNop(), config: defaultConfig(), - signer: signer, nonceFetcher: nonceFetcher, allowance: allowance, publisher: publisher, @@ -138,7 +138,6 @@ type WeakCoin struct { logger log.Log config config verifier vrfVerifier - signer vrfSigner nonceFetcher nonceFetcher publisher pubsub.Publisher @@ -193,7 +192,7 @@ func (wc *WeakCoin) FinishEpoch(ctx context.Context, epoch types.EpochID) { if epoch != wc.epoch { logger.With().Fatal("attempted to finish beacon weak coin for the wrong epoch", epoch, - log.Stringer("weak_coin_epoch", wc.epoch), + log.Uint32("weak_coin_epoch", uint32(wc.epoch)), ) } wc.epochStarted = false @@ -202,8 +201,13 @@ func (wc *WeakCoin) FinishEpoch(ctx context.Context, epoch types.EpochID) { logger.Info("weak coin finished epoch") } +type Participant struct { + Signer vrfSigner + Nonce types.VRFPostIndex +} + // StartRound process any buffered messages for this round and broadcast our proposal. -func (wc *WeakCoin) StartRound(ctx context.Context, round types.RoundID, nonce *types.VRFPostIndex) { +func (wc *WeakCoin) StartRound(ctx context.Context, round types.RoundID, participants []Participant) { wc.mu.Lock() logger := wc.logger.WithContext(ctx).WithFields(wc.epoch, round) logger.Info("started beacon weak coin round") @@ -221,9 +225,15 @@ func (wc *WeakCoin) StartRound(ctx context.Context, round types.RoundID, nonce * wc.nextRoundBuffer = wc.nextRoundBuffer[:0] wc.mu.Unlock() - if nonce != nil { - wc.publishProposal(ctx, wc.epoch, *nonce, wc.round) + var eg errgroup.Group + for _, p := range participants { + p := p + eg.Go(func() error { + wc.publishProposal(ctx, wc.epoch, p.Signer, p.Nonce, round) + return nil + }) } + eg.Wait() } func (wc *WeakCoin) updateProposal(ctx context.Context, message Message) error { @@ -245,8 +255,8 @@ func (wc *WeakCoin) updateProposal(ctx context.Context, message Message) error { return wc.updateSmallest(ctx, message.VRFSignature) } -func (wc *WeakCoin) prepareProposal(epoch types.EpochID, nonce types.VRFPostIndex, round types.RoundID) ([]byte, types.VrfSignature) { - minerAllowance := wc.allowance.MinerAllowance(wc.epoch, wc.signer.NodeID()) +func (wc *WeakCoin) prepareProposal(epoch types.EpochID, signer vrfSigner, nonce types.VRFPostIndex, round types.RoundID) ([]byte, types.VrfSignature) { + minerAllowance := wc.allowance.MinerAllowance(wc.epoch, signer.NodeID()) if minerAllowance == 0 { return nil, types.EmptyVrfSignature } @@ -254,7 +264,7 @@ func (wc *WeakCoin) prepareProposal(epoch types.EpochID, nonce types.VRFPostInde var smallest *types.VrfSignature for unit := uint32(0); unit < minerAllowance; unit++ { proposal := wc.encodeProposal(epoch, nonce, round, unit) - signature := wc.signer.Sign(proposal) + signature := signer.Sign(proposal) if wc.aboveThreshold(signature) { continue } @@ -263,7 +273,7 @@ func (wc *WeakCoin) prepareProposal(epoch types.EpochID, nonce types.VRFPostInde Epoch: epoch, Round: round, Unit: unit, - NodeID: wc.signer.NodeID(), + NodeID: signer.NodeID(), VRFSignature: signature, } broadcast = codec.MustEncode(&message) @@ -279,8 +289,8 @@ func (wc *WeakCoin) prepareProposal(epoch types.EpochID, nonce types.VRFPostInde return broadcast, *smallest } -func (wc *WeakCoin) publishProposal(ctx context.Context, epoch types.EpochID, nonce types.VRFPostIndex, round types.RoundID) { - msg, proposal := wc.prepareProposal(epoch, nonce, round) +func (wc *WeakCoin) publishProposal(ctx context.Context, epoch types.EpochID, signer vrfSigner, nonce types.VRFPostIndex, round types.RoundID) { + msg, proposal := wc.prepareProposal(epoch, signer, nonce, round) if msg == nil { return } diff --git a/beacon/weakcoin/weak_coin_test.go b/beacon/weakcoin/weak_coin_test.go index a5636df0ed7..6366da32a0c 100644 --- a/beacon/weakcoin/weak_coin_test.go +++ b/beacon/weakcoin/weak_coin_test.go @@ -41,7 +41,6 @@ func staticSigner(tb testing.TB, ctrl *gomock.Controller, nodeId types.NodeID, s signer := weakcoin.NewMockvrfSigner(ctrl) signer.EXPECT().Sign(gomock.Any()).Return(sig).AnyTimes() signer.EXPECT().NodeID().Return(nodeId).AnyTimes() - signer.EXPECT().LittleEndian().Return(true).AnyTimes() return signer } @@ -82,16 +81,14 @@ func TestWeakCoin(t *testing.T) { higherThreshold[79] = 0xff for _, tc := range []struct { - desc string - nodeSig types.VrfSignature - mining, expected bool - msg []byte - result func(error) bool + desc string + participants []weakcoin.Participant + expected bool + msg []byte + result func(error) bool }{ { desc: "node not mining", - nodeSig: oneLSBSig, - mining: false, expected: false, msg: codec.MustEncode(&weakcoin.Message{ Epoch: epoch, @@ -103,9 +100,11 @@ func TestWeakCoin(t *testing.T) { result: nilErr, }, { - desc: "node mining", - nodeSig: oneLSBSig, - mining: true, + desc: "node mining", + participants: []weakcoin.Participant{{ + Signer: staticSigner(t, ctrl, types.RandomNodeID(), oneLSBSig), + Nonce: types.VRFPostIndex(1), + }}, expected: true, msg: codec.MustEncode(&weakcoin.Message{ Epoch: epoch, @@ -117,9 +116,11 @@ func TestWeakCoin(t *testing.T) { result: isErr, }, { - desc: "node mining but exceed threshold", - nodeSig: higherThreshold, - mining: true, + desc: "node mining but exceed threshold", + participants: []weakcoin.Participant{{ + Signer: staticSigner(t, ctrl, types.RandomNodeID(), higherThreshold), + Nonce: types.VRFPostIndex(1), + }}, expected: false, msg: codec.MustEncode(&weakcoin.Message{ Epoch: epoch, @@ -131,19 +132,31 @@ func TestWeakCoin(t *testing.T) { result: nilErr, }, { - desc: "node only miner", - nodeSig: oneLSBSig, - mining: true, + desc: "node only miner", + participants: []weakcoin.Participant{{ + Signer: staticSigner(t, ctrl, types.RandomNodeID(), oneLSBSig), + Nonce: types.VRFPostIndex(1), + }}, expected: true, }, + { + desc: "two signers", + participants: []weakcoin.Participant{ + { + Signer: staticSigner(t, ctrl, types.RandomNodeID(), types.VrfSignature{0b111}), + Nonce: types.VRFPostIndex(1), + }, + { + Signer: staticSigner(t, ctrl, types.RandomNodeID(), types.VrfSignature{0b110}), + Nonce: types.VRFPostIndex(2), + }, + }, + expected: false, + }, } { tc := tc t.Run(tc.desc, func(t *testing.T) { - miner := 0 - if tc.mining { - // once for generating, once for pubsub validation - miner += 2 - } + miner := len(tc.participants) * 2 if len(tc.msg) > 0 { miner++ } @@ -160,7 +173,6 @@ func TestWeakCoin(t *testing.T) { threshold[79] = 0xfe wc = weakcoin.New( mockPublisher, - staticSigner(t, ctrl, types.RandomNodeID(), tc.nodeSig), sigVerifier(t, ctrl), nonceFetcher(t, ctrl), mockAllowance, @@ -170,12 +182,7 @@ func TestWeakCoin(t *testing.T) { ) wc.StartEpoch(context.Background(), epoch) - nonce := types.VRFPostIndex(1) - if tc.mining { - wc.StartRound(context.Background(), round, &nonce) - } else { - wc.StartRound(context.Background(), round, nil) - } + wc.StartRound(context.Background(), round, tc.participants) if len(tc.msg) > 0 { require.True(t, tc.result(wc.HandleProposal(context.Background(), "", tc.msg))) @@ -199,7 +206,6 @@ func TestWeakCoin_HandleProposal(t *testing.T) { oneLSBMiner = types.NodeID{0b0001} oneLSBSig = types.VrfSignature{0b0001} - zeroLSBSig = types.VrfSignature{0b0110} highLSBMiner = types.NodeID{0xff} higherThreshold types.VrfSignature ) @@ -320,7 +326,6 @@ func TestWeakCoin_HandleProposal(t *testing.T) { threshold[79] = 0xfe wc := weakcoin.New( noopBroadcaster(t, ctrl), - staticSigner(t, ctrl, types.RandomNodeID(), zeroLSBSig), sigVerifier(t, ctrl), nonceFetcher(t, ctrl), mockAllowance, @@ -330,7 +335,7 @@ func TestWeakCoin_HandleProposal(t *testing.T) { ) wc.StartEpoch(context.Background(), tc.startedEpoch) - wc.StartRound(context.Background(), tc.startedRound, nil) + wc.StartRound(context.Background(), tc.startedRound, []weakcoin.Participant{}) require.True(t, tc.expected(wc.HandleProposal(context.Background(), "", tc.msg))) wc.FinishRound(context.Background()) @@ -356,7 +361,6 @@ func TestWeakCoinNextRoundBufferOverflow(t *testing.T) { mockAllowance.EXPECT().MinerAllowance(epoch, gomock.Any()).Return(uint32(1)).AnyTimes() wc := weakcoin.New( noopBroadcaster(t, ctrl), - staticSigner(t, ctrl, types.RandomNodeID(), oneLSBSig), sigVerifier(t, ctrl), nonceFetcher(t, ctrl), mockAllowance, @@ -422,7 +426,6 @@ func TestWeakCoinEncodingRegression(t *testing.T) { }) instance := weakcoin.New( broadcaster, - vrfSig, signing.NewVRFVerifier(), nonceFetcher(t, ctrl), mockAllowance, @@ -430,8 +433,12 @@ func TestWeakCoinEncodingRegression(t *testing.T) { weakcoin.WithLog(logtest.New(t)), ) instance.StartEpoch(context.Background(), epoch) - nonce := types.VRFPostIndex(1) - instance.StartRound(context.Background(), round, &nonce) + instance.StartRound(context.Background(), round, []weakcoin.Participant{ + { + Signer: vrfSig, + Nonce: types.VRFPostIndex(1), + }, + }) require.Equal(t, "78f523319fd2cdf3812a3bc3905561acb2f7f1b7e47de71f92811d7bb82460e5999a048051cefa2d1b6f3f16656de83c2756b7539b33fa563a3e8fea5130235e66e8dce914d69bd40f13174f3914ad07", @@ -476,7 +483,6 @@ func TestWeakCoinExchangeProposals(t *testing.T) { for i := range instances { instances[i] = weakcoin.New( broadcasters[i], - vrfSigners[i], signing.NewVRFVerifier(), nonceFetcher(t, ctrl), mockAllowance, @@ -485,7 +491,6 @@ func TestWeakCoinExchangeProposals(t *testing.T) { ) } - nonce := types.VRFPostIndex(1) for epoch := epochStart; epoch <= epochEnd; epoch++ { for _, instance := range instances { instance.StartEpoch(context.Background(), epoch) @@ -495,7 +500,12 @@ func TestWeakCoinExchangeProposals(t *testing.T) { if i == 0 { instance.StartRound(context.Background(), current, nil) } else { - instance.StartRound(context.Background(), current, &nonce) + instance.StartRound(context.Background(), current, []weakcoin.Participant{ + { + Signer: vrfSigners[i], + Nonce: types.VRFPostIndex(1), + }, + }) } } for _, instance := range instances { diff --git a/node/node.go b/node/node.go index 9a5bf0f060f..4acedc0056f 100644 --- a/node/node.go +++ b/node/node.go @@ -638,7 +638,6 @@ func (app *App) initServices(ctx context.Context) error { vrfVerifier := signing.NewVRFVerifier() beaconProtocol := beacon.New( app.host, - app.edSgn, app.edVerifier, vrfVerifier, app.cachedDB, @@ -647,6 +646,7 @@ func (app *App) initServices(ctx context.Context) error { beacon.WithConfig(app.Config.Beacon), beacon.WithLogger(app.addLogger(BeaconLogger, lg)), ) + beaconProtocol.Register(app.edSgn) trtlCfg := app.Config.Tortoise trtlCfg.LayerSize = layerSize diff --git a/signing/vrf.go b/signing/vrf.go index d74ca32d077..e64f271a730 100644 --- a/signing/vrf.go +++ b/signing/vrf.go @@ -28,11 +28,6 @@ func (s VRFSigner) PublicKey() *PublicKey { return NewPublicKey(s.nodeID.Bytes()) } -// LittleEndian indicates whether byte order in a signature is little-endian. -func (s VRFSigner) LittleEndian() bool { - return true -} - type VRFVerifier func(types.NodeID, []byte, types.VrfSignature) bool func NewVRFVerifier() VRFVerifier {