diff --git a/common/fixation_entry.go b/common/fixation_entry.go index f9e28a57da..fa795698c5 100644 --- a/common/fixation_entry.go +++ b/common/fixation_entry.go @@ -32,6 +32,8 @@ import ( // - GetEntry(index, *entry): get a copy (and reference) of the latest version of an entry // - PutEntry(index, block): drop reference to an existing entry with "index" and exact "block" (*) // - DelEntry(index, block): mark an entry as unavailable for new GetEntry() calls +// - HasEntry(index, block): checks for existence of a specific version of an entry +// - IsEntryStale(index, block): checks if a specific version of an entry is stale // - GetAllEntryIndices(): get all the entries indices (without versions) // - GetAllEntryVersions(index): get all the versions of an entry (for testing) // - GetEntryVersionsRange(index, block, delta): get range of entry versions (**) @@ -140,28 +142,28 @@ const ( timerStaleEntry = 0x03 ) -func encodeForTimer(index string, block uint64, kind byte) []byte { +func encodeForTimer(safeIndex types.SafeIndex, block uint64, kind byte) []byte { // NOTE: the encoding places callback type first to ensure the order of // callbacks when there are multiple at the same block (for some entry); // it is followed by the entry version (block) and index. - encodedKey := make([]byte, 8+1+len(index)) - copy(encodedKey[9:], []byte(index)) + encodedKey := make([]byte, 8+1+len(safeIndex)) + copy(encodedKey[9:], []byte(safeIndex)) binary.BigEndian.PutUint64(encodedKey[1:9], block) encodedKey[0] = kind return encodedKey } -func decodeFromTimer(encodedKey []byte) (index string, block uint64, kind byte) { - index = string(encodedKey[9:]) +func decodeFromTimer(encodedKey []byte) (safeIndex types.SafeIndex, block uint64, kind byte) { + safeIndex = types.SafeIndex(encodedKey[9:]) block = binary.BigEndian.Uint64(encodedKey[1:9]) kind = encodedKey[0] - return index, block, kind + return safeIndex, block, kind } -func (fs *FixationStore) getEntryStore(ctx sdk.Context, index string) *prefix.Store { +func (fs *FixationStore) getEntryStore(ctx sdk.Context, safeIndex types.SafeIndex) *prefix.Store { store := prefix.NewStore( ctx.KVStore(fs.storeKey), - types.KeyPrefix(fs.createEntryStoreKey(index))) + types.KeyPrefix(fs.createEntryStoreKey(string(safeIndex)))) return &store } @@ -169,20 +171,34 @@ func (fs *FixationStore) getEntryStore(ctx sdk.Context, index string) *prefix.St // the same expirty block. Useful, for example, when a newer entry takes responsibility // for a pending deletion from the previous owner. func (fs *FixationStore) transferTimer(ctx sdk.Context, prev, next types.Entry, block uint64, kind byte) { - key := encodeForTimer(prev.Index, prev.Block, kind) + key := encodeForTimer(prev.SafeIndex(), prev.Block, kind) fs.tstore.DelTimerByBlockHeight(ctx, block, key) - key = encodeForTimer(next.Index, next.Block, kind) + key = encodeForTimer(next.SafeIndex(), next.Block, kind) fs.tstore.AddTimerByBlockHeight(ctx, block, key, []byte{}) } +// hasEntry returns wether a specific entry exists in the store +// (any kind of entry, even deleted or stale) +func (fs *FixationStore) hasEntry(ctx sdk.Context, safeIndex types.SafeIndex, block uint64) bool { + store := fs.getEntryStore(ctx, safeIndex) + byteKey := types.EncodeKey(block) + return store.Has(byteKey) +} + // getEntry returns an existing entry in the store -func (fs *FixationStore) getEntry(ctx sdk.Context, safeIndex string, block uint64) (entry types.Entry) { +// (any kind of entry, even deleted or stale) +func (fs *FixationStore) getEntry(ctx sdk.Context, safeIndex types.SafeIndex, block uint64) (entry types.Entry) { store := fs.getEntryStore(ctx, safeIndex) byteKey := types.EncodeKey(block) b := store.Get(byteKey) if b == nil { - panic(fmt.Sprintf("getEntry: unknown entry: %s block: %d", types.DesanitizeIndex(safeIndex), block)) + // panic:ok: internal API that expects the to exist + utils.LavaFormatPanic("fixation: getEntry failed (unknown entry)", sdkerrors.ErrNotFound, + utils.Attribute{Key: "prefix", Value: fs.prefix}, + utils.Attribute{Key: "index", Value: types.DesanitizeIndex(safeIndex)}, + utils.Attribute{Key: "block", Value: block}, + ) } fs.cdc.MustUnmarshal(b, &entry) return entry @@ -190,7 +206,7 @@ func (fs *FixationStore) getEntry(ctx sdk.Context, safeIndex string, block uint6 // setEntry modifies an existing entry in the store func (fs *FixationStore) setEntry(ctx sdk.Context, entry types.Entry) { - store := fs.getEntryStore(ctx, entry.Index) + store := fs.getEntryStore(ctx, entry.SafeIndex()) byteKey := types.EncodeKey(entry.Block) marshaledEntry := fs.cdc.MustMarshal(&entry) store.Set(byteKey, marshaledEntry) @@ -222,7 +238,14 @@ func (fs *FixationStore) AppendEntry( fs.setEntryIndex(ctx, safeIndex, true) } else { if block < latestEntry.Block { - panic(fmt.Sprintf("AppendEntry for block %d < latest entry block %d", block, latestEntry.Block)) + // how come getUnmarshaledEntryForBlock lied to us?! + return utils.LavaFormatError("critical: AppendEntry block smaller than latest", + fmt.Errorf("block %d < latest entry block %d", block, latestEntry.Block), + utils.Attribute{Key: "prefix", Value: fs.prefix}, + utils.Attribute{Key: "index", Value: index}, + utils.Attribute{Key: "block", Value: block}, + utils.Attribute{Key: "latest", Value: latestEntry.Block}, + ) } // temporary: do not allow adding new entries for an index that was deleted @@ -242,7 +265,7 @@ func (fs *FixationStore) AppendEntry( } // if the previous latest entry is marked with DeleteAt which is set to expire after - // theis future entry's maturity (block), then transfer this DeleteAt to the future + // this future entry's maturity (block), then transfer this DeleteAt to the future // entry, and then replace the old timer with a new timer (below) // (note: deletion, if any, cannot be for the current block, since it would have been // processed at the beginning of the block, and AppendEntry would fail earlier). @@ -271,7 +294,7 @@ func (fs *FixationStore) AppendEntry( // create a new entry entry := types.Entry{ - Index: safeIndex, + Index: string(safeIndex), Block: block, StaleAt: math.MaxUint64, DeleteAt: deleteAt, @@ -299,23 +322,50 @@ func (fs *FixationStore) entryCallbackBeginBlock(ctx sdk.Context, key []byte, da fs.deleteMarkedEntry(ctx, safeIndex, block) case timerStaleEntry: fs.deleteStaleEntries(ctx, safeIndex, block) + default: + utils.LavaFormatPanic("fixation: timer callback unknown type", + // panic:ok: state is badly invalid, because we expect the kind of the timer + // to always be one of the above. + fmt.Errorf("unknown callback kind = %x", kind), + utils.Attribute{Key: "prefix", Value: fs.prefix}, + utils.Attribute{Key: "index", Value: types.DesanitizeIndex(safeIndex)}, + utils.Attribute{Key: "block", Value: ctx.BlockHeight()}, + ) } } -func (fs *FixationStore) updateFutureEntry(ctx sdk.Context, safeIndex string, block uint64) { - if block != uint64(ctx.BlockHeight()) { - panic(fmt.Sprintf("Future entry: future block %d != current block %d", block, ctx.BlockHeight())) +func (fs *FixationStore) updateFutureEntry(ctx sdk.Context, safeIndex types.SafeIndex, block uint64) { + // sanity check: future entries should get timeout on their block reachs now + ctxBlock := uint64(ctx.BlockHeight()) + if block != ctxBlock { + // panic:ok: state is badly invalid, because we expect the expiry block of a + // timer that expired now to be same as current block height. + utils.LavaFormatPanic("fixation: future callback block mismatch", + fmt.Errorf("wrong expiry block %d != current block %d", block, ctxBlock), + utils.Attribute{Key: "prefix", Value: fs.prefix}, + utils.Attribute{Key: "index", Value: types.DesanitizeIndex(safeIndex)}, + utils.Attribute{Key: "expiry", Value: block}, + utils.Attribute{Key: "block", Value: ctxBlock}, + ) } latestEntry, found := fs.getUnmarshaledEntryForBlock(ctx, safeIndex, block-1) if found { // previous latest entry should never have its DeleteAt set for this block: - // if our AppendEntry happened before the DelEntry, we would get marked (and - // not the previous latest entry); if the DelEntry happened first, then we - // would inherit the DeleteAt from the previous latest entry. + // if our AppendEntry happened before some DelEntry, we would get marked for + // delete with DeleteAt (and not the previous latest entry); if the DelEntry + // happened first, then upon our AppendEntry() we would inherit the DeleteAt + // from the previous latest entry. if latestEntry.HasDeleteAt() { - panic(fmt.Sprintf("Future entry: latest entry has DeleteAt %d", latestEntry.DeleteAt)) + // panic:ok: internal state mismatch, unknown outcome if we proceed + utils.LavaFormatPanic("fixation: future entry callback invalid state", + fmt.Errorf("previous latest entry marked delete at %d", latestEntry.DeleteAt), + utils.Attribute{Key: "prefix", Value: fs.prefix}, + utils.Attribute{Key: "index", Value: types.DesanitizeIndex(safeIndex)}, + utils.Attribute{Key: "block", Value: block}, + utils.Attribute{Key: "latest", Value: latestEntry.Block}, + ) } // latest entry had extra refcount for being the latest; so drop that refcount @@ -325,12 +375,19 @@ func (fs *FixationStore) updateFutureEntry(ctx sdk.Context, safeIndex string, bl } } -func (fs *FixationStore) deleteMarkedEntry(ctx sdk.Context, safeIndex string, block uint64) { +func (fs *FixationStore) deleteMarkedEntry(ctx sdk.Context, safeIndex types.SafeIndex, block uint64) { entry := fs.getEntry(ctx, safeIndex, block) ctxBlock := uint64(ctx.BlockHeight()) if entry.DeleteAt != ctxBlock { - panic(fmt.Sprintf("DelEntry entry deleted %d != current ctx block %d", entry.DeleteAt, ctxBlock)) + // panic:ok: internal state mismatch, unknown outcome if we proceed + utils.LavaFormatPanic("fixation: delete entry callback invalid state", + fmt.Errorf("entry delete at %d != current block %d", entry.DeleteAt, ctxBlock), + utils.Attribute{Key: "prefix", Value: fs.prefix}, + utils.Attribute{Key: "index", Value: types.DesanitizeIndex(safeIndex)}, + utils.Attribute{Key: "block", Value: ctxBlock}, + utils.Attribute{Key: "delete", Value: entry.DeleteAt}, + ) } fs.setEntryIndex(ctx, safeIndex, false) @@ -357,15 +414,14 @@ func (fs *FixationStore) deleteMarkedEntry(ctx sdk.Context, safeIndex string, bl } for _, entry := range entriesToRemove { - key := encodeForTimer(entry.Index, entry.Block, timerFutureEntry) + key := encodeForTimer(entry.SafeIndex(), entry.Block, timerFutureEntry) fs.tstore.DelTimerByBlockHeight(ctx, entry.Block, key) - fs.removeEntry(ctx, entry.Index, entry.Block) + fs.removeEntry(ctx, entry.SafeIndex(), entry.Block) } } -func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex string, _ uint64) { +func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex types.SafeIndex, _ uint64) { store := fs.getEntryStore(ctx, safeIndex) - ctxBlock := uint64(ctx.BlockHeight()) iterator := sdk.KVStorePrefixIterator(store, []byte{}) defer iterator.Close() @@ -415,7 +471,7 @@ func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex string, _ } // entry is not stale: skip - if !entry.IsStaleBy(ctxBlock) { + if !entry.IsStale(ctx) { safeToDeleteEntry = false safeToDeleteIndex = false continue @@ -442,10 +498,15 @@ func (fs *FixationStore) deleteStaleEntries(ctx sdk.Context, safeIndex string, _ } // ReadEntry returns and existing entry with index and specific block +// (should be called only for existing entries; will panic otherwise) func (fs *FixationStore) ReadEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) { safeIndex, err := types.SanitizeIndex(index) if err != nil { - panic("ReadEntry invalid non-ascii entry: " + index) + // panic:ok: entry expected to exist as is + utils.LavaFormatPanic("fixation: ReadEntry failed (invalid index)", err, + utils.Attribute{Key: "prefix", Value: fs.prefix}, + utils.Attribute{Key: "index", Value: index}, + ) } entry := fs.getEntry(ctx, safeIndex, block) @@ -453,10 +514,15 @@ func (fs *FixationStore) ReadEntry(ctx sdk.Context, index string, block uint64, } // ModifyEntry modifies an existing entry in the store +// (should be called only for existing entries; will panic otherwise) func (fs *FixationStore) ModifyEntry(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) { safeIndex, err := types.SanitizeIndex(index) if err != nil { - panic("ModifyEntry with non-ascii index: " + index) + // panic:ok: entry expected to exist as is + utils.LavaFormatPanic("fixation: ModifyEntry failed (invalid index)", err, + utils.Attribute{Key: "prefix", Value: fs.prefix}, + utils.Attribute{Key: "index", Value: index}, + ) } entry := fs.getEntry(ctx, safeIndex, block) @@ -466,7 +532,7 @@ func (fs *FixationStore) ModifyEntry(ctx sdk.Context, index string, block uint64 // getUnmarshaledEntryForBlock gets an entry version for an index that has // nearest-smaller block version for the given block arg. -func (fs *FixationStore) getUnmarshaledEntryForBlock(ctx sdk.Context, safeIndex string, block uint64) (types.Entry, bool) { +func (fs *FixationStore) getUnmarshaledEntryForBlock(ctx sdk.Context, safeIndex types.SafeIndex, block uint64) (types.Entry, bool) { types.AssertSanitizedIndex(safeIndex, fs.prefix) store := fs.getEntryStore(ctx, safeIndex) ctxBlock := uint64(ctx.BlockHeight()) @@ -516,7 +582,7 @@ func (fs *FixationStore) getUnmarshaledEntryForBlock(ctx sdk.Context, safeIndex func (fs *FixationStore) FindEntry2(ctx sdk.Context, index string, block uint64, entryData codec.ProtoMarshaler) (uint64, bool) { safeIndex, err := types.SanitizeIndex(index) if err != nil { - utils.LavaFormatError("FindEntry failed", err, + utils.LavaFormatError("FindEntry failed (invalid index)", err, utils.Attribute{Key: "index", Value: index}, ) return 0, false @@ -545,11 +611,37 @@ func (fs *FixationStore) FindEntry(ctx sdk.Context, index string, block uint64, return found } +// IsEntryStale returns true if an entry version exists and is stale. +func (fs *FixationStore) IsEntryStale(ctx sdk.Context, index string, block uint64) bool { + safeIndex, err := types.SanitizeIndex(index) + if err != nil { + utils.LavaFormatError("IsEntryStale failed (invalid index)", err, + utils.Attribute{Key: "index", Value: index}, + ) + return false + } + entry := fs.getEntry(ctx, safeIndex, block) + return entry.IsStale(ctx) +} + +// HasEntry returns true if an entry version exists for the given index, block tuple +// (any kind of entry, even deleted or stale). +func (fs *FixationStore) HasEntry(ctx sdk.Context, index string, block uint64) bool { + safeIndex, err := types.SanitizeIndex(index) + if err != nil { + utils.LavaFormatError("HasEntry failed (invalid index)", err, + utils.Attribute{Key: "index", Value: index}, + ) + return false + } + return fs.hasEntry(ctx, safeIndex, block) +} + // GetEntry returns the latest entry by index and increments the refcount func (fs *FixationStore) GetEntry(ctx sdk.Context, index string, entryData codec.ProtoMarshaler) bool { safeIndex, err := types.SanitizeIndex(index) if err != nil { - utils.LavaFormatError("GetEntry failed", err, + utils.LavaFormatError("GetEntry failed (invalid index)", err, utils.Attribute{Key: "index", Value: index}, ) return false @@ -572,7 +664,13 @@ func (fs *FixationStore) GetEntry(ctx sdk.Context, index string, entryData codec // putEntry decrements the refcount of an entry and marks for staleness if needed func (fs *FixationStore) putEntry(ctx sdk.Context, entry types.Entry) { if entry.Refcount == 0 { - panic("Fixation: prefix " + fs.prefix + ": negative refcount safeIndex: " + entry.Index) + // panic:ok: double putEntry() is bad news like double free + safeIndex := types.SafeIndex(entry.Index) + utils.LavaFormatPanic("fixation: putEntry invalid refcount state", + fmt.Errorf("unable to put entry with refcount 0"), + utils.Attribute{Key: "prefix", Value: fs.prefix}, + utils.Attribute{Key: "index", Value: types.DesanitizeIndex(safeIndex)}, + ) } entry.Refcount -= 1 @@ -580,7 +678,7 @@ func (fs *FixationStore) putEntry(ctx sdk.Context, entry types.Entry) { if entry.Refcount == 0 { // never overflows because ctx.BlockHeight is int64 entry.StaleAt = uint64(ctx.BlockHeight()) + uint64(types.STALE_ENTRY_TIME) - key := encodeForTimer(entry.Index, entry.Block, timerStaleEntry) + key := encodeForTimer(entry.SafeIndex(), entry.Block, timerStaleEntry) fs.tstore.AddTimerByBlockHeight(ctx, entry.StaleAt, key, []byte{}) } @@ -588,10 +686,15 @@ func (fs *FixationStore) putEntry(ctx sdk.Context, entry types.Entry) { } // PutEntry finds the entry by index and block and decrements the refcount +// (should be called only for existing entries; will panic otherwise) func (fs *FixationStore) PutEntry(ctx sdk.Context, index string, block uint64) { safeIndex, err := types.SanitizeIndex(index) if err != nil { - panic("PutEntry invalid non-ascii entry: " + index) + // panic:ok: entry expected to exist as is + utils.LavaFormatPanic("fixation: PutEntry failed (invalid index)", err, + utils.Attribute{Key: "prefix", Value: fs.prefix}, + utils.Attribute{Key: "index", Value: index}, + ) } entry := fs.getEntry(ctx, safeIndex, block) @@ -603,13 +706,16 @@ func (fs *FixationStore) PutEntry(ctx sdk.Context, index string, block uint64) { func (fs *FixationStore) DelEntry(ctx sdk.Context, index string, block uint64) error { safeIndex, err := types.SanitizeIndex(index) if err != nil { - return sdkerrors.ErrNotFound.Wrapf("invalid non-ascii index") + return sdkerrors.ErrNotFound.Wrapf("invalid non-ascii index: %s", index) } ctxBlock := uint64(ctx.BlockHeight()) if block < ctxBlock { - panic(fmt.Sprintf("DelEntry for block %d < current ctx block %d", block, ctxBlock)) + return utils.LavaFormatError("critical: DelEntry of past block", + fmt.Errorf("delete requested at an old block %d < %d", block, ctxBlock), + utils.Attribute{Key: "index", Value: index}, + ) } entry, found := fs.getUnmarshaledEntryForBlock(ctx, safeIndex, block) @@ -635,7 +741,7 @@ func (fs *FixationStore) DelEntry(ctx sdk.Context, index string, block uint64) e } // removeEntry removes an entry from the store -func (fs *FixationStore) removeEntry(ctx sdk.Context, safeIndex string, block uint64) { +func (fs *FixationStore) removeEntry(ctx sdk.Context, safeIndex types.SafeIndex, block uint64) { store := fs.getEntryStore(ctx, safeIndex) store.Delete(types.EncodeKey(block)) } @@ -680,7 +786,7 @@ func (fs *FixationStore) getEntryVersionsFilter(ctx sdk.Context, index string, b // and onward, and not more than delta blocks further (skip stale entries). func (fs *FixationStore) GetEntryVersionsRange(ctx sdk.Context, index string, block, delta uint64) (blocks []uint64) { filter := func(entry *types.Entry) bool { - if entry.IsStaleBy(uint64(ctx.BlockHeight())) { + if entry.IsStale(ctx) { return false } if entry.Block > block+delta { diff --git a/common/fixation_entry_index.go b/common/fixation_entry_index.go index 75b18dc44d..a514018506 100644 --- a/common/fixation_entry_index.go +++ b/common/fixation_entry_index.go @@ -30,21 +30,21 @@ func (fs *FixationStore) getEntryIndexStore(ctx sdk.Context) *prefix.Store { } // setEntryIndex stores an Entry index in the store -func (fs FixationStore) setEntryIndex(ctx sdk.Context, safeIndex string, live bool) { +func (fs FixationStore) setEntryIndex(ctx sdk.Context, safeIndex types.SafeIndex, live bool) { types.AssertSanitizedIndex(safeIndex, fs.prefix) store := fs.getEntryIndexStore(ctx) value := types.EntryIndexLive if !live { value = types.EntryIndexDead } - store.Set(types.KeyPrefix(safeIndex), value) + store.Set(types.KeyPrefix(string(safeIndex)), value) } // removeEntryIndex removes an Entry index from the store -func (fs FixationStore) removeEntryIndex(ctx sdk.Context, safeIndex string) { +func (fs FixationStore) removeEntryIndex(ctx sdk.Context, safeIndex types.SafeIndex) { types.AssertSanitizedIndex(safeIndex, fs.prefix) store := fs.getEntryIndexStore(ctx) - store.Delete(types.KeyPrefix(safeIndex)) + store.Delete(types.KeyPrefix(string(safeIndex))) } // AllEntryIndicesFilter returns all Entry indices with a given prefix and filtered @@ -59,7 +59,7 @@ func (fs FixationStore) AllEntryIndicesFilter(ctx sdk.Context, prefix string, fi indexList := []string{} for ; iterator.Valid(); iterator.Next() { key, value := iterator.Key(), iterator.Value() - safeIndex := string(key) + safeIndex := types.SafeIndex(key) types.AssertSanitizedIndex(safeIndex, fs.prefix) if filter == nil || filter(key, value) { indexList = append(indexList, types.DesanitizeIndex(safeIndex)) diff --git a/common/fixation_entry_test.go b/common/fixation_entry_test.go index 5a1c2e8333..6430e64f5c 100644 --- a/common/fixation_entry_test.go +++ b/common/fixation_entry_test.go @@ -88,6 +88,12 @@ func testWithFixationTemplate(t *testing.T, playbook []fixationTemplate, countOb } else { require.False(t, found, what) } + case "has": //nolint:goconst + has := fs[play.store].HasEntry(ctx, index, block) + require.Equal(t, !play.fail, has, what) + case "stale": + stale := fs[play.store].IsEntryStale(ctx, index, block) + require.Equal(t, play.fail, stale, what) case "get": found := fs[play.store].GetEntry(ctx, index, &dummy) if !play.fail { @@ -145,12 +151,18 @@ func TestFixationEntryAdditionAndRemoval(t *testing.T) { playbook := []fixationTemplate{ {op: "append", name: "entry #1", count: block0, coin: 0}, {op: "find", name: "entry #1", count: block0, coin: 0}, + {op: "has", name: "entry #1", count: block0, coin: 0}, + {op: "stale", name: "entry #1", count: block0, coin: 0}, {op: "getall", name: "to check exactly one index", count: 1}, {op: "append", name: "entry #2", count: block1, coin: 1}, + {op: "has", name: "entry #1 (again)", count: block0, coin: 0}, + {op: "has", name: "entry #2", count: block1, coin: 0}, // entry #1 not deleted because not enough time with refcount = zero + {op: "has", name: "entry #1 (not stale yet)", count: block0}, {op: "find", name: "entry #1 (not stale yet)", count: block0}, {op: "block", name: "add STALE_ENTRY_TIME+1", count: types.STALE_ENTRY_TIME + 1}, // entry #1 now deleted because blocks advanced by STALE_ENTRY_TIME+1 + {op: "has", name: "entry #1 (now stale/gone)", count: block0, fail: true}, {op: "find", name: "entry #1 (now stale/gone)", count: block0, fail: true}, {op: "find", name: "latest entry", coin: 1}, {op: "getall", name: "to check again exactly one index", count: 1}, @@ -223,13 +235,19 @@ func TestEntryStale(t *testing.T) { {op: "append", name: "entry #3", count: block2, coin: 2}, // entry #1 should not be deleted because it has refcount != zero); // entry #2 (refcount = zero) also not deleted because it is not oldest + {op: "has", name: "entry #1", count: block0, coin: 0}, + {op: "has", name: "entry #2", count: block1, coin: 1}, {op: "find", name: "entry #1", count: block0 + 1, coin: 0}, {op: "find", name: "entry #2", count: block1 + 1, coin: 1}, {op: "block", name: "add STALE_ENTRY_TIME+1", count: types.STALE_ENTRY_TIME + 1}, // entry #2 now stale and therefore should not be visible {op: "find", name: "entry #2", count: block1 + 1, fail: true}, + // but should still be positive for HasEntry() + {op: "has", name: "entry #2 (still positive)", count: block1}, + {op: "stale", name: "entry #2 (still positive)", count: block1, fail: true}, // entry #3 (refcount = zero) is old, but being the latest it always // remains visible (despite of refcount and age). + {op: "has", name: "entry #3", count: block2, coin: 2}, {op: "find", name: "entry #3", count: block2 + 1, coin: 2}, } @@ -323,7 +341,8 @@ func TestDelEntry(t *testing.T) { {op: "find", name: "entry #1 version 0", coin: 0, count: block0}, {op: "find", name: "entry #1 version 1", coin: 1, count: block1}, // entry #1 find beyond the delete should fail - {op: "find", name: "entry #1 version 1", count: block2, fail: true}, + {op: "has", name: "entry #1 version 1 (deleted)", count: block2, fail: true}, + {op: "find", name: "entry #1 version 1 (deleted)", count: block2, fail: true}, } testWithFixationTemplate(t, playbook, 3, 1) diff --git a/common/fixation_migrate.go b/common/fixation_migrate.go index 25fdfa6b42..24b4a19d1e 100644 --- a/common/fixation_migrate.go +++ b/common/fixation_migrate.go @@ -187,7 +187,7 @@ func fixationMigrate3to4(ctx sdk.Context, fs *FixationStore) error { // if StaleAt is set, then replace old style timer with new style timer if entry.StaleAt != math.MaxUint && entry.StaleAt > ctxBlock { fs.tstore.DelTimerByBlockHeight(ctx, entry.StaleAt, []byte{}) - key := encodeForTimer(entry.Index, entry.Block, timerStaleEntry) + key := encodeForTimer(entry.SafeIndex(), entry.Block, timerStaleEntry) fs.tstore.AddTimerByBlockHeight(ctx, entry.StaleAt, key, []byte{}) } diff --git a/common/fixation_migrate_test.go b/common/fixation_migrate_test.go index bebc59e3a7..197a27e8c2 100644 --- a/common/fixation_migrate_test.go +++ b/common/fixation_migrate_test.go @@ -104,11 +104,11 @@ type mockEntry2to3 struct { } // V2_setEntryIndex stores an Entry index in the store -func (fs FixationStore) V2_setEntryIndex(ctx sdk.Context, safeIndex string) { +func (fs FixationStore) V2_setEntryIndex(ctx sdk.Context, safeIndex types.SafeIndex) { storePrefix := types.EntryIndexPrefix + fs.prefix store := prefix.NewStore(ctx.KVStore(fs.storeKey), types.KeyPrefix(storePrefix)) appendedValue := []byte(safeIndex) // convert the index value to a byte array - store.Set(types.KeyPrefix(storePrefix+safeIndex), appendedValue) + store.Set(types.KeyPrefix(storePrefix+string(safeIndex)), appendedValue) } // V2_setEntry modifies an existing entry in the store @@ -167,7 +167,7 @@ func TestMigrate2to3(t *testing.T) { numHeads += 1 } entry := types.Entry{ - Index: safeIndex, + Index: string(safeIndex), Block: tt.block, StaleAt: math.MaxUint64, Data: fs.cdc.MustMarshal(&coin), diff --git a/common/timer.go b/common/timer.go index 8b3c61d798..70f9964453 100644 --- a/common/timer.go +++ b/common/timer.go @@ -22,6 +22,8 @@ import ( // - WithCallbackByBlockTime(callback): sets the callback for block-time timers // - AddTimerByBlockHeight(ctx, block, key, data): add a timer to expire at block height // - AddTimerByBlockTime(ctx, timestamp, key, data): add timer to expire at block timestamp +// - HasTimerByBlockHeight(ctx, block, key): check whether a timer exists at block height +// - HasTimerByBlockTime(ctx, timestamp, key): check whether a timer exists at block timestamp // - DelTimerByBlockHeight(ctx, block, key): delete a timer to expire at block height // - DelTimerByBlockTime(ctx, timestamp, key): delete timer to expire at block timestamp // - Tick(ctx): advance the timer to the ctx's block (height and timestamp) @@ -37,6 +39,10 @@ import ( // When the expiry block (or block time) arrives, the respective callback will be invoked with // the timer's _key_ and _data_. Adding the same timer again (i.e. same expiry block/block-time // and same key) will overwrite the exiting timer's data. +// Trying to add a timer with expiry block not in the future, or expiry time not later than +// current block's timestamp, will cause a panic. +// Existence of a timer can be checked using HasTimerByBlockHeight() and HasTimerByBlockTimer(), +// respectively. These return true if a timer exists that matches the block/block-time and key. // An existing timer can be deleted using DelTimerByBlockHeight() and AddTimerByBlockTime(), // respectively. The timer to be deleted must exactly match the block/block-time and key. Trying // to delete a non-existing timer will cause a panic. @@ -196,10 +202,18 @@ func (tstore *TimerStore) addTimer(ctx sdk.Context, which types.TimerType, value } } +func (tstore *TimerStore) hasTimer(ctx sdk.Context, which types.TimerType, value uint64, key []byte) bool { + store := tstore.getStoreTimer(ctx, which) + timerKey := types.EncodeBlockAndKey(value, key) + return store.Has(timerKey) +} + func (tstore *TimerStore) delTimer(ctx sdk.Context, which types.TimerType, value uint64, key []byte) { store := tstore.getStoreTimer(ctx, which) timerKey := types.EncodeBlockAndKey(value, key) if !store.Has(timerKey) { + // panic:ok: caller should only try to delete existing timers + // (use HasTimerByBlock{Height,Time} to check if a timer exists) panic(fmt.Sprintf("delTimer which %d block %d key %v: no such timer", which, value, key)) } store.Delete(timerKey) @@ -209,6 +223,7 @@ func (tstore *TimerStore) delTimer(ctx sdk.Context, which types.TimerType, value // If a timer for that tuple exists, it will be overridden. func (tstore *TimerStore) AddTimerByBlockHeight(ctx sdk.Context, block uint64, key []byte, data []byte) { if block <= uint64(ctx.BlockHeight()) { + // panic:ok: caller should never add a timer with past expiry panic(fmt.Sprintf("timer expiry block %d smaller than ctx block %d", block, uint64(ctx.BlockHeight()))) } @@ -219,18 +234,29 @@ func (tstore *TimerStore) AddTimerByBlockHeight(ctx sdk.Context, block uint64, k // If a timer for that tuple exists, it will be overridden. func (tstore *TimerStore) AddTimerByBlockTime(ctx sdk.Context, timestamp uint64, key []byte, data []byte) { if timestamp <= uint64(ctx.BlockTime().UTC().Unix()) { + // panic:ok: caller should never add a timer with past expiry panic(fmt.Sprintf("timer expiry time %d smaller than ctx time %d", timestamp, uint64(ctx.BlockTime().UTC().Unix()))) } tstore.addTimer(ctx, types.BlockTime, timestamp, key, data) } +// HasTimerByBlockHeight checks whether a timer exists for the tuple. +func (tstore *TimerStore) HasTimerByBlockHeight(ctx sdk.Context, block uint64, key []byte) bool { + return tstore.hasTimer(ctx, types.BlockHeight, block, key) +} + +// HasTimerByBlockTime checks whether a timer exists for the tuple. +func (tstore *TimerStore) HasTimerByBlockTime(ctx sdk.Context, timestamp uint64, key []byte) bool { + return tstore.hasTimer(ctx, types.BlockTime, timestamp, key) +} + // DelTimerByBlockHeight removes an existing timer for the tuple. func (tstore *TimerStore) DelTimerByBlockHeight(ctx sdk.Context, block uint64, key []byte) { tstore.delTimer(ctx, types.BlockHeight, block, key) } -// DelTimerByBlockHeight removes an existing timer for the tuple. +// DelTimerByBlockTime removes an existing timer for the tuple. func (tstore *TimerStore) DelTimerByBlockTime(ctx sdk.Context, timestamp uint64, key []byte) { tstore.delTimer(ctx, types.BlockTime, timestamp, key) } diff --git a/common/timer_test.go b/common/timer_test.go index c486764d3d..6747dde63f 100644 --- a/common/timer_test.go +++ b/common/timer_test.go @@ -78,10 +78,16 @@ func testWithTimerTemplate(t *testing.T, playbook []timerTemplate, countTS int) switch play.op { case "addheight": tstore[play.store].AddTimerByBlockHeight(ctx, play.value, key, data) + case "hasheight": + has := tstore[play.store].HasTimerByBlockHeight(ctx, play.value, key) + require.Equal(t, play.data == "has", has) case "delheight": tstore[play.store].DelTimerByBlockHeight(ctx, play.value, key) case "addtime": tstore[play.store].AddTimerByBlockTime(ctx, play.value, key, data) + case "hastime": + has := tstore[play.store].HasTimerByBlockTime(ctx, play.value, key) + require.Equal(t, play.data == "has", has) case "deltime": tstore[play.store].DelTimerByBlockTime(ctx, play.value, key) case "nextheight": @@ -118,11 +124,14 @@ func TestTimerBlockHeight(t *testing.T) { {op: "nextheight", name: "next timeout infinity", value: math.MaxUint64}, {op: "tickheight", name: "tick without timers", value: 100, fire: 0}, {op: "addheight", name: "add timer no-1", value: 120, key: "a", data: "no-1."}, + {op: "hasheight", name: "has timer no-1", value: 120, key: "a", data: "has"}, {op: "nextheight", name: "next timeout no-1", value: 120}, {op: "tickheight", name: "tick before timer no-1", value: 110, fire: 0}, {op: "tickheight", name: "tick after timer no-1", value: 130, key: "a", fire: 1, data: "no-1."}, + {op: "hasheight", name: "gone timer no-1", value: 120, key: "a", data: "gone"}, {op: "nextheight", name: "next timeout no-1", value: math.MaxUint64}, - {op: "addheight", name: "add timer no-2", value: 140, data: "no-2.", key: "a"}, + {op: "addheight", name: "add timer no-2", value: 140, key: "a", data: "no-2."}, + {op: "hasheight", name: "has timer no-2", value: 140, key: "a", data: "has"}, {op: "tickheight", name: "tick exactly on timer no-2", value: 140, key: "a", fire: 1, data: "no-2."}, {op: "nextheight", name: "next timeout infinity again", value: math.MaxUint64}, } @@ -136,9 +145,11 @@ func TestTimerBlockTime(t *testing.T) { {op: "nexttime", name: "next timeout infinity", value: math.MaxUint64}, {op: "ticktime", name: "tick without timers", value: 100, fire: 0}, {op: "addtime", name: "add timer no-1", value: 120, key: "b", data: "no-1."}, + {op: "hastime", name: "has timer no-1", value: 120, key: "b", data: "has"}, {op: "nexttime", name: "next timeout no-1", value: 120}, {op: "ticktime", name: "tick before timer no-1", value: 110, fire: 0}, {op: "ticktime", name: "tick after timer no-1", value: 130, key: "b", fire: 1, data: "no-1."}, + {op: "hastime", name: "gone timer no-1", value: 120, key: "b", data: "gone"}, } testWithTimerTemplate(t, playbook, 1) @@ -192,8 +203,10 @@ func TestDeleteTimers(t *testing.T) { {op: "addheight", name: "add timer no 3", value: 140, key: "c", data: "no-3."}, {op: "tickheight", name: "tick before all", value: 110, fire: 0}, {op: "delheight", name: "del timer no 2a", value: 130, key: "bx"}, + {op: "hasheight", name: "gone timer no 2a", value: 130, key: "bx", data: "gone"}, {op: "tickheight", name: "tick between no-2,no-3", value: 135, key: "aby", fire: 2, data: "no-1.no-2b."}, {op: "delheight", name: "del timer no 3", value: 140, key: "c"}, + {op: "hasheight", name: "gone timer no 3", value: 140, key: "c", data: "gone"}, {op: "nextheight", name: "next timeout no-3", value: 140}, {op: "tickheight", name: "tick after all", value: 155, fire: 0, key: "", data: ""}, } diff --git a/common/types/ascii.go b/common/types/ascii.go index 3bf766aadc..51fe071cd8 100644 --- a/common/types/ascii.go +++ b/common/types/ascii.go @@ -49,27 +49,3 @@ func ValidateString(s string, restrictType charRestrictionEnum, disallowedChars return true } - -// sanitizeIdnex checks that a string contains only visible ascii characters -// (i.e. Ascii 32-126), and appends a (ascii) DEL to the index; this ensures -// that an index can never be a prefix of another index. -func SanitizeIndex(index string) (string, error) { - for i := 0; i < len(index); i++ { - if index[i] < ASCII_MIN || index[i] > ASCII_MAX { - return index, ErrInvalidIndex - } - } - return index + string([]byte{ASCII_DEL}), nil -} - -// desantizeIndex reverts the effect of sanitizeIndex - removes the trailing -// (ascii) DEL terminator. -func DesanitizeIndex(safeIndex string) string { - return safeIndex[0 : len(safeIndex)-1] -} - -func AssertSanitizedIndex(safeIndex string, prefix string) { - if []byte(safeIndex)[len(safeIndex)-1] != ASCII_DEL { - panic("Fixation: prefix " + prefix + ": unsanitized safeIndex: " + safeIndex) - } -} diff --git a/common/types/fixationEntry.go b/common/types/fixationEntry.go index 50816a9033..d2f32d43be 100644 --- a/common/types/fixationEntry.go +++ b/common/types/fixationEntry.go @@ -6,7 +6,42 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -// IsStale tests whether an entry is stale, i.e. has refcount zero _and_ +// SafeIndex is a sanitized string, i.e. contains only visible ascii characters +// (i.e. Ascii 32-126), and terminates with (ascii) DEL; this ensures that an +// index can never be a prefix of another index. +type SafeIndex string + +// sanitizeIndex checks that a string contains only visible ascii characters +// (i.e. Ascii 32-126), and appends a (ascii) DEL to the index; this ensures +// that an index can never be a prefix of another index. +func SanitizeIndex(index string) (SafeIndex, error) { + for i := 0; i < len(index); i++ { + if index[i] < ASCII_MIN || index[i] > ASCII_MAX { + return SafeIndex(""), ErrInvalidIndex + } + } + return SafeIndex(index + string([]byte{ASCII_DEL})), nil +} + +// desantizeIndex reverts the effect of SanitizeIndex - removes the trailing +// (ascii) DEL terminator. +func DesanitizeIndex(safeIndex SafeIndex) string { + return string(safeIndex[0 : len(safeIndex)-1]) +} + +func AssertSanitizedIndex(safeIndex SafeIndex, prefix string) { + if []byte(safeIndex)[len(safeIndex)-1] != ASCII_DEL { + // panic:ok: intended assertion + panic("Fixation: prefix " + prefix + ": unsanitized safeIndex: " + string(safeIndex)) + } +} + +// SafeIndex returns the entry's index +func (entry Entry) SafeIndex() SafeIndex { + return SafeIndex(entry.Index) +} + +// IsStaleBy tests whether an entry is stale, i.e. has refcount zero _and_ // has passed its stale_at time (more than STALE_ENTRY_TIME since deletion). func (entry Entry) IsStaleBy(block uint64) bool { if entry.GetRefcount() == 0 { @@ -17,6 +52,12 @@ func (entry Entry) IsStaleBy(block uint64) bool { return false } +// IsStale tests whether an entry is currently stale, i.e. has refcount zero _and_ +// has passed its stale_at time (more than STALE_ENTRY_TIME since deletion). +func (entry Entry) IsStale(ctx sdk.Context) bool { + return entry.IsStaleBy(uint64(ctx.BlockHeight())) +} + // IsDeletedBy tests whether an entry is deleted, with respect to a given // block, i.e. has entry.DeletAt smaller or equal to that that block. func (entry Entry) IsDeletedBy(block uint64) bool { diff --git a/utils/lavalog.go b/utils/lavalog.go index 537a737b21..e1246c72ba 100644 --- a/utils/lavalog.go +++ b/utils/lavalog.go @@ -19,6 +19,15 @@ const ( EventPrefix = "lava_" ) +const ( + LAVA_LOG_DEBUG = iota + LAVA_LOG_INFO + LAVA_LOG_WARN + LAVA_LOG_ERROR + LAVA_LOG_FATAL + LAVA_LOG_PANIC +) + var JsonFormat = false type Attribute struct { @@ -71,20 +80,22 @@ func LavaFormatLog(description string, err error, attributes []Attribute, severi var logEvent *zerolog.Event switch severity { - case 4: + case LAVA_LOG_PANIC: + // prefix = "Panic:" + logEvent = zerologlog.Panic() + case LAVA_LOG_FATAL: // prefix = "Fatal:" logEvent = zerologlog.Fatal() - - case 3: + case LAVA_LOG_ERROR: // prefix = "Error:" logEvent = zerologlog.Error() - case 2: + case LAVA_LOG_WARN: // prefix = "Warning:" logEvent = zerologlog.Warn() - case 1: + case LAVA_LOG_INFO: logEvent = zerologlog.Info() // prefix = "Info:" - case 0: + case LAVA_LOG_DEBUG: logEvent = zerologlog.Debug() // prefix = "Debug:" } @@ -152,26 +163,30 @@ func LavaFormatLog(description string, err error, attributes []Attribute, severi return errRet } +func LavaFormatPanic(description string, err error, attributes ...Attribute) { + attributes = append(attributes, Attribute{Key: "StackTrace", Value: debug.Stack()}) + LavaFormatLog(description, err, attributes, LAVA_LOG_PANIC) +} + func LavaFormatFatal(description string, err error, attributes ...Attribute) { attributes = append(attributes, Attribute{Key: "StackTrace", Value: debug.Stack()}) - LavaFormatLog(description, err, attributes, 4) - os.Exit(1) + LavaFormatLog(description, err, attributes, LAVA_LOG_FATAL) } func LavaFormatError(description string, err error, attributes ...Attribute) error { - return LavaFormatLog(description, err, attributes, 3) + return LavaFormatLog(description, err, attributes, LAVA_LOG_ERROR) } func LavaFormatWarning(description string, err error, attributes ...Attribute) error { - return LavaFormatLog(description, err, attributes, 2) + return LavaFormatLog(description, err, attributes, LAVA_LOG_WARN) } func LavaFormatInfo(description string, attributes ...Attribute) error { - return LavaFormatLog(description, nil, attributes, 1) + return LavaFormatLog(description, nil, attributes, LAVA_LOG_INFO) } func LavaFormatDebug(description string, attributes ...Attribute) error { - return LavaFormatLog(description, nil, attributes, 0) + return LavaFormatLog(description, nil, attributes, LAVA_LOG_DEBUG) } func FormatStringerList[T fmt.Stringer](description string, listToPrint []T) string { diff --git a/utils/lavaSerializer.go b/utils/serialize.go similarity index 52% rename from utils/lavaSerializer.go rename to utils/serialize.go index 5ce41ca083..fd39ea9f55 100644 --- a/utils/lavaSerializer.go +++ b/utils/serialize.go @@ -12,7 +12,9 @@ func Serialize(data any) []byte { binary.LittleEndian.PutUint64(res, castedData) return res } - panic(fmt.Sprintf("Lava can't Serialize typetype %T!\n", data)) + // panic:ok: validates that the data is of known type; would fail + // on start when all parameters are read in. + panic(fmt.Sprintf("unable to Serialize type %T", data)) } func Deserialize(raw []byte, data any) { @@ -21,5 +23,7 @@ func Deserialize(raw []byte, data any) { *casted = binary.LittleEndian.Uint64(raw) return } - panic(fmt.Sprintf("Lava can't DeSerialize typetype %T!\n", data)) + // panic:ok: validates that the data is of known type; would fail + // on start when all parameters are read in. + panic(fmt.Sprintf("unable to DeSerialize type %T", data)) } diff --git a/utils/vrf.go b/utils/vrf.go deleted file mode 100644 index 467051483d..0000000000 --- a/utils/vrf.go +++ /dev/null @@ -1,215 +0,0 @@ -package utils - -import ( - "bytes" - "encoding/binary" - "fmt" - - "github.com/99designs/keyring" - vrf "github.com/coniks-sys/coniks-go/crypto/vrf" - - "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/types/bech32" - sdkerrors "github.com/cosmos/cosmos-sdk/types/errors" - pairingtypes "github.com/lavanet/lava/x/pairing/types" - tendermintcrypto "github.com/tendermint/tendermint/crypto" -) - -const ( - bechPrefix = "vrf" - pk_vrf_prefix = "vrf-pk-" - sk_vrf_prefix = "vrf-sk-" -) - -var VRFValueAboveReliabilityThresholdError = sdkerrors.New("VRFValueAboveReliabilityThreshold Error", 1, "calculated vrf does not result in a smaller value than threshold") // client could'nt connect to any provider. - -func GetIndexForVrf(vrf []byte, providersCount uint32, reliabilityThreshold uint32) (index int64, err error) { - vrf_num := binary.LittleEndian.Uint32(vrf) - if vrf_num <= reliabilityThreshold { - // need to send relay with VRF - modulo := providersCount - index = int64(vrf_num % modulo) - } else { - index = -1 - err = VRFValueAboveReliabilityThresholdError.Wrapf("Vrf Does not meet threshold: %d VS threshold: %d", vrf_num, reliabilityThreshold) - } - return -} - -func CalculateVrfOnRelay(request *pairingtypes.RelayPrivateData, response *pairingtypes.RelayReply, vrf_sk vrf.PrivateKey, currentEpoch uint64) ([]byte, []byte) { - vrfData0 := FormatDataForVrf(request, response, false, currentEpoch) - vrfData1 := FormatDataForVrf(request, response, true, currentEpoch) - return vrf_sk.Compute(vrfData0), vrf_sk.Compute(vrfData1) -} - -func ProveVrfOnRelay(request *pairingtypes.RelayPrivateData, response *pairingtypes.RelayReply, vrf_sk vrf.PrivateKey, differentiator bool, currentEpoch uint64) (vrf_res []byte, proof []byte) { - vrfData := FormatDataForVrf(request, response, differentiator, currentEpoch) - return vrf_sk.Prove(vrfData) -} - -func CalculateQueryHash(relayReq pairingtypes.RelayPrivateData) (queryHash []byte) { - queryHash = tendermintcrypto.Sha256([]byte(relayReq.String())) - return -} - -func FormatDataForVrf(request *pairingtypes.RelayPrivateData, response *pairingtypes.RelayReply, differentiator bool, currentEpoch uint64) (data []byte) { - // vrf is calculated on: query hash, relayer signature and 0/1 byte - queryHash := CalculateQueryHash(*request) - currentEpochBytes := make([]byte, 8) - binary.LittleEndian.PutUint64(currentEpochBytes, currentEpoch) - if differentiator { - data = bytes.Join([][]byte{queryHash, currentEpochBytes, response.Sig, []uint8{1}}, nil) - } else { - data = bytes.Join([][]byte{queryHash, currentEpochBytes, response.Sig, []uint8{0}}, nil) - } - return -} - -func VerifyVRF(vrfpk string) error { - // everything is okay - if vrfpk == "" { - return fmt.Errorf("can't stake with an empty vrf pk bech32 string") - } - return nil -} - -func GeneratePrivateVRFKey() (vrf.PrivateKey, vrf.PublicKey, error) { - privateKey, err := vrf.GenerateKey(nil) - if err != nil { - return nil, nil, err - } - pk, success := privateKey.Public() - if !success { - return nil, nil, err - } - return privateKey, pk, nil -} - -func GetOrCreateVRFKey(clientCtx client.Context) (sk vrf.PrivateKey, pk *VrfPubKey, err error) { - sk, pk, err = LoadVRFKey(clientCtx) - if err != nil { - sk, pk, err = GenerateVRFKey(clientCtx) - fmt.Printf("Generated New VRF Key: {%X}\n", pk) - } - return -} - -func GenerateVRFKey(clientCtx client.Context) (vrf.PrivateKey, *VrfPubKey, error) { - kr, err := OpenKeyring(clientCtx) - if err != nil { - return nil, nil, err - } - sk, pk, err := GeneratePrivateVRFKey() - if err != nil { - return nil, nil, err - } - key := keyring.Item{Key: pk_vrf_prefix + clientCtx.FromName, Data: pk, Label: "pk", Description: "the vrf public key"} - err = kr.Set(key) - if err != nil { - return nil, nil, err - } - key = keyring.Item{Key: sk_vrf_prefix + clientCtx.FromName, Data: sk, Label: "sk", Description: "the vrf secret key"} - err = kr.Set(key) - if err != nil { - return nil, nil, err - } - return sk, &VrfPubKey{pk: pk}, nil -} - -func OpenKeyring(clientCtx client.Context) (keyring.Keyring, error) { - keyringConfig := keyring.Config{ - AllowedBackends: []keyring.BackendType{keyring.FileBackend}, - ServiceName: "vrf", - KeychainName: "vrf", - FileDir: clientCtx.KeyringDir, - FilePasswordFunc: func(_ string) (string, error) { - return "test", nil - }, - } - kr, err := keyring.Open(keyringConfig) - if err != nil { - return nil, err - } - return kr, nil -} - -func LoadVRFKey(clientCtx client.Context) (vrf.PrivateKey, *VrfPubKey, error) { - kr, err := OpenKeyring(clientCtx) - if err != nil { - return nil, nil, err - } - pkItem, err := kr.Get(pk_vrf_prefix + clientCtx.FromName) - if err != nil { - return nil, nil, err - } - skItem, err := kr.Get(sk_vrf_prefix + clientCtx.FromName) - return skItem.Data, &VrfPubKey{pk: pkItem.Data}, err -} - -// type PubKey interface { -// proto.Message - -// Address() Address -// Bytes() []byte -// VerifySignature(msg []byte, sig []byte) bool -// Equals(PubKey) bool -// Type() string -// } - -type VrfPubKey struct { - pk vrf.PublicKey -} - -func (pk *VrfPubKey) Bytes() []byte { - if pk == nil { - return nil - } - return pk.pk -} - -func (pk *VrfPubKey) DecodeFromBech32(bech32str string) (*VrfPubKey, error) { - hrp, bz, err := bech32.DecodeAndConvert(bech32str) - if hrp != bechPrefix { - return nil, fmt.Errorf("invalid prefix for bech string: %s", hrp) - } - pk.pk = bz - return pk, err -} - -func (pk *VrfPubKey) EncodeBech32() (string, error) { - return bech32.ConvertAndEncode(bechPrefix, pk.Bytes()) -} - -func (pk *VrfPubKey) Equals(pk2 VrfPubKey) bool { - return bytes.Equal(pk.Bytes(), pk2.Bytes()) -} - -func (pk *VrfPubKey) VerifySignature(m []byte, vrfBytes []byte, proof []byte) bool { - return pk.pk.Verify(m, vrfBytes, proof) -} - -// String returns a string representation of the public key -func (pk *VrfPubKey) String() string { - st, err := pk.EncodeBech32() - if err != nil { - return fmt.Sprintf("{%X}", pk.Bytes()) - } - return st -} - -func (pk *VrfPubKey) Reset() { *pk = VrfPubKey{} } - -// // **** Proto Marshaler **** - -// // MarshalTo implements proto.Marshaler interface. -func (pk *VrfPubKey) MarshalTo(dAtA []byte) (int, error) { - bz := pk.Bytes() - copy(dAtA, bz) - return len(bz), nil -} - -// // Unmarshal implements proto.Marshaler interface. -func (pk *VrfPubKey) Unmarshal(bz []byte) error { - pk.pk = bz - return nil -} diff --git a/x/epochstorage/keeper/epoch_details.go b/x/epochstorage/keeper/epoch_details.go index 4f122bb737..f312aa1ee5 100644 --- a/x/epochstorage/keeper/epoch_details.go +++ b/x/epochstorage/keeper/epoch_details.go @@ -1,8 +1,11 @@ package keeper import ( + "fmt" + "github.com/cosmos/cosmos-sdk/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/lavanet/lava/utils" "github.com/lavanet/lava/x/epochstorage/types" ) @@ -35,7 +38,12 @@ func (k Keeper) RemoveEpochDetails(ctx sdk.Context) { func (k Keeper) SetEpochDetailsStart(ctx sdk.Context, block uint64) { details, found := k.GetEpochDetails(ctx) if !found { - panic("did not find EpochDetails") + // panic:ok: EpochDetails is fundamental for operation + utils.LavaFormatPanic("critical: SetEpochDetailsStart failed", + fmt.Errorf("EpochDetails not found"), + utils.LogAttr("block", block), + utils.LogAttr("ctxBlock", ctx.BlockHeight()), + ) } details.StartBlock = block k.SetEpochDetails(ctx, details) @@ -44,7 +52,11 @@ func (k Keeper) SetEpochDetailsStart(ctx sdk.Context, block uint64) { func (k Keeper) GetEpochStart(ctx sdk.Context) uint64 { details, found := k.GetEpochDetails(ctx) if !found { - panic("did not find EpochDetails") + // panic:ok: EpochDetails is fundamental for operation + utils.LavaFormatPanic("critical: GetEpochStart failed", + fmt.Errorf("EpochDetails not found"), + utils.LogAttr("ctxBlock", ctx.BlockHeight()), + ) } return details.StartBlock } @@ -52,7 +64,11 @@ func (k Keeper) GetEpochStart(ctx sdk.Context) uint64 { func (k Keeper) GetEarliestEpochStart(ctx sdk.Context) uint64 { details, found := k.GetEpochDetails(ctx) if !found { - panic("did not find EpochDetails") + // panic:ok: EpochDetails is fundamental for operation + utils.LavaFormatPanic("critical: GetEarliestEpochStart failed", + fmt.Errorf("EpochDetails not found"), + utils.LogAttr("ctxBlock", ctx.BlockHeight()), + ) } return details.EarliestStart } @@ -60,7 +76,11 @@ func (k Keeper) GetEarliestEpochStart(ctx sdk.Context) uint64 { func (k Keeper) GetDeletedEpochs(ctx sdk.Context) []uint64 { details, found := k.GetEpochDetails(ctx) if !found { - panic("did not find EpochDetails") + // panic:ok: EpochDetails is fundamental for operation + utils.LavaFormatPanic("critical: GetDeletedEpochs failed", + fmt.Errorf("EpochDetails not found"), + utils.LogAttr("ctxBlock", ctx.BlockHeight()), + ) } return details.DeletedEpochs } @@ -68,7 +88,12 @@ func (k Keeper) GetDeletedEpochs(ctx sdk.Context) []uint64 { func (k Keeper) SetEarliestEpochStart(ctx sdk.Context, block uint64, deletedEpochs []uint64) { details, found := k.GetEpochDetails(ctx) if !found { - panic("did not find EpochDetails") + // panic:ok: EpochDetails is fundamental for operation + utils.LavaFormatPanic("critical: SetEarliestEpochStart failed", + fmt.Errorf("EpochDetails not found"), + utils.LogAttr("block", block), + utils.LogAttr("ctxBlock", ctx.BlockHeight()), + ) } details.DeletedEpochs = deletedEpochs details.EarliestStart = block diff --git a/x/epochstorage/keeper/keeper.go b/x/epochstorage/keeper/keeper.go index 34918db697..45687d7f16 100644 --- a/x/epochstorage/keeper/keeper.go +++ b/x/epochstorage/keeper/keeper.go @@ -64,7 +64,8 @@ func (k Keeper) Logger(ctx sdk.Context) log.Logger { func (k *Keeper) AddFixationRegistry(fixationKey string, getParamFunction func(sdk.Context) any) { if _, ok := k.fixationRegistries[fixationKey]; ok { - panic(fmt.Sprintf("duplicate fixation registry %s", fixationKey)) + // panic:ok: duplicate fixation registry is severe (triggered at init time) + panic("duplicate fixation registry: " + fixationKey) } k.fixationRegistries[fixationKey] = getParamFunction } diff --git a/x/epochstorage/keeper/params.go b/x/epochstorage/keeper/params.go index cf15085f93..149f4b6f09 100644 --- a/x/epochstorage/keeper/params.go +++ b/x/epochstorage/keeper/params.go @@ -91,6 +91,10 @@ func (k Keeper) IsEpochStart(ctx sdk.Context) (res bool) { return blockInEpoch == 0 } +func (k Keeper) BlocksToSaveRaw(ctx sdk.Context) (res uint64) { + return k.EpochsToSaveRaw(ctx) * k.EpochBlocksRaw(ctx) +} + func (k Keeper) BlocksToSave(ctx sdk.Context, block uint64) (res uint64, erro error) { epochsToSave, err := k.EpochsToSave(ctx, block) epochBlocks, err2 := k.EpochBlocks(ctx, block) diff --git a/x/epochstorage/keeper/stake_storage.go b/x/epochstorage/keeper/stake_storage.go index c18f0ce573..3a8da427ab 100644 --- a/x/epochstorage/keeper/stake_storage.go +++ b/x/epochstorage/keeper/stake_storage.go @@ -4,7 +4,6 @@ import ( "fmt" "sort" "strconv" - "strings" "github.com/cosmos/cosmos-sdk/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" @@ -79,34 +78,45 @@ func (k Keeper) RemoveOldEpochData(ctx sdk.Context) { func (k *Keeper) UpdateEarliestEpochstart(ctx sdk.Context) { currentBlock := uint64(ctx.BlockHeight()) earliestEpochBlock := k.GetEarliestEpochStart(ctx) - blocksToSaveAtEarliestEpoch, err := k.BlocksToSave(ctx, earliestEpochBlock) // we take the epochs memory size at earliestEpochBlock, and not the current one - deletedEpochs := []uint64{} + + // we take the epochs memory size at earliestEpochBlock, and not the current one + blocksToSaveAtEarliestEpoch, err := k.BlocksToSave(ctx, earliestEpochBlock) if err != nil { - // this is critical, no recovery from this - panic(fmt.Sprintf("Critical Error: could not progress EarliestEpochstart %s\nearliestEpochBlock: %d, fixations: %+v", err, earliestEpochBlock, k.GetAllFixatedParams(ctx))) + // panic:ok: critical, no recovery, avoid further corruption + utils.LavaFormatPanic("critical: failed to advance EarliestEpochstart", err, + utils.LogAttr("earliestEpochBlock", earliestEpochBlock), + utils.LogAttr("fixations", k.GetAllFixatedParams(ctx)), + ) } + if currentBlock <= blocksToSaveAtEarliestEpoch { return } + lastBlockInMemory := currentBlock - blocksToSaveAtEarliestEpoch - changed := false + + deletedEpochs := []uint64{} for earliestEpochBlock < lastBlockInMemory { deletedEpochs = append(deletedEpochs, earliestEpochBlock) earliestEpochBlock, err = k.GetNextEpoch(ctx, earliestEpochBlock) if err != nil { - // this is critical, no recovery from this - panic(fmt.Sprintf("Critical Error: could not progress EarliestEpochstart %s", err)) + // panic:ok: critical, no recovery, avoid further corruption + utils.LavaFormatPanic("critical: failed to advance EarliestEpochstart", err, + utils.LogAttr("earliestEpochBlock", earliestEpochBlock), + utils.LogAttr("fixations", k.GetAllFixatedParams(ctx)), + ) } - changed = true } - if !changed { + if len(deletedEpochs) == 0 { return } - logger := k.Logger(ctx) + utils.LogLavaEvent(ctx, k.Logger(ctx), types.EarliestEpochEventName, + map[string]string{"block": strconv.FormatUint(earliestEpochBlock, 10)}, + "updated earliest epoch block") + // now update the earliest epoch start - utils.LogLavaEvent(ctx, logger, types.EarliestEpochEventName, map[string]string{"block": strconv.FormatUint(earliestEpochBlock, 10)}, "updated earliest epoch block") k.SetEarliestEpochStart(ctx, earliestEpochBlock, deletedEpochs) } @@ -114,35 +124,6 @@ func (k Keeper) StakeStorageKey(block uint64, chainID string) string { return strconv.FormatUint(block, 10) + chainID } -func (k Keeper) removeAllEntriesPriorToBlockNumber(ctx sdk.Context, block uint64, allChainID []string) { - allStorage := k.GetAllStakeStorage(ctx) - for _, chainId := range allChainID { - for _, entry := range allStorage { - if strings.Contains(entry.Index, chainId) { - if (len(chainId)) > len(entry.Index) { - panic(fmt.Sprintf("storageType + chainId length out of range %d vs %d\n more info: entry.Index: %s, chainId: %s", len(chainId), len(entry.Index), entry.Index, chainId)) - } - storageBlock := entry.Index[:(len(entry.Index) - len(chainId))] - blockHeight, err := strconv.ParseUint(storageBlock, 10, 64) - if err != nil { - if storageBlock == "" { - // if storageBlock is empty its stake entry current. so we dont remove it. - continue - } - panic("failed to convert storage block to int: " + storageBlock) - } - if blockHeight < block { - k.RemoveStakeStorage(ctx, entry.Index) - } - } - } - } -} - -func (k Keeper) RemoveAllEntriesPriorToBlockNumber(ctx sdk.Context, block uint64, allChainID []string) { - k.removeAllEntriesPriorToBlockNumber(ctx, block, allChainID) -} - func (k Keeper) RemoveStakeStorageByBlockAndChain(ctx sdk.Context, block uint64, chainID string) { key := k.StakeStorageKey(block, chainID) k.RemoveStakeStorage(ctx, key) @@ -170,14 +151,17 @@ func (k Keeper) stakeEntryIndexByAddress(ctx sdk.Context, stakeStorage types.Sta for idx, entry := range entries { entryAddr, err := sdk.AccAddressFromBech32(entry.Address) if err != nil { - panic("invalid account address inside StakeStorage: " + entry.Address) + // this should not happen; to avoid panic we simply skip this one (thus + // freeze the situation so it can be investigated and orderly resolved). + utils.LavaFormatError("critical: invalid account address inside StakeStorage", err, + utils.LogAttr("address", entry.Address), + utils.LogAttr("chainID", entry.Chain), + ) + continue } if entryAddr.Equals(address) { // found the right thing - index = uint64(idx) - found = true - // remove from the stakeStorage, i checked it supports idx == length-1 - return + return uint64(idx), true } } return 0, false @@ -258,7 +242,13 @@ func (k Keeper) ModifyStakeEntryCurrent(ctx sdk.Context, chainID string, stakeEn // this stake storage entries are sorted by stake amount stakeStorage, found := k.GetStakeStorageCurrent(ctx, chainID) if !found { - panic("called modify when there is no stakeStorage") + // should not happen since caller is expected to validate chainID first; + // do nothing and return to avoid panic. + utils.LavaFormatError("critical: ModifyStakeEntryCurrent with unknown chain", errors.ErrNotFound, + utils.LogAttr("chainID", chainID), + utils.LogAttr("stakeAddr", stakeEntry.Address), + ) + return } // TODO: more efficient: only create a new list once, after the second index is identified // remove the given index, then store the new entry in the sorted list at the right place @@ -315,7 +305,11 @@ func (k Keeper) ModifyUnstakeEntry(ctx sdk.Context, stakeEntry types.StakeEntry, // this stake storage entries are sorted by stake amount stakeStorage, found := k.GetStakeStorageUnstake(ctx) if !found { - panic("called modify when there is no stakeStorage") + // should not happen since stake storage must always exist; do nothing to avoid panic + utils.LavaFormatError("critical: ModifyUnstakeEntry failed to get stakeStorage", errors.ErrNotFound, + utils.LogAttr("stakeAddr", stakeEntry.Address), + ) + return } // TODO: more efficient: only create a new list once, after the second index is identified // remove the given index, then store the new entry in the sorted list at the right place diff --git a/x/epochstorage/keeper/stake_storage_test.go b/x/epochstorage/keeper/stake_storage_test.go index 82669f0c5a..cdc5ee281c 100644 --- a/x/epochstorage/keeper/stake_storage_test.go +++ b/x/epochstorage/keeper/stake_storage_test.go @@ -2,6 +2,7 @@ package keeper_test import ( "strconv" + "strings" "testing" sdk "github.com/cosmos/cosmos-sdk/types" @@ -59,6 +60,29 @@ func TestStakeStorageRemove(t *testing.T) { } } +func removeAllEntriesBeforeBlock(keeper keeper.Keeper, ctx sdk.Context, block uint64, allChainID []string) { + allStorage := keeper.GetAllStakeStorage(ctx) + for _, chainId := range allChainID { + for _, entry := range allStorage { + if strings.Contains(entry.Index, chainId) { + storageBlock := entry.Index[:(len(entry.Index) - len(chainId))] + blockHeight, err := strconv.ParseUint(storageBlock, 10, 64) + if err != nil { + if storageBlock == "" { + // empty storageBlock means stake entry current, so skip it + continue + } + panic("failed to decode storage block: " + strconv.Itoa(int(block)) + + "chainID: " + chainId + "index: " + entry.Index) + } + if blockHeight < block { + keeper.RemoveStakeStorage(ctx, entry.Index) + } + } + } + } +} + func TestStakeStorageRemoveAllPriorToBlock(t *testing.T) { // keeper, ctx := keepertest.EpochstorageKeeper(t) _, allkeepers, ctxx := testkeeper.InitAllKeepers(t) @@ -77,29 +101,29 @@ func TestStakeStorageRemoveAllPriorToBlock(t *testing.T) { testkeeper.AdvanceEpoch(ctxx, allkeepers) } - keeper.RemoveAllEntriesPriorToBlockNumber(ctx, 10, []string{"COS3ETH1LAV1COS4"}) + removeAllEntriesBeforeBlock(keeper, ctx, 10, []string{"COS3ETH1LAV1COS4"}) allStorage := keeper.GetAllStakeStorage(ctx) require.Equal(t, len(allStorage), stakeStorageSlots) // no entry was removed - keeper.RemoveAllEntriesPriorToBlockNumber(ctx, 10, []string{"COS3"}) + removeAllEntriesBeforeBlock(keeper, ctx, 10, []string{"COS3"}) allStorage = keeper.GetAllStakeStorage(ctx) require.Equal(t, len(allStorage), stakeStorageSlots) // no entry was removed - keeper.RemoveAllEntriesPriorToBlockNumber(ctx, 0, []string{chainID}) + removeAllEntriesBeforeBlock(keeper, ctx, 0, []string{chainID}) allStorage = keeper.GetAllStakeStorage(ctx) require.Equal(t, len(allStorage), stakeStorageSlots) // no entry was removed - keeper.RemoveAllEntriesPriorToBlockNumber(ctx, 9, []string{chainID}) + removeAllEntriesBeforeBlock(keeper, ctx, 9, []string{chainID}) allStorage = keeper.GetAllStakeStorage(ctx) require.Equal(t, len(allStorage), 1) // one provider - keeper.RemoveAllEntriesPriorToBlockNumber(ctx, 10, []string{chainID}) + removeAllEntriesBeforeBlock(keeper, ctx, 10, []string{chainID}) allStorage = keeper.GetAllStakeStorage(ctx) require.Equal(t, len(allStorage), 0) // zero entries left items[0].Index = strconv.FormatUint(uint64(10), 10) + "" keeper.SetStakeStorage(ctx, items[0]) - keeper.RemoveAllEntriesPriorToBlockNumber(ctx, 11, []string{""}) + removeAllEntriesBeforeBlock(keeper, ctx, 11, []string{""}) allStorage = keeper.GetAllStakeStorage(ctx) require.Equal(t, len(allStorage), 0) // zero entries left } diff --git a/x/epochstorage/module.go b/x/epochstorage/module.go index 706682f197..0db80cbda9 100644 --- a/x/epochstorage/module.go +++ b/x/epochstorage/module.go @@ -144,6 +144,7 @@ func (am AppModule) RegisterServices(cfg module.Configurator) { // register v2 -> v3 migration if err := cfg.RegisterMigration(types.ModuleName, 2, migrator.Migrate2to3); err != nil { + // panic:ok: at start up, migration cannot proceed anyhow panic(fmt.Errorf("%s: failed to register migration to v3: %w", types.ModuleName, err)) } } diff --git a/x/pairing/keeper/badge_used_cu.go b/x/pairing/keeper/badge_used_cu.go index 72ef1f7881..9788b4b8f8 100644 --- a/x/pairing/keeper/badge_used_cu.go +++ b/x/pairing/keeper/badge_used_cu.go @@ -1,8 +1,11 @@ package keeper import ( + "fmt" + "github.com/cosmos/cosmos-sdk/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/lavanet/lava/utils" "github.com/lavanet/lava/x/pairing/types" ) @@ -38,7 +41,15 @@ func (k Keeper) RemoveBadgeUsedCu( if store.Has(badgeUsedCuKey) { store.Delete(badgeUsedCuKey) } else { - panic("could not remove badgeUsedCu entry. key not found " + string(badgeUsedCuKey)) + // badge (epoch) timer has expired for an unknown badge: either the + // timer was set wrongly, or the badge was incorrectly removed; and + // we cannot even return an error about it. + utils.LavaFormatError("critical: epoch expiry for unknown badge, skipping", + fmt.Errorf("badge not found"), + utils.Attribute{Key: "badge", Value: string(badgeUsedCuKey)}, + utils.Attribute{Key: "block", Value: ctx.BlockHeight()}, + ) + return } } @@ -58,9 +69,15 @@ func (k Keeper) GetAllBadgeUsedCu(ctx sdk.Context) (list []types.BadgeUsedCu) { } func (k Keeper) BadgeUsedCuExpiry(ctx sdk.Context, badge types.Badge) uint64 { - blocksToSave, err := k.epochStorageKeeper.BlocksToSave(ctx, uint64(ctx.BlockHeight())) + blocksToSave, err := k.epochStorageKeeper.BlocksToSave(ctx, badge.Epoch) if err != nil { - panic("can't get blocksToSave param. err: " + err.Error()) + utils.LavaFormatError("critical: BadgeUsedCuExpiry failed to get BlocksToSave", err, + utils.LogAttr("badge", badge.Address), + utils.LogAttr("block", ctx.BlockHeight()), + ) + // on error, blocksToSave will be zero, so to avoid immediate expiry (and user + // discontent) use a reasonable default: the current EpochToSave * EpochBlocks + blocksToSave = k.epochStorageKeeper.BlocksToSaveRaw(ctx) } return badge.Epoch + blocksToSave diff --git a/x/pairing/keeper/epoch_payments.go b/x/pairing/keeper/epoch_payments.go index 306aa8de0a..ab3bde3d03 100644 --- a/x/pairing/keeper/epoch_payments.go +++ b/x/pairing/keeper/epoch_payments.go @@ -1,6 +1,7 @@ package keeper import ( + "fmt" "strconv" "github.com/cosmos/cosmos-sdk/store/prefix" @@ -64,11 +65,10 @@ func (k Keeper) GetAllEpochPayments(ctx sdk.Context) (list []types.EpochPayments } // Function to remove epochPayments objects from deleted epochs (older than the chain's memory) -func (k Keeper) RemoveOldEpochPayment(ctx sdk.Context) (err error) { +func (k Keeper) RemoveOldEpochPayment(ctx sdk.Context) { for _, epoch := range k.epochStorageKeeper.GetDeletedEpochs(ctx) { - err = k.RemoveAllEpochPaymentsForBlock(ctx, epoch) + k.RemoveAllEpochPaymentsForBlock(ctx, epoch) } - return } // Function to get the epochPayments object from a specific epoch. Note that it also returns the epochPayments object's key which is the epoch in hex representation (base 16) @@ -115,13 +115,11 @@ func (k Keeper) AddEpochPayment(ctx sdk.Context, chainID string, epoch uint64, p } // Function to remove all epochPayments objects from a specific epoch -func (k Keeper) RemoveAllEpochPaymentsForBlock(ctx sdk.Context, blockForDelete uint64) error { +func (k Keeper) RemoveAllEpochPaymentsForBlock(ctx sdk.Context, blockForDelete uint64) { // get the epochPayments object of blockForDelete epochPayments, found, key := k.GetEpochPaymentsFromBlock(ctx, blockForDelete) if !found { - // return fmt.Errorf("did not find any epochPayments for block %d", blockForDelete.Num) - // no epochPayments object -> do nothing - return nil + return } // go over the epochPayments object's providerPaymentStorageKeys @@ -144,9 +142,14 @@ func (k Keeper) RemoveAllEpochPaymentsForBlock(ctx sdk.Context, blockForDelete u // validate its an old entry, for sanity if uniquePaymentStorage.Block > blockForDelete { - errMsg := "trying to delete a new entry in epoch payments for block" - k.Logger(ctx).Error(errMsg) - panic(errMsg) + // this should not happen; to avoid panic we simply skip this one (thus + // freeze the situation so it can be investigated and orderly resolved). + utils.LavaFormatError("critical: failed to delete epoch payment", + fmt.Errorf("payment block greater than block for delete"), + utils.Attribute{Key: "paymentBlock", Value: uniquePaymentStorage.Block}, + utils.Attribute{Key: "deleteBlock", Value: blockForDelete}, + ) + continue } // delete the uniquePaymentStorageClientProvider object @@ -159,5 +162,4 @@ func (k Keeper) RemoveAllEpochPaymentsForBlock(ctx sdk.Context, blockForDelete u // after we're done deleting the providerPaymentStorage objects, delete the epochPayments object k.RemoveEpochPayments(ctx, key) - return nil } diff --git a/x/pairing/keeper/epoch_start.go b/x/pairing/keeper/epoch_start.go deleted file mode 100644 index 7fc002f42e..0000000000 --- a/x/pairing/keeper/epoch_start.go +++ /dev/null @@ -1,32 +0,0 @@ -package keeper - -import ( - sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/lavanet/lava/utils" -) - -// Function that calls all the functions that are supposed to run in epoch start -func (k Keeper) EpochStart(ctx sdk.Context, epochsNumToCheckCuForUnresponsiveProvider uint64, epochsNumToCheckForComplainers uint64) { - logOnErr := func(err error, failingFunc string) { - if err != nil { - utils.LavaFormatError("failing func: "+failingFunc, err) - } - } - // on session start we need to do: - // 1. remove old session payments - // 2. unstake any unstaking providers - // 3. unstake any unstaking users - // 4. unstake/jail unresponsive providers - - // 1. - err := k.RemoveOldEpochPayment(ctx) - logOnErr(err, "RemoveOldEpochPayment") - - // 2+3. - err = k.CheckUnstakingForCommit(ctx) - logOnErr(err, "CheckUnstakingForCommit") - - // 4. unstake unresponsive providers - err = k.UnstakeUnresponsiveProviders(ctx, epochsNumToCheckCuForUnresponsiveProvider, epochsNumToCheckForComplainers) - logOnErr(err, "UnstakeUnresponsiveProviders") -} diff --git a/x/pairing/keeper/keeper.go b/x/pairing/keeper/keeper.go index 0fdf069257..b6da4ba2f6 100644 --- a/x/pairing/keeper/keeper.go +++ b/x/pairing/keeper/keeper.go @@ -30,6 +30,16 @@ type ( } ) +// sanity checks at start time +func init() { + if types.EPOCHS_NUM_TO_CHECK_CU_FOR_UNRESPONSIVE_PROVIDER == 0 { + panic("types.EPOCHS_NUM_TO_CHECK_FOR_COMPLAINERS == 0") + } + if types.EPOCHS_NUM_TO_CHECK_FOR_COMPLAINERS == 0 { + panic("types.EPOCHS_NUM_TO_CHECK_FOR_COMPLAINERS == 0") + } +} + func NewKeeper( cdc codec.BinaryCodec, storeKey, @@ -80,8 +90,15 @@ func (k Keeper) Logger(ctx sdk.Context) log.Logger { func (k Keeper) BeginBlock(ctx sdk.Context) { k.badgeTimerStore.Tick(ctx) + if k.epochStorageKeeper.IsEpochStart(ctx) { - // run functions that are supposed to run in epoch start - k.EpochStart(ctx, types.EPOCHS_NUM_TO_CHECK_CU_FOR_UNRESPONSIVE_PROVIDER, types.EPOCHS_NUM_TO_CHECK_FOR_COMPLAINERS) + // remove old session payments + k.RemoveOldEpochPayment(ctx) + // unstake any unstaking providers + k.CheckUnstakingForCommit(ctx) + // unstake/jail unresponsive providers + k.UnstakeUnresponsiveProviders(ctx, + types.EPOCHS_NUM_TO_CHECK_CU_FOR_UNRESPONSIVE_PROVIDER, + types.EPOCHS_NUM_TO_CHECK_FOR_COMPLAINERS) } } diff --git a/x/pairing/keeper/msg_server_relay_payment.go b/x/pairing/keeper/msg_server_relay_payment.go index 5d2a336ce4..262d48737a 100644 --- a/x/pairing/keeper/msg_server_relay_payment.go +++ b/x/pairing/keeper/msg_server_relay_payment.go @@ -255,15 +255,19 @@ func (k msgServer) RelayPayment(goCtx context.Context, msg *types.MsgRelayPaymen if !rewardCoins.AmountOf(epochstoragetypes.TokenDenom).IsZero() { err = k.Keeper.bankKeeper.MintCoins(ctx, types.ModuleName, rewardCoins) if err != nil { - utils.LavaFormatError("MintCoins Failed", err) - panic(fmt.Sprintf("module failed to mint coins to give to provider: %s", err)) + // panic:ok: mint coins should never fail + utils.LavaFormatPanic("critical: failed to mint coins to reward provider", err, + utils.Attribute{Key: "provider", Value: providerAddr}, + utils.Attribute{Key: "reward", Value: rewardCoins}, + ) } - // - // Send to provider err = k.bankKeeper.SendCoinsFromModuleToAccount(ctx, types.ModuleName, providerAddr, rewardCoins) if err != nil { - utils.LavaFormatError("SendCoinsFromModuleToAccount Failed", err) - panic(fmt.Sprintf("failed to transfer minted new coins to provider, %s account: %s", err, providerAddr)) + // panic:ok: reward transfer should never fail + utils.LavaFormatPanic("critical: failed to send reward to provider", err, + utils.Attribute{Key: "provider", Value: providerAddr}, + utils.Attribute{Key: "reward", Value: rewardCoins}, + ) } } diff --git a/x/pairing/keeper/msg_server_relay_payment_test.go b/x/pairing/keeper/msg_server_relay_payment_test.go index 63e94e3d85..8709d4ad1b 100644 --- a/x/pairing/keeper/msg_server_relay_payment_test.go +++ b/x/pairing/keeper/msg_server_relay_payment_test.go @@ -833,8 +833,7 @@ func TestBadgeValidation(t *testing.T) { ts.ctx = testkeeper.AdvanceBlock(ts.ctx, ts.keepers) // remove past payments to avoid double spending error (first test had a successful payment) - err = ts.keepers.Pairing.RemoveAllEpochPaymentsForBlock(sdk.UnwrapSDKContext(ts.ctx), tt.epoch) - require.Nil(t, err) + ts.keepers.Pairing.RemoveAllEpochPaymentsForBlock(sdk.UnwrapSDKContext(ts.ctx), tt.epoch) } badge := types.CreateBadge(badgeCuAllocation, tt.epoch, tt.badgeAddress, tt.lavaChainID, []byte{}) sig, err := sigs.SignBadge(tt.badgeSigner.SK, *badge) diff --git a/x/pairing/keeper/pairing.go b/x/pairing/keeper/pairing.go index 958acddef1..1ba00e7c39 100644 --- a/x/pairing/keeper/pairing.go +++ b/x/pairing/keeper/pairing.go @@ -67,7 +67,13 @@ func (k Keeper) VerifyClientStake(ctx sdk.Context, chainID string, clientAddress for i, clientStakeEntry := range userStakedEntries { clientAddr, err := sdk.AccAddressFromBech32(clientStakeEntry.Address) if err != nil { - panic(fmt.Sprintf("invalid user address saved in keeper %s, err: %s", clientStakeEntry.Address, err)) + // this should not happen; to avoid panic we simply skip this one (thus + // freeze the situation so it can be investigated and orderly resolved). + utils.LavaFormatError("critical: invalid account address inside StakeStorage", err, + utils.LogAttr("address", clientStakeEntry.Address), + utils.LogAttr("chainID", clientStakeEntry.Chain), + ) + continue } if clientAddr.Equals(clientAddress) { if clientStakeEntry.StakeAppliedBlock > block { @@ -267,7 +273,13 @@ func (k Keeper) ValidatePairingForClient(ctx sdk.Context, chainID string, client for _, possibleAddr := range validAddresses { providerAccAddr, err := sdk.AccAddressFromBech32(possibleAddr.Address) if err != nil { - panic(fmt.Sprintf("invalid provider address saved in keeper %s, err: %s", providerAccAddr, err)) + // panic:ok: provider address saved on chain must be valid + utils.LavaFormatPanic("critical: invalid provider address for payment", err, + utils.Attribute{Key: "chainID", Value: chainID}, + utils.Attribute{Key: "client", Value: clientAddress}, + utils.Attribute{Key: "provider", Value: providerAccAddr}, + utils.Attribute{Key: "epochBlock", Value: epoch}, + ) } if providerAccAddr.Equals(providerAddress) { @@ -280,8 +292,14 @@ func (k Keeper) ValidatePairingForClient(ctx sdk.Context, chainID string, client func (k Keeper) calculatePairingForClient(ctx sdk.Context, providers []epochstoragetypes.StakeEntry, developerAddress string, epochStartBlock uint64, chainID string, epochHash []byte, providersToPair uint64) (validProviders []epochstoragetypes.StakeEntry, err error) { if epochStartBlock > uint64(ctx.BlockHeight()) { - k.Logger(ctx).Error("\ninvalid session start\n") - panic(fmt.Sprintf("invalid session start saved in keeper %d, current block was %d", epochStartBlock, uint64(ctx.BlockHeight()))) + // panic:ok: provider address saved on chain must be valid + utils.LavaFormatPanic("critical: pairing session start block mismatch", + fmt.Errorf("epoch start block beyond current block height"), + utils.Attribute{Key: "chainID", Value: chainID}, + utils.Attribute{Key: "developer", Value: developerAddress}, + utils.Attribute{Key: "epochBlock", Value: epochStartBlock}, + utils.Attribute{Key: "ctxBlock", Value: ctx.BlockHeight()}, + ) } spec, found := k.specKeeper.GetSpec(ctx, chainID) @@ -291,7 +309,6 @@ func (k Keeper) calculatePairingForClient(ctx sdk.Context, providers []epochstor if spec.ProvidersTypes == spectypes.Spec_dynamic { // calculates a hash and randomly chooses the providers - validProviders = k.returnSubsetOfProvidersByStake(ctx, developerAddress, providers, providersToPair, epochStartBlock, chainID, epochHash) } else { validProviders = k.returnSubsetOfProvidersByHighestStake(ctx, providers, providersToPair) diff --git a/x/pairing/keeper/staking.go b/x/pairing/keeper/staking.go index bd0ddf37b7..0b57cdc83c 100644 --- a/x/pairing/keeper/staking.go +++ b/x/pairing/keeper/staking.go @@ -67,7 +67,7 @@ func (k Keeper) StakeNewEntry(ctx sdk.Context, creator string, chainID string, a return utils.LavaFormatWarning("invalid endpoints implementation for the given spec", err, utils.Attribute{Key: "provider", Value: creator}, utils.Attribute{Key: "endpoints", Value: endpoints}, - utils.Attribute{Key: "Chain", Value: chainID}, + utils.Attribute{Key: "chain", Value: chainID}, utils.Attribute{Key: "geolocation", Value: geolocation}, ) } @@ -161,29 +161,41 @@ func (k Keeper) StakeNewEntry(ctx sdk.Context, creator string, chainID string, a } func (k Keeper) validateGeoLocationAndApiInterfaces(ctx sdk.Context, endpoints []epochstoragetypes.Endpoint, geolocation uint64, chainID string) (endpointsFormatted []epochstoragetypes.Endpoint, err error) { - expectedInterfaces := k.specKeeper.GetExpectedInterfacesForSpec(ctx, chainID, true) - allowedInterfaces := k.specKeeper.GetExpectedInterfacesForSpec(ctx, chainID, false) + expectedInterfaces, err := k.specKeeper.GetExpectedInterfacesForSpec(ctx, chainID, true) + if err != nil { + return nil, fmt.Errorf("expected interfaces: %w", err) + } + allowedInterfaces, err := k.specKeeper.GetExpectedInterfacesForSpec(ctx, chainID, false) + if err != nil { + return nil, fmt.Errorf("allowed interfaces: %w", err) + } + geolocMapRequired := map[epochstoragetypes.EndpointService]struct{}{} geolocMapAllowed := map[epochstoragetypes.EndpointService]struct{}{} geolocations := k.specKeeper.GeolocationCount(ctx) + geolocKey := func(intefaceName string, geolocation uint64, addon string) epochstoragetypes.EndpointService { return epochstoragetypes.EndpointService{ ApiInterface: intefaceName + "_" + strconv.FormatUint(geolocation, 10), Addon: addon, } } + for idx := uint64(0); idx < geolocations; idx++ { // geolocation is a bit mask for areas, each bit turns support for an area geolocZone := geolocation & (1 << idx) if geolocZone != 0 { for expectedEndpointService := range expectedInterfaces { - geolocMapRequired[geolocKey(expectedEndpointService.ApiInterface, geolocZone, expectedEndpointService.Addon)] = struct{}{} + key := geolocKey(expectedEndpointService.ApiInterface, geolocZone, expectedEndpointService.Addon) + geolocMapRequired[key] = struct{}{} } for expectedEndpointService := range allowedInterfaces { - geolocMapAllowed[geolocKey(expectedEndpointService.ApiInterface, geolocZone, expectedEndpointService.Addon)] = struct{}{} + key := geolocKey(expectedEndpointService.ApiInterface, geolocZone, expectedEndpointService.Addon) + geolocMapAllowed[key] = struct{}{} } } } + // check all endpoints only implement expected interfaces for idx, endpoint := range endpoints { endpoint.SetApiInterfacesFromAddons(allowedInterfaces) // support apiInterfaces inside addons list @@ -191,13 +203,12 @@ func (k Keeper) validateGeoLocationAndApiInterfaces(ctx sdk.Context, endpoints [ endpoints[idx] = endpoint for _, endpointService := range endpoint.GetSupportedServices() { key := geolocKey(endpointService.ApiInterface, endpoint.Geolocation, endpointService.Addon) - if _, ok := geolocMapAllowed[key]; ok { - continue - } else { - return nil, fmt.Errorf("servicer implemented api interfaces that are not allowed in the spec: %s, current allowed: %+v", key, geolocMapAllowed) + if _, ok := geolocMapAllowed[key]; !ok { + return nil, fmt.Errorf("servicer implements api interfaces not allowed in the spec: %s, current allowed: %+v", key, geolocMapAllowed) } } } + // check all expected api interfaces are implemented for _, endpoint := range endpoints { for _, endpointService := range endpoint.GetSupportedServices() { @@ -205,9 +216,11 @@ func (k Keeper) validateGeoLocationAndApiInterfaces(ctx sdk.Context, endpoints [ delete(geolocMapRequired, key) // remove this from expected implementations } } - if len(geolocMapRequired) == 0 { - // all interfaces and geolocations were implemented - return endpoints, nil + + if len(geolocMapRequired) != 0 { + return nil, fmt.Errorf("servicer does not implement all expected interfaces for all geolocations: %+v, missing implementation count: %d", geolocMapRequired, len(geolocMapRequired)) } - return nil, fmt.Errorf("not all expected interfaces are implemented for all geolocations: %+v, missing implementation count: %d", geolocMapRequired, len(geolocMapRequired)) + + // all interfaces and geolocations were implemented + return endpoints, nil } diff --git a/x/pairing/keeper/unique_payment_storage_client_provider.go b/x/pairing/keeper/unique_payment_storage_client_provider.go index 2c5fe5eefa..1be1bebf4f 100644 --- a/x/pairing/keeper/unique_payment_storage_client_provider.go +++ b/x/pairing/keeper/unique_payment_storage_client_provider.go @@ -1,8 +1,6 @@ package keeper import ( - "fmt" - "github.com/cosmos/cosmos-sdk/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/address" @@ -86,12 +84,7 @@ func maxAddressLengths() int { } func (k Keeper) EncodeUniquePaymentKey(ctx sdk.Context, projectID string, providerAddress sdk.AccAddress, uniqueIdentifier string, chainID string) string { - maxAdrLengthProvider := maxAddressLengths() - providerLength, clientLength := len(providerAddress.String()), len(projectID) - if providerLength > maxAdrLengthProvider { - panic(fmt.Sprintf("invalid providerAddress found! len(%s) != %d == %d", providerAddress.String(), maxAdrLengthProvider, len(providerAddress.String()))) - } - leadingChar := asciiNumberToChar(clientLength) + leadingChar := asciiNumberToChar(len(projectID)) key := string(leadingChar) + projectID + providerAddress.String() + uniqueIdentifier + chainID return key } diff --git a/x/pairing/keeper/unresponsive_provider.go b/x/pairing/keeper/unresponsive_provider.go index cd4d7577fd..323d00b48c 100644 --- a/x/pairing/keeper/unresponsive_provider.go +++ b/x/pairing/keeper/unresponsive_provider.go @@ -11,10 +11,11 @@ import ( ) // Function that returns a map that links between a provider that should be punished and its providerCuCounterForUnreponsiveness -func (k Keeper) UnstakeUnresponsiveProviders(ctx sdk.Context, epochsNumToCheckCUForUnresponsiveProvider uint64, epochsNumToCheckCUForComplainers uint64) error { +func (k Keeper) UnstakeUnresponsiveProviders(ctx sdk.Context, epochsNumToCheckCUForUnresponsiveProvider uint64, epochsNumToCheckCUForComplainers uint64) { // check the epochsNum consts if epochsNumToCheckCUForComplainers <= 0 || epochsNumToCheckCUForUnresponsiveProvider <= 0 { - return utils.LavaFormatError("epochsNumToCheckCUForUnresponsiveProvider or epochsNumToCheckCUForComplainers are smaller or equal than zero", fmt.Errorf("invalid unresponsive provider consts"), + utils.LavaFormatError("epoch to check CU for unresponsive provider or for complainer is zero", + fmt.Errorf("invalid unresponsive provider consts"), utils.Attribute{Key: "epochsNumToCheckCUForUnresponsiveProvider", Value: epochsNumToCheckCUForUnresponsiveProvider}, utils.Attribute{Key: "epochsNumToCheckCUForComplainers", Value: epochsNumToCheckCUForComplainers}, ) @@ -39,21 +40,22 @@ func (k Keeper) UnstakeUnresponsiveProviders(ctx sdk.Context, epochsNumToCheckCU minHistoryBlock, err := k.getBlockEpochsAgo(ctx, currentEpoch, largerEpochsNumConst+recommendedEpochNumToCollectPayment) if err != nil { // not enough history, do nothing - return nil + return } - // Get the current stake storages (from all chains). stake storages contain a list of stake entries. Each stake storage is for a different chain + // Get the current stake storages (from all chains). + // Stake storages contain a list of stake entries (each for a different chain). providerStakeStorageList := k.getCurrentProviderStakeStorageList(ctx) if len(providerStakeStorageList) == 0 { // no provider is staked -> no one to punish - return nil + return } // Go back recommendedEpochNumToCollectPayment minPaymentBlock, err := k.getBlockEpochsAgo(ctx, currentEpoch, recommendedEpochNumToCollectPayment) if err != nil { // not enough history, do nothiing - return nil + return } // find the minimum number of providers in all the plans @@ -84,7 +86,10 @@ func (k Keeper) UnstakeUnresponsiveProviders(ctx sdk.Context, epochsNumToCheckCU // update the CU count for this provider in providerCuCounterForUnreponsivenessMap providerPaymentStorageKeyList, err := k.countCuForUnresponsiveness(ctx, minPaymentBlock, epochsNumToCheckCUForUnresponsiveProvider, epochsNumToCheckCUForComplainers, providerStakeEntry) if err != nil { - return utils.LavaFormatError("couldn't count CU for unreponsiveness", err) + utils.LavaFormatError("unstake unresponsive providers failed to count CU", err, + utils.Attribute{Key: "provider", Value: providerStakeEntry.Address}, + ) + continue } // providerPaymentStorageKeyList is not empty -> provider should be punished @@ -92,13 +97,14 @@ func (k Keeper) UnstakeUnresponsiveProviders(ctx sdk.Context, epochsNumToCheckCU err = k.punishUnresponsiveProvider(ctx, minPaymentBlock, providerPaymentStorageKeyList, providerStakeEntry.GetAddress(), providerStakeEntry.GetChain()) existingProviders[providerStakeEntry.Geolocation]-- if err != nil { - return utils.LavaFormatError("couldn't punish unresponsive provider", err) + utils.LavaFormatError("unstake unresponsive providers failed to punish provider", err, + utils.Attribute{Key: "provider", Value: providerStakeEntry.Address}, + ) + continue } } } } - - return nil } // getBlockEpochsAgo returns the block numEpochs back from the given blockHeight diff --git a/x/pairing/keeper/unstaking.go b/x/pairing/keeper/unstaking.go index 3dc5989ddc..0e97df2edf 100644 --- a/x/pairing/keeper/unstaking.go +++ b/x/pairing/keeper/unstaking.go @@ -62,51 +62,62 @@ func (k Keeper) UnstakeEntry(ctx sdk.Context, chainID string, creator string, un return k.epochStorageKeeper.AppendUnstakeEntry(ctx, existingEntry, unstakeHoldBlocks) } -func (k Keeper) CheckUnstakingForCommit(ctx sdk.Context) error { +func (k Keeper) CheckUnstakingForCommit(ctx sdk.Context) { // this pops all the entries that had their deadline pass unstakingEntriesToCredit := k.epochStorageKeeper.PopUnstakeEntries(ctx, uint64(ctx.BlockHeight())) if unstakingEntriesToCredit != nil { - err := k.creditUnstakingEntries(ctx, unstakingEntriesToCredit) // true for providers - if err != nil { - panic(err.Error()) - } + k.creditUnstakingEntries(ctx, unstakingEntriesToCredit) // true for providers } +} +func (k Keeper) refundUnstakingProvider(ctx sdk.Context, addr sdk.AccAddress, neededAmount sdk.Coin) error { + moduleBalance := k.bankKeeper.GetBalance(ctx, k.accountKeeper.GetModuleAddress(types.ModuleName), epochstoragetypes.TokenDenom) + if moduleBalance.IsLT(neededAmount) { + return fmt.Errorf("insufficient balance to unstake %s (current balance: %s)", neededAmount, moduleBalance) + } + err := k.bankKeeper.SendCoinsFromModuleToAccount(ctx, types.ModuleName, addr, []sdk.Coin{neededAmount}) + if err != nil { + return fmt.Errorf("failed to send coins from module to %s: %w", addr, err) + } return nil } -func (k Keeper) creditUnstakingEntries(ctx sdk.Context, entriesToUnstake []epochstoragetypes.StakeEntry) error { +func (k Keeper) creditUnstakingEntries(ctx sdk.Context, entriesToUnstake []epochstoragetypes.StakeEntry) { logger := k.Logger(ctx) - verifySufficientAmountAndSendFromModuleToAddress := func(ctx sdk.Context, k Keeper, addr sdk.AccAddress, neededAmount sdk.Coin) (bool, error) { - moduleBalance := k.bankKeeper.GetBalance(ctx, k.accountKeeper.GetModuleAddress(types.ModuleName), epochstoragetypes.TokenDenom) - if moduleBalance.IsLT(neededAmount) { - return false, fmt.Errorf("insufficient balance for unstaking %s current balance: %s", neededAmount, moduleBalance) - } - err := k.bankKeeper.SendCoinsFromModuleToAccount(ctx, types.ModuleName, addr, []sdk.Coin{neededAmount}) - if err != nil { - return false, fmt.Errorf("invalid transfer coins from module, %s to account %s", err, addr) - } - return true, nil - } for _, unstakingEntry := range entriesToUnstake { - details := map[string]string{"spec": unstakingEntry.Chain, "provider": unstakingEntry.Address, "stake": unstakingEntry.Stake.String()} + details := map[string]string{ + "spec": unstakingEntry.Chain, + "provider": unstakingEntry.Address, + "stake": unstakingEntry.Stake.String(), + } + if unstakingEntry.StakeAppliedBlock <= uint64(ctx.BlockHeight()) { - // found an entry that needs handling receiverAddr, err := sdk.AccAddressFromBech32(unstakingEntry.Address) if err != nil { - panic(fmt.Sprintf("error getting AccAddress from : %s error: %s", unstakingEntry.Address, err)) + // this should not happen; to avoid panic we simply skip this one (thus + // freeze the situation so it can be investigated and orderly resolved). + utils.LavaFormatError("critical: failed to get unstaking provider address", err, + utils.Attribute{Key: "spec", Value: unstakingEntry.Chain}, + utils.Attribute{Key: "provider", Value: unstakingEntry.Address}, + ) + continue } if unstakingEntry.Stake.Amount.GT(sdk.ZeroInt()) { // transfer stake money to the stake entry account - valid, err := verifySufficientAmountAndSendFromModuleToAddress(ctx, k, receiverAddr, unstakingEntry.Stake) - if !valid { - details["error"] = err.Error() - utils.LavaFormatError("verifySufficientAmountAndSendFromModuleToAddress Failed", err) - panic(fmt.Sprintf("error unstaking : %s", err)) + err := k.refundUnstakingProvider(ctx, receiverAddr, unstakingEntry.Stake) + if err != nil { + // we should always be able to redund a provider that decides to unstake; + // but to avoid panic, just emit a critical error and proceed + utils.LavaFormatError("critical: failed to refund staked provider", err, + utils.Attribute{Key: "spec", Value: unstakingEntry.Chain}, + utils.Attribute{Key: "provider", Value: receiverAddr}, + utils.Attribute{Key: "stake", Value: unstakingEntry.Stake}, + ) + } else { + utils.LogLavaEvent(ctx, logger, types.ProviderUnstakeEventName, details, "Unstaking Providers Commit") } - utils.LogLavaEvent(ctx, logger, types.ProviderUnstakeEventName, details, "Unstaking Providers Commit") } } else { // found an entry that isn't handled now, but later because its stakeAppliedBlock isnt current block @@ -117,7 +128,6 @@ func (k Keeper) creditUnstakingEntries(ctx sdk.Context, entriesToUnstake []epoch ) } } - return nil } func (k Keeper) unstakeHoldBlocks(ctx sdk.Context, chainID string) (uint64, error) { diff --git a/x/pairing/types/expected_keepers.go b/x/pairing/types/expected_keepers.go index 97081281ae..56cc2a74f4 100644 --- a/x/pairing/types/expected_keepers.go +++ b/x/pairing/types/expected_keepers.go @@ -15,7 +15,7 @@ type SpecKeeper interface { IsSpecFoundAndActive(ctx sdk.Context, chainID string) (foundAndActive bool, found bool) GetSpec(ctx sdk.Context, index string) (val spectypes.Spec, found bool) GeolocationCount(ctx sdk.Context) uint64 - GetExpectedInterfacesForSpec(ctx sdk.Context, chainID string, mandatory bool) (expectedInterfaces map[epochstoragetypes.EndpointService]struct{}) + GetExpectedInterfacesForSpec(ctx sdk.Context, chainID string, mandatory bool) (expectedInterfaces map[epochstoragetypes.EndpointService]struct{}, err error) GetAllChainIDs(ctx sdk.Context) (chainIDs []string) } @@ -29,6 +29,7 @@ type EpochstorageKeeper interface { UnstakeHoldBlocksStatic(ctx sdk.Context, block uint64) (res uint64) IsEpochStart(ctx sdk.Context) (res bool) BlocksToSave(ctx sdk.Context, block uint64) (res uint64, erro error) + BlocksToSaveRaw(ctx sdk.Context) (res uint64) GetEpochStartForBlock(ctx sdk.Context, block uint64) (epochStart uint64, blockInEpoch uint64, err error) GetPreviousEpochStartForBlock(ctx sdk.Context, block uint64) (previousEpochStart uint64, erro error) PopUnstakeEntries(ctx sdk.Context, block uint64) (value []epochstoragetypes.StakeEntry) diff --git a/x/plans/module.go b/x/plans/module.go index 8b0a1811e5..34e134da39 100644 --- a/x/plans/module.go +++ b/x/plans/module.go @@ -138,18 +138,22 @@ func (am AppModule) RegisterServices(cfg module.Configurator) { // register v2 -> v3 migration if err := cfg.RegisterMigration(types.ModuleName, 2, migrator.Migrate2to3); err != nil { + // panic:ok: at start up, migration cannot proceed anyhow panic(fmt.Errorf("%s: failed to register migration to v3: %w", types.ModuleName, err)) } // register v3 -> v4 migration if err := cfg.RegisterMigration(types.ModuleName, 3, migrator.Migrate3to4); err != nil { + // panic:ok: at start up, migration cannot proceed anyhow panic(fmt.Errorf("%s: failed to register migration to v4: %w", types.ModuleName, err)) } // register v4 -> v5 migration if err := cfg.RegisterMigration(types.ModuleName, 4, migrator.Migrate4to5); err != nil { + // panic:ok: at start up, migration cannot proceed anyhow panic(fmt.Errorf("%s: failed to register migration to v5: %w", types.ModuleName, err)) } // register v5 -> v6 migration if err := cfg.RegisterMigration(types.ModuleName, 5, migrator.Migrate5to6); err != nil { + // panic:ok: at start up, migration cannot proceed anyhow panic(fmt.Errorf("%s: failed to register migration to v5: %w", types.ModuleName, err)) } } diff --git a/x/projects/keeper/creation.go b/x/projects/keeper/creation.go index cc87cb099f..524bf86b4c 100644 --- a/x/projects/keeper/creation.go +++ b/x/projects/keeper/creation.go @@ -224,13 +224,14 @@ func (k Keeper) unregisterKey(ctx sdk.Context, key types.ProjectKey, project *ty var devkeyData types.ProtoDeveloperData found = k.developerKeysFS.FindEntry(ctx, key.Key, epoch, &devkeyData) - // check again that the developer key is valid, and that it belongs to the - // project the developer key belongs to a different project. - if !found || devkeyData.ProjectID != project.GetIndex() { - // ... but we already checked above that it belongs to this project, - // so this should never happen! - return utils.LavaFormatError("critical: developer key mapped wrongly", - fmt.Errorf("developer key included in project but mapped to another"), + if !found { + return sdkerrors.ErrNotFound + } + + // the developer key belongs to a different project + if devkeyData.ProjectID != project.GetIndex() { + return utils.LavaFormatWarning("failed to unregister key", sdkerrors.ErrNotFound, + utils.Attribute{Key: "projectID", Value: project.Index}, utils.Attribute{Key: "key", Value: key.Key}, utils.Attribute{Key: "keyTypes", Value: key.Kinds}, utils.Attribute{Key: "projectID", Value: project.GetIndex()}, @@ -251,25 +252,30 @@ func (k Keeper) unregisterKey(ctx sdk.Context, key types.ProjectKey, project *ty func (k Keeper) SnapshotSubscriptionProjects(ctx sdk.Context, subscriptionAddr string) { projects := k.projectsFS.GetAllEntryIndicesWithPrefix(ctx, subscriptionAddr) for _, projectID := range projects { - err := k.snapshotProject(ctx, projectID) - if err != nil { - panic(err) - } + k.snapshotProject(ctx, projectID) } } // snapshot project, create a snapshot of a project and reset the cu -func (k Keeper) snapshotProject(ctx sdk.Context, projectID string) error { +func (k Keeper) snapshotProject(ctx sdk.Context, projectID string) { var project types.Project if found := k.projectsFS.FindEntry(ctx, projectID, uint64(ctx.BlockHeight()), &project); !found { - return utils.LavaFormatWarning("snapshot of project failed, project does not exist", - fmt.Errorf("project not found"), - utils.Attribute{Key: "projectID", Value: projectID}, + utils.LavaFormatError("critical: snapshot of project failed (find)", sdkerrors.ErrKeyNotFound, + utils.Attribute{Key: "project", Value: projectID}, + utils.Attribute{Key: "block", Value: ctx.BlockHeight()}, ) + return } project.UsedCu = 0 project.Snapshot += 1 - return k.projectsFS.AppendEntry(ctx, project.Index, uint64(ctx.BlockHeight()), &project) + err := k.projectsFS.AppendEntry(ctx, project.Index, uint64(ctx.BlockHeight()), &project) + if err != nil { + utils.LavaFormatError("critical: snapshot of project failed (append)", err, + utils.Attribute{Key: "project", Value: projectID}, + utils.Attribute{Key: "block", Value: ctx.BlockHeight()}, + ) + return + } } diff --git a/x/projects/module.go b/x/projects/module.go index 27d59fa0f2..bb11cae1a1 100644 --- a/x/projects/module.go +++ b/x/projects/module.go @@ -139,18 +139,22 @@ func (am AppModule) RegisterServices(cfg module.Configurator) { // register v2 -> v3 migration if err := cfg.RegisterMigration(types.ModuleName, 2, migrator.Migrate2to3); err != nil { + // panic:ok: at start up, migration cannot proceed anyhow panic(fmt.Errorf("%s: failed to register migration to v3: %w", types.ModuleName, err)) } // register v3 -> v4 migration if err := cfg.RegisterMigration(types.ModuleName, 3, migrator.Migrate3to4); err != nil { + // panic:ok: at start up, migration cannot proceed anyhow panic(fmt.Errorf("%s: failed to register migration to v4: %w", types.ModuleName, err)) } // register v4 -> v5 migration if err := cfg.RegisterMigration(types.ModuleName, 4, migrator.Migrate4to5); err != nil { + // panic:ok: at start up, migration cannot proceed anyhow panic(fmt.Errorf("%s: failed to register migration to v5: %w", types.ModuleName, err)) } // register v5 -> v6 migration if err := cfg.RegisterMigration(types.ModuleName, 5, migrator.Migrate5to6); err != nil { + // panic:ok: at start up, migration cannot proceed anyhow panic(fmt.Errorf("%s: failed to register migration to v5: %w", types.ModuleName, err)) } } diff --git a/x/spec/keeper/spec.go b/x/spec/keeper/spec.go index bfc068c8f7..1c647f4a00 100644 --- a/x/spec/keeper/spec.go +++ b/x/spec/keeper/spec.go @@ -205,13 +205,17 @@ func (k Keeper) GetAllChainIDs(ctx sdk.Context) (chainIDs []string) { } // returns map[apiInterface][]addons -func (k Keeper) GetExpectedInterfacesForSpec(ctx sdk.Context, chainID string, mandatory bool) (expectedInterfaces map[epochstoragetypes.EndpointService]struct{}) { +func (k Keeper) GetExpectedInterfacesForSpec(ctx sdk.Context, chainID string, mandatory bool) (expectedInterfaces map[epochstoragetypes.EndpointService]struct{}, err error) { expectedInterfaces = make(map[epochstoragetypes.EndpointService]struct{}) spec, found := k.GetSpec(ctx, chainID) if found && spec.Enabled { spec, err := k.ExpandSpec(ctx, spec) - if err != nil { // should not happen! (all specs on chain must be valid) - panic(err) + if err != nil { + // spec expansion should work because all specs on chain must be valid; + // to avoid panic return an error so the caller can bail. + return nil, utils.LavaFormatError("critical: failed to expand spec on chain", err, + utils.Attribute{Key: "chainID", Value: chainID}, + ) } expectedInterfaces = k.getExpectedInterfacesForSpecInner(&spec, expectedInterfaces, mandatory) } diff --git a/x/spec/module.go b/x/spec/module.go index 0e8cfdca4c..99bd16ff0e 100644 --- a/x/spec/module.go +++ b/x/spec/module.go @@ -144,6 +144,7 @@ func (am AppModule) RegisterServices(cfg module.Configurator) { // register v2 -> v3 migration if err := cfg.RegisterMigration(types.ModuleName, 2, migrator.Migrate2to3); err != nil { + // panic:ok: at start up, migration cannot proceed anyhow panic(fmt.Errorf("%s: failed to register migration to v3: %w", types.ModuleName, err)) } } diff --git a/x/subscription/keeper/subscription.go b/x/subscription/keeper/subscription.go index a5d1be3cb0..f37aab734a 100644 --- a/x/subscription/keeper/subscription.go +++ b/x/subscription/keeper/subscription.go @@ -210,7 +210,7 @@ func (k Keeper) advanceMonth(ctx sdk.Context, subkey []byte) { // subscription (monthly) timer has expired for an unknown subscription: // either the timer was set wrongly, or the subscription was incorrectly // removed; and we cannot even return an error about it. - utils.LavaFormatError("critical: month expirty for unknown subscription, skipping", + utils.LavaFormatError("critical: month expiry for unknown subscription, skipping", fmt.Errorf("subscription not found"), utils.Attribute{Key: "consumer", Value: consumer}, utils.Attribute{Key: "block", Value: block}, @@ -219,6 +219,9 @@ func (k Keeper) advanceMonth(ctx sdk.Context, subkey []byte) { } if sub.DurationLeft == 0 { + // subscription duration has already reached zero before and should have + // been removed before. Extend duration by another month (without adding + // CUs) to allow smooth operation. utils.LavaFormatError("critical: negative duration for subscription, extend by 1 month", fmt.Errorf("negative duration in AdvanceMonth()"), utils.Attribute{Key: "consumer", Value: consumer}, @@ -278,11 +281,13 @@ func (k Keeper) GetPlanFromSubscription(ctx sdk.Context, consumer string) (plans plan, found := k.plansKeeper.FindPlan(ctx, sub.PlanIndex, sub.PlanBlock) if !found { - err := utils.LavaFormatError("can't find plan from subscription with consumer address", sdkerrors.ErrKeyNotFound, + // normally would panic! but can "recover" by ignoring and returning error + err := utils.LavaFormatError("critical: failed to find existing subscription plan", sdkerrors.ErrKeyNotFound, utils.Attribute{Key: "consumer", Value: consumer}, - utils.Attribute{Key: "plan", Value: sub.PlanIndex}, + utils.Attribute{Key: "planIndex", Value: sub.PlanIndex}, + utils.Attribute{Key: "planBlock", Value: sub.PlanBlock}, ) - panic(err) + return plan, err } return plan, nil @@ -296,11 +301,12 @@ func (k Keeper) AddProjectToSubscription(ctx sdk.Context, consumer string, proje plan, found := k.plansKeeper.FindPlan(ctx, sub.PlanIndex, sub.PlanBlock) if !found { - err := utils.LavaFormatError("can't get plan with subscription", sdkerrors.ErrKeyNotFound, - utils.Attribute{Key: "subscription", Value: sub.Creator}, - utils.Attribute{Key: "plan", Value: sub.PlanIndex}, + err := utils.LavaFormatError("critical: failed to find existing subscriptio plan", sdkerrors.ErrKeyNotFound, + utils.Attribute{Key: "consumer", Value: sub.Creator}, + utils.Attribute{Key: "planIndex", Value: sub.PlanIndex}, + utils.Attribute{Key: "planBlock", Value: sub.PlanBlock}, ) - panic(err) + return err } return k.projectsKeeper.CreateProject(ctx, consumer, projectData, plan) diff --git a/x/subscription/module.go b/x/subscription/module.go index 53be601045..37691549ea 100644 --- a/x/subscription/module.go +++ b/x/subscription/module.go @@ -145,6 +145,7 @@ func (am AppModule) RegisterServices(cfg module.Configurator) { // register v2 -> v3 migration if err := cfg.RegisterMigration(types.ModuleName, 2, migrator.Migrate2to3); err != nil { + // panic:ok: at start up, migration cannot proceed anyhow panic(fmt.Errorf("%s: failed to register migration to v3: %w", types.ModuleName, err)) } }