From 00735c4740a409ff5538e59649f9ba3628287220 Mon Sep 17 00:00:00 2001 From: Hagar Meir Date: Tue, 15 Aug 2023 15:47:33 +0300 Subject: [PATCH] Fix bug stuck in propose MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The leader after a restart would grab the leader token and wait for a non-empty batch. In the new test the view thread of the leader after the restart is just about to call controller decide. However, before the provided fix, the controller is busy waiting for a new batch. Without any new transactions, the leader is stuck. In the fix, if the new batch is empty, the controller continues while reacquiring the leader token, thus allowing the view to call decide. Signed-off-by: Фёдор Партанский --- internal/bft/controller.go | 23 ++++-------- internal/bft/controller_test.go | 1 + test/basic_test.go | 64 +++++++++++++++++++++++++++++++++ test/test_app.go | 5 +++ 4 files changed, 76 insertions(+), 17 deletions(-) diff --git a/internal/bft/controller.go b/internal/bft/controller.go index b3af84d0..6471277e 100644 --- a/internal/bft/controller.go +++ b/internal/bft/controller.go @@ -468,24 +468,13 @@ func (c *Controller) ViewChanged(newViewNumber uint64, newProposalSequence uint6 c.viewChange <- viewInfo{proposalSeq: newProposalSequence, viewNumber: newViewNumber} } -func (c *Controller) getNextBatch() [][]byte { - var validRequests [][]byte - for len(validRequests) == 0 { // no valid requests in this batch - requests := c.Batcher.NextBatch() - if c.stopped() || c.Batcher.Closed() { - return nil - } - validRequests = append(validRequests, requests...) - } - return validRequests -} - func (c *Controller) propose() { - nextBatch := c.getNextBatch() - if len(nextBatch) == 0 { - // If our next batch is empty, - // it can only be because - // the batcher is stopped and so are we. + if c.stopped() || c.Batcher.Closed() { + return + } + nextBatch := c.Batcher.NextBatch() + if len(nextBatch) == 0 { // no requests in this batch + c.acquireLeaderToken() // try again later return } metadata := c.currView.GetMetadata() diff --git a/internal/bft/controller_test.go b/internal/bft/controller_test.go index c4a73546..e3cffcb9 100644 --- a/internal/bft/controller_test.go +++ b/internal/bft/controller_test.go @@ -584,6 +584,7 @@ func TestControllerLeaderRequestHandling(t *testing.T) { batcher := &mocks.Batcher{} batcher.On("Close") + batcher.On("Closed").Return(false) batcher.On("Reset") batcher.On("NextBatch").Run(func(arguments mock.Arguments) { time.Sleep(time.Hour) diff --git a/test/basic_test.go b/test/basic_test.go index 98dc5306..69946a11 100644 --- a/test/basic_test.go +++ b/test/basic_test.go @@ -1249,6 +1249,70 @@ func TestLeaderModifiesPreprepare(t *testing.T) { } } +func TestLeaderCatchUpWithoutSync(t *testing.T) { + t.Parallel() + network := NewNetwork() + defer network.Shutdown() + + testDir, err := os.MkdirTemp("", t.Name()) + assert.NoErrorf(t, err, "generate temporary test dir") + defer os.RemoveAll(testDir) + + numberOfNodes := 4 + nodes := make([]*App, 0) + for i := 1; i <= numberOfNodes; i++ { + n := newNode(uint64(i), network, t.Name(), testDir, false, 0) + n.Consensus.Config.SyncOnStart = false + nodes = append(nodes, n) + } + + restartWG := sync.WaitGroup{} + restartWG.Add(1) + + restoredWG := sync.WaitGroup{} + restoredWG.Add(1) + + baseLogger := nodes[0].logger.Desugar() + nodes[0].logger = baseLogger.WithOptions(zap.Hooks(func(entry zapcore.Entry) error { + if strings.Contains(entry.Message, "Processed prepares for proposal with seq 1") { + restartWG.Done() + } + if strings.Contains(entry.Message, "Restored proposal with sequence 1") { + restoredWG.Done() + } + return nil + })).Sugar() + nodes[0].Setup() + + startNodes(nodes, network) + + nodes[0].Submit(Request{ID: "1", ClientID: "alice"}) + + restartWG.Wait() + nodes[0].RestartSync(false) + restoredWG.Wait() + + data := make([]*AppRecord, 0) + for i := 0; i < numberOfNodes; i++ { + d := <-nodes[i].Delivered + data = append(data, d) + } + for i := 0; i < numberOfNodes-1; i++ { + assert.Equal(t, data[i], data[i+1]) + } + + nodes[0].Submit(Request{ID: "2", ClientID: "alice"}) + + data = make([]*AppRecord, 0) + for i := 0; i < numberOfNodes; i++ { + d := <-nodes[i].Delivered + data = append(data, d) + } + for i := 0; i < numberOfNodes-1; i++ { + assert.Equal(t, data[i], data[i+1]) + } +} + func TestGradualStart(t *testing.T) { // Scenario: initially the network has only one node // a transaction is submitted and committed with that node diff --git a/test/test_app.go b/test/test_app.go index 9589ec0b..ab2395ef 100644 --- a/test/test_app.go +++ b/test/test_app.go @@ -127,10 +127,15 @@ func (a *App) Sync() types.SyncResponse { // Restart restarts the node func (a *App) Restart() { + a.RestartSync(true) +} + +func (a *App) RestartSync(sync bool) { a.Consensus.Stop() a.Node.Lock() defer a.Node.Unlock() a.Setup() + a.Consensus.Config.SyncOnStart = sync if err := a.Consensus.Start(); err != nil { a.logger.Panicf("Consensus start returned an error : %v", err) }