Skip to content

Commit

Permalink
Merge pull request #4293 from ElrondNetwork/activeDB-markup-exception…
Browse files Browse the repository at this point in the history
…-master

[FIX]Active db markup exception on master
  • Loading branch information
iulianpascalau authored Jul 22, 2022
2 parents db10dc2 + 92c1438 commit 3ec53be
Show file tree
Hide file tree
Showing 15 changed files with 579 additions and 61 deletions.
5 changes: 3 additions & 2 deletions common/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ type StorageManager interface {
Get(key []byte) ([]byte, error)
GetFromCurrentEpoch(key []byte) ([]byte, error)
PutInEpoch(key []byte, val []byte, epoch uint32) error
TakeSnapshot(rootHash []byte, mainTrieRootHash []byte, leavesChan chan core.KeyValueHolder, stats SnapshotStatisticsHandler, epoch uint32)
SetCheckpoint(rootHash []byte, mainTrieRootHash []byte, leavesChan chan core.KeyValueHolder, stats SnapshotStatisticsHandler)
TakeSnapshot(rootHash []byte, mainTrieRootHash []byte, leavesChan chan core.KeyValueHolder, errChan chan error, stats SnapshotStatisticsHandler, epoch uint32)
SetCheckpoint(rootHash []byte, mainTrieRootHash []byte, leavesChan chan core.KeyValueHolder, errChan chan error, stats SnapshotStatisticsHandler)
GetLatestStorageEpoch() (uint32, error)
IsPruningEnabled() bool
IsPruningBlocked() bool
Expand All @@ -57,6 +57,7 @@ type StorageManager interface {
Remove(hash []byte) error
SetEpochForPutOperation(uint32)
ShouldTakeSnapshot() bool
IsClosed() bool
Close() error
IsInterfaceNil() bool

Expand Down
57 changes: 43 additions & 14 deletions state/accountsDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,7 @@ func (adb *AccountsDB) SnapshotState(rootHash []byte) {
return
}

log.Debug("starting snapshot", "rootHash", rootHash, "epoch", epoch)
log.Info("starting snapshot", "rootHash", rootHash, "epoch", epoch)

adb.lastSnapshot.rootHash = rootHash
adb.lastSnapshot.epoch = epoch
Expand All @@ -1102,28 +1102,41 @@ func (adb *AccountsDB) SnapshotState(rootHash []byte) {

trieStorageManager.EnterPruningBufferingMode()

errChan := make(chan error, 1)
stats := newSnapshotStatistics(1)
go func() {
leavesChannel := make(chan core.KeyValueHolder, leavesChannelSize)
stats.NewSnapshotStarted()
trieStorageManager.TakeSnapshot(rootHash, rootHash, leavesChannel, stats, epoch)
adb.snapshotUserAccountDataTrie(true, rootHash, leavesChannel, stats, epoch)
trieStorageManager.TakeSnapshot(rootHash, rootHash, leavesChannel, errChan, stats, epoch)
adb.snapshotUserAccountDataTrie(true, rootHash, leavesChannel, errChan, stats, epoch)
trieStorageManager.ExitPruningBufferingMode()

stats.wg.Done()
}()

go func() {
printStats(stats, "snapshotState user trie", rootHash)

log.Debug("set activeDB in epoch", "epoch", epoch)
errPut := trieStorageManager.PutInEpoch([]byte(common.ActiveDBKey), []byte(common.ActiveDBVal), epoch)
handleLoggingWhenError("error while putting active DB value into main storer", errPut)
}()
go adb.markActiveDBAfterSnapshot(stats, errChan, rootHash, "snapshotState user trie", epoch)

adb.waitForCompletionIfRunningInImportDB(stats)
}

func (adb *AccountsDB) markActiveDBAfterSnapshot(stats *snapshotStatistics, errChan chan error, rootHash []byte, message string, epoch uint32) {
printStats(stats, message, rootHash)

trieStorageManager := adb.mainTrie.GetStorageManager()
containsErrorDuringSnapshot := emptyErrChanReturningHadContained(errChan)
shouldNotMarkActive := trieStorageManager.IsClosed() || containsErrorDuringSnapshot
if shouldNotMarkActive {
log.Debug("will not set activeDB in epoch as the snapshot might be incomplete",
"epoch", epoch, "trie storage manager closed", trieStorageManager.IsClosed(),
"errors during snapshot found", containsErrorDuringSnapshot)
return
}

log.Debug("set activeDB in epoch", "epoch", epoch)
errPut := trieStorageManager.PutInEpoch([]byte(common.ActiveDBKey), []byte(common.ActiveDBVal), epoch)
handleLoggingWhenError("error while putting active DB value into main storer", errPut)
}

func printStats(stats *snapshotStatistics, identifier string, rootHash []byte) {
stats.wg.Wait()

Expand All @@ -1140,10 +1153,23 @@ func printStats(stats *snapshotStatistics, identifier string, rootHash []byte) {
)
}

func emptyErrChanReturningHadContained(errChan chan error) bool {
contained := false
for {
select {
case <-errChan:
contained = true
default:
return contained
}
}
}

func (adb *AccountsDB) snapshotUserAccountDataTrie(
isSnapshot bool,
mainTrieRootHash []byte,
leavesChannel chan core.KeyValueHolder,
errChan chan error,
stats common.SnapshotStatisticsHandler,
epoch uint32,
) {
Expand All @@ -1163,11 +1189,11 @@ func (adb *AccountsDB) snapshotUserAccountDataTrie(
stats.NewDataTrie()

if isSnapshot {
adb.mainTrie.GetStorageManager().TakeSnapshot(account.RootHash, mainTrieRootHash, nil, stats, epoch)
adb.mainTrie.GetStorageManager().TakeSnapshot(account.RootHash, mainTrieRootHash, nil, errChan, stats, epoch)
continue
}

adb.mainTrie.GetStorageManager().SetCheckpoint(account.RootHash, mainTrieRootHash, nil, stats)
adb.mainTrie.GetStorageManager().SetCheckpoint(account.RootHash, mainTrieRootHash, nil, errChan, stats)
}
}

Expand All @@ -1185,16 +1211,19 @@ func (adb *AccountsDB) setStateCheckpoint(rootHash []byte) {
trieStorageManager.EnterPruningBufferingMode()

stats := newSnapshotStatistics(1)
errChan := make(chan error, 1)
go func() {
leavesChannel := make(chan core.KeyValueHolder, leavesChannelSize)
stats.NewSnapshotStarted()
trieStorageManager.SetCheckpoint(rootHash, rootHash, leavesChannel, stats)
adb.snapshotUserAccountDataTrie(false, rootHash, leavesChannel, stats, 0)
trieStorageManager.SetCheckpoint(rootHash, rootHash, leavesChannel, errChan, stats)
adb.snapshotUserAccountDataTrie(false, rootHash, leavesChannel, errChan, stats, 0)
trieStorageManager.ExitPruningBufferingMode()

stats.wg.Done()
}()

// TODO decide if we need to take some actions whenever we hit an error that occurred in the checkpoint process
// that will be present in the errChan var
go printStats(stats, "setStateCheckpoint user trie", rootHash)

adb.waitForCompletionIfRunningInImportDB(stats)
Expand Down
173 changes: 167 additions & 6 deletions state/accountsDB_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func TestNewAccountsDB(t *testing.T) {
})
}

//------- SaveAccount
// ------- SaveAccount

func TestAccountsDB_SaveAccountNilAccountShouldErr(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -905,7 +905,7 @@ func TestAccountsDB_SnapshotState(t *testing.T) {
trieStub := &trieMock.TrieStub{
GetStorageManagerCalled: func() common.StorageManager {
return &testscommon.StorageManagerStub{
TakeSnapshotCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ common.SnapshotStatisticsHandler, _ uint32) {
TakeSnapshotCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ chan error, _ common.SnapshotStatisticsHandler, _ uint32) {
snapshotMut.Lock()
takeSnapshotWasCalled = true
snapshotMut.Unlock()
Expand All @@ -922,6 +922,112 @@ func TestAccountsDB_SnapshotState(t *testing.T) {
snapshotMut.Unlock()
}

func TestAccountsDB_SnapshotStateOnAClosedStorageManagerShouldNotMarkActiveDB(t *testing.T) {
t.Parallel()

mut := sync.RWMutex{}
lastSnapshotStartedWasPut := false
activeDBWasPut := false
trieStub := &trieMock.TrieStub{
GetStorageManagerCalled: func() common.StorageManager {
return &testscommon.StorageManagerStub{
ShouldTakeSnapshotCalled: func() bool {
return true
},
TakeSnapshotCalled: func(_ []byte, _ []byte, ch chan core.KeyValueHolder, _ chan error, stats common.SnapshotStatisticsHandler, _ uint32) {
close(ch)
stats.SnapshotFinished()
},
IsClosedCalled: func() bool {
return true
},
PutCalled: func(key []byte, val []byte) error {
mut.Lock()
defer mut.Unlock()

if string(key) == state.LastSnapshotStarted {
lastSnapshotStartedWasPut = true
}

return nil
},
PutInEpochCalled: func(key []byte, val []byte, epoch uint32) error {
mut.Lock()
defer mut.Unlock()

if string(key) == common.ActiveDBKey {
activeDBWasPut = true
}

return nil
},
}
},
}
adb := generateAccountDBFromTrie(trieStub)
adb.SnapshotState([]byte("roothash"))
time.Sleep(time.Second)

mut.RLock()
defer mut.RUnlock()
assert.True(t, lastSnapshotStartedWasPut)
assert.False(t, activeDBWasPut)
}

func TestAccountsDB_SnapshotStateWithErrorsShouldNotMarkActiveDB(t *testing.T) {
t.Parallel()

mut := sync.RWMutex{}
lastSnapshotStartedWasPut := false
activeDBWasPut := false
expectedErr := errors.New("expected error")
trieStub := &trieMock.TrieStub{
GetStorageManagerCalled: func() common.StorageManager {
return &testscommon.StorageManagerStub{
ShouldTakeSnapshotCalled: func() bool {
return true
},
TakeSnapshotCalled: func(_ []byte, _ []byte, ch chan core.KeyValueHolder, errChan chan error, stats common.SnapshotStatisticsHandler, _ uint32) {
errChan <- expectedErr
close(ch)
stats.SnapshotFinished()
},
IsClosedCalled: func() bool {
return false
},
PutCalled: func(key []byte, val []byte) error {
mut.Lock()
defer mut.Unlock()

if string(key) == state.LastSnapshotStarted {
lastSnapshotStartedWasPut = true
}

return nil
},
PutInEpochCalled: func(key []byte, val []byte, epoch uint32) error {
mut.Lock()
defer mut.Unlock()

if string(key) == common.ActiveDBKey {
activeDBWasPut = true
}

return nil
},
}
},
}
adb := generateAccountDBFromTrie(trieStub)
adb.SnapshotState([]byte("roothash"))
time.Sleep(time.Second)

mut.RLock()
defer mut.RUnlock()
assert.True(t, lastSnapshotStartedWasPut)
assert.False(t, activeDBWasPut)
}

func TestAccountsDB_SnapshotStateGetLatestStorageEpochErrDoesNotSnapshot(t *testing.T) {
t.Parallel()

Expand All @@ -932,7 +1038,7 @@ func TestAccountsDB_SnapshotStateGetLatestStorageEpochErrDoesNotSnapshot(t *test
GetLatestStorageEpochCalled: func() (uint32, error) {
return 0, fmt.Errorf("new error")
},
TakeSnapshotCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ common.SnapshotStatisticsHandler, _ uint32) {
TakeSnapshotCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ chan error, _ common.SnapshotStatisticsHandler, _ uint32) {
takeSnapshotCalled = true
},
}
Expand All @@ -959,7 +1065,7 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) {
GetLatestStorageEpochCalled: func() (uint32, error) {
return latestEpoch, nil
},
TakeSnapshotCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ common.SnapshotStatisticsHandler, _ uint32) {
TakeSnapshotCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ chan error, _ common.SnapshotStatisticsHandler, _ uint32) {
snapshotMutex.Lock()
takeSnapshotCalled++
snapshotMutex.Unlock()
Expand Down Expand Up @@ -1063,7 +1169,7 @@ func TestAccountsDB_SetStateCheckpoint(t *testing.T) {
trieStub := &trieMock.TrieStub{
GetStorageManagerCalled: func() common.StorageManager {
return &testscommon.StorageManagerStub{
SetCheckpointCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ common.SnapshotStatisticsHandler) {
SetCheckpointCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ chan error, _ common.SnapshotStatisticsHandler) {
snapshotMut.Lock()
setCheckPointWasCalled = true
snapshotMut.Unlock()
Expand Down Expand Up @@ -2274,7 +2380,7 @@ func TestAccountsDB_NewAccountsDbStartsSnapshotAfterRestart(t *testing.T) {
ShouldTakeSnapshotCalled: func() bool {
return true
},
TakeSnapshotCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ common.SnapshotStatisticsHandler, _ uint32) {
TakeSnapshotCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ chan error, _ common.SnapshotStatisticsHandler, _ uint32) {
takeSnapshotCalled.SetValue(true)
},
}
Expand Down Expand Up @@ -2377,3 +2483,58 @@ func TestAccountsDB_waitForCompletionIfRunningInImportDB(t *testing.T) {
assert.True(t, waitForSnapshotsToFinishCalled)
})
}

func TestEmptyErrChanReturningHadContained(t *testing.T) {
t.Parallel()

t.Run("empty chan should return false", func(t *testing.T) {
t.Parallel()

t.Run("unbuffered chan", func(t *testing.T) {
t.Parallel()

errChan := make(chan error)
assert.False(t, state.EmptyErrChanReturningHadContained(errChan))
assert.Equal(t, 0, len(errChan))
})
t.Run("buffered chan", func(t *testing.T) {
t.Parallel()

for i := 1; i < 10; i++ {
errChan := make(chan error, i)
assert.False(t, state.EmptyErrChanReturningHadContained(errChan))
assert.Equal(t, 0, len(errChan))
}
})
})
t.Run("chan containing elements should return true", func(t *testing.T) {
t.Parallel()

t.Run("unbuffered chan", func(t *testing.T) {
t.Parallel()

errChan := make(chan error)
go func() {
errChan <- errors.New("test")
}()

time.Sleep(time.Second) // allow the go routine to start

assert.True(t, state.EmptyErrChanReturningHadContained(errChan))
assert.Equal(t, 0, len(errChan))
})
t.Run("buffered chan", func(t *testing.T) {
t.Parallel()

for i := 1; i < 10; i++ {
errChan := make(chan error, i)
for j := 0; j < i; j++ {
errChan <- errors.New("test")
}

assert.True(t, state.EmptyErrChanReturningHadContained(errChan))
assert.Equal(t, 0, len(errChan))
}
})
})
}
8 changes: 8 additions & 0 deletions state/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
vmcommon "github.com/ElrondNetwork/elrond-vm-common"
)

// LastSnapshotStarted -
const LastSnapshotStarted = lastSnapshotStarted

// NewEmptyBaseAccount -
func NewEmptyBaseAccount(address []byte, tracker DataTrieTracker) *baseAccount {
return &baseAccount{
Expand Down Expand Up @@ -65,3 +68,8 @@ func (accountsDB *accountsDBApi) SetLastRootHash(rootHash []byte) {
accountsDB.lastRootHash = rootHash
accountsDB.mutLastRootHash.Unlock()
}

// EmptyErrChanReturningHadContained -
func EmptyErrChanReturningHadContained(errChan chan error) bool {
return emptyErrChanReturningHadContained(errChan)
}
Loading

0 comments on commit 3ec53be

Please sign in to comment.