Skip to content

Commit

Permalink
Fix OSS sealunwrapper adding extra get + put request to all storage g…
Browse files Browse the repository at this point in the history
…et requests (#29050)

* fix OSS sealunwrapper adding extra get + put request to all storage requests

* Add changelog entry
  • Loading branch information
andybao-dd authored Nov 29, 2024
1 parent 9bf3d11 commit 4b456ff
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 49 deletions.
4 changes: 4 additions & 0 deletions changelog/29050.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
```release-note:bug
core: fix bug in seal unwrapper that caused high storage latency in Vault CE. For every storage read request, the
seal unwrapper was performing the read twice, and would also issue an unnecessary storage write.
```
7 changes: 7 additions & 0 deletions sdk/physical/inmem/inmem_ha.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ func (i *InmemHABackend) HAEnabled() bool {
return true
}

func (i *InmemHABackend) Underlying() *InmemBackend {
if txBackend, ok := i.Backend.(*TransactionalInmemBackend); ok {
return &txBackend.InmemBackend
}
return i.Backend.(*InmemBackend)
}

// InmemLock is an in-memory Lock implementation for the HABackend
type InmemLock struct {
in *InmemHABackend
Expand Down
70 changes: 38 additions & 32 deletions vault/sealunwrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ import (
// NewSealUnwrapper creates a new seal unwrapper
func NewSealUnwrapper(underlying physical.Backend, logger log.Logger) physical.Backend {
ret := &sealUnwrapper{
underlying: underlying,
logger: logger,
locks: locksutil.CreateLocks(),
allowUnwraps: new(uint32),
underlying: underlying,
logger: logger,
locks: locksutil.CreateLocks(),
}

if underTxn, ok := underlying.(physical.Transactional); ok {
Expand All @@ -43,7 +42,7 @@ type sealUnwrapper struct {
underlying physical.Backend
logger log.Logger
locks []*locksutil.LockEntry
allowUnwraps *uint32
allowUnwraps atomic.Bool
}

// transactionalSealUnwrapper is a seal unwrapper that wraps a physical that is transactional
Expand All @@ -63,63 +62,70 @@ func (d *sealUnwrapper) Put(ctx context.Context, entry *physical.Entry) error {
return d.underlying.Put(ctx, entry)
}

// unwrap gets an entry from underlying storage and tries to unwrap it. If the entry was not wrapped, return
// value unwrappedEntry will be nil. If the entry is wrapped and encrypted, an error is returned.
func (d *sealUnwrapper) unwrap(ctx context.Context, key string) (entry, unwrappedEntry *physical.Entry, err error) {
entry, err = d.underlying.Get(ctx, key)
// unwrap gets an entry from underlying storage and tries to unwrap it.
// - If the entry is not wrapped: the entry will be returned unchanged and wasWrapped will be false
// - If the entry is wrapped and encrypted: an error is returned.
// - If the entry is wrapped but not encrypted: the entry will be unwrapped and returned. wasWrapped will be true.
func (d *sealUnwrapper) unwrap(ctx context.Context, key string) (unwrappedEntry *physical.Entry, wasWrapped bool, err error) {
entry, err := d.underlying.Get(ctx, key)
if err != nil {
return nil, nil, err
return nil, false, err
}
if entry == nil {
return nil, nil, err
return nil, false, nil
}

wrappedEntryValue, unmarshaled := UnmarshalSealWrappedValueWithCanary(entry.Value)
switch {
case !unmarshaled:
unwrappedEntry = entry
// Entry is not wrapped
return entry, false, nil
case wrappedEntryValue.isEncrypted():
return nil, nil, fmt.Errorf("cannot decode sealwrapped storage entry %q", entry.Key)
// Entry is wrapped and encrypted
return nil, true, fmt.Errorf("cannot decode sealwrapped storage entry %q", entry.Key)
default:
// Entry is wrapped and not encrypted
pt, err := wrappedEntryValue.getPlaintextValue()
if err != nil {
return nil, nil, err
return nil, true, err
}
unwrappedEntry = &physical.Entry{
return &physical.Entry{
Key: entry.Key,
Value: pt,
}
}, true, nil
}

return entry, unwrappedEntry, nil
}

func (d *sealUnwrapper) Get(ctx context.Context, key string) (*physical.Entry, error) {
entry, unwrappedEntry, err := d.unwrap(ctx, key)
entry, wasWrapped, err := d.unwrap(ctx, key)
switch {
case err != nil:
case err != nil: // Failed to get entry
return nil, err
case entry == nil:
case entry == nil: // Entry doesn't exist
return nil, nil
case atomic.LoadUint32(d.allowUnwraps) != 1:
return unwrappedEntry, nil
case !wasWrapped || !d.allowUnwraps.Load(): // Entry was not wrapped or unwrapping not allowed
return entry, nil
}

// Entry was wrapped, we need to replace it with the unwrapped value

// Grab locks because we are performing a write
locksutil.LockForKey(d.locks, key).Lock()
defer locksutil.LockForKey(d.locks, key).Unlock()

// At this point we need to re-read and re-check
entry, unwrappedEntry, err = d.unwrap(ctx, key)
// Read entry again in case it was changed while we were waiting for the lock
entry, wasWrapped, err = d.unwrap(ctx, key)
switch {
case err != nil:
case err != nil: // Failed to get entry
return nil, err
case entry == nil:
case entry == nil: // Entry doesn't exist
return nil, nil
case atomic.LoadUint32(d.allowUnwraps) != 1:
return unwrappedEntry, nil
case !wasWrapped || !d.allowUnwraps.Load(): // Entry was not wrapped or unwrapping not allowed
return entry, nil
}

return unwrappedEntry, d.underlying.Put(ctx, unwrappedEntry)
// Write out the unwrapped value
return entry, d.underlying.Put(ctx, entry)
}

func (d *sealUnwrapper) Delete(ctx context.Context, key string) error {
Expand Down Expand Up @@ -155,12 +161,12 @@ func (d *transactionalSealUnwrapper) Transaction(ctx context.Context, txns []*ph
// This should only run during preSeal which ensures that it can't be run
// concurrently and that it will be run only by the active node
func (d *sealUnwrapper) stopUnwraps() {
atomic.StoreUint32(d.allowUnwraps, 0)
d.allowUnwraps.Store(false)
}

func (d *sealUnwrapper) runUnwraps() {
// Allow key unwraps on key gets. This gets set only when running on the
// active node to prevent standbys from changing data underneath the
// primary
atomic.StoreUint32(d.allowUnwraps, 1)
d.allowUnwraps.Store(true)
}
53 changes: 36 additions & 17 deletions vault/sealunwrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,32 +21,38 @@ import (
func TestSealUnwrapper(t *testing.T) {
logger := corehelpers.NewTestLogger(t)

// Test without transactions
phys, err := inmem.NewInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
}
performTestSealUnwrapper(t, phys, logger)
// Test with both cache enabled and disabled
for _, disableCache := range []bool{true, false} {
// Test without transactions
phys, err := inmem.NewInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
}
performTestSealUnwrapper(t, phys, logger, disableCache)

// Test with transactions
tPhys, err := inmem.NewTransactionalInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
// Test with transactions
tPhys, err := inmem.NewTransactionalInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
}
performTestSealUnwrapper(t, tPhys, logger, disableCache)
}
performTestSealUnwrapper(t, tPhys, logger)
}

func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Logger) {
func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Logger, disableCache bool) {
ctx := context.Background()
base := &CoreConfig{
Physical: phys,
Physical: phys,
DisableCache: disableCache,
}
cluster := NewTestCluster(t, base, &TestClusterOptions{
Logger: logger,
})
cluster.Start()
defer cluster.Cleanup()

physImem := phys.(interface{ Underlying() *inmem.InmemBackend }).Underlying()

// Read a value and then save it back in a proto message
entry, err := phys.Get(ctx, "core/master")
if err != nil {
Expand Down Expand Up @@ -78,7 +84,15 @@ func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Lo
// successfully decode it, but be able to unmarshal it when read back from
// the underlying physical store. When we read from active, it should both
// successfully decode it and persist it back.
checkValue := func(core *Core, wrapped bool) {
checkValue := func(core *Core, wrapped bool, ro bool) {
if ro {
physImem.FailPut(true)
physImem.FailDelete(true)
defer func() {
physImem.FailPut(false)
physImem.FailDelete(false)
}()
}
entry, err := core.physical.Get(ctx, "core/master")
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -106,7 +120,12 @@ func performTestSealUnwrapper(t *testing.T, phys physical.Backend, logger log.Lo
}

TestWaitActive(t, cluster.Cores[0].Core)
checkValue(cluster.Cores[2].Core, true)
checkValue(cluster.Cores[1].Core, true)
checkValue(cluster.Cores[0].Core, false)
checkValue(cluster.Cores[2].Core, true, true)
checkValue(cluster.Cores[1].Core, true, true)
checkValue(cluster.Cores[0].Core, false, false)

// The storage entry should now be unwrapped, so there should be no more writes to storage when we read it
checkValue(cluster.Cores[2].Core, false, true)
checkValue(cluster.Cores[1].Core, false, true)
checkValue(cluster.Cores[0].Core, false, true)
}

0 comments on commit 4b456ff

Please sign in to comment.