Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up set index refs when referenced values are deleted. Fixes #80 #81

Merged
merged 1 commit into from
Jul 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions boltz/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,16 @@ func TestSetIndex_CheckIntegrity(t *testing.T) {
expectedMsg := "for index on employees.roleAttributes, val bash references id invalid, which doesn't exist"
test.requireIntegrityErrorAndFixResult(test.empStore, expectedMsg, true)

err = test.db.Update(func(tx *bbolt.Tx) error {
index := test.empStore.indexRoles.(*setIndex)
indexBucket := Path(tx, index.indexPath...)
return indexBucket.GetOrCreateBucket(string("bash")).GetError()
})
test.NoError(err)

expectedMsg = "for index on employees.roleAttributes, index value bash has no referenced values"
test.requireIntegrityErrorAndFixResult(test.empStore, expectedMsg, true)

err = test.db.Update(func(tx *bbolt.Tx) error {
index := test.empStore.indexRoles.(*setIndex)
indexBucket := Path(tx, index.indexPath...)
Expand Down
56 changes: 56 additions & 0 deletions boltz/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ type SetReadIndex interface {

type Constraint interface {
Checkable
Label() string
ProcessBeforeUpdate(ctx *IndexingContext)
ProcessAfterUpdate(ctx *IndexingContext)
ProcessBeforeDelete(ctx *IndexingContext)
Expand All @@ -281,6 +282,10 @@ type uniqueIndex struct {
indexPath []string
}

func (index *uniqueIndex) Label() string {
return fmt.Sprintf("unique index on %s.%s", index.symbol.GetStore().GetEntityType(), index.symbol.GetName())
}

func (index *uniqueIndex) Read(tx *bbolt.Tx, val []byte) []byte {
indexBucket := index.getIndexBucket(tx)
if indexBucket.Err != nil {
Expand Down Expand Up @@ -428,6 +433,10 @@ type setIndex struct {
listeners []SetChangeListener
}

func (index *setIndex) Label() string {
return fmt.Sprintf("set index on %s.%s", index.symbol.GetStore().GetEntityType(), index.symbol.GetName())
}

func (index *setIndex) AddListener(listener SetChangeListener) {
index.listeners = append(index.listeners, listener)
}
Expand Down Expand Up @@ -560,6 +569,9 @@ func (index *setIndex) ProcessBeforeDelete(ctx *IndexingContext) {
for _, val := range values {
indexBucket := index.getIndexBucket(ctx.Tx(), val.Value)
ctx.ErrHolder.SetError(indexBucket.DeleteListEntry(TypeString, ctx.RowId).Err)
if k, _ := indexBucket.Cursor().First(); k == nil {
ctx.ErrHolder.SetError(index.deleteIndexKey(ctx.Tx(), val.Value))
}
}
}
}
Expand Down Expand Up @@ -590,18 +602,24 @@ func (index *setIndex) deleteIndexKey(tx *bbolt.Tx, key []byte) error {
func (index *setIndex) CheckIntegrity(ctx MutateContext, fix bool, errorSink func(error, bool)) error {
tx := ctx.Tx()
if indexBaseBucket := Path(tx, index.indexPath...); indexBaseBucket != nil {
var toDelete []string
cursor := indexBaseBucket.Cursor()
for key, _ := cursor.First(); key != nil; key, _ = cursor.Next() {
hadRefs := false
if indexBucket := indexBaseBucket.Bucket.Bucket(key); indexBucket != nil {
idsCursor := indexBucket.Cursor()
referenceCount := 0
for val, _ := idsCursor.First(); val != nil; val, _ = idsCursor.Next() {
hadRefs = true
referenceCount++
_, id := GetTypeAndValue(val)
if !index.symbol.GetStore().IsEntityPresent(tx, string(id)) {
// entry has been deleted, remove
if fix {
if err := idsCursor.Delete(); err != nil {
return err
}
referenceCount--
}
errorSink(errors.Errorf("for index on %v.%v, val %v references id %v, which doesn't exist",
index.symbol.GetStore().GetEntityType(), index.GetSymbol().GetName(),
Expand All @@ -621,20 +639,36 @@ func (index *setIndex) CheckIntegrity(ctx MutateContext, fix bool, errorSink fun
if err := idsCursor.Delete(); err != nil {
return err
}
referenceCount--
}
errorSink(errors.Errorf("for index on %v.%v, val %v references id %v, which doesn't contain the value",
index.symbol.GetStore().GetEntityType(), index.GetSymbol().GetName(),
string(key), string(id)), fix)
}
}
}
if referenceCount == 0 {
if fix {
toDelete = append(toDelete, string(key))
}
if !hadRefs {
errorSink(errors.Errorf("for index on %s.%s, index value %s has no referenced values",
index.symbol.GetStore().GetEntityType(), index.GetSymbol().GetName(), string(key)), fix)
}
}
} else {
// If key has no values, delete the key
if err := cursor.Delete(); err != nil {
return err
}
}
}

for _, deleteKey := range toDelete {
if err := indexBaseBucket.DeleteBucket([]byte(deleteKey)); err != nil {
return errors.Wrapf(err, "error deleting unused key %s from index %s", deleteKey, index.Label())
}
}
}

for idsCursor := index.symbol.GetStore().IterateValidIds(tx, ast.BoolNodeTrue); idsCursor.IsValid(); idsCursor.Next() {
Expand Down Expand Up @@ -670,6 +704,12 @@ type fkIndex struct {
nullable bool
}

func (index *fkIndex) Label() string {
return fmt.Sprintf("fk index %s.%s -> %s.%s",
index.symbol.GetStore().GetEntityType(), index.symbol.GetName(),
index.fkSymbol.GetStore().GetEntityType(), index.fkSymbol.GetName())
}

func (index *fkIndex) ProcessBeforeUpdate(ctx *IndexingContext) {
if !ctx.ErrHolder.HasError() {
_, fieldValue := index.symbol.Eval(ctx.Tx(), ctx.RowId)
Expand Down Expand Up @@ -828,6 +868,12 @@ type fkDeleteConstraint struct {
fkSymbol EntitySymbol
}

func (index *fkDeleteConstraint) Label() string {
return fmt.Sprintf("fk delete contraint %s.%s -> %s.%s",
index.symbol.GetStore().GetEntityType(), index.symbol.GetName(),
index.fkSymbol.GetStore().GetEntityType(), index.fkSymbol.GetName())
}

func (index *fkDeleteConstraint) ProcessBeforeUpdate(_ *IndexingContext) {
}

Expand Down Expand Up @@ -870,6 +916,11 @@ type fkConstraint struct {
nullable bool
}

func (index *fkConstraint) Label() string {
return fmt.Sprintf("fk constraint %s.%s",
index.symbol.GetStore().GetEntityType(), index.symbol.GetName())
}

func (index *fkConstraint) ProcessBeforeUpdate(ctx *IndexingContext) {
if !ctx.ErrHolder.HasError() {
_, fieldValue := index.symbol.Eval(ctx.Tx(), ctx.RowId)
Expand Down Expand Up @@ -944,6 +995,11 @@ type fkDeleteCascadeConstraint struct {
cascadeType CascadeType
}

func (index *fkDeleteCascadeConstraint) Label() string {
return fmt.Sprintf("fk delete cascade index %s.%s",
index.symbol.GetStore().GetEntityType(), index.symbol.GetName())
}

func (index *fkDeleteCascadeConstraint) ProcessBeforeUpdate(*IndexingContext) {
}

Expand Down
7 changes: 6 additions & 1 deletion boltz/store_crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package boltz

import (
"github.com/google/uuid"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/errorz"
"github.com/openziti/foundation/v2/stringz"
"github.com/openziti/storage/ast"
Expand Down Expand Up @@ -545,12 +546,16 @@ func (*BaseStore[E]) IteratorMatchingAnyOf(readIndex SetReadIndex, values []stri
func (store *BaseStore[E]) CheckIntegrity(ctx MutateContext, fix bool, errorSink func(err error, fixed bool)) error {
for _, linkCollection := range store.links {
if err := linkCollection.CheckIntegrity(ctx, fix, errorSink); err != nil {
pfxlog.Logger().WithError(err).Infof("error checking link collection %s.%s -> %s.%s",
linkCollection.GetFieldSymbol().GetStore().GetEntityType(), linkCollection.GetFieldSymbol().GetName(),
linkCollection.GetLinkedSymbol().GetStore().GetEntityType(), linkCollection.GetLinkedSymbol().GetName())
return err
}
}
for _, constraint := range store.Indexer.constraints {
if err := constraint.CheckIntegrity(ctx, fix, errorSink); err != nil {
return err
pfxlog.Logger().WithError(err).Infof("error checking link constraint: %s", constraint.Label())
return errors.Wrapf(err, "error checking constraint: %s", constraint.Label())
}
}
return nil
Expand Down
6 changes: 6 additions & 0 deletions boltz/system_entity_constraint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package boltz

import (
"fmt"
"github.com/openziti/foundation/v2/errorz"
"github.com/pkg/errors"
"go.etcd.io/bbolt"
Expand All @@ -17,6 +18,11 @@ type systemEntityConstraint struct {
systemFlagSymbol EntitySymbol
}

func (index *systemEntityConstraint) Label() string {
return fmt.Sprintf("system entity constraint %s.%s",
index.systemFlagSymbol.GetStore().GetEntityType(), index.systemFlagSymbol.GetName())
}

func (self *systemEntityConstraint) checkOperation(operation string, ctx *IndexingContext) error {
t, val := self.systemFlagSymbol.Eval(ctx.Tx(), ctx.RowId)
isSystem := FieldToBool(t, val)
Expand Down
2 changes: 1 addition & 1 deletion version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.2
0.3
Loading