Skip to content

Commit

Permalink
remove network migration and optimize network event retry (#3510)
Browse files Browse the repository at this point in the history
* remove network migration and optimize network event retry

* more cleanup
  • Loading branch information
woutslakhorst authored Oct 22, 2024
1 parent bfe1d26 commit 08ad0e5
Show file tree
Hide file tree
Showing 10 changed files with 13 additions and 262 deletions.
91 changes: 0 additions & 91 deletions network/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,69 +96,6 @@ func newDAG(db stoabs.KVStore) *dag {
return &dag{db: db}
}

func (d *dag) Migrate() error {
return d.db.Write(context.Background(), func(tx stoabs.WriteTx) error {
writer := tx.GetShelfWriter(metadataShelf)
// Migrate highest LC value
// Todo: remove after V5 release
_, err := writer.Get(stoabs.BytesKey(highestClockValue))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Highest LC value not stored, migrating...")
highestLC := d.getHighestClockLegacy(tx)
err = d.setHighestClockValue(tx, highestLC)
}
if err != nil {
return err
}

// Migrate number of TXs
// Todo: remove after V5 release
_, err = writer.Get(stoabs.BytesKey(numberOfTransactionsKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Number of transactions not stored, migrating...")
numberOfTXs := d.getNumberOfTransactionsLegacy(tx)
err = d.setNumberOfTransactions(tx, numberOfTXs)
}
if err != nil {
return err
}

// Migrate headsLegacy to single head
// Todo: remove after V6 release => then remove headsShelf
_, err = writer.Get(stoabs.BytesKey(headRefKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Head not stored in metadata, migrating...")
heads := d.headsLegacy(tx)
err = nil // reset error
if len(heads) != 0 { // ignore for empty node
var latestHead hash.SHA256Hash
var latestLC uint32

for _, ref := range heads {
transaction, err := getTransaction(ref, tx)
if err != nil {
if errors.Is(err, ErrTransactionNotFound) {
return fmt.Errorf("database migration failed: %w (%s=%s)", err, core.LogFieldTransactionRef, ref)
}
return err
}
if transaction.Clock() >= latestLC {
latestHead = ref
latestLC = transaction.Clock()
}
}

err = d.setHead(tx, latestHead)
}
}
if err != nil {
return err
}

return nil
})
}

func (d *dag) diagnostics(ctx context.Context) []core.DiagnosticResult {
var stats Statistics
_ = d.db.Read(ctx, func(tx stoabs.ReadTx) error {
Expand Down Expand Up @@ -296,27 +233,6 @@ func (d dag) getHighestClockValue(tx stoabs.ReadTx) uint32 {
return bytesToClock(value)
}

// getHighestClockLegacy is used for migration.
// Remove after V5 or V6 release?
func (d dag) getHighestClockLegacy(tx stoabs.ReadTx) uint32 {
reader := tx.GetShelfReader(clockShelf)
var clock uint32
err := reader.Iterate(func(key stoabs.Key, _ []byte) error {
currentClock := uint32(key.(stoabs.Uint32Key))
if currentClock > clock {
clock = currentClock
}
return nil
}, stoabs.Uint32Key(0))
if err != nil {
log.Logger().
WithError(err).
Error("Failed to read clock shelf")
return 0
}
return clock
}

func (d dag) getHead(tx stoabs.ReadTx) (hash.SHA256Hash, error) {
head, err := tx.GetShelfReader(metadataShelf).Get(stoabs.BytesKey(headRefKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
Expand All @@ -329,13 +245,6 @@ func (d dag) getHead(tx stoabs.ReadTx) (hash.SHA256Hash, error) {
return hash.FromSlice(head), nil
}

// getNumberOfTransactionsLegacy is used for migration.
// Remove after V5 or V6 release?
func (d dag) getNumberOfTransactionsLegacy(tx stoabs.ReadTx) uint64 {
reader := tx.GetShelfReader(transactionsShelf)
return uint64(reader.Stats().NumEntries)
}

func (d dag) setHighestClockValue(tx stoabs.WriteTx, count uint32) error {
writer := tx.GetShelfWriter(metadataShelf)
bytes := make([]byte, 4)
Expand Down
98 changes: 0 additions & 98 deletions network/dag/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,104 +107,6 @@ func TestDAG_Get(t *testing.T) {
})
}

func TestDAG_Migrate(t *testing.T) {
ctx := context.Background()
txRoot := CreateTestTransactionWithJWK(0)
tx1 := CreateTestTransactionWithJWK(1, txRoot)
tx2 := CreateTestTransactionWithJWK(2, tx1)

t.Run("migrate LC value and transaction count to metadata storage", func(t *testing.T) {
graph := CreateDAG(t)

// Setup: add transactions, remove metadata
addTx(t, graph, txRoot, tx1, tx2)
err := graph.db.WriteShelf(ctx, metadataShelf, func(writer stoabs.Writer) error {
return writer.Iterate(func(key stoabs.Key, _ []byte) error {
return writer.Delete(key)
}, stoabs.BytesKey{})
})
require.NoError(t, err)

// Check values return 0
var stats Statistics
var lc uint32
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
stats = graph.statistics(tx)
lc = graph.getHighestClockValue(tx)
return nil
})
assert.Equal(t, uint(0), stats.NumberOfTransactions)
assert.Equal(t, uint32(0), lc)

// Migrate
err = graph.Migrate()
require.NoError(t, err)

// Assert
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
stats = graph.statistics(tx)
lc = graph.getHighestClockValue(tx)
return nil
})
assert.Equal(t, uint(3), stats.NumberOfTransactions)
assert.Equal(t, tx2.Clock(), lc)
})
t.Run("migrate head to metadata storage", func(t *testing.T) {
graph := CreateDAG(t)

// Setup: add transactions, remove metadata, add to headsShelf
addTx(t, graph, txRoot, tx1, tx2)
err := graph.db.WriteShelf(ctx, metadataShelf, func(writer stoabs.Writer) error {
return writer.Iterate(func(key stoabs.Key, _ []byte) error {
return writer.Delete(key)
}, stoabs.BytesKey{})
})
require.NoError(t, err)
err = graph.db.WriteShelf(ctx, headsShelf, func(writer stoabs.Writer) error {
_ = writer.Put(stoabs.BytesKey(txRoot.Ref().Slice()), []byte{1})
_ = writer.Put(stoabs.BytesKey(tx2.Ref().Slice()), []byte{1})
return writer.Put(stoabs.BytesKey(tx1.Ref().Slice()), []byte{1})
})
require.NoError(t, err)

// Check current head is nil
var head hash.SHA256Hash
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
head, _ = graph.getHead(tx)
return nil
})
assert.Equal(t, hash.EmptyHash(), head)

// Migrate
err = graph.Migrate()
require.NoError(t, err)

// Assert
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
head, _ = graph.getHead(tx)
return nil
})
assert.Equal(t, tx2.Ref(), head)
})
t.Run("nothing to migrate", func(t *testing.T) {
graph := CreateDAG(t)
addTx(t, graph, txRoot, tx1, tx2)

err := graph.Migrate()
require.NoError(t, err)

stats := Statistics{}
var lc uint32
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
stats = graph.statistics(tx)
lc = graph.getHighestClockValue(tx)
return nil
})
assert.Equal(t, uint(3), stats.NumberOfTransactions)
assert.Equal(t, tx2.Clock(), lc)
})
}

func TestDAG_Add(t *testing.T) {
ctx := context.Background()
t.Run("ok", func(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions network/dag/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (p *notifier) Run() error {
}
// we're going to retry all events synchronously at startup. For the ones that fail we'll start the retry loop
failedAtStartup := make([]Event, 0)
readyToRetry := make([]Event, 0)
err := p.db.ReadShelf(p.ctx, p.shelfName(), func(reader stoabs.Reader) error {
return reader.Iterate(func(k stoabs.Key, v []byte) error {
event := Event{}
Expand All @@ -253,19 +254,23 @@ func (p *notifier) Run() error {
return nil
}

if err := p.notifyNow(event); err != nil {
if event.Retries < maxRetries {
failedAtStartup = append(failedAtStartup, event)
}
}

readyToRetry = append(readyToRetry, event)
return nil
}, stoabs.BytesKey{})
})
if err != nil {
return err
}

// do outside of main loop to prevent long running read
for _, event := range readyToRetry {
if err := p.notifyNow(event); err != nil {
if event.Retries < maxRetries {
failedAtStartup = append(failedAtStartup, event)
}
}
}

// for all events from failedAtStartup, call retry
// this may still produce errors in the logs or even duplicate errors since notifyNow also failed
// but rather duplicate errors then errors produced from overloading the DB with transactions
Expand Down
2 changes: 1 addition & 1 deletion network/dag/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type state struct {
}

func (s *state) Migrate() error {
return s.graph.Migrate()
return nil
}

// NewState returns a new State. The State is used as entry point, it's methods will start transactions and will notify observers from within those transactions.
Expand Down
2 changes: 0 additions & 2 deletions network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ type Transactions interface {
Subscribe(name string, receiver dag.ReceiverFn, filters ...SubscriberOption) error
// Subscribers returns the list of notifiers on the DAG that emit events to subscribers.
Subscribers() []dag.Notifier
// CleanupSubscriberEvents removes events. Example use is cleaning up events that errored but should be removed due to a bugfix.
CleanupSubscriberEvents(subcriberName, errorPrefix string) error
// GetTransactionPayload retrieves the transaction Payload for the given transaction.
// If the transaction or Payload is not found, dag.ErrPayloadNotFound is returned.
GetTransactionPayload(transactionRef hash.SHA256Hash) ([]byte, error)
Expand Down
14 changes: 0 additions & 14 deletions network/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions vcr/ambassador.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ func (n ambassador) Start() error {
return fmt.Errorf("failed to subscribe to REPROCESS event stream: %v", err)
}

// removing failed events required for #1743
// remove after v6 release
return n.networkClient.CleanupSubscriberEvents("vcr_vcs", "canonicalization failed: unable to normalize the json-ld document: loading remote context failed: Dereferencing a URL did not result in a valid JSON-LD context")
return nil
}

func (n ambassador) handleNetworkVCs(event dag.Event) (bool, error) {
Expand Down
1 change: 0 additions & 1 deletion vcr/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func newMockContext(t *testing.T) mockContext {
tx.EXPECT().WithPersistency().AnyTimes()
tx.EXPECT().Subscribe("vcr_vcs", gomock.Any(), gomock.Any())
tx.EXPECT().Subscribe("vcr_revocations", gomock.Any(), gomock.Any())
tx.EXPECT().CleanupSubscriberEvents("vcr_vcs", gomock.Any())
tx.EXPECT().Disabled().AnyTimes()
didResolver := resolver.NewMockDIDResolver(ctrl)
documentOwner := didsubject.NewMockDocumentOwner(ctrl)
Expand Down
10 changes: 0 additions & 10 deletions vdr/vdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,6 @@ func (r *Module) Start() error {
return err
}

// VDR migration needs to be started after ambassador has started!
count, err := r.store.DocumentCount()
if err != nil {
return err
}
if count == 0 {
// remove after v6 release
_, err = r.network.Reprocess(context.Background(), "application/did+json")
}

// start DID Document rollback loop
r.routines.Add(1)
go func() {
Expand Down
36 changes: 0 additions & 36 deletions vdr/vdr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
"github.com/lestrrat-go/jwx/v2/jwk"
ssi "github.com/nuts-foundation/go-did"
"github.com/nuts-foundation/go-did/did"
Expand Down Expand Up @@ -107,41 +106,6 @@ func TestNewVDR(t *testing.T) {
assert.IsType(t, &Module{}, vdr)
}

func TestVDR_Start(t *testing.T) {
t.Run("migration", func(t *testing.T) {
t.Run("migrate on 0 document count", func(t *testing.T) {
ctx := newVDRTestCtx(t)
ctx.mockAmbassador.EXPECT().Start()
ctx.mockStore.EXPECT().DocumentCount().Return(uint(0), nil)
ctx.mockNetwork.EXPECT().Reprocess(context.Background(), "application/did+json").Return(nil, nil)

err := ctx.vdr.Start()

require.NoError(t, err)
})
t.Run("don't migrate on > 0 document count", func(t *testing.T) {
ctx := newVDRTestCtx(t)
ctx.mockAmbassador.EXPECT().Start()
ctx.mockStore.EXPECT().DocumentCount().Return(uint(1), nil)

err := ctx.vdr.Start()

require.NoError(t, err)
})
t.Run("error on migration error", func(t *testing.T) {
ctx := newVDRTestCtx(t)
ctx.mockAmbassador.EXPECT().Start()
testError := errors.New("test")
ctx.mockStore.EXPECT().DocumentCount().Return(uint(0), testError)

err := ctx.vdr.Start()

assert.Equal(t, testError, err)
})
})

}

func TestVDR_ConflictingDocuments(t *testing.T) {

t.Run("diagnostics", func(t *testing.T) {
Expand Down

0 comments on commit 08ad0e5

Please sign in to comment.