From 48f0a543146a11dac3accef0f5282639045eac54 Mon Sep 17 00:00:00 2001 From: Mingkuan Date: Wed, 1 Jul 2020 23:54:13 +0800 Subject: [PATCH] fix: panic when update and iterate simultaneously (#232) * Fix panic when update and iteration simultaneously. 1. Copy keys' hashed value instead of copy keys' index in ByteQueue. 2. Skip ErrNotFound during iteration when the key has been evicted. * byte queue set full is false after allocated addition memory * Add panic recover in cleanUp to prevent the program exited. * Revert "Add panic recover in cleanUp to prevent the program exited." This reverts commit 665c3a9849ee6bc159f361ebd1296ac4a2eb44b7. --- iterator.go | 84 +++++++++++++++++++++++------------- iterator_test.go | 90 +++++++++++++++++++++++++++++++++++++-- queue/bytes_queue.go | 2 + queue/bytes_queue_test.go | 23 ++++++++++ shard.go | 19 ++++++--- 5 files changed, 178 insertions(+), 40 deletions(-) diff --git a/iterator.go b/iterator.go index 70b98d90..e55f03ea 100644 --- a/iterator.go +++ b/iterator.go @@ -1,6 +1,8 @@ package bigcache -import "sync" +import ( + "sync" +) type iteratorError string @@ -22,6 +24,7 @@ type EntryInfo struct { hash uint64 key string value []byte + err error } // Key returns entry's underlying key @@ -46,13 +49,14 @@ func (e EntryInfo) Value() []byte { // EntryInfoIterator allows to iterate over entries in the cache type EntryInfoIterator struct { - mutex sync.Mutex - cache *BigCache - currentShard int - currentIndex int - elements []uint32 - elementsCount int - valid bool + mutex sync.Mutex + cache *BigCache + currentShard int + currentIndex int + currentEntryInfo EntryInfo + elements []uint64 + elementsCount int + valid bool } // SetNext moves to next element and returns true if it exists. @@ -64,28 +68,66 @@ func (it *EntryInfoIterator) SetNext() bool { if it.elementsCount > it.currentIndex { it.valid = true + + empty := it.setCurrentEntry() it.mutex.Unlock() - return true + + if empty { + return it.SetNext() + } else { + return true + } } for i := it.currentShard + 1; i < it.cache.config.Shards; i++ { - it.elements, it.elementsCount = it.cache.shards[i].copyKeys() + it.elements, it.elementsCount = it.cache.shards[i].copyHashedKeys() // Non empty shard - stick with it if it.elementsCount > 0 { it.currentIndex = 0 it.currentShard = i it.valid = true + + empty := it.setCurrentEntry() it.mutex.Unlock() - return true + + if empty { + return it.SetNext() + } else { + return true + } } } it.mutex.Unlock() return false } +func (it *EntryInfoIterator) setCurrentEntry() bool { + var entryNotFound = false + entry, err := it.cache.shards[it.currentShard].getEntry(it.elements[it.currentIndex]) + + if err == ErrEntryNotFound { + it.currentEntryInfo = emptyEntryInfo + entryNotFound = true + } else if err != nil { + it.currentEntryInfo = EntryInfo{ + err: err, + } + } else { + it.currentEntryInfo = EntryInfo{ + timestamp: readTimestampFromEntry(entry), + hash: readHashFromEntry(entry), + key: readKeyFromEntry(entry), + value: readEntry(entry), + err: err, + } + } + + return entryNotFound +} + func newIterator(cache *BigCache) *EntryInfoIterator { - elements, count := cache.shards[0].copyKeys() + elements, count := cache.shards[0].copyHashedKeys() return &EntryInfoIterator{ cache: cache, @@ -98,25 +140,9 @@ func newIterator(cache *BigCache) *EntryInfoIterator { // Value returns current value from the iterator func (it *EntryInfoIterator) Value() (EntryInfo, error) { - it.mutex.Lock() - if !it.valid { - it.mutex.Unlock() return emptyEntryInfo, ErrInvalidIteratorState } - entry, err := it.cache.shards[it.currentShard].getEntry(int(it.elements[it.currentIndex])) - - if err != nil { - it.mutex.Unlock() - return emptyEntryInfo, ErrCannotRetrieveEntry - } - it.mutex.Unlock() - - return EntryInfo{ - timestamp: readTimestampFromEntry(entry), - hash: readHashFromEntry(entry), - key: readKeyFromEntry(entry), - value: readEntry(entry), - }, nil + return it.currentEntryInfo, it.currentEntryInfo.err } diff --git a/iterator_test.go b/iterator_test.go index 5ae1b687..63e2b723 100644 --- a/iterator_test.go +++ b/iterator_test.go @@ -1,7 +1,9 @@ package bigcache import ( + "context" "fmt" + "math/rand" "runtime" "strconv" "sync" @@ -109,11 +111,11 @@ func TestEntriesIteratorWithConcurrentUpdate(t *testing.T) { } current, err := iterator.Value() + assertEqual(t, nil, err) + assertEqual(t, []byte("value"), current.Value()) - // then - assertEqual(t, ErrCannotRetrieveEntry, err) - assertEqual(t, "Could not retrieve entry from cache", err.Error()) - assertEqual(t, EntryInfo{}, current) + next := iterator.SetNext() + assertEqual(t, false, next) } func TestEntriesIteratorWithAllShardsEmpty(t *testing.T) { @@ -184,3 +186,83 @@ func TestEntriesIteratorParallelAdd(t *testing.T) { } wg.Wait() } + +func TestParallelSetAndIteration(t *testing.T) { + t.Parallel() + + rand.Seed(0) + + cache, _ := NewBigCache(Config{ + Shards: 1, + LifeWindow: time.Second, + MaxEntriesInWindow: 100, + MaxEntrySize: 256, + HardMaxCacheSize: 1, + Verbose: true, + }) + + entrySize := 1024 * 100 + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + wg := sync.WaitGroup{} + wg.Add(2) + + go func() { + defer func() { + err := recover() + // no panic + assertEqual(t, err, nil) + }() + + defer wg.Done() + + isTimeout := false + + for { + if isTimeout { + break + } + select { + case <-ctx.Done(): + isTimeout = true + default: + err := cache.Set(strconv.Itoa(rand.Intn(100)), blob('a', entrySize)) + noError(t, err) + } + } + }() + + go func() { + defer func() { + err := recover() + // no panic + assertEqual(t, nil, err) + }() + + defer wg.Done() + + isTimeout := false + + for { + if isTimeout { + break + } + select { + case <-ctx.Done(): + isTimeout = true + default: + iter := cache.Iterator() + for iter.SetNext() { + entry, err := iter.Value() + + // then + noError(t, err) + assertEqual(t, entrySize, len(entry.Value())) + } + } + } + }() + + wg.Wait() +} diff --git a/queue/bytes_queue.go b/queue/bytes_queue.go index 0a1cb389..927a1783 100644 --- a/queue/bytes_queue.go +++ b/queue/bytes_queue.go @@ -129,6 +129,8 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) { } } + q.full = false + if q.verbose { log.Printf("Allocated new queue in %s; Capacity: %d \n", time.Since(start), q.capacity) } diff --git a/queue/bytes_queue_test.go b/queue/bytes_queue_test.go index 3b197cc8..545bf7da 100644 --- a/queue/bytes_queue_test.go +++ b/queue/bytes_queue_test.go @@ -379,6 +379,29 @@ func TestMaxSizeLimit(t *testing.T) { assertEqual(t, blob('b', 5), pop(queue)) } +func TestPushEntryAfterAllocateAdditionMemory(t *testing.T) { + t.Parallel() + + // given + queue := NewBytesQueue(9, 20, true) + + // when + queue.Push([]byte("aaa")) + queue.Push([]byte("bb")) + queue.Pop() + queue.Push([]byte("c")) + queue.Push([]byte("d")) + + // allocate more memory + assertEqual(t, 9, queue.Capacity()) + queue.Push([]byte("c")) + assertEqual(t, 18, queue.Capacity()) + + // push after allocate + _, err := queue.Push([]byte("d")) + noError(t, err) +} + func pop(queue *BytesQueue) []byte { entry, err := queue.Pop() if err != nil { diff --git a/shard.go b/shard.go index ada18884..a7e9f119 100644 --- a/shard.go +++ b/shard.go @@ -264,20 +264,25 @@ func (s *cacheShard) cleanUp(currentTimestamp uint64) { s.lock.Unlock() } -func (s *cacheShard) getEntry(index int) ([]byte, error) { +func (s *cacheShard) getEntry(hashedKey uint64) ([]byte, error) { s.lock.RLock() - entry, err := s.entries.Get(index) + + entry, err := s.getWrappedEntry(hashedKey) + // copy entry + newEntry := make([]byte, len(entry)) + copy(newEntry, entry) + s.lock.RUnlock() - return entry, err + return newEntry, err } -func (s *cacheShard) copyKeys() (keys []uint32, next int) { +func (s *cacheShard) copyHashedKeys() (keys []uint64, next int) { s.lock.RLock() - keys = make([]uint32, len(s.hashmap)) + keys = make([]uint64, len(s.hashmap)) - for _, index := range s.hashmap { - keys[next] = index + for key := range s.hashmap { + keys[next] = key next++ }