From 08ad0e587cbb4375e7b55ce27a76609510ada0d0 Mon Sep 17 00:00:00 2001 From: Wout Slakhorst Date: Tue, 22 Oct 2024 15:09:04 +0200 Subject: [PATCH] remove network migration and optimize network event retry (#3510) * remove network migration and optimize network event retry * more cleanup --- network/dag/dag.go | 91 -------------------------------------- network/dag/dag_test.go | 98 ----------------------------------------- network/dag/notifier.go | 17 ++++--- network/dag/state.go | 2 +- network/interface.go | 2 - network/mock.go | 14 ------ vcr/ambassador.go | 4 +- vcr/test.go | 1 - vdr/vdr.go | 10 ----- vdr/vdr_test.go | 36 --------------- 10 files changed, 13 insertions(+), 262 deletions(-) diff --git a/network/dag/dag.go b/network/dag/dag.go index 05ad9e77b5..a8e6260019 100644 --- a/network/dag/dag.go +++ b/network/dag/dag.go @@ -96,69 +96,6 @@ func newDAG(db stoabs.KVStore) *dag { return &dag{db: db} } -func (d *dag) Migrate() error { - return d.db.Write(context.Background(), func(tx stoabs.WriteTx) error { - writer := tx.GetShelfWriter(metadataShelf) - // Migrate highest LC value - // Todo: remove after V5 release - _, err := writer.Get(stoabs.BytesKey(highestClockValue)) - if errors.Is(err, stoabs.ErrKeyNotFound) { - log.Logger().Info("Highest LC value not stored, migrating...") - highestLC := d.getHighestClockLegacy(tx) - err = d.setHighestClockValue(tx, highestLC) - } - if err != nil { - return err - } - - // Migrate number of TXs - // Todo: remove after V5 release - _, err = writer.Get(stoabs.BytesKey(numberOfTransactionsKey)) - if errors.Is(err, stoabs.ErrKeyNotFound) { - log.Logger().Info("Number of transactions not stored, migrating...") - numberOfTXs := d.getNumberOfTransactionsLegacy(tx) - err = d.setNumberOfTransactions(tx, numberOfTXs) - } - if err != nil { - return err - } - - // Migrate headsLegacy to single head - // Todo: remove after V6 release => then remove headsShelf - _, err = writer.Get(stoabs.BytesKey(headRefKey)) - if errors.Is(err, stoabs.ErrKeyNotFound) { - log.Logger().Info("Head not stored in metadata, migrating...") - heads := d.headsLegacy(tx) - err = nil // reset error - if len(heads) != 0 { // ignore for empty node - var latestHead hash.SHA256Hash - var latestLC uint32 - - for _, ref := range heads { - transaction, err := getTransaction(ref, tx) - if err != nil { - if errors.Is(err, ErrTransactionNotFound) { - return fmt.Errorf("database migration failed: %w (%s=%s)", err, core.LogFieldTransactionRef, ref) - } - return err - } - if transaction.Clock() >= latestLC { - latestHead = ref - latestLC = transaction.Clock() - } - } - - err = d.setHead(tx, latestHead) - } - } - if err != nil { - return err - } - - return nil - }) -} - func (d *dag) diagnostics(ctx context.Context) []core.DiagnosticResult { var stats Statistics _ = d.db.Read(ctx, func(tx stoabs.ReadTx) error { @@ -296,27 +233,6 @@ func (d dag) getHighestClockValue(tx stoabs.ReadTx) uint32 { return bytesToClock(value) } -// getHighestClockLegacy is used for migration. -// Remove after V5 or V6 release? -func (d dag) getHighestClockLegacy(tx stoabs.ReadTx) uint32 { - reader := tx.GetShelfReader(clockShelf) - var clock uint32 - err := reader.Iterate(func(key stoabs.Key, _ []byte) error { - currentClock := uint32(key.(stoabs.Uint32Key)) - if currentClock > clock { - clock = currentClock - } - return nil - }, stoabs.Uint32Key(0)) - if err != nil { - log.Logger(). - WithError(err). - Error("Failed to read clock shelf") - return 0 - } - return clock -} - func (d dag) getHead(tx stoabs.ReadTx) (hash.SHA256Hash, error) { head, err := tx.GetShelfReader(metadataShelf).Get(stoabs.BytesKey(headRefKey)) if errors.Is(err, stoabs.ErrKeyNotFound) { @@ -329,13 +245,6 @@ func (d dag) getHead(tx stoabs.ReadTx) (hash.SHA256Hash, error) { return hash.FromSlice(head), nil } -// getNumberOfTransactionsLegacy is used for migration. -// Remove after V5 or V6 release? -func (d dag) getNumberOfTransactionsLegacy(tx stoabs.ReadTx) uint64 { - reader := tx.GetShelfReader(transactionsShelf) - return uint64(reader.Stats().NumEntries) -} - func (d dag) setHighestClockValue(tx stoabs.WriteTx, count uint32) error { writer := tx.GetShelfWriter(metadataShelf) bytes := make([]byte, 4) diff --git a/network/dag/dag_test.go b/network/dag/dag_test.go index 935f85691c..4b072621fe 100644 --- a/network/dag/dag_test.go +++ b/network/dag/dag_test.go @@ -107,104 +107,6 @@ func TestDAG_Get(t *testing.T) { }) } -func TestDAG_Migrate(t *testing.T) { - ctx := context.Background() - txRoot := CreateTestTransactionWithJWK(0) - tx1 := CreateTestTransactionWithJWK(1, txRoot) - tx2 := CreateTestTransactionWithJWK(2, tx1) - - t.Run("migrate LC value and transaction count to metadata storage", func(t *testing.T) { - graph := CreateDAG(t) - - // Setup: add transactions, remove metadata - addTx(t, graph, txRoot, tx1, tx2) - err := graph.db.WriteShelf(ctx, metadataShelf, func(writer stoabs.Writer) error { - return writer.Iterate(func(key stoabs.Key, _ []byte) error { - return writer.Delete(key) - }, stoabs.BytesKey{}) - }) - require.NoError(t, err) - - // Check values return 0 - var stats Statistics - var lc uint32 - _ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error { - stats = graph.statistics(tx) - lc = graph.getHighestClockValue(tx) - return nil - }) - assert.Equal(t, uint(0), stats.NumberOfTransactions) - assert.Equal(t, uint32(0), lc) - - // Migrate - err = graph.Migrate() - require.NoError(t, err) - - // Assert - _ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error { - stats = graph.statistics(tx) - lc = graph.getHighestClockValue(tx) - return nil - }) - assert.Equal(t, uint(3), stats.NumberOfTransactions) - assert.Equal(t, tx2.Clock(), lc) - }) - t.Run("migrate head to metadata storage", func(t *testing.T) { - graph := CreateDAG(t) - - // Setup: add transactions, remove metadata, add to headsShelf - addTx(t, graph, txRoot, tx1, tx2) - err := graph.db.WriteShelf(ctx, metadataShelf, func(writer stoabs.Writer) error { - return writer.Iterate(func(key stoabs.Key, _ []byte) error { - return writer.Delete(key) - }, stoabs.BytesKey{}) - }) - require.NoError(t, err) - err = graph.db.WriteShelf(ctx, headsShelf, func(writer stoabs.Writer) error { - _ = writer.Put(stoabs.BytesKey(txRoot.Ref().Slice()), []byte{1}) - _ = writer.Put(stoabs.BytesKey(tx2.Ref().Slice()), []byte{1}) - return writer.Put(stoabs.BytesKey(tx1.Ref().Slice()), []byte{1}) - }) - require.NoError(t, err) - - // Check current head is nil - var head hash.SHA256Hash - _ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error { - head, _ = graph.getHead(tx) - return nil - }) - assert.Equal(t, hash.EmptyHash(), head) - - // Migrate - err = graph.Migrate() - require.NoError(t, err) - - // Assert - _ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error { - head, _ = graph.getHead(tx) - return nil - }) - assert.Equal(t, tx2.Ref(), head) - }) - t.Run("nothing to migrate", func(t *testing.T) { - graph := CreateDAG(t) - addTx(t, graph, txRoot, tx1, tx2) - - err := graph.Migrate() - require.NoError(t, err) - - stats := Statistics{} - var lc uint32 - _ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error { - stats = graph.statistics(tx) - lc = graph.getHighestClockValue(tx) - return nil - }) - assert.Equal(t, uint(3), stats.NumberOfTransactions) - assert.Equal(t, tx2.Clock(), lc) - }) -} - func TestDAG_Add(t *testing.T) { ctx := context.Background() t.Run("ok", func(t *testing.T) { diff --git a/network/dag/notifier.go b/network/dag/notifier.go index 675b3882bf..4b6fa4890a 100644 --- a/network/dag/notifier.go +++ b/network/dag/notifier.go @@ -243,6 +243,7 @@ func (p *notifier) Run() error { } // we're going to retry all events synchronously at startup. For the ones that fail we'll start the retry loop failedAtStartup := make([]Event, 0) + readyToRetry := make([]Event, 0) err := p.db.ReadShelf(p.ctx, p.shelfName(), func(reader stoabs.Reader) error { return reader.Iterate(func(k stoabs.Key, v []byte) error { event := Event{} @@ -253,12 +254,7 @@ func (p *notifier) Run() error { return nil } - if err := p.notifyNow(event); err != nil { - if event.Retries < maxRetries { - failedAtStartup = append(failedAtStartup, event) - } - } - + readyToRetry = append(readyToRetry, event) return nil }, stoabs.BytesKey{}) }) @@ -266,6 +262,15 @@ func (p *notifier) Run() error { return err } + // do outside of main loop to prevent long running read + for _, event := range readyToRetry { + if err := p.notifyNow(event); err != nil { + if event.Retries < maxRetries { + failedAtStartup = append(failedAtStartup, event) + } + } + } + // for all events from failedAtStartup, call retry // this may still produce errors in the logs or even duplicate errors since notifyNow also failed // but rather duplicate errors then errors produced from overloading the DB with transactions diff --git a/network/dag/state.go b/network/dag/state.go index 81c2edf519..fe60a74525 100644 --- a/network/dag/state.go +++ b/network/dag/state.go @@ -60,7 +60,7 @@ type state struct { } func (s *state) Migrate() error { - return s.graph.Migrate() + return nil } // NewState returns a new State. The State is used as entry point, it's methods will start transactions and will notify observers from within those transactions. diff --git a/network/interface.go b/network/interface.go index aef5ed4cfa..847d6e58e7 100644 --- a/network/interface.go +++ b/network/interface.go @@ -37,8 +37,6 @@ type Transactions interface { Subscribe(name string, receiver dag.ReceiverFn, filters ...SubscriberOption) error // Subscribers returns the list of notifiers on the DAG that emit events to subscribers. Subscribers() []dag.Notifier - // CleanupSubscriberEvents removes events. Example use is cleaning up events that errored but should be removed due to a bugfix. - CleanupSubscriberEvents(subcriberName, errorPrefix string) error // GetTransactionPayload retrieves the transaction Payload for the given transaction. // If the transaction or Payload is not found, dag.ErrPayloadNotFound is returned. GetTransactionPayload(transactionRef hash.SHA256Hash) ([]byte, error) diff --git a/network/mock.go b/network/mock.go index 90c0ab062c..4a2c5c8ecb 100644 --- a/network/mock.go +++ b/network/mock.go @@ -58,20 +58,6 @@ func (mr *MockTransactionsMockRecorder) AddressBook() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddressBook", reflect.TypeOf((*MockTransactions)(nil).AddressBook)) } -// CleanupSubscriberEvents mocks base method. -func (m *MockTransactions) CleanupSubscriberEvents(subcriberName, errorPrefix string) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CleanupSubscriberEvents", subcriberName, errorPrefix) - ret0, _ := ret[0].(error) - return ret0 -} - -// CleanupSubscriberEvents indicates an expected call of CleanupSubscriberEvents. -func (mr *MockTransactionsMockRecorder) CleanupSubscriberEvents(subcriberName, errorPrefix any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanupSubscriberEvents", reflect.TypeOf((*MockTransactions)(nil).CleanupSubscriberEvents), subcriberName, errorPrefix) -} - // CreateTransaction mocks base method. func (m *MockTransactions) CreateTransaction(ctx context.Context, spec Template) (dag.Transaction, error) { m.ctrl.T.Helper() diff --git a/vcr/ambassador.go b/vcr/ambassador.go index 817e103170..63dd274816 100644 --- a/vcr/ambassador.go +++ b/vcr/ambassador.go @@ -99,9 +99,7 @@ func (n ambassador) Start() error { return fmt.Errorf("failed to subscribe to REPROCESS event stream: %v", err) } - // removing failed events required for #1743 - // remove after v6 release - return n.networkClient.CleanupSubscriberEvents("vcr_vcs", "canonicalization failed: unable to normalize the json-ld document: loading remote context failed: Dereferencing a URL did not result in a valid JSON-LD context") + return nil } func (n ambassador) handleNetworkVCs(event dag.Event) (bool, error) { diff --git a/vcr/test.go b/vcr/test.go index c67fbffd66..845225887f 100644 --- a/vcr/test.go +++ b/vcr/test.go @@ -176,7 +176,6 @@ func newMockContext(t *testing.T) mockContext { tx.EXPECT().WithPersistency().AnyTimes() tx.EXPECT().Subscribe("vcr_vcs", gomock.Any(), gomock.Any()) tx.EXPECT().Subscribe("vcr_revocations", gomock.Any(), gomock.Any()) - tx.EXPECT().CleanupSubscriberEvents("vcr_vcs", gomock.Any()) tx.EXPECT().Disabled().AnyTimes() didResolver := resolver.NewMockDIDResolver(ctrl) documentOwner := didsubject.NewMockDocumentOwner(ctrl) diff --git a/vdr/vdr.go b/vdr/vdr.go index 0672bf339c..f1c8289370 100644 --- a/vdr/vdr.go +++ b/vdr/vdr.go @@ -221,16 +221,6 @@ func (r *Module) Start() error { return err } - // VDR migration needs to be started after ambassador has started! - count, err := r.store.DocumentCount() - if err != nil { - return err - } - if count == 0 { - // remove after v6 release - _, err = r.network.Reprocess(context.Background(), "application/did+json") - } - // start DID Document rollback loop r.routines.Add(1) go func() { diff --git a/vdr/vdr_test.go b/vdr/vdr_test.go index 1c57bdc8f8..36382e69d0 100644 --- a/vdr/vdr_test.go +++ b/vdr/vdr_test.go @@ -25,7 +25,6 @@ import ( "crypto/rand" "encoding/base64" "encoding/json" - "errors" "github.com/lestrrat-go/jwx/v2/jwk" ssi "github.com/nuts-foundation/go-did" "github.com/nuts-foundation/go-did/did" @@ -107,41 +106,6 @@ func TestNewVDR(t *testing.T) { assert.IsType(t, &Module{}, vdr) } -func TestVDR_Start(t *testing.T) { - t.Run("migration", func(t *testing.T) { - t.Run("migrate on 0 document count", func(t *testing.T) { - ctx := newVDRTestCtx(t) - ctx.mockAmbassador.EXPECT().Start() - ctx.mockStore.EXPECT().DocumentCount().Return(uint(0), nil) - ctx.mockNetwork.EXPECT().Reprocess(context.Background(), "application/did+json").Return(nil, nil) - - err := ctx.vdr.Start() - - require.NoError(t, err) - }) - t.Run("don't migrate on > 0 document count", func(t *testing.T) { - ctx := newVDRTestCtx(t) - ctx.mockAmbassador.EXPECT().Start() - ctx.mockStore.EXPECT().DocumentCount().Return(uint(1), nil) - - err := ctx.vdr.Start() - - require.NoError(t, err) - }) - t.Run("error on migration error", func(t *testing.T) { - ctx := newVDRTestCtx(t) - ctx.mockAmbassador.EXPECT().Start() - testError := errors.New("test") - ctx.mockStore.EXPECT().DocumentCount().Return(uint(0), testError) - - err := ctx.vdr.Start() - - assert.Equal(t, testError, err) - }) - }) - -} - func TestVDR_ConflictingDocuments(t *testing.T) { t.Run("diagnostics", func(t *testing.T) {