Skip to content

Commit

Permalink
fix: panic when update and iterate simultaneously (#232)
Browse files Browse the repository at this point in the history
* Fix panic when update and iteration simultaneously.

1. Copy keys' hashed value instead of copy keys' index in ByteQueue.
2. Skip ErrNotFound during iteration when the key has been evicted.

* byte queue set full is false after allocated addition memory

* Add panic recover in cleanUp to prevent the program exited.

* Revert "Add panic recover in cleanUp to prevent the program exited."

This reverts commit 665c3a9.
  • Loading branch information
WideLee authored Jul 1, 2020
1 parent bbbffd3 commit 48f0a54
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 40 deletions.
84 changes: 55 additions & 29 deletions iterator.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package bigcache

import "sync"
import (
"sync"
)

type iteratorError string

Expand All @@ -22,6 +24,7 @@ type EntryInfo struct {
hash uint64
key string
value []byte
err error
}

// Key returns entry's underlying key
Expand All @@ -46,13 +49,14 @@ func (e EntryInfo) Value() []byte {

// EntryInfoIterator allows to iterate over entries in the cache
type EntryInfoIterator struct {
mutex sync.Mutex
cache *BigCache
currentShard int
currentIndex int
elements []uint32
elementsCount int
valid bool
mutex sync.Mutex
cache *BigCache
currentShard int
currentIndex int
currentEntryInfo EntryInfo
elements []uint64
elementsCount int
valid bool
}

// SetNext moves to next element and returns true if it exists.
Expand All @@ -64,28 +68,66 @@ func (it *EntryInfoIterator) SetNext() bool {

if it.elementsCount > it.currentIndex {
it.valid = true

empty := it.setCurrentEntry()
it.mutex.Unlock()
return true

if empty {
return it.SetNext()
} else {
return true
}
}

for i := it.currentShard + 1; i < it.cache.config.Shards; i++ {
it.elements, it.elementsCount = it.cache.shards[i].copyKeys()
it.elements, it.elementsCount = it.cache.shards[i].copyHashedKeys()

// Non empty shard - stick with it
if it.elementsCount > 0 {
it.currentIndex = 0
it.currentShard = i
it.valid = true

empty := it.setCurrentEntry()
it.mutex.Unlock()
return true

if empty {
return it.SetNext()
} else {
return true
}
}
}
it.mutex.Unlock()
return false
}

func (it *EntryInfoIterator) setCurrentEntry() bool {
var entryNotFound = false
entry, err := it.cache.shards[it.currentShard].getEntry(it.elements[it.currentIndex])

if err == ErrEntryNotFound {
it.currentEntryInfo = emptyEntryInfo
entryNotFound = true
} else if err != nil {
it.currentEntryInfo = EntryInfo{
err: err,
}
} else {
it.currentEntryInfo = EntryInfo{
timestamp: readTimestampFromEntry(entry),
hash: readHashFromEntry(entry),
key: readKeyFromEntry(entry),
value: readEntry(entry),
err: err,
}
}

return entryNotFound
}

func newIterator(cache *BigCache) *EntryInfoIterator {
elements, count := cache.shards[0].copyKeys()
elements, count := cache.shards[0].copyHashedKeys()

return &EntryInfoIterator{
cache: cache,
Expand All @@ -98,25 +140,9 @@ func newIterator(cache *BigCache) *EntryInfoIterator {

// Value returns current value from the iterator
func (it *EntryInfoIterator) Value() (EntryInfo, error) {
it.mutex.Lock()

if !it.valid {
it.mutex.Unlock()
return emptyEntryInfo, ErrInvalidIteratorState
}

entry, err := it.cache.shards[it.currentShard].getEntry(int(it.elements[it.currentIndex]))

if err != nil {
it.mutex.Unlock()
return emptyEntryInfo, ErrCannotRetrieveEntry
}
it.mutex.Unlock()

return EntryInfo{
timestamp: readTimestampFromEntry(entry),
hash: readHashFromEntry(entry),
key: readKeyFromEntry(entry),
value: readEntry(entry),
}, nil
return it.currentEntryInfo, it.currentEntryInfo.err
}
90 changes: 86 additions & 4 deletions iterator_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package bigcache

import (
"context"
"fmt"
"math/rand"
"runtime"
"strconv"
"sync"
Expand Down Expand Up @@ -109,11 +111,11 @@ func TestEntriesIteratorWithConcurrentUpdate(t *testing.T) {
}

current, err := iterator.Value()
assertEqual(t, nil, err)
assertEqual(t, []byte("value"), current.Value())

// then
assertEqual(t, ErrCannotRetrieveEntry, err)
assertEqual(t, "Could not retrieve entry from cache", err.Error())
assertEqual(t, EntryInfo{}, current)
next := iterator.SetNext()
assertEqual(t, false, next)
}

func TestEntriesIteratorWithAllShardsEmpty(t *testing.T) {
Expand Down Expand Up @@ -184,3 +186,83 @@ func TestEntriesIteratorParallelAdd(t *testing.T) {
}
wg.Wait()
}

func TestParallelSetAndIteration(t *testing.T) {
t.Parallel()

rand.Seed(0)

cache, _ := NewBigCache(Config{
Shards: 1,
LifeWindow: time.Second,
MaxEntriesInWindow: 100,
MaxEntrySize: 256,
HardMaxCacheSize: 1,
Verbose: true,
})

entrySize := 1024 * 100
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()

wg := sync.WaitGroup{}
wg.Add(2)

go func() {
defer func() {
err := recover()
// no panic
assertEqual(t, err, nil)
}()

defer wg.Done()

isTimeout := false

for {
if isTimeout {
break
}
select {
case <-ctx.Done():
isTimeout = true
default:
err := cache.Set(strconv.Itoa(rand.Intn(100)), blob('a', entrySize))
noError(t, err)
}
}
}()

go func() {
defer func() {
err := recover()
// no panic
assertEqual(t, nil, err)
}()

defer wg.Done()

isTimeout := false

for {
if isTimeout {
break
}
select {
case <-ctx.Done():
isTimeout = true
default:
iter := cache.Iterator()
for iter.SetNext() {
entry, err := iter.Value()

// then
noError(t, err)
assertEqual(t, entrySize, len(entry.Value()))
}
}
}
}()

wg.Wait()
}
2 changes: 2 additions & 0 deletions queue/bytes_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ func (q *BytesQueue) allocateAdditionalMemory(minimum int) {
}
}

q.full = false

if q.verbose {
log.Printf("Allocated new queue in %s; Capacity: %d \n", time.Since(start), q.capacity)
}
Expand Down
23 changes: 23 additions & 0 deletions queue/bytes_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,29 @@ func TestMaxSizeLimit(t *testing.T) {
assertEqual(t, blob('b', 5), pop(queue))
}

func TestPushEntryAfterAllocateAdditionMemory(t *testing.T) {
t.Parallel()

// given
queue := NewBytesQueue(9, 20, true)

// when
queue.Push([]byte("aaa"))
queue.Push([]byte("bb"))
queue.Pop()
queue.Push([]byte("c"))
queue.Push([]byte("d"))

// allocate more memory
assertEqual(t, 9, queue.Capacity())
queue.Push([]byte("c"))
assertEqual(t, 18, queue.Capacity())

// push after allocate
_, err := queue.Push([]byte("d"))
noError(t, err)
}

func pop(queue *BytesQueue) []byte {
entry, err := queue.Pop()
if err != nil {
Expand Down
19 changes: 12 additions & 7 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,20 +264,25 @@ func (s *cacheShard) cleanUp(currentTimestamp uint64) {
s.lock.Unlock()
}

func (s *cacheShard) getEntry(index int) ([]byte, error) {
func (s *cacheShard) getEntry(hashedKey uint64) ([]byte, error) {
s.lock.RLock()
entry, err := s.entries.Get(index)

entry, err := s.getWrappedEntry(hashedKey)
// copy entry
newEntry := make([]byte, len(entry))
copy(newEntry, entry)

s.lock.RUnlock()

return entry, err
return newEntry, err
}

func (s *cacheShard) copyKeys() (keys []uint32, next int) {
func (s *cacheShard) copyHashedKeys() (keys []uint64, next int) {
s.lock.RLock()
keys = make([]uint32, len(s.hashmap))
keys = make([]uint64, len(s.hashmap))

for _, index := range s.hashmap {
keys[next] = index
for key := range s.hashmap {
keys[next] = key
next++
}

Expand Down

0 comments on commit 48f0a54

Please sign in to comment.