From 99123521cb21601de203263c452e95dd939a08a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartosz=20R=C3=B3=C5=BCa=C5=84ski?= Date: Sun, 15 Oct 2023 14:49:44 +0200 Subject: [PATCH] Support multiple signers in beacon --- beacon/beacon.go | 255 ++++++++++++++++++++++------------- beacon/beacon_test.go | 90 +++++++------ beacon/handlers.go | 22 ++- beacon/state.go | 8 +- beacon/weakcoin/weak_coin.go | 4 +- node/node.go | 3 +- 6 files changed, 229 insertions(+), 153 deletions(-) diff --git a/beacon/beacon.go b/beacon/beacon.go index 092ce92d91f..879dad16b78 100644 --- a/beacon/beacon.go +++ b/beacon/beacon.go @@ -52,18 +52,6 @@ type ( } ) -type defaultFetcher struct { - cdb *datastore.CachedDB -} - -func (f defaultFetcher) VRFNonce(nodeID types.NodeID, epoch types.EpochID) (types.VRFPostIndex, error) { - nonce, err := f.cdb.VRFNonce(nodeID, epoch) - if err != nil { - return types.VRFPostIndex(0), fmt.Errorf("get vrf nonce: %w", err) - } - return nonce, nil -} - // Opt for configuring beacon protocol. type Opt func(*ProtocolDriver) @@ -88,9 +76,11 @@ func WithConfig(cfg Config) Opt { } } -func withWeakCoin(wc coin) Opt { +type weakCoinFactory func(signer *signing.EdSigner) coin + +func withWeakCoinFactory(f weakCoinFactory) Opt { return func(pd *ProtocolDriver) { - pd.weakCoin = wc + pd.createWeakCoin = f } } @@ -103,7 +93,6 @@ func withNonceFetcher(nf nonceFetcher) Opt { // New returns a new ProtocolDriver. func New( publisher pubsub.Publisher, - edSigner *signing.EdSigner, edVerifier *signing.EdVerifier, vrfVerifier vrfVerifier, cdb *datastore.CachedDB, @@ -115,11 +104,13 @@ func New( logger: log.NewNop(), config: DefaultConfig(), publisher: publisher, - edSigner: edSigner, edVerifier: edVerifier, vrfVerifier: vrfVerifier, + nonceFetcher: cdb, cdb: cdb, clock: clock, + signers: make(map[types.NodeID]participant), + createWeakCoin: nil, beacons: make(map[types.EpochID]types.Beacon), ballotsBeacons: make(map[types.EpochID]map[types.Beacon]*beaconWeight), states: make(map[types.EpochID]*state), @@ -135,22 +126,60 @@ func New( pd.ctx, pd.cancel = context.WithCancel(pd.ctx) pd.theta = new(big.Float).SetRat(pd.config.Theta) - if pd.nonceFetcher == nil { - pd.nonceFetcher = defaultFetcher{cdb: cdb} - } - if pd.weakCoin == nil { - pd.weakCoin = weakcoin.New(pd.publisher, edSigner.VRFSigner(), vrfVerifier, pd.nonceFetcher, pd, - pd.msgTimes, - weakcoin.WithLog(pd.logger.WithName("weakCoin")), - weakcoin.WithMaxRound(pd.config.RoundsNumber), - ) + if pd.createWeakCoin == nil { + pd.createWeakCoin = func(signer *signing.EdSigner) coin { + return weakcoin.New( + pd.publisher, + signer.VRFSigner(), + pd.vrfVerifier, + pd.nonceFetcher, + pd, + pd.msgTimes, + weakcoin.WithLog(pd.logger.WithName("weakCoin").WithName(signer.NodeID().ShortString())), + weakcoin.WithMaxRound(pd.config.RoundsNumber), + ) + } } - pd.metricsCollector = metrics.NewBeaconMetricsCollector(pd.gatherMetricsData, pd.logger.WithName("metrics")) return pd } +func (pd *ProtocolDriver) Register(s *signing.EdSigner) { + pd.mu.Lock() + defer pd.mu.Unlock() + if _, exists := pd.signers[s.NodeID()]; exists { + pd.logger.With().Error("signing key already registered", log.ShortStringer("key", s.NodeID())) + return + } + p := participant{ + signer: s, + coin: pd.createWeakCoin(s), + } + pd.logger.With().Info("registered signing key", p.Id()) + pd.signers[s.NodeID()] = p +} + +type participant struct { + signer *signing.EdSigner + coin coin +} + +type signerSession struct { + participant + nonce types.VRFPostIndex +} + +func (s *participant) Id() signerID { + return signerID(s.signer.NodeID()) +} + +type signerID types.NodeID + +func (id signerID) Field() log.Field { + return log.ShortStringer("id", types.NodeID(id)) +} + // ProtocolDriver is the driver for the beacon protocol. type ProtocolDriver struct { inProtocol uint64 @@ -160,14 +189,16 @@ type ProtocolDriver struct { cancel context.CancelFunc startOnce sync.Once - config Config - sync system.SyncStateProvider - publisher pubsub.Publisher - edSigner *signing.EdSigner + config Config + sync system.SyncStateProvider + publisher pubsub.Publisher + + signers map[types.NodeID]participant + createWeakCoin weakCoinFactory + edVerifier *signing.EdVerifier vrfVerifier vrfVerifier nonceFetcher nonceFetcher - weakCoin coin theta *big.Float clock layerClock @@ -536,10 +567,9 @@ func (pd *ProtocolDriver) initEpochStateIfNotPresent(logger log.Log, epoch types } var ( - epochWeight uint64 - miners = make(map[types.NodeID]*minerInfo) - active bool - nonce *types.VRFPostIndex + epochWeight uint64 + miners = make(map[types.NodeID]*minerInfo) + potentiallyActive = make(map[types.NodeID]participant) // w1 is the weight units at δ before the end of the previous epoch, used to calculate `thresholdStrict` // w2 is the weight units at the end of the previous epoch, used to calculate `threshold` w1, w2 int @@ -572,8 +602,9 @@ func (pd *ProtocolDriver) initEpochStateIfNotPresent(logger log.Log, epoch types log.Bool("malicious", malicious), log.Stringer("smesher", header.NodeID)) } - if header.NodeID == pd.edSigner.NodeID() { - active = true + + if s, ok := pd.signers[header.NodeID]; ok { + potentiallyActive[header.NodeID] = s } return nil }); err != nil { @@ -581,20 +612,23 @@ func (pd *ProtocolDriver) initEpochStateIfNotPresent(logger log.Log, epoch types } if epochWeight == 0 { - logger.With().Error("zero weight targeting epoch", log.Err(errZeroEpochWeight)) return nil, errZeroEpochWeight } - if active { - nnc, err := pd.nonceFetcher.VRFNonce(pd.edSigner.NodeID(), epoch) + active := map[types.NodeID]signerSession{} + for id, signer := range potentiallyActive { + nnc, err := pd.nonceFetcher.VRFNonce(id, epoch) if err != nil { - logger.With().Error("failed to get own VRF nonce", log.Err(err)) - return nil, fmt.Errorf("get own VRF nonce: %w", err) + return nil, fmt.Errorf("getting own VRF nonce: %w", err) } - nonce = &nnc + active[id] = signerSession{ + participant: signer, + nonce: nnc, + } + } checker := createProposalChecker(logger, pd.config, w1, w1+w2) - pd.states[epoch] = newState(logger, pd.config, nonce, epochWeight, miners, checker) + pd.states[epoch] = newState(logger, pd.config, active, epochWeight, miners, checker) return pd.states[epoch], nil } @@ -687,7 +721,7 @@ func (pd *ProtocolDriver) setRoundInProgress(round types.RoundID) { earliestVoteTime := nextRoundStartTime.Add(-pd.config.GracePeriodDuration) pd.logger.With().Debug("earliest vote time for next round", round, - log.Uint32("next_round", uint32(round+1)), + log.FieldNamed("next_round", round+1), log.Time("earliest_time", earliestVoteTime)) pd.mu.Lock() @@ -731,14 +765,16 @@ func (pd *ProtocolDriver) runProtocol(ctx context.Context, epoch types.EpochID, pd.setBeginProtocol(ctx) defer pd.setEndProtocol(ctx) - pd.weakCoin.StartEpoch(ctx, epoch) - defer pd.weakCoin.FinishEpoch(ctx, epoch) + for _, participant := range st.active { + participant.coin.StartEpoch(ctx, epoch) + defer participant.coin.FinishEpoch(ctx, epoch) + } if err := pd.runProposalPhase(ctx, epoch, st); err != nil { logger.With().Warning("proposal phase failed", log.Err(err)) return } - lastRoundOwnVotes, err := pd.runConsensusPhase(ctx, epoch, st.nonce) + lastRoundOwnVotes, err := pd.runConsensusPhase(ctx, epoch, st.active) if err != nil { logger.With().Warning("consensus phase failed", log.Err(err)) return @@ -781,9 +817,10 @@ func (pd *ProtocolDriver) runProposalPhase(ctx context.Context, epoch types.Epoc ctx, cancel := context.WithTimeout(ctx, pd.config.ProposalDuration) defer cancel() - if st.nonce != nil { + for _, session := range st.active { + session := session pd.eg.Go(func() error { - pd.sendProposal(ctx, epoch, *st.nonce, st.proposalChecker) + pd.sendProposal(ctx, epoch, session, st.proposalChecker) return nil }) } @@ -802,37 +839,37 @@ func (pd *ProtocolDriver) runProposalPhase(ctx context.Context, epoch types.Epoc return nil } -func (pd *ProtocolDriver) sendProposal(ctx context.Context, epoch types.EpochID, nonce types.VRFPostIndex, checker eligibilityChecker) { +func (pd *ProtocolDriver) sendProposal(ctx context.Context, epoch types.EpochID, s signerSession, checker eligibilityChecker) { if pd.isClosed() { return } - atx, malicious, err := pd.minerAtxHdr(epoch, pd.edSigner.NodeID()) + atx, malicious, err := pd.minerAtxHdr(epoch, s.signer.NodeID()) if err != nil || malicious { return } logger := pd.logger.WithContext(ctx).WithFields(epoch) - vrfSig := buildSignedProposal(ctx, pd.logger, pd.edSigner.VRFSigner(), epoch, nonce) + vrfSig := buildSignedProposal(ctx, pd.logger, s.signer.VRFSigner(), epoch, s.nonce) proposal := ProposalFromVrf(vrfSig) m := ProposalMessage{ EpochID: epoch, - NodeID: pd.edSigner.NodeID(), + NodeID: s.signer.NodeID(), VRFSignature: vrfSig, } if invalid == pd.classifyProposal(logger, m, atx.Received, time.Now(), checker) { - logger.With().Debug("own proposal doesn't pass threshold", log.Inline(proposal)) + logger.With().Debug("own proposal doesn't pass threshold", log.Inline(proposal), s.Id()) return } - logger.With().Debug("own proposal passes threshold", log.Inline(proposal)) + logger.With().Debug("own proposal passes threshold", log.Inline(proposal), s.Id()) pd.sendToGossip(ctx, pubsub.BeaconProposalProtocol, codec.MustEncode(&m)) - logger.With().Info("beacon proposal sent", log.Inline(proposal)) + logger.With().Info("beacon proposal sent", log.Inline(proposal), s.Id()) } // runConsensusPhase runs K voting rounds and returns result from last weak coin round. -func (pd *ProtocolDriver) runConsensusPhase(ctx context.Context, epoch types.EpochID, nonce *types.VRFPostIndex) (allVotes, error) { +func (pd *ProtocolDriver) runConsensusPhase(ctx context.Context, epoch types.EpochID, active map[types.NodeID]signerSession) (allVotes, error) { logger := pd.logger.WithContext(ctx).WithFields(epoch) logger.Info("starting consensus phase") @@ -848,21 +885,42 @@ func (pd *ProtocolDriver) runConsensusPhase(ctx context.Context, epoch types.Epo undecided proposalList err error ) - for round := types.FirstRound; round < pd.config.RoundsNumber; round++ { + + // First round + round := types.FirstRound + pd.setRoundInProgress(round) + if msg, err := pd.genFirstRoundMsgBody(epoch); err != nil { + logger.With().Error("failed to get firstround message", log.Err(err), round) + } else { + for _, session := range active { + session := session + pd.eg.Go(func() error { + if err := pd.sendFirstRoundVote(ctx, msg, session.signer); err != nil { + logger.With().Error("failed to send proposal vote", log.Err(err), session.Id(), round) + } + return nil + }) + } + } + select { + case <-timer.C: + case <-ctx.Done(): + return allVotes{}, fmt.Errorf("context done: %w", ctx.Err()) + } + + // Subsequent rounds + for round := types.FirstRound + 1; round < pd.config.RoundsNumber; round++ { round := round pd.setRoundInProgress(round) rLogger := logger.WithFields(round) + timer.Reset(pd.config.VotingRoundDuration) + votes := ownVotes - if nonce != nil { + for _, session := range active { + session := session pd.eg.Go(func() error { - if round == types.FirstRound { - if err := pd.sendFirstRoundVote(ctx, epoch); err != nil { - rLogger.With().Error("failed to send proposal vote", log.Err(err)) - } - } else { - if err := pd.sendFollowingVote(ctx, epoch, round, votes); err != nil { - rLogger.With().Error("failed to send following vote", log.Err(err)) - } + if err := pd.sendFollowingVote(ctx, epoch, round, votes, session.signer); err != nil { + rLogger.With().Error("failed to send following vote", log.Err(err), session.Id()) } return nil }) @@ -881,27 +939,35 @@ func (pd *ProtocolDriver) runConsensusPhase(ctx context.Context, epoch types.Epo if err != nil { return allVotes{}, err } - if round != types.FirstRound { - timer.Reset(pd.config.WeakCoinRoundDuration) + timer.Reset(pd.config.WeakCoinRoundDuration) + + for _, session := range active { + session := session pd.eg.Go(func() error { - pd.weakCoin.StartRound(ctx, round, nonce) + session.coin.StartRound(ctx, round, &session.nonce) return nil }) - select { - case <-timer.C: - case <-ctx.Done(): - return allVotes{}, fmt.Errorf("context done: %w", ctx.Err()) - } - pd.weakCoin.FinishRound(ctx) - flip, err := pd.weakCoin.Get(ctx, epoch, round) + } + select { + case <-timer.C: + case <-ctx.Done(): + return allVotes{}, fmt.Errorf("context done: %w", ctx.Err()) + } + for _, session := range active { + session.coin.FinishRound(ctx) + } + // All weak coin should have the same result, so we can just take the first one + var flip bool + for _, session := range active { + flip, err = session.coin.Get(ctx, epoch, round) if err != nil { rLogger.With().Error("failed to generate weak coin", log.Err(err)) return allVotes{}, err } - tallyUndecided(&ownVotes, undecided, flip) + break } - timer.Reset(pd.config.VotingRoundDuration) + tallyUndecided(&ownVotes, undecided, flip) } logger.Info("consensus phase finished") @@ -944,19 +1010,14 @@ func (pd *ProtocolDriver) genFirstRoundMsgBody(epoch types.EpochID) (FirstVoting }, nil } -func (pd *ProtocolDriver) sendFirstRoundVote(ctx context.Context, epoch types.EpochID) error { - mb, err := pd.genFirstRoundMsgBody(epoch) - if err != nil { - return fmt.Errorf("getting first round message: %w", err) - } - +func (pd *ProtocolDriver) sendFirstRoundVote(ctx context.Context, msg FirstVotingMessageBody, signer *signing.EdSigner) error { m := FirstVotingMessage{ - FirstVotingMessageBody: mb, - SmesherID: pd.edSigner.NodeID(), - Signature: pd.edSigner.Sign(signing.BEACON_FIRST_MSG, codec.MustEncode(&mb)), + FirstVotingMessageBody: msg, + SmesherID: signer.NodeID(), + Signature: signer.Sign(signing.BEACON_FIRST_MSG, codec.MustEncode(&msg)), } - pd.logger.WithContext(ctx).With().Debug("sending first round vote", epoch, types.FirstRound) + pd.logger.WithContext(ctx).With().Debug("sending first round vote", msg.EpochID, types.FirstRound) pd.sendToGossip(ctx, pubsub.BeaconFirstVotesProtocol, codec.MustEncode(&m)) return nil } @@ -973,10 +1034,10 @@ func (pd *ProtocolDriver) getFirstRoundVote(epoch types.EpochID, nodeID types.No return st.getMinerFirstRoundVote(nodeID) } -func (pd *ProtocolDriver) sendFollowingVote(ctx context.Context, epoch types.EpochID, round types.RoundID, ownCurrentRoundVotes allVotes) error { - firstRoundVotes, err := pd.getFirstRoundVote(epoch, pd.edSigner.NodeID()) +func (pd *ProtocolDriver) sendFollowingVote(ctx context.Context, epoch types.EpochID, round types.RoundID, ownCurrentRoundVotes allVotes, signer *signing.EdSigner) error { + firstRoundVotes, err := pd.getFirstRoundVote(epoch, signer.NodeID()) if err != nil { - return fmt.Errorf("get own first round votes %s: %w", pd.edSigner.NodeID(), err) + return fmt.Errorf("get own first round votes %s: %w", signer.NodeID(), err) } bitVector := encodeVotes(ownCurrentRoundVotes, firstRoundVotes) @@ -988,11 +1049,11 @@ func (pd *ProtocolDriver) sendFollowingVote(ctx context.Context, epoch types.Epo m := FollowingVotingMessage{ FollowingVotingMessageBody: mb, - SmesherID: pd.edSigner.NodeID(), - Signature: pd.edSigner.Sign(signing.BEACON_FOLLOWUP_MSG, codec.MustEncode(&mb)), + SmesherID: signer.NodeID(), + Signature: signer.Sign(signing.BEACON_FOLLOWUP_MSG, codec.MustEncode(&mb)), } - pd.logger.WithContext(ctx).With().Debug("sending following round vote", epoch, round) + pd.logger.WithContext(ctx).With().Debug("sending following round vote", epoch, round, signerID(signer.NodeID())) pd.sendToGossip(ctx, pubsub.BeaconFollowingVotesProtocol, codec.MustEncode(&m)) return nil } @@ -1086,7 +1147,7 @@ func buildSignedProposal(ctx context.Context, logger log.Log, signer vrfSigner, p := buildProposal(logger, epoch, nonce) vrfSig := signer.Sign(p) proposal := ProposalFromVrf(vrfSig) - logger.WithContext(ctx).With().Debug("calculated beacon proposal", epoch, nonce, log.Inline(proposal)) + logger.WithContext(ctx).With().Debug("calculated beacon proposal", epoch, nonce, log.Inline(proposal), signerID(signer.NodeID())) return vrfSig } diff --git a/beacon/beacon_test.go b/beacon/beacon_test.go index bde3738eb7b..d71049bbd61 100644 --- a/beacon/beacon_test.go +++ b/beacon/beacon_test.go @@ -31,10 +31,6 @@ import ( "github.com/spacemeshos/go-spacemesh/system/mocks" ) -const ( - numATXs = 10 -) - func coinValueMock(tb testing.TB, value bool) coin { ctrl := gomock.NewController(tb) coinMock := NewMockcoin(ctrl) @@ -71,42 +67,41 @@ func newPublisher(tb testing.TB) pubsub.Publisher { type testProtocolDriver struct { *ProtocolDriver - ctrl *gomock.Controller - cdb *datastore.CachedDB - mClock *MocklayerClock - mSync *mocks.MockSyncStateProvider - mVerifier *MockvrfVerifier - mNonceFetcher *MocknonceFetcher + ctrl *gomock.Controller + cdb *datastore.CachedDB + mClock *MocklayerClock + mSync *mocks.MockSyncStateProvider + mVerifier *MockvrfVerifier } func setUpProtocolDriver(tb testing.TB) *testProtocolDriver { - return newTestDriver(tb, UnitTestConfig(), newPublisher(tb)) + return newTestDriver(tb, UnitTestConfig(), newPublisher(tb), 3, "") } -func newTestDriver(tb testing.TB, cfg Config, p pubsub.Publisher) *testProtocolDriver { +func newTestDriver(tb testing.TB, cfg Config, p pubsub.Publisher, miners int, id string) *testProtocolDriver { ctrl := gomock.NewController(tb) tpd := &testProtocolDriver{ - ctrl: ctrl, - mClock: NewMocklayerClock(ctrl), - mSync: mocks.NewMockSyncStateProvider(ctrl), - mVerifier: NewMockvrfVerifier(ctrl), - mNonceFetcher: NewMocknonceFetcher(ctrl), + ctrl: ctrl, + mClock: NewMocklayerClock(ctrl), + mSync: mocks.NewMockSyncStateProvider(ctrl), + mVerifier: NewMockvrfVerifier(ctrl), } - edSgn, err := signing.NewEdSigner() - require.NoError(tb, err) - lg := logtest.New(tb) + lg := logtest.New(tb).Named(id) tpd.mVerifier.EXPECT().Verify(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(true) - tpd.mNonceFetcher.EXPECT().VRFNonce(gomock.Any(), gomock.Any()).AnyTimes().Return(types.VRFPostIndex(1), nil) tpd.cdb = datastore.NewCachedDB(sql.InMemory(), lg) - tpd.ProtocolDriver = New(p, edSgn, signing.NewEdVerifier(), tpd.mVerifier, tpd.cdb, tpd.mClock, + tpd.ProtocolDriver = New(p, signing.NewEdVerifier(), tpd.mVerifier, tpd.cdb, tpd.mClock, WithConfig(cfg), WithLogger(lg), - withWeakCoin(coinValueMock(tb, true)), - withNonceFetcher(tpd.mNonceFetcher), + withWeakCoinFactory(func(signer *signing.EdSigner) coin { return coinValueMock(tb, true) }), ) tpd.ProtocolDriver.SetSyncState(tpd.mSync) + for i := 0; i < miners; i++ { + edSgn, err := signing.NewEdSigner() + require.NoError(tb, err) + tpd.ProtocolDriver.Register(edSgn) + } return tpd } @@ -148,18 +143,20 @@ func TestMain(m *testing.M) { func TestBeacon_MultipleNodes(t *testing.T) { numNodes := 5 + numMinersPerNode := 7 testNodes := make([]*testProtocolDriver, 0, numNodes) publisher := pubsubmocks.NewMockPublisher(gomock.NewController(t)) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, protocol string, data []byte) error { - for _, node := range testNodes { + for i, node := range testNodes { + peer := p2p.Peer(fmt.Sprint(i)) switch protocol { case pubsub.BeaconProposalProtocol: - require.NoError(t, node.HandleProposal(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleProposal(ctx, peer, data)) case pubsub.BeaconFirstVotesProtocol: - require.NoError(t, node.HandleFirstVotes(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleFirstVotes(ctx, peer, data)) case pubsub.BeaconFollowingVotesProtocol: - require.NoError(t, node.HandleFollowingVotes(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleFollowingVotes(ctx, peer, data)) case pubsub.BeaconWeakCoinProtocol: } } @@ -173,7 +170,7 @@ func TestBeacon_MultipleNodes(t *testing.T) { bootstrap := types.Beacon{1, 2, 3, 4} now := time.Now() for i := 0; i < numNodes; i++ { - node := newTestDriver(t, cfg, publisher) + node := newTestDriver(t, cfg, publisher, numMinersPerNode, fmt.Sprintf("node-%d", i)) require.NoError(t, node.UpdateBeacon(types.EpochID(2), bootstrap)) node.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).AnyTimes() node.mClock.EXPECT().CurrentLayer().Return(current).AnyTimes() @@ -192,8 +189,11 @@ func TestBeacon_MultipleNodes(t *testing.T) { // make the first node non-smeshing node continue } + for _, db := range dbs { - createATX(t, db, atxPublishLid, node.edSigner, 1, time.Now().Add(-1*time.Second)) + for _, s := range node.signers { + createATX(t, db, atxPublishLid, s.signer, 1, time.Now().Add(-1*time.Second)) + } } } var wg sync.WaitGroup @@ -221,14 +221,14 @@ func TestBeacon_MultipleNodes_OnlyOneHonest(t *testing.T) { publisher := pubsubmocks.NewMockPublisher(gomock.NewController(t)) publisher.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( func(ctx context.Context, protocol string, data []byte) error { - for _, node := range testNodes { + for i, node := range testNodes { switch protocol { case pubsub.BeaconProposalProtocol: - require.NoError(t, node.HandleProposal(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleProposal(ctx, p2p.Peer(fmt.Sprint(i)), data)) case pubsub.BeaconFirstVotesProtocol: - require.NoError(t, node.HandleFirstVotes(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleFirstVotes(ctx, p2p.Peer(fmt.Sprint(i)), data)) case pubsub.BeaconFollowingVotesProtocol: - require.NoError(t, node.HandleFollowingVotes(ctx, p2p.Peer(node.edSigner.NodeID().ShortString()), data)) + require.NoError(t, node.HandleFollowingVotes(ctx, p2p.Peer(fmt.Sprint(i)), data)) case pubsub.BeaconWeakCoinProtocol: } } @@ -242,7 +242,7 @@ func TestBeacon_MultipleNodes_OnlyOneHonest(t *testing.T) { bootstrap := types.Beacon{1, 2, 3, 4} now := time.Now() for i := 0; i < numNodes; i++ { - node := newTestDriver(t, cfg, publisher) + node := newTestDriver(t, cfg, publisher, 3, fmt.Sprintf("node-%d", i)) require.NoError(t, node.UpdateBeacon(types.EpochID(2), bootstrap)) node.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).AnyTimes() node.mClock.EXPECT().CurrentLayer().Return(current).AnyTimes() @@ -258,9 +258,11 @@ func TestBeacon_MultipleNodes_OnlyOneHonest(t *testing.T) { } for i, node := range testNodes { for _, db := range dbs { - createATX(t, db, atxPublishLid, node.edSigner, 1, time.Now().Add(-1*time.Second)) - if i != 0 { - require.NoError(t, identities.SetMalicious(db, node.edSigner.NodeID(), []byte("bad"), time.Now())) + for _, s := range node.signers { + createATX(t, db, atxPublishLid, s.signer, 1, time.Now().Add(-1*time.Second)) + if i != 0 { + require.NoError(t, identities.SetMalicious(db, s.signer.NodeID(), []byte("bad"), time.Now())) + } } } } @@ -296,7 +298,7 @@ func TestBeacon_NoProposals(t *testing.T) { now := time.Now() bootstrap := types.Beacon{1, 2, 3, 4} for i := 0; i < numNodes; i++ { - node := newTestDriver(t, cfg, publisher) + node := newTestDriver(t, cfg, publisher, 3, fmt.Sprintf("node-%d", i)) require.NoError(t, node.UpdateBeacon(types.EpochID(2), bootstrap)) node.mSync.EXPECT().IsSynced(gomock.Any()).Return(true).AnyTimes() node.mClock.EXPECT().CurrentLayer().Return(current).AnyTimes() @@ -312,7 +314,9 @@ func TestBeacon_NoProposals(t *testing.T) { } for _, node := range testNodes { for _, db := range dbs { - createATX(t, db, atxPublishLid, node.edSigner, 1, time.Now().Add(-1*time.Second)) + for _, s := range node.signers { + createATX(t, db, atxPublishLid, s.signer, 1, time.Now().Add(-1*time.Second)) + } } } var wg sync.WaitGroup @@ -404,8 +408,10 @@ func TestBeaconWithMetrics(t *testing.T) { epoch := types.EpochID(3) for i := types.EpochID(2); i < epoch; i++ { lid := i.FirstLayer().Sub(1) - createATX(t, tpd.cdb, lid, tpd.edSigner, 199, time.Now()) - createRandomATXs(t, tpd.cdb, lid, numATXs-1) + for _, s := range tpd.signers { + createATX(t, tpd.cdb, lid, s.signer, 199, time.Now()) + } + createRandomATXs(t, tpd.cdb, lid, 9) } finalLayer := types.LayerID(types.GetLayersPerEpoch() * uint32(epoch)) beacon1 := types.RandomBeacon() diff --git a/beacon/handlers.go b/beacon/handlers.go index 1a7a5f413b5..b1636114c40 100644 --- a/beacon/handlers.go +++ b/beacon/handlers.go @@ -15,6 +15,7 @@ import ( "github.com/spacemeshos/go-spacemesh/p2p" "github.com/spacemeshos/go-spacemesh/p2p/pubsub" "github.com/spacemeshos/go-spacemesh/signing" + "golang.org/x/sync/errgroup" ) type category uint8 @@ -38,12 +39,21 @@ var ( ) // HandleWeakCoinProposal handles weakcoin proposal from gossip. -func (pd *ProtocolDriver) HandleWeakCoinProposal(ctx context.Context, peer p2p.Peer, msg []byte) error { +func (pd *ProtocolDriver) HandleWeakCoinProposal(ctx context.Context, peer p2p.Peer, msg []byte) (err error) { if !pd.isInProtocol() { return errBeaconProtocolInactive } + var eg errgroup.Group + pd.mu.RLock() + for _, signer := range pd.signers { + coin := signer.coin + eg.Go(func() error { + return coin.HandleProposal(ctx, peer, msg) + }) + } + pd.mu.Unlock() - return pd.weakCoin.HandleProposal(ctx, peer, msg) + return eg.Wait() } // HandleProposal handles beacon proposal from gossip. @@ -229,8 +239,8 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m currentEpoch := pd.currentEpoch() if m.EpochID != currentEpoch { logger.With().Debug("first votes from different epoch", - log.Uint32("current_epoch", uint32(currentEpoch)), - log.Uint32("message_epoch", uint32(m.EpochID))) + log.FieldNamed("current_epoch", currentEpoch), + log.FieldNamed("message_epoch", m.EpochID)) return errEpochNotActive } @@ -238,8 +248,8 @@ func (pd *ProtocolDriver) HandleFirstVotes(ctx context.Context, peer p2p.Peer, m currentRound := pd.currentRound() if currentRound > types.FirstRound { logger.With().Debug("first votes too late", - log.Uint32("current_round", uint32(currentRound)), - log.Uint32("message_round", uint32(types.FirstRound))) + log.FieldNamed("current_round", currentRound), + log.FieldNamed("message_round", types.FirstRound)) return errUntimelyMessage } diff --git a/beacon/state.go b/beacon/state.go index 45449349507..55391a0b547 100644 --- a/beacon/state.go +++ b/beacon/state.go @@ -19,7 +19,7 @@ type minerInfo struct { type state struct { cfg Config logger log.Log - nonce *types.VRFPostIndex + active map[types.NodeID]signerSession epochWeight uint64 // the original proposals as received, bucketed by validity. incomingProposals proposals @@ -38,7 +38,7 @@ type state struct { func newState( logger log.Log, cfg Config, - nonce *types.VRFPostIndex, + active map[types.NodeID]signerSession, epochWeight uint64, miners map[types.NodeID]*minerInfo, checker eligibilityChecker, @@ -47,7 +47,7 @@ func newState( cfg: cfg, logger: logger, epochWeight: epochWeight, - nonce: nonce, + active: active, minerAtxs: miners, firstRoundIncomingVotes: make(map[types.NodeID]proposalList), votesMargin: map[Proposal]*big.Int{}, @@ -101,8 +101,6 @@ func (s *state) addVote(proposal Proposal, vote uint, voteWeight *big.Int) { func (s *state) registerProposed(logger log.Log, nodeID types.NodeID) error { if _, ok := s.hasProposed[nodeID]; ok { - // see TODOs for registerVoted() - logger.Warning("already received proposal from miner") return fmt.Errorf("already made proposal (miner ID %v): %w", nodeID.ShortString(), errAlreadyProposed) } diff --git a/beacon/weakcoin/weak_coin.go b/beacon/weakcoin/weak_coin.go index 22f891819d6..d1736bf018a 100644 --- a/beacon/weakcoin/weak_coin.go +++ b/beacon/weakcoin/weak_coin.go @@ -70,7 +70,7 @@ type OptionFunc func(*WeakCoin) // WithLog changes logger. func WithLog(logger log.Log) OptionFunc { return func(wc *WeakCoin) { - wc.logger = logger + wc.logger = logger.WithFields(log.FieldNamed("id", wc.signer.NodeID())) } } @@ -193,7 +193,7 @@ func (wc *WeakCoin) FinishEpoch(ctx context.Context, epoch types.EpochID) { if epoch != wc.epoch { logger.With().Fatal("attempted to finish beacon weak coin for the wrong epoch", epoch, - log.Stringer("weak_coin_epoch", wc.epoch), + log.FieldNamed("weak_coin_epoch", wc.epoch), ) } wc.epochStarted = false diff --git a/node/node.go b/node/node.go index d114f4f9dbe..250e500e80a 100644 --- a/node/node.go +++ b/node/node.go @@ -610,11 +610,12 @@ func (app *App) initServices(ctx context.Context) error { app.edVerifier = signing.NewEdVerifier(signing.WithVerifierPrefix(app.Config.Genesis.GenesisID().Bytes())) vrfVerifier := signing.NewVRFVerifier() - beaconProtocol := beacon.New(app.host, app.edSgn, app.edVerifier, vrfVerifier, app.cachedDB, app.clock, + beaconProtocol := beacon.New(app.host, app.edVerifier, vrfVerifier, app.cachedDB, app.clock, beacon.WithContext(ctx), beacon.WithConfig(app.Config.Beacon), beacon.WithLogger(app.addLogger(BeaconLogger, lg)), ) + beaconProtocol.Register(app.edSgn) trtlCfg := app.Config.Tortoise trtlCfg.LayerSize = layerSize