Skip to content

Commit

Permalink
remove network migration and optimize network event retry
Browse files Browse the repository at this point in the history
  • Loading branch information
woutslakhorst committed Oct 22, 2024
1 parent 4c62ff1 commit d1d1033
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 168 deletions.
63 changes: 0 additions & 63 deletions network/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,69 +96,6 @@ func newDAG(db stoabs.KVStore) *dag {
return &dag{db: db}
}

func (d *dag) Migrate() error {
return d.db.Write(context.Background(), func(tx stoabs.WriteTx) error {
writer := tx.GetShelfWriter(metadataShelf)
// Migrate highest LC value
// Todo: remove after V5 release
_, err := writer.Get(stoabs.BytesKey(highestClockValue))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Highest LC value not stored, migrating...")
highestLC := d.getHighestClockLegacy(tx)
err = d.setHighestClockValue(tx, highestLC)
}
if err != nil {
return err
}

// Migrate number of TXs
// Todo: remove after V5 release
_, err = writer.Get(stoabs.BytesKey(numberOfTransactionsKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Number of transactions not stored, migrating...")
numberOfTXs := d.getNumberOfTransactionsLegacy(tx)
err = d.setNumberOfTransactions(tx, numberOfTXs)
}
if err != nil {
return err
}

// Migrate headsLegacy to single head
// Todo: remove after V6 release => then remove headsShelf
_, err = writer.Get(stoabs.BytesKey(headRefKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Head not stored in metadata, migrating...")
heads := d.headsLegacy(tx)
err = nil // reset error
if len(heads) != 0 { // ignore for empty node
var latestHead hash.SHA256Hash
var latestLC uint32

for _, ref := range heads {
transaction, err := getTransaction(ref, tx)
if err != nil {
if errors.Is(err, ErrTransactionNotFound) {
return fmt.Errorf("database migration failed: %w (%s=%s)", err, core.LogFieldTransactionRef, ref)
}
return err
}
if transaction.Clock() >= latestLC {
latestHead = ref
latestLC = transaction.Clock()
}
}

err = d.setHead(tx, latestHead)
}
}
if err != nil {
return err
}

return nil
})
}

func (d *dag) diagnostics(ctx context.Context) []core.DiagnosticResult {
var stats Statistics
_ = d.db.Read(ctx, func(tx stoabs.ReadTx) error {
Expand Down
98 changes: 0 additions & 98 deletions network/dag/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,104 +107,6 @@ func TestDAG_Get(t *testing.T) {
})
}

func TestDAG_Migrate(t *testing.T) {
ctx := context.Background()
txRoot := CreateTestTransactionWithJWK(0)
tx1 := CreateTestTransactionWithJWK(1, txRoot)
tx2 := CreateTestTransactionWithJWK(2, tx1)

t.Run("migrate LC value and transaction count to metadata storage", func(t *testing.T) {
graph := CreateDAG(t)

// Setup: add transactions, remove metadata
addTx(t, graph, txRoot, tx1, tx2)
err := graph.db.WriteShelf(ctx, metadataShelf, func(writer stoabs.Writer) error {
return writer.Iterate(func(key stoabs.Key, _ []byte) error {
return writer.Delete(key)
}, stoabs.BytesKey{})
})
require.NoError(t, err)

// Check values return 0
var stats Statistics
var lc uint32
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
stats = graph.statistics(tx)
lc = graph.getHighestClockValue(tx)
return nil
})
assert.Equal(t, uint(0), stats.NumberOfTransactions)
assert.Equal(t, uint32(0), lc)

// Migrate
err = graph.Migrate()
require.NoError(t, err)

// Assert
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
stats = graph.statistics(tx)
lc = graph.getHighestClockValue(tx)
return nil
})
assert.Equal(t, uint(3), stats.NumberOfTransactions)
assert.Equal(t, tx2.Clock(), lc)
})
t.Run("migrate head to metadata storage", func(t *testing.T) {
graph := CreateDAG(t)

// Setup: add transactions, remove metadata, add to headsShelf
addTx(t, graph, txRoot, tx1, tx2)
err := graph.db.WriteShelf(ctx, metadataShelf, func(writer stoabs.Writer) error {
return writer.Iterate(func(key stoabs.Key, _ []byte) error {
return writer.Delete(key)
}, stoabs.BytesKey{})
})
require.NoError(t, err)
err = graph.db.WriteShelf(ctx, headsShelf, func(writer stoabs.Writer) error {
_ = writer.Put(stoabs.BytesKey(txRoot.Ref().Slice()), []byte{1})
_ = writer.Put(stoabs.BytesKey(tx2.Ref().Slice()), []byte{1})
return writer.Put(stoabs.BytesKey(tx1.Ref().Slice()), []byte{1})
})
require.NoError(t, err)

// Check current head is nil
var head hash.SHA256Hash
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
head, _ = graph.getHead(tx)
return nil
})
assert.Equal(t, hash.EmptyHash(), head)

// Migrate
err = graph.Migrate()
require.NoError(t, err)

// Assert
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
head, _ = graph.getHead(tx)
return nil
})
assert.Equal(t, tx2.Ref(), head)
})
t.Run("nothing to migrate", func(t *testing.T) {
graph := CreateDAG(t)
addTx(t, graph, txRoot, tx1, tx2)

err := graph.Migrate()
require.NoError(t, err)

stats := Statistics{}
var lc uint32
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
stats = graph.statistics(tx)
lc = graph.getHighestClockValue(tx)
return nil
})
assert.Equal(t, uint(3), stats.NumberOfTransactions)
assert.Equal(t, tx2.Clock(), lc)
})
}

func TestDAG_Add(t *testing.T) {
ctx := context.Background()
t.Run("ok", func(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions network/dag/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (p *notifier) Run() error {
}
// we're going to retry all events synchronously at startup. For the ones that fail we'll start the retry loop
failedAtStartup := make([]Event, 0)
readyToRetry := make([]Event, 0)
err := p.db.ReadShelf(p.ctx, p.shelfName(), func(reader stoabs.Reader) error {
return reader.Iterate(func(k stoabs.Key, v []byte) error {
event := Event{}
Expand All @@ -253,19 +254,23 @@ func (p *notifier) Run() error {
return nil
}

if err := p.notifyNow(event); err != nil {
if event.Retries < maxRetries {
failedAtStartup = append(failedAtStartup, event)
}
}

readyToRetry = append(readyToRetry, event)
return nil
}, stoabs.BytesKey{})
})
if err != nil {
return err
}

// do outside of main loop to prevent long running read
for _, event := range readyToRetry {
if err := p.notifyNow(event); err != nil {
if event.Retries < maxRetries {
failedAtStartup = append(failedAtStartup, event)
}
}
}

// for all events from failedAtStartup, call retry
// this may still produce errors in the logs or even duplicate errors since notifyNow also failed
// but rather duplicate errors then errors produced from overloading the DB with transactions
Expand Down
2 changes: 1 addition & 1 deletion network/dag/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type state struct {
}

func (s *state) Migrate() error {
return s.graph.Migrate()
return nil
}

// NewState returns a new State. The State is used as entry point, it's methods will start transactions and will notify observers from within those transactions.
Expand Down

0 comments on commit d1d1033

Please sign in to comment.