Skip to content

Commit

Permalink
add ErrDatabase (#25)
Browse files Browse the repository at this point in the history
  • Loading branch information
gerardsn authored Sep 30, 2022
1 parent 1419b53 commit b439144
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 35 deletions.
34 changes: 24 additions & 10 deletions bbolt/bbolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func CreateBBoltStore(filePath string, opts ...stoabs.Option) (stoabs.KVStore, e
func createBBoltStore(filePath string, options *bbolt.Options, cfg stoabs.Config) (stoabs.KVStore, error) {
err := os.MkdirAll(path.Dir(filePath), os.ModePerm) // TODO: Right permissions?
if err != nil {
return nil, err
return nil, stoabs.DatabaseError(err)
}

done := make(chan bool, 1)
Expand All @@ -80,7 +80,7 @@ func createBBoltStore(filePath string, options *bbolt.Options, cfg stoabs.Config
db, err := bbolt.Open(filePath, os.FileMode(0640), options) // TODO: Right permissions?
done <- true
if err != nil {
return nil, err
return nil, stoabs.DatabaseError(err)
}

return Wrap(db, cfg), nil
Expand All @@ -104,9 +104,13 @@ type store struct {
}

func (b *store) Close(ctx context.Context) error {
return util.CallWithTimeout(ctx, b.db.Close, func() {
err := util.CallWithTimeout(ctx, b.db.Close, func() {
b.log.Error("Closing of BBolt store timed out, store may not shut down correctly.")
})
if err != nil {
return stoabs.DatabaseError(err)
}
return nil
}

func (b *store) Write(ctx context.Context, fn func(stoabs.WriteTx) error, opts ...stoabs.TxOption) error {
Expand Down Expand Up @@ -160,7 +164,10 @@ func (b *store) doTX(ctx context.Context, fn func(tx *bbolt.Tx) error, writable
dbTX, err := b.db.Begin(writable)
if err != nil {
unlock()
return err
if err == bbolt.ErrDatabaseNotOpen {
return stoabs.ErrStoreIsClosed
}
return stoabs.DatabaseError(err)
}

// Perform TX action(s)
Expand Down Expand Up @@ -224,7 +231,7 @@ func (b bboltTx) GetShelfReader(shelfName string) stoabs.Reader {
func (b bboltTx) GetShelfWriter(shelfName string) (stoabs.Writer, error) {
bucket, err := b.tx.CreateBucketIfNotExists([]byte(shelfName))
if err != nil {
return nil, err
return nil, stoabs.DatabaseError(err)
}
return &bboltShelf{bucket: bucket, ctx: b.ctx}, nil
}
Expand Down Expand Up @@ -257,11 +264,17 @@ func (t bboltShelf) Get(key stoabs.Key) ([]byte, error) {
}

func (t bboltShelf) Put(key stoabs.Key, value []byte) error {
return t.bucket.Put(key.Bytes(), value)
if err := t.bucket.Put(key.Bytes(), value); err != nil {
return stoabs.DatabaseError(err)
}
return nil
}

func (t bboltShelf) Delete(key stoabs.Key) error {
return t.bucket.Delete(key.Bytes())
if err := t.bucket.Delete(key.Bytes()); err != nil {
return stoabs.DatabaseError(err)
}
return nil
}

func (t bboltShelf) Stats() stoabs.ShelfStats {
Expand All @@ -276,13 +289,14 @@ func (t bboltShelf) Iterate(callback stoabs.CallerFn, keyType stoabs.Key) error
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
// Potentially long-running operation, check context for cancellation
if t.ctx.Err() != nil {
return t.ctx.Err()
return stoabs.DatabaseError(t.ctx.Err())
}
// return a copy to avoid data manipulation
vCopy := append(v[:0:0], v...)
key, err := keyType.FromBytes(k)
if err != nil {
return nil
// should never happen
return err
}
if err := callback(key, vCopy); err != nil {
return err
Expand All @@ -297,7 +311,7 @@ func (t bboltShelf) Range(from stoabs.Key, to stoabs.Key, callback stoabs.Caller
for k, v := cursor.Seek(from.Bytes()); k != nil && bytes.Compare(k, to.Bytes()) < 0; k, v = cursor.Next() {
// Potentially long-running operation, check context for cancellation
if t.ctx.Err() != nil {
return t.ctx.Err()
return stoabs.DatabaseError(t.ctx.Err())
}
key, err := from.FromBytes(k)
if err != nil {
Expand Down
25 changes: 19 additions & 6 deletions kvtests/common_tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,27 +274,33 @@ func TestRange(t *testing.T, storeProvider StoreProvider) {
})

assert.EqualError(t, err, "failure")
assert.NotErrorIs(t, err, stoabs.ErrDatabase{})
})

t.Run("TX context cancelled", func(t *testing.T) {
store := createStore(t, storeProvider)

// Write some data
_ = store.WriteShelf(ctx, shelf, func(writer stoabs.Writer) error {
return writer.Put(bytesKey, bytesValue)
for _, e := range input {
_ = writer.Put(e.key, e.value)
}
return nil
})

// Cancel read context
ctx, cancel := context.WithCancel(ctx)
cancel()

err := store.ReadShelf(ctx, shelf, func(reader stoabs.Reader) error {
return reader.Range(bytesKey, largerBytesKey, func(key stoabs.Key, value []byte) error {
// cancel within Range to make sure the context cancellation is caught during Range()
cancel()
return nil
}, false)
})

assert.ErrorIs(t, err, context.Canceled)
assert.ErrorIs(t, err, stoabs.ErrDatabase{})
})
})
}
Expand Down Expand Up @@ -410,6 +416,7 @@ func TestIterate(t *testing.T, storeProvider StoreProvider) {
return err
})
assert.EqualError(t, err, "failure")
assert.NotErrorIs(t, err, stoabs.ErrDatabase{})
})

t.Run("TX context cancelled", func(t *testing.T) {
Expand All @@ -422,6 +429,7 @@ func TestIterate(t *testing.T, storeProvider StoreProvider) {

// Cancel read context
ctx, cancel := context.WithCancel(ctx)
// TODO: move cancellation inside iterator. Currently returns before iterator is reached
cancel()

err := store.ReadShelf(ctx, shelf, func(reader stoabs.Reader) error {
Expand All @@ -431,6 +439,7 @@ func TestIterate(t *testing.T, storeProvider StoreProvider) {
})

assert.ErrorIs(t, err, context.Canceled)
assert.ErrorIs(t, err, stoabs.ErrDatabase{})
})
})
}
Expand Down Expand Up @@ -485,6 +494,7 @@ func TestWriteTransactions(t *testing.T, storeProvider StoreProvider) {
return errors.New("failure")
})
assert.Error(t, err)
assert.NotErrorIs(t, err, stoabs.ErrDatabase{}) // user error
assert.Equal(t, bytesValue, actualValue)

// Assert that the first key can be read, but the second and third keys not
Expand Down Expand Up @@ -580,10 +590,12 @@ func TestWriteTransactions(t *testing.T, storeProvider StoreProvider) {
cancel()

// Transaction should now have been aborted because context was cancelled
assert.ErrorIs(t, <-errs, context.Canceled)
err := <-errs
assert.ErrorIs(t, err, context.Canceled)
assert.ErrorIs(t, err, stoabs.ErrDatabase{})
// Assert value hasn't been ommitted
var actual []byte
err := store.ReadShelf(context.Background(), shelf, func(reader stoabs.Reader) error {
err = store.ReadShelf(context.Background(), shelf, func(reader stoabs.Reader) error {
var err error
actual, err = reader.Get(bytesKey)
return err
Expand Down Expand Up @@ -650,6 +662,7 @@ func TestTransactionWriteLock(t *testing.T, storeProvider StoreProvider) {
}, stoabs.WithWriteLock())

assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.ErrorIs(t, err, stoabs.ErrDatabase{})
})
})
}
Expand Down Expand Up @@ -705,14 +718,14 @@ func TestClose(t *testing.T, storeProvider StoreProvider) {
err := store.WriteShelf(ctx, shelf, func(writer stoabs.Writer) error {
return writer.Put(bytesKey, bytesValue)
})
assert.Equal(t, err, stoabs.ErrStoreIsClosed)
assert.Equal(t, stoabs.ErrStoreIsClosed, err)
})
t.Run("timeout", func(t *testing.T) {
store := createStore(t, storeProvider)
ctx, cancel := context.WithCancel(context.Background())
cancel()
err := store.Close(ctx)
assert.Equal(t, err, context.Canceled)
assert.Equal(t, stoabs.DatabaseError(context.Canceled), err)
})
})
}
Expand Down
35 changes: 23 additions & 12 deletions redis7/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func Wrap(prefix string, client *redis.Client, opts ...stoabs.Option) (stoabs.KV
time.Sleep(PingAttemptBackoff)
}
if err != nil {
return nil, fmt.Errorf("unable to connect to Redis database: %w", err)
return nil, fmt.Errorf("unable to connect to Redis database: %w", stoabs.DatabaseError(err))
}
result.log.Debug("Connection check successful")

Expand Down Expand Up @@ -122,7 +122,10 @@ func (s *store) Close(ctx context.Context) error {
s.log.Error("Closing of Redis client timed out")
})
s.client = nil
return err
if err != nil {
return stoabs.DatabaseError(err)
}
return nil
}

func (s *store) Write(ctx context.Context, fn func(stoabs.WriteTx) error, opts ...stoabs.TxOption) error {
Expand Down Expand Up @@ -199,7 +202,7 @@ func (s *store) doTX(ctx context.Context, fn func(ctx context.Context, tx redis.
txMutex = s.rs.NewMutex(lockName, redsync.WithExpiry(lockExpiry))
err := txMutex.LockContext(lockCtx)
if err != nil {
return fmt.Errorf("unable to obtain Redis transaction-level write lock: %w", err)
return fmt.Errorf("unable to obtain Redis transaction-level write lock: %w", stoabs.DatabaseError(err))
}
unlock = func() {
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
Expand Down Expand Up @@ -241,7 +244,7 @@ func (s *store) doTX(ctx context.Context, fn func(ctx context.Context, tx redis.
s.log.Error("Unable to commit Redis transaction, transaction timed out.")
unlock()
stoabs.OnRollbackOption{}.Invoke(opts)
return ctx.Err()
return stoabs.DatabaseError(ctx.Err())
}

// Everything looks OK, commit
Expand All @@ -253,7 +256,7 @@ func (s *store) doTX(ctx context.Context, fn func(ctx context.Context, tx redis.
}
unlock()
stoabs.OnRollbackOption{}.Invoke(opts)
return stoabs.ErrCommitFailed
return util.WrapError(stoabs.ErrCommitFailed, err)
}

// Success
Expand Down Expand Up @@ -310,19 +313,25 @@ func (s shelf) Put(key stoabs.Key, value []byte) error {
if err := s.store.checkOpen(); err != nil {
return err
}
return s.writer.Set(s.ctx, s.toRedisKey(key), value, 0).Err()
if err := s.writer.Set(s.ctx, s.toRedisKey(key), value, 0).Err(); err != nil {
return stoabs.DatabaseError(err)
}
return nil
}

func (s shelf) Delete(key stoabs.Key) error {
return s.writer.Del(s.ctx, s.toRedisKey(key)).Err()
if err := s.writer.Del(s.ctx, s.toRedisKey(key)).Err(); err != nil {
return stoabs.DatabaseError(err)
}
return nil
}

func (s shelf) Get(key stoabs.Key) ([]byte, error) {
result, err := s.reader.Get(s.ctx, s.toRedisKey(key)).Result()
if err == redis.Nil {
return nil, nil
} else if err != nil {
return nil, err
return nil, stoabs.DatabaseError(err)
}
return []byte(result), nil
}
Expand All @@ -335,7 +344,7 @@ func (s shelf) Iterate(callback stoabs.CallerFn, keyType stoabs.Key) error {
scanCmd := s.reader.Scan(s.ctx, cursor, s.toRedisKey(stoabs.BytesKey(""))+"*", int64(resultCount))
keys, cursor, err = scanCmd.Result()
if err != nil {
return err
return stoabs.DatabaseError(err)
}
if len(keys) > 0 {
_, err := s.visitKeys(keys, callback, keyType, false)
Expand All @@ -356,8 +365,9 @@ func (s shelf) Range(from stoabs.Key, to stoabs.Key, callback stoabs.CallerFn, s
// Iterate from..to (start inclusive, end exclusive)
var numKeys = 0
for curr := from; !curr.Equals(to); curr = curr.Next() {
// Potentially long-running operation, check context for cancellation
if s.ctx.Err() != nil {
return s.ctx.Err()
return stoabs.DatabaseError(s.ctx.Err())
}
if curr.Equals(to) {
// Reached end (exclusive)
Expand All @@ -383,13 +393,13 @@ func (s shelf) Range(from stoabs.Key, to stoabs.Key, callback stoabs.CallerFn, s
// visitKeys retrieves the values of the given keys and invokes the callback with each key and value.
// It returns a bool indicating whether subsequent calls to visitKeys (with larger keys) should be attempted.
// Behavior when encountering a non-existing key depends on stopAtNil:
// - If stopAtNil is true, it stops processing keys and returns false (no futher calls to visitKeys should be made).
// - If stopAtNil is true, it stops processing keys and returns false (no further calls to visitKeys should be made).
// - If stopAtNil is false, it proceeds with the next key.
// If an error occurs it also returns false.
func (s shelf) visitKeys(keys []string, callback stoabs.CallerFn, keyType stoabs.Key, stopAtNil bool) (bool, error) {
values, err := s.reader.MGet(s.ctx, keys...).Result()
if err != nil {
return false, err
return false, stoabs.DatabaseError(err)
}
for i, value := range values {
if values[i] == nil {
Expand Down Expand Up @@ -428,6 +438,7 @@ func (s shelf) toRedisKey(key stoabs.Key) string {
}

func (s shelf) fromRedisKey(key string, keyType stoabs.Key) (stoabs.Key, error) {
// returned errors are the result of invalid input, hence not wrapped in ErrDatabase
if len(s.prefix) > 0 {
dbPrefix := s.prefix + ":"
if !strings.HasPrefix(key, dbPrefix) {
Expand Down
Loading

0 comments on commit b439144

Please sign in to comment.