diff --git a/internal/bft/controller.go b/internal/bft/controller.go index b3af84d0..6471277e 100644 --- a/internal/bft/controller.go +++ b/internal/bft/controller.go @@ -468,24 +468,13 @@ func (c *Controller) ViewChanged(newViewNumber uint64, newProposalSequence uint6 c.viewChange <- viewInfo{proposalSeq: newProposalSequence, viewNumber: newViewNumber} } -func (c *Controller) getNextBatch() [][]byte { - var validRequests [][]byte - for len(validRequests) == 0 { // no valid requests in this batch - requests := c.Batcher.NextBatch() - if c.stopped() || c.Batcher.Closed() { - return nil - } - validRequests = append(validRequests, requests...) - } - return validRequests -} - func (c *Controller) propose() { - nextBatch := c.getNextBatch() - if len(nextBatch) == 0 { - // If our next batch is empty, - // it can only be because - // the batcher is stopped and so are we. + if c.stopped() || c.Batcher.Closed() { + return + } + nextBatch := c.Batcher.NextBatch() + if len(nextBatch) == 0 { // no requests in this batch + c.acquireLeaderToken() // try again later return } metadata := c.currView.GetMetadata() diff --git a/internal/bft/controller_test.go b/internal/bft/controller_test.go index c4a73546..e3cffcb9 100644 --- a/internal/bft/controller_test.go +++ b/internal/bft/controller_test.go @@ -584,6 +584,7 @@ func TestControllerLeaderRequestHandling(t *testing.T) { batcher := &mocks.Batcher{} batcher.On("Close") + batcher.On("Closed").Return(false) batcher.On("Reset") batcher.On("NextBatch").Run(func(arguments mock.Arguments) { time.Sleep(time.Hour) diff --git a/test/basic_test.go b/test/basic_test.go index 98dc5306..69946a11 100644 --- a/test/basic_test.go +++ b/test/basic_test.go @@ -1249,6 +1249,70 @@ func TestLeaderModifiesPreprepare(t *testing.T) { } } +func TestLeaderCatchUpWithoutSync(t *testing.T) { + t.Parallel() + network := NewNetwork() + defer network.Shutdown() + + testDir, err := os.MkdirTemp("", t.Name()) + assert.NoErrorf(t, err, "generate temporary test dir") + defer os.RemoveAll(testDir) + + numberOfNodes := 4 + nodes := make([]*App, 0) + for i := 1; i <= numberOfNodes; i++ { + n := newNode(uint64(i), network, t.Name(), testDir, false, 0) + n.Consensus.Config.SyncOnStart = false + nodes = append(nodes, n) + } + + restartWG := sync.WaitGroup{} + restartWG.Add(1) + + restoredWG := sync.WaitGroup{} + restoredWG.Add(1) + + baseLogger := nodes[0].logger.Desugar() + nodes[0].logger = baseLogger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error { + if strings.Contains(entry.Message, "Processed prepares for proposal with seq 1") { + restartWG.Done() + } + if strings.Contains(entry.Message, "Restored proposal with sequence 1") { + restoredWG.Done() + } + return nil + })).Sugar() + nodes[0].Setup() + + startNodes(nodes, network) + + nodes[0].Submit(Request{ID: "1", ClientID: "alice"}) + + restartWG.Wait() + nodes[0].RestartSync(false) + restoredWG.Wait() + + data := make([]*AppRecord, 0) + for i := 0; i < numberOfNodes; i++ { + d := <-nodes[i].Delivered + data = append(data, d) + } + for i := 0; i < numberOfNodes-1; i++ { + assert.Equal(t, data[i], data[i+1]) + } + + nodes[0].Submit(Request{ID: "2", ClientID: "alice"}) + + data = make([]*AppRecord, 0) + for i := 0; i < numberOfNodes; i++ { + d := <-nodes[i].Delivered + data = append(data, d) + } + for i := 0; i < numberOfNodes-1; i++ { + assert.Equal(t, data[i], data[i+1]) + } +} + func TestGradualStart(t *testing.T) { // Scenario: initially the network has only one node // a transaction is submitted and committed with that node diff --git a/test/test_app.go b/test/test_app.go index 9589ec0b..ab2395ef 100644 --- a/test/test_app.go +++ b/test/test_app.go @@ -127,10 +127,15 @@ func (a *App) Sync() types.SyncResponse { // Restart restarts the node func (a *App) Restart() { + a.RestartSync(true) +} + +func (a *App) RestartSync(sync bool) { a.Consensus.Stop() a.Node.Lock() defer a.Node.Unlock() a.Setup() + a.Consensus.Config.SyncOnStart = sync if err := a.Consensus.Start(); err != nil { a.logger.Panicf("Consensus start returned an error : %v", err) }