Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove network migration and optimize network event retry #3510

Merged
merged 2 commits into from
Oct 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 0 additions & 91 deletions network/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,69 +96,6 @@ func newDAG(db stoabs.KVStore) *dag {
return &dag{db: db}
}

func (d *dag) Migrate() error {
return d.db.Write(context.Background(), func(tx stoabs.WriteTx) error {
writer := tx.GetShelfWriter(metadataShelf)
// Migrate highest LC value
// Todo: remove after V5 release
_, err := writer.Get(stoabs.BytesKey(highestClockValue))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Highest LC value not stored, migrating...")
highestLC := d.getHighestClockLegacy(tx)
err = d.setHighestClockValue(tx, highestLC)
}
if err != nil {
return err
}

// Migrate number of TXs
// Todo: remove after V5 release
_, err = writer.Get(stoabs.BytesKey(numberOfTransactionsKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Number of transactions not stored, migrating...")
numberOfTXs := d.getNumberOfTransactionsLegacy(tx)
err = d.setNumberOfTransactions(tx, numberOfTXs)
}
if err != nil {
return err
}

// Migrate headsLegacy to single head
// Todo: remove after V6 release => then remove headsShelf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming the second half of this todo points to a now unused constant that should be removed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused shelf, but since the entire DB will be removed in the future...

_, err = writer.Get(stoabs.BytesKey(headRefKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
log.Logger().Info("Head not stored in metadata, migrating...")
heads := d.headsLegacy(tx)
err = nil // reset error
if len(heads) != 0 { // ignore for empty node
var latestHead hash.SHA256Hash
var latestLC uint32

for _, ref := range heads {
transaction, err := getTransaction(ref, tx)
if err != nil {
if errors.Is(err, ErrTransactionNotFound) {
return fmt.Errorf("database migration failed: %w (%s=%s)", err, core.LogFieldTransactionRef, ref)
}
return err
}
if transaction.Clock() >= latestLC {
latestHead = ref
latestLC = transaction.Clock()
}
}

err = d.setHead(tx, latestHead)
}
}
if err != nil {
return err
}

return nil
})
}

func (d *dag) diagnostics(ctx context.Context) []core.DiagnosticResult {
var stats Statistics
_ = d.db.Read(ctx, func(tx stoabs.ReadTx) error {
Expand Down Expand Up @@ -296,27 +233,6 @@ func (d dag) getHighestClockValue(tx stoabs.ReadTx) uint32 {
return bytesToClock(value)
}

// getHighestClockLegacy is used for migration.
// Remove after V5 or V6 release?
func (d dag) getHighestClockLegacy(tx stoabs.ReadTx) uint32 {
reader := tx.GetShelfReader(clockShelf)
var clock uint32
err := reader.Iterate(func(key stoabs.Key, _ []byte) error {
currentClock := uint32(key.(stoabs.Uint32Key))
if currentClock > clock {
clock = currentClock
}
return nil
}, stoabs.Uint32Key(0))
if err != nil {
log.Logger().
WithError(err).
Error("Failed to read clock shelf")
return 0
}
return clock
}

func (d dag) getHead(tx stoabs.ReadTx) (hash.SHA256Hash, error) {
head, err := tx.GetShelfReader(metadataShelf).Get(stoabs.BytesKey(headRefKey))
if errors.Is(err, stoabs.ErrKeyNotFound) {
Expand All @@ -329,13 +245,6 @@ func (d dag) getHead(tx stoabs.ReadTx) (hash.SHA256Hash, error) {
return hash.FromSlice(head), nil
}

// getNumberOfTransactionsLegacy is used for migration.
// Remove after V5 or V6 release?
func (d dag) getNumberOfTransactionsLegacy(tx stoabs.ReadTx) uint64 {
reader := tx.GetShelfReader(transactionsShelf)
return uint64(reader.Stats().NumEntries)
}

func (d dag) setHighestClockValue(tx stoabs.WriteTx, count uint32) error {
writer := tx.GetShelfWriter(metadataShelf)
bytes := make([]byte, 4)
Expand Down
98 changes: 0 additions & 98 deletions network/dag/dag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,104 +107,6 @@ func TestDAG_Get(t *testing.T) {
})
}

func TestDAG_Migrate(t *testing.T) {
ctx := context.Background()
txRoot := CreateTestTransactionWithJWK(0)
tx1 := CreateTestTransactionWithJWK(1, txRoot)
tx2 := CreateTestTransactionWithJWK(2, tx1)

t.Run("migrate LC value and transaction count to metadata storage", func(t *testing.T) {
graph := CreateDAG(t)

// Setup: add transactions, remove metadata
addTx(t, graph, txRoot, tx1, tx2)
err := graph.db.WriteShelf(ctx, metadataShelf, func(writer stoabs.Writer) error {
return writer.Iterate(func(key stoabs.Key, _ []byte) error {
return writer.Delete(key)
}, stoabs.BytesKey{})
})
require.NoError(t, err)

// Check values return 0
var stats Statistics
var lc uint32
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
stats = graph.statistics(tx)
lc = graph.getHighestClockValue(tx)
return nil
})
assert.Equal(t, uint(0), stats.NumberOfTransactions)
assert.Equal(t, uint32(0), lc)

// Migrate
err = graph.Migrate()
require.NoError(t, err)

// Assert
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
stats = graph.statistics(tx)
lc = graph.getHighestClockValue(tx)
return nil
})
assert.Equal(t, uint(3), stats.NumberOfTransactions)
assert.Equal(t, tx2.Clock(), lc)
})
t.Run("migrate head to metadata storage", func(t *testing.T) {
graph := CreateDAG(t)

// Setup: add transactions, remove metadata, add to headsShelf
addTx(t, graph, txRoot, tx1, tx2)
err := graph.db.WriteShelf(ctx, metadataShelf, func(writer stoabs.Writer) error {
return writer.Iterate(func(key stoabs.Key, _ []byte) error {
return writer.Delete(key)
}, stoabs.BytesKey{})
})
require.NoError(t, err)
err = graph.db.WriteShelf(ctx, headsShelf, func(writer stoabs.Writer) error {
_ = writer.Put(stoabs.BytesKey(txRoot.Ref().Slice()), []byte{1})
_ = writer.Put(stoabs.BytesKey(tx2.Ref().Slice()), []byte{1})
return writer.Put(stoabs.BytesKey(tx1.Ref().Slice()), []byte{1})
})
require.NoError(t, err)

// Check current head is nil
var head hash.SHA256Hash
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
head, _ = graph.getHead(tx)
return nil
})
assert.Equal(t, hash.EmptyHash(), head)

// Migrate
err = graph.Migrate()
require.NoError(t, err)

// Assert
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
head, _ = graph.getHead(tx)
return nil
})
assert.Equal(t, tx2.Ref(), head)
})
t.Run("nothing to migrate", func(t *testing.T) {
graph := CreateDAG(t)
addTx(t, graph, txRoot, tx1, tx2)

err := graph.Migrate()
require.NoError(t, err)

stats := Statistics{}
var lc uint32
_ = graph.db.Read(ctx, func(tx stoabs.ReadTx) error {
stats = graph.statistics(tx)
lc = graph.getHighestClockValue(tx)
return nil
})
assert.Equal(t, uint(3), stats.NumberOfTransactions)
assert.Equal(t, tx2.Clock(), lc)
})
}

func TestDAG_Add(t *testing.T) {
ctx := context.Background()
t.Run("ok", func(t *testing.T) {
Expand Down
17 changes: 11 additions & 6 deletions network/dag/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (p *notifier) Run() error {
}
// we're going to retry all events synchronously at startup. For the ones that fail we'll start the retry loop
failedAtStartup := make([]Event, 0)
readyToRetry := make([]Event, 0)
err := p.db.ReadShelf(p.ctx, p.shelfName(), func(reader stoabs.Reader) error {
return reader.Iterate(func(k stoabs.Key, v []byte) error {
event := Event{}
Expand All @@ -253,19 +254,23 @@ func (p *notifier) Run() error {
return nil
}

if err := p.notifyNow(event); err != nil {
if event.Retries < maxRetries {
failedAtStartup = append(failedAtStartup, event)
}
}

readyToRetry = append(readyToRetry, event)
return nil
}, stoabs.BytesKey{})
})
if err != nil {
return err
}

// do outside of main loop to prevent long running read
for _, event := range readyToRetry {
if err := p.notifyNow(event); err != nil {
if event.Retries < maxRetries {
failedAtStartup = append(failedAtStartup, event)
}
}
}

// for all events from failedAtStartup, call retry
// this may still produce errors in the logs or even duplicate errors since notifyNow also failed
// but rather duplicate errors then errors produced from overloading the DB with transactions
Expand Down
2 changes: 1 addition & 1 deletion network/dag/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type state struct {
}

func (s *state) Migrate() error {
return s.graph.Migrate()
return nil
}

// NewState returns a new State. The State is used as entry point, it's methods will start transactions and will notify observers from within those transactions.
Expand Down
2 changes: 0 additions & 2 deletions network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ type Transactions interface {
Subscribe(name string, receiver dag.ReceiverFn, filters ...SubscriberOption) error
// Subscribers returns the list of notifiers on the DAG that emit events to subscribers.
Subscribers() []dag.Notifier
// CleanupSubscriberEvents removes events. Example use is cleaning up events that errored but should be removed due to a bugfix.
CleanupSubscriberEvents(subcriberName, errorPrefix string) error
// GetTransactionPayload retrieves the transaction Payload for the given transaction.
// If the transaction or Payload is not found, dag.ErrPayloadNotFound is returned.
GetTransactionPayload(transactionRef hash.SHA256Hash) ([]byte, error)
Expand Down
14 changes: 0 additions & 14 deletions network/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 1 addition & 3 deletions vcr/ambassador.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ func (n ambassador) Start() error {
return fmt.Errorf("failed to subscribe to REPROCESS event stream: %v", err)
}

// removing failed events required for #1743
// remove after v6 release
return n.networkClient.CleanupSubscriberEvents("vcr_vcs", "canonicalization failed: unable to normalize the json-ld document: loading remote context failed: Dereferencing a URL did not result in a valid JSON-LD context")
return nil
}

func (n ambassador) handleNetworkVCs(event dag.Event) (bool, error) {
Expand Down
1 change: 0 additions & 1 deletion vcr/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ func newMockContext(t *testing.T) mockContext {
tx.EXPECT().WithPersistency().AnyTimes()
tx.EXPECT().Subscribe("vcr_vcs", gomock.Any(), gomock.Any())
tx.EXPECT().Subscribe("vcr_revocations", gomock.Any(), gomock.Any())
tx.EXPECT().CleanupSubscriberEvents("vcr_vcs", gomock.Any())
tx.EXPECT().Disabled().AnyTimes()
didResolver := resolver.NewMockDIDResolver(ctrl)
documentOwner := didsubject.NewMockDocumentOwner(ctrl)
Expand Down
10 changes: 0 additions & 10 deletions vdr/vdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,16 +221,6 @@ func (r *Module) Start() error {
return err
}

// VDR migration needs to be started after ambassador has started!
count, err := r.store.DocumentCount()
if err != nil {
return err
}
if count == 0 {
// remove after v6 release
_, err = r.network.Reprocess(context.Background(), "application/did+json")
}

// start DID Document rollback loop
r.routines.Add(1)
go func() {
Expand Down
36 changes: 0 additions & 36 deletions vdr/vdr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"crypto/rand"
"encoding/base64"
"encoding/json"
"errors"
"github.com/lestrrat-go/jwx/v2/jwk"
ssi "github.com/nuts-foundation/go-did"
"github.com/nuts-foundation/go-did/did"
Expand Down Expand Up @@ -107,41 +106,6 @@ func TestNewVDR(t *testing.T) {
assert.IsType(t, &Module{}, vdr)
}

func TestVDR_Start(t *testing.T) {
t.Run("migration", func(t *testing.T) {
t.Run("migrate on 0 document count", func(t *testing.T) {
ctx := newVDRTestCtx(t)
ctx.mockAmbassador.EXPECT().Start()
ctx.mockStore.EXPECT().DocumentCount().Return(uint(0), nil)
ctx.mockNetwork.EXPECT().Reprocess(context.Background(), "application/did+json").Return(nil, nil)

err := ctx.vdr.Start()

require.NoError(t, err)
})
t.Run("don't migrate on > 0 document count", func(t *testing.T) {
ctx := newVDRTestCtx(t)
ctx.mockAmbassador.EXPECT().Start()
ctx.mockStore.EXPECT().DocumentCount().Return(uint(1), nil)

err := ctx.vdr.Start()

require.NoError(t, err)
})
t.Run("error on migration error", func(t *testing.T) {
ctx := newVDRTestCtx(t)
ctx.mockAmbassador.EXPECT().Start()
testError := errors.New("test")
ctx.mockStore.EXPECT().DocumentCount().Return(uint(0), testError)

err := ctx.vdr.Start()

assert.Equal(t, testError, err)
})
})

}

func TestVDR_ConflictingDocuments(t *testing.T) {

t.Run("diagnostics", func(t *testing.T) {
Expand Down
Loading