From 9d9b9911eea2fc76f3141199d7a178cc4b18e3a6 Mon Sep 17 00:00:00 2001 From: Iulian Pascalau Date: Tue, 6 Feb 2024 17:54:11 +0200 Subject: [PATCH] - refactored remove get concurrency to work also with more than one pending batch --- leveldb/batch.go | 36 ++++++++++---- leveldb/export_test.go | 6 +++ leveldb/leveldb.go | 14 +++--- leveldb/leveldbSerial.go | 32 ++++++++---- leveldb/leveldbSerial_test.go | 85 ++++++++++++++++++++++++++++++++ leveldb/mapKeysRemovedHandler.go | 52 +++++++++++++++++++ 6 files changed, 200 insertions(+), 25 deletions(-) create mode 100644 leveldb/export_test.go create mode 100644 leveldb/mapKeysRemovedHandler.go diff --git a/leveldb/batch.go b/leveldb/batch.go index 7367d732..8171ee67 100644 --- a/leveldb/batch.go +++ b/leveldb/batch.go @@ -10,19 +10,23 @@ import ( var _ types.Batcher = (*batch)(nil) type batch struct { - batch *leveldb.Batch - cachedData map[string][]byte - removedData map[string]struct{} - mutBatch sync.RWMutex + batch *leveldb.Batch + cachedData map[string][]byte + removedData map[string]struct{} + id uint64 + keysRemovedHandler keysRemovedHandler + mutBatch sync.RWMutex } // NewBatch creates a batch -func NewBatch() *batch { +func NewBatch(id uint64, keysRemovedHandler keysRemovedHandler) *batch { return &batch{ - batch: &leveldb.Batch{}, - cachedData: make(map[string][]byte), - removedData: make(map[string]struct{}), - mutBatch: sync.RWMutex{}, + batch: &leveldb.Batch{}, + cachedData: make(map[string][]byte), + removedData: make(map[string]struct{}), + mutBatch: sync.RWMutex{}, + id: id, + keysRemovedHandler: keysRemovedHandler, } } @@ -69,8 +73,20 @@ func (b *batch) IsRemoved(key []byte) bool { defer b.mutBatch.RUnlock() _, found := b.removedData[string(key)] + if found { + return true + } + _, found = b.cachedData[string(key)] + if found { + return false + } + + return b.keysRemovedHandler.hasRemovedKeys(key) +} - return found +// ID returns the batch's ID +func (b *batch) ID() uint64 { + return b.id } // IsInterfaceNil returns true if there is no value under the interface diff --git a/leveldb/export_test.go b/leveldb/export_test.go new file mode 100644 index 00000000..dcd88bf4 --- /dev/null +++ b/leveldb/export_test.go @@ -0,0 +1,6 @@ +package leveldb + +// PutBatch will call the unexported putBatch function +func (s *SerialDB) PutBatch() { + _ = s.putBatch() +} diff --git a/leveldb/leveldb.go b/leveldb/leveldb.go index 61448a7b..2169f46e 100644 --- a/leveldb/leveldb.go +++ b/leveldb/leveldb.go @@ -28,6 +28,7 @@ var log = logger.GetOrCreate("storage/leveldb") // DB holds a pointer to the leveldb database and the path to where it is stored. type DB struct { + keysRemovedHandler *baseLevelDb maxBatchSize int batchDelaySeconds int @@ -76,11 +77,12 @@ func NewDB(path string, batchDelaySeconds int, maxBatchSize int, maxOpenFiles in ctx, cancel := context.WithCancel(context.Background()) dbStore := &DB{ - baseLevelDb: bldb, - maxBatchSize: maxBatchSize, - batchDelaySeconds: batchDelaySeconds, - sizeBatch: 0, - cancel: cancel, + baseLevelDb: bldb, + maxBatchSize: maxBatchSize, + batchDelaySeconds: batchDelaySeconds, + sizeBatch: 0, + cancel: cancel, + keysRemovedHandler: newMapKeysRemovedHandler(), } dbStore.batch = dbStore.createBatch() @@ -220,7 +222,7 @@ func (s *DB) Has(key []byte) error { // CreateBatch returns a batcher to be used for batch writing data to the database func (s *DB) createBatch() types.Batcher { - return NewBatch() + return NewBatch(0, s.keysRemovedHandler) } // putBatch writes the Batch data into the database diff --git a/leveldb/leveldbSerial.go b/leveldb/leveldbSerial.go index fbb8c5e6..293c70e7 100644 --- a/leveldb/leveldbSerial.go +++ b/leveldb/leveldbSerial.go @@ -19,13 +19,21 @@ import ( var _ types.Persister = (*SerialDB)(nil) +type keysRemovedHandler interface { + addRemovedKeys(keys map[string]struct{}, batchID uint64) + deleteRemovedKeys(batchID uint64) + hasRemovedKeys(key []byte) bool +} + // SerialDB holds a pointer to the leveldb database and the path to where it is stored. type SerialDB struct { *baseLevelDb + keysRemovedHandler maxBatchSize int batchDelaySeconds int sizeBatch int batch types.Batcher + batchID uint64 mutBatch sync.RWMutex dbAccess chan serialQueryer cancel context.CancelFunc @@ -71,16 +79,18 @@ func NewSerialDB(path string, batchDelaySeconds int, maxBatchSize int, maxOpenFi ctx, cancel := context.WithCancel(context.Background()) dbStore := &SerialDB{ - baseLevelDb: bldb, - maxBatchSize: maxBatchSize, - batchDelaySeconds: batchDelaySeconds, - sizeBatch: 0, - dbAccess: make(chan serialQueryer), - cancel: cancel, - closer: closing.NewSafeChanCloser(), + keysRemovedHandler: newMapKeysRemovedHandler(), + baseLevelDb: bldb, + maxBatchSize: maxBatchSize, + batchDelaySeconds: batchDelaySeconds, + sizeBatch: 0, + dbAccess: make(chan serialQueryer), + cancel: cancel, + closer: closing.NewSafeChanCloser(), + batchID: 0, } - dbStore.batch = NewBatch() + dbStore.batch = NewBatch(dbStore.batchID, dbStore.keysRemovedHandler) go dbStore.batchTimeoutHandle(ctx) go dbStore.processLoop(ctx) @@ -246,7 +256,9 @@ func (s *SerialDB) putBatch() error { return common.ErrInvalidBatch } s.sizeBatch = 0 - s.batch = NewBatch() + s.batchID++ + s.batch = NewBatch(s.batchID, s.keysRemovedHandler) + s.keysRemovedHandler.addRemovedKeys(dbBatch.removedData, dbBatch.ID()) s.mutBatch.Unlock() ch := make(chan error) @@ -262,6 +274,8 @@ func (s *SerialDB) putBatch() error { result := <-ch close(ch) + s.keysRemovedHandler.deleteRemovedKeys(dbBatch.ID()) + return result } diff --git a/leveldb/leveldbSerial_test.go b/leveldb/leveldbSerial_test.go index f8b39827..6c3d90e7 100644 --- a/leveldb/leveldbSerial_test.go +++ b/leveldb/leveldbSerial_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math/big" "sync" + "sync/atomic" "testing" "time" @@ -358,3 +359,87 @@ func TestSerialDB_ConcurrentOperations(t *testing.T) { wg.Wait() } + +func TestSerialDB_PutRemoveGet(t *testing.T) { + if testing.Short() { + t.Skip("this is not a short test") + } + + ldb := createSerialLevelDb(t, 100000, 1000000, 10) + + numKeys := 10000 + for i := 0; i < numKeys; i++ { + _ = ldb.Put([]byte(fmt.Sprintf("key %d", i)), []byte("val")) + } + + time.Sleep(time.Second * 2) + + numErr := uint32(0) + + for i := 0; i < numKeys; i++ { + key := []byte(fmt.Sprintf("key %d", i)) + + recoveredVal, _ := ldb.Get(key) + assert.NotEmpty(t, recoveredVal) + + wg := &sync.WaitGroup{} + wg.Add(2) + + // emulate the following scenario: + // the sequence Remove(key) -> Get(key) is done while the putBatch is called. So the actual edgecase is + // go routine 1: Remove(key) -----------------> Get(key) + // go routine 2: putBatch() + + go func() { + time.Sleep(time.Millisecond * 1) + ldb.PutBatch() + wg.Done() + }() + go func() { + _ = ldb.Remove(key) + + time.Sleep(time.Millisecond * 1) + + recoveredVal2, _ := ldb.Get(key) + if len(recoveredVal2) > 0 { + // the key-value was not removed + atomic.AddUint32(&numErr, 1) + } + + wg.Done() + }() + + wg.Wait() + + require.Zero(t, atomic.LoadUint32(&numErr), "iteration %d out of %d", i, numKeys) + } + + _ = ldb.Close() +} + +func TestSerialDB_PutRemovePutHas(t *testing.T) { + ldb := createSerialLevelDb(t, 100000, 1000000, 10) + + key := []byte("key") + value := []byte("value") + + _ = ldb.Put(key, value) + + // manually put the pair in storage + ldb.PutBatch() + time.Sleep(time.Second) + assert.Nil(t, ldb.Has(key)) // key was found + + // we now remove the key + _ = ldb.Remove(key) + + // manually delete the key from the storage + ldb.PutBatch() + time.Sleep(time.Second) + assert.NotNil(t, ldb.Has(key)) // missing key + + _ = ldb.Put(key, value) // put the key again + assert.Nil(t, ldb.Has(key)) // key was found + + _ = ldb.Close() +} diff --git a/leveldb/mapKeysRemovedHandler.go b/leveldb/mapKeysRemovedHandler.go new file mode 100644 index 00000000..cf03d247 --- /dev/null +++ b/leveldb/mapKeysRemovedHandler.go @@ -0,0 +1,52 @@ +package leveldb + +import ( + "sync" +) + +type mapKeysRemovedHandler struct { + mut sync.RWMutex + removedByBatch map[uint64]map[string]struct{} +} + +func newMapKeysRemovedHandler() *mapKeysRemovedHandler { + return &mapKeysRemovedHandler{ + removedByBatch: make(map[uint64]map[string]struct{}), + } +} + +// addRemovedKeys will add a map containing removed keys for a specified batch ID +func (handler *mapKeysRemovedHandler) addRemovedKeys(keys map[string]struct{}, batchID uint64) { + handler.mut.Lock() + defer handler.mut.Unlock() + + if len(keys) == 0 { + // no need to add an empty map here + return + } + + handler.removedByBatch[batchID] = keys +} + +// deleteRemovedKeys will delete the map containing removed keys for a specified batch ID +func (handler *mapKeysRemovedHandler) deleteRemovedKeys(batchID uint64) { + handler.mut.Lock() + defer handler.mut.Unlock() + + delete(handler.removedByBatch, batchID) +} + +// hasRemovedKeys will return true if the key is contained in any of the inner maps +func (handler *mapKeysRemovedHandler) hasRemovedKeys(key []byte) bool { + handler.mut.RLock() + defer handler.mut.RUnlock() + + for _, m := range handler.removedByBatch { + _, found := m[string(key)] + if found { + return true + } + } + + return false +}