From d1d1033e47ef16abbab35560de63994474791751 Mon Sep 17 00:00:00 2001 From: Wout Slakhorst Date: Tue, 22 Oct 2024 14:26:43 +0200 Subject: [PATCH] remove network migration and optimize network event retry --- network/dag/dag.go | 63 -------------------------- network/dag/dag_test.go | 98 ----------------------------------------- network/dag/notifier.go | 17 ++++--- network/dag/state.go | 2 +- 4 files changed, 12 insertions(+), 168 deletions(-) diff --git a/network/dag/dag.go b/network/dag/dag.go index 05ad9e77b5..b209fa907c 100644 --- a/network/dag/dag.go +++ b/network/dag/dag.go @@ -96,69 +96,6 @@ func newDAG(db stoabs.KVStore) *dag { return &dag{db: db} } -func (d *dag) Migrate() error { - return d.db.Write(context.Background(), func(tx stoabs.WriteTx) error { - writer := tx.GetShelfWriter(metadataShelf) - // Migrate highest LC value - // Todo: remove after V5 release - _, err := writer.Get(stoabs.BytesKey(highestClockValue)) - if errors.Is(err, stoabs.ErrKeyNotFound) { - log.Logger().Info("Highest LC value not stored, migrating...") - highestLC := d.getHighestClockLegacy(tx) - err = d.setHighestClockValue(tx, highestLC) - } - if err != nil { - return err - } - - // Migrate number of TXs - // Todo: remove after V5 release - _, err = writer.Get(stoabs.BytesKey(numberOfTransactionsKey)) - if errors.Is(err, stoabs.ErrKeyNotFound) { - log.Logger().Info("Number of transactions not stored, migrating...") - numberOfTXs := d.getNumberOfTransactionsLegacy(tx) - err = d.setNumberOfTransactions(tx, numberOfTXs) - } - if err != nil { - return err - } - - // Migrate headsLegacy to single head - // Todo: remove after V6 release => then remove headsShelf - _, err = writer.Get(stoabs.BytesKey(headRefKey)) - if errors.Is(err, stoabs.ErrKeyNotFound) { - log.Logger().Info("Head not stored in metadata, migrating...") - heads := d.headsLegacy(tx) - err = nil // reset error - if len(heads) != 0 { // ignore for empty node - var latestHead hash.SHA256Hash - var latestLC uint32 - - for _, ref := range heads { - transaction, err := getTransaction(ref, tx) - if err != nil { - if errors.Is(err, ErrTransactionNotFound) { - return fmt.Errorf("database migration failed: %w (%s=%s)", err, core.LogFieldTransactionRef, ref) - } - return err - } - if transaction.Clock() >= latestLC { - latestHead = ref - latestLC = transaction.Clock() - } - } - - err = d.setHead(tx, latestHead) - } - } - if err != nil { - return err - } - - return nil - }) -} - func (d *dag) diagnostics(ctx context.Context) []core.DiagnosticResult { var stats Statistics _ = d.db.Read(ctx, func(tx stoabs.ReadTx) error { diff --git a/network/dag/dag_test.go b/network/dag/dag_test.go index 935f85691c..4b072621fe 100644 --- a/network/dag/dag_test.go +++ b/network/dag/dag_test.go @@ -107,104 +107,6 @@ func TestDAG_Get(t *testing.T) { }) } -func TestDAG_Migrate(t *testing.T) { - ctx := context.Background() - txRoot := CreateTestTransactionWithJWK(0) - tx1 := CreateTestTransactionWithJWK(1, txRoot) - tx2 := CreateTestTransactionWithJWK(2, tx1) - - t.Run("migrate LC value and transaction count to metadata storage", func(t *testing.T) { - graph := CreateDAG(t) - - // Setup: add transactions, remove metadata - addTx(t, graph, txRoot, tx1, tx2) - err := graph.db.WriteShelf(ctx, metadataShelf, func(writer stoabs.Writer) error { - return writer.Iterate(func(key stoabs.Key, _ []byte) error { - return writer.Delete(key) - }, stoabs.BytesKey{}) - }) - require.NoError(t, err) - - // Check values return 0 - var stats Statistics - var lc uint32 - _ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error { - stats = graph.statistics(tx) - lc = graph.getHighestClockValue(tx) - return nil - }) - assert.Equal(t, uint(0), stats.NumberOfTransactions) - assert.Equal(t, uint32(0), lc) - - // Migrate - err = graph.Migrate() - require.NoError(t, err) - - // Assert - _ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error { - stats = graph.statistics(tx) - lc = graph.getHighestClockValue(tx) - return nil - }) - assert.Equal(t, uint(3), stats.NumberOfTransactions) - assert.Equal(t, tx2.Clock(), lc) - }) - t.Run("migrate head to metadata storage", func(t *testing.T) { - graph := CreateDAG(t) - - // Setup: add transactions, remove metadata, add to headsShelf - addTx(t, graph, txRoot, tx1, tx2) - err := graph.db.WriteShelf(ctx, metadataShelf, func(writer stoabs.Writer) error { - return writer.Iterate(func(key stoabs.Key, _ []byte) error { - return writer.Delete(key) - }, stoabs.BytesKey{}) - }) - require.NoError(t, err) - err = graph.db.WriteShelf(ctx, headsShelf, func(writer stoabs.Writer) error { - _ = writer.Put(stoabs.BytesKey(txRoot.Ref().Slice()), []byte{1}) - _ = writer.Put(stoabs.BytesKey(tx2.Ref().Slice()), []byte{1}) - return writer.Put(stoabs.BytesKey(tx1.Ref().Slice()), []byte{1}) - }) - require.NoError(t, err) - - // Check current head is nil - var head hash.SHA256Hash - _ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error { - head, _ = graph.getHead(tx) - return nil - }) - assert.Equal(t, hash.EmptyHash(), head) - - // Migrate - err = graph.Migrate() - require.NoError(t, err) - - // Assert - _ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error { - head, _ = graph.getHead(tx) - return nil - }) - assert.Equal(t, tx2.Ref(), head) - }) - t.Run("nothing to migrate", func(t *testing.T) { - graph := CreateDAG(t) - addTx(t, graph, txRoot, tx1, tx2) - - err := graph.Migrate() - require.NoError(t, err) - - stats := Statistics{} - var lc uint32 - _ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error { - stats = graph.statistics(tx) - lc = graph.getHighestClockValue(tx) - return nil - }) - assert.Equal(t, uint(3), stats.NumberOfTransactions) - assert.Equal(t, tx2.Clock(), lc) - }) -} - func TestDAG_Add(t *testing.T) { ctx := context.Background() t.Run("ok", func(t *testing.T) { diff --git a/network/dag/notifier.go b/network/dag/notifier.go index 675b3882bf..4b6fa4890a 100644 --- a/network/dag/notifier.go +++ b/network/dag/notifier.go @@ -243,6 +243,7 @@ func (p *notifier) Run() error { } // we're going to retry all events synchronously at startup. For the ones that fail we'll start the retry loop failedAtStartup := make([]Event, 0) + readyToRetry := make([]Event, 0) err := p.db.ReadShelf(p.ctx, p.shelfName(), func(reader stoabs.Reader) error { return reader.Iterate(func(k stoabs.Key, v []byte) error { event := Event{} @@ -253,12 +254,7 @@ func (p *notifier) Run() error { return nil } - if err := p.notifyNow(event); err != nil { - if event.Retries < maxRetries { - failedAtStartup = append(failedAtStartup, event) - } - } - + readyToRetry = append(readyToRetry, event) return nil }, stoabs.BytesKey{}) }) @@ -266,6 +262,15 @@ func (p *notifier) Run() error { return err } + // do outside of main loop to prevent long running read + for _, event := range readyToRetry { + if err := p.notifyNow(event); err != nil { + if event.Retries < maxRetries { + failedAtStartup = append(failedAtStartup, event) + } + } + } + // for all events from failedAtStartup, call retry // this may still produce errors in the logs or even duplicate errors since notifyNow also failed // but rather duplicate errors then errors produced from overloading the DB with transactions diff --git a/network/dag/state.go b/network/dag/state.go index 81c2edf519..fe60a74525 100644 --- a/network/dag/state.go +++ b/network/dag/state.go @@ -60,7 +60,7 @@ type state struct { } func (s *state) Migrate() error { - return s.graph.Migrate() + return nil } // NewState returns a new State. The State is used as entry point, it's methods will start transactions and will notify observers from within those transactions.