From 03cc0ef1a445c786c2e012d076f66b63593d63ec Mon Sep 17 00:00:00 2001 From: leonz789 Date: Mon, 18 Mar 2024 23:12:01 +0800 Subject: [PATCH] feat(oracle-proto):update caches with recache for aggregatorContext, refactor --- x/oracle/keeper/aggregator/aggregator.go | 20 ++- x/oracle/keeper/cache/caches.go | 180 ++++++--------------- x/oracle/keeper/common/expected_keepers.go | 4 + x/oracle/keeper/keeper.go | 10 ++ x/oracle/keeper/msg_server_create_price.go | 3 +- x/oracle/keeper/prices.go | 34 ++-- x/oracle/keeper/single.go | 119 +++++++++++++- x/oracle/module.go | 52 +++++- 8 files changed, 262 insertions(+), 160 deletions(-) diff --git a/x/oracle/keeper/aggregator/aggregator.go b/x/oracle/keeper/aggregator/aggregator.go index cdaf7cfdb..60a499510 100644 --- a/x/oracle/keeper/aggregator/aggregator.go +++ b/x/oracle/keeper/aggregator/aggregator.go @@ -5,6 +5,7 @@ import ( "math/big" "time" + "github.com/ExocoreNetwork/exocore/x/oracle/keeper/cache" "github.com/ExocoreNetwork/exocore/x/oracle/keeper/common" "github.com/ExocoreNetwork/exocore/x/oracle/types" sdk "github.com/cosmos/cosmos-sdk/types" @@ -113,7 +114,7 @@ func (agc *AggregatorContext) checkMsg(msg *types.MsgCreatePrice) error { return nil } -func (agc *AggregatorContext) FillPrice(msg *types.MsgCreatePrice) (*priceItemKV, *cacheItemM, error) { +func (agc *AggregatorContext) FillPrice(msg *types.MsgCreatePrice) (*priceItemKV, *cache.CacheItemM, error) { feederWorker := agc.aggregators[msg.FeederId] //worker initialzed here reduce workload for Endblocker if feederWorker == nil { @@ -135,9 +136,9 @@ func (agc *AggregatorContext) FillPrice(msg *types.MsgCreatePrice) (*priceItemKV //TODO: check the format Timestamp: time.Now().String(), RoundId: agc.rounds[msg.FeederId].nextRoundId, - }}, &cacheItemM{feederId: msg.FeederId}, nil + }}, &cache.CacheItemM{FeederId: msg.FeederId}, nil } - return nil, &cacheItemM{msg.FeederId, listFilled, msg.Creator}, nil + return nil, &cache.CacheItemM{msg.FeederId, listFilled, msg.Creator}, nil } return nil, nil, errors.New("") @@ -145,7 +146,7 @@ func (agc *AggregatorContext) FillPrice(msg *types.MsgCreatePrice) (*priceItemKV // NewCreatePrice receives msgCreatePrice message, and goes process: filter->aggregator, filter->calculator->aggregator // non-deterministic data will goes directly into aggregator, and deterministic data will goes into calculator first to get consensus on the deterministic id. -func (agc *AggregatorContext) NewCreatePrice(ctx sdk.Context, msg *types.MsgCreatePrice) (*priceItemKV, *cacheItemM, error) { +func (agc *AggregatorContext) NewCreatePrice(ctx sdk.Context, msg *types.MsgCreatePrice) (*priceItemKV, *cache.CacheItemM, error) { if err := agc.checkMsg(msg); err != nil { return nil, nil, err @@ -157,8 +158,9 @@ func (agc *AggregatorContext) NewCreatePrice(ctx sdk.Context, msg *types.MsgCrea // prepare for new roundInfo, just update the status kept in memory // executed at EndBlock stage, seall all success or expired roundInfo // including possible aggregation and state update +// when validatorSet update, set force to true, to seal all alive round // returns: 1st successful sealed, need to be written to KVStore, 2nd: failed sealed tokenId, use previous price to write to KVStore -func (agc *AggregatorContext) SealRound(ctx sdk.Context) (success []*priceItemKV, failed []int32) { +func (agc *AggregatorContext) SealRound(ctx sdk.Context, force bool) (success []*priceItemKV, failed []int32) { //1. check validatorSet udpate //TODO: if validatoSet has been updated in current block, just seal all active rounds and return //1. for sealed worker, the KVStore has been updated @@ -171,7 +173,7 @@ func (agc *AggregatorContext) SealRound(ctx sdk.Context) (success []*priceItemKV case 1: expired := ctx.BlockHeight() >= feeder.EndBlock outOfWindow := uint64(ctx.BlockHeight())-round.basedBlock >= uint64(common.MaxNonce) - if expired || outOfWindow { + if expired || outOfWindow || force { //TODO: WRITE TO KVSTORE with previous round data for this round failed = append(failed, feeder.TokenId) if expired { @@ -180,8 +182,6 @@ func (agc *AggregatorContext) SealRound(ctx sdk.Context) (success []*priceItemKV } else { round.status = 2 agc.aggregators[feederId] = nil - //TODO: WRITE TO KVSTORE with previous round data for this round - failed = append(failed, feeder.TokenId) } } } @@ -194,6 +194,10 @@ func (agc *AggregatorContext) SealRound(ctx sdk.Context) (success []*priceItemKV return } +//func (agc *AggregatorContext) ForceSeal(ctx sdk.Context) (success []*priceItemKV, failed []int32) { +// +//} + func (agc *AggregatorContext) PrepareRound(ctx sdk.Context, block uint64) { //block>0 means recache initialization, all roundInfo is empty if block == 0 { diff --git a/x/oracle/keeper/cache/caches.go b/x/oracle/keeper/cache/caches.go index 67e09a9b4..b3b3dec12 100644 --- a/x/oracle/keeper/cache/caches.go +++ b/x/oracle/keeper/cache/caches.go @@ -3,21 +3,19 @@ package cache import ( "math/big" - "github.com/ExocoreNetwork/exocore/x/oracle/keeper/aggregator" "github.com/ExocoreNetwork/exocore/x/oracle/keeper/common" "github.com/ExocoreNetwork/exocore/x/oracle/types" sdk "github.com/cosmos/cosmos-sdk/types" - stakingTypes "github.com/cosmos/cosmos-sdk/x/staking/types" ) var zeroBig = big.NewInt(0) -type cacheItemV map[string]*big.Int -type cacheItemP *common.Params -type cacheItemM struct { - feederId int32 - pSources []*types.PriceWithSource - validator string +type CacheItemV map[string]*big.Int +type CacheItemP *common.Params +type CacheItemM struct { + FeederId int32 + PSources []*types.PriceWithSource + Validator string } type Cache struct { @@ -26,7 +24,7 @@ type Cache struct { params *cacheParams } -type cacheMsgs map[int32][]*cacheItemM +type cacheMsgs map[int32][]*CacheItemM // used to track validator change type cacheValidator struct { @@ -40,11 +38,11 @@ type cacheParams struct { update bool } -func (c cacheMsgs) add(item *cacheItemM) { - c[item.feederId] = append(c[item.feederId], item) +func (c cacheMsgs) add(item *CacheItemM) { + c[item.FeederId] = append(c[item.FeederId], item) } -func (c cacheMsgs) remove(item *cacheItemM) { - delete(c, item.feederId) +func (c cacheMsgs) remove(item *CacheItemM) { + delete(c, item.FeederId) } func (c cacheMsgs) commit(ctx sdk.Context, k common.KeeperOracle) { @@ -56,9 +54,9 @@ func (c cacheMsgs) commit(ctx sdk.Context, k common.KeeperOracle) { for _, msgs4Feeder := range c { for _, msg := range msgs4Feeder { recentMsgs.Msgs = append(recentMsgs.Msgs, &types.MsgItem{ - FeederId: msg.feederId, - PSources: msg.pSources, - Validator: msg.validator, + FeederId: msg.FeederId, + PSources: msg.PSources, + Validator: msg.Validator, }) } } @@ -124,12 +122,12 @@ func (c *cacheParams) commit(ctx sdk.Context, k common.KeeperOracle) { // memory cache func (c *Cache) AddCache(i any, k common.KeeperOracle) { switch item := i.(type) { - case *cacheItemM: + case *CacheItemM: c.msg.add(item) // case *params: - case cacheItemP: + case CacheItemP: c.params.add(item) - case cacheItemV: + case CacheItemV: c.validators.add(item) default: panic("no other types are support") @@ -137,16 +135,45 @@ func (c *Cache) AddCache(i any, k common.KeeperOracle) { } func (c *Cache) RemoveCache(i any, k common.KeeperOracle) { switch item := i.(type) { - case *cacheItemM: + case *CacheItemM: c.msg.remove(item) default: } } +func (c *Cache) GetCache(i any) bool { + switch item := i.(type) { + case CacheItemV: + if item == nil { + return false + } + for addr, power := range c.validators.validators { + item[addr] = power + } + case CacheItemP: + if item == nil { + return false + } + *item = *(c.params.params) + case *[]*CacheItemM: + if item == nil { + return false + } + tmp := make([]*CacheItemM, 0, len(c.msg)) + for _, msgs := range c.msg { + tmp = append(tmp, msgs...) + } + *item = tmp + default: + return false + } + return true +} + func (c *Cache) CommitCache(ctx sdk.Context, reset bool, k common.KeeperOracle) { if len(c.msg) > 0 { c.msg.commit(ctx, k) - c.msg = make(map[int32][]*cacheItemM) + c.msg = make(map[int32][]*CacheItemM) } if c.validators.update { @@ -169,7 +196,7 @@ func (c *Cache) ResetCaches() { func NewCache() *Cache { return &Cache{ - msg: make(map[int32][]*cacheItemM), + msg: make(map[int32][]*CacheItemM), validators: &cacheValidator{ validators: make(map[string]*big.Int), }, @@ -178,112 +205,3 @@ func NewCache() *Cache { }, } } - -func (c *Cache) RecacheAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k common.KeeperOracle) bool { - - from := uint64(ctx.BlockHeight()) - common.MaxNonce - to := uint64(ctx.BlockHeight()) - 1 - - h, ok := k.GetValidatorUpdateBlock(ctx) - recentParamsMap := k.GetAllRecentParamsAsMap(ctx) - if !ok || recentParamsMap == nil { - //no cache, this is the very first running, so go to initial proces instead - return false - } - - if h.Block > from { - from = h.Block - } - - totalPower := big.NewInt(0) - validatorPowers := make(map[string]*big.Int) - k.IterateBondedValidatorsByPower(ctx, func(index int64, validator stakingTypes.ValidatorI) bool { - power := big.NewInt(validator.GetConsensusPower(validator.GetBondedTokens())) - addr := string(validator.GetOperator()) - validatorPowers[addr] = power - totalPower = new(big.Int).Add(totalPower, power) - return false - }) - agc.SetValidatorPowers(validatorPowers) - agc.SetTotalPower(totalPower) - //TODO: test only - if k.GetLastTotalPower(ctx).Cmp(totalPower) != 0 { - panic("something wrong when get validatorsPower from staking module") - } - - //reset validators - c.AddCache(cacheItemV(validatorPowers), k) - - recentMsgs := k.GetAllRecentMsgAsMap(ctx) - var pTmp common.Params - for ; from < to; from++ { - //fill params - for b, recentParams := range recentParamsMap { - prev := uint64(0) - if b <= from && b > prev { - pTmp = common.Params(*recentParams) - agc.SetParams(&pTmp) - if prev > 0 { - //TODO: safe delete - delete(recentParamsMap, prev) - } - prev = b - } - } - - agc.PrepareRound(ctx, from) - - if msgs := recentMsgs[from+1]; msgs != nil { - for _, msg := range msgs { - //these messages are retreived for recache, just skip the validation check and fill the memory cache - agc.FillPrice(&types.MsgCreatePrice{ - Creator: msg.Validator, - FeederId: msg.FeederId, - Prices: msg.PSources, - }) - } - } - agc.SealRound(ctx) - } - - //fill params cache - c.AddCache(cacheItemP(&pTmp), k) - - agc.PrepareRound(ctx, to) - - return true -} - -func (c *Cache) InitAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k common.KeeperOracle) error { - //set params - p := k.GetParams(ctx) - m := make(map[uint64]*types.Params) - m[uint64(ctx.BlockHeight())] = &p - // k.setParams4CacheRecover(m) //used to trace tokenFeeder's update during cache recover - pTmp := common.Params(p) - agc.SetParams(&pTmp) - //set params cache - c.AddCache(cacheItemP(&pTmp), k) - - totalPower := big.NewInt(0) - validatorPowers := make(map[string]*big.Int) - k.IterateBondedValidatorsByPower(ctx, func(index int64, validator stakingTypes.ValidatorI) bool { - power := big.NewInt(validator.GetConsensusPower(validator.GetBondedTokens())) - addr := string(validator.GetOperator()) - //agc.validatorsPower[addr] = power - validatorPowers[addr] = power - totalPower = new(big.Int).Add(totalPower, power) - return false - }) - agc.SetTotalPower(totalPower) - agc.SetValidatorPowers(validatorPowers) - if k.GetLastTotalPower(ctx).Cmp(totalPower) != 0 { - panic("-") - } - - //set validatorPower cache - c.AddCache(cacheItemV(validatorPowers), k) - - agc.PrepareRound(ctx, uint64(ctx.BlockHeight())-1) - return nil -} diff --git a/x/oracle/keeper/common/expected_keepers.go b/x/oracle/keeper/common/expected_keepers.go index b6d4a6ac7..0eec0d1ff 100644 --- a/x/oracle/keeper/common/expected_keepers.go +++ b/x/oracle/keeper/common/expected_keepers.go @@ -3,7 +3,9 @@ package common import ( "math/big" + // "cosmossdk.io/api/tendermint/abci" "github.com/ExocoreNetwork/exocore/x/oracle/types" + abci "github.com/cometbft/cometbft/abci/types" sdk "github.com/cosmos/cosmos-sdk/types" stakingTypes "github.com/cosmos/cosmos-sdk/x/staking/types" ) @@ -13,6 +15,8 @@ type KeeperOracle interface { IterateBondedValidatorsByPower(sdk.Context, func(index int64, validator stakingTypes.ValidatorI) bool) GetLastTotalPower(sdk.Context) *big.Int + GetValidatorUpdates(sdk.Context) []abci.ValidatorUpdate + GetValidatorByConsAddr(sdk.Context, sdk.ConsAddress) (stakingTypes.Validator, bool) GetIndexRecentMsg(sdk.Context) (types.IndexRecentMsg, bool) GetAllRecentMsgAsMap(sdk.Context) map[uint64][]*types.MsgItem diff --git a/x/oracle/keeper/keeper.go b/x/oracle/keeper/keeper.go index 24e7bb091..4247f0088 100644 --- a/x/oracle/keeper/keeper.go +++ b/x/oracle/keeper/keeper.go @@ -4,6 +4,8 @@ import ( "fmt" "math/big" + // "cosmossdk.io/api/tendermint/abci" + abci "github.com/cometbft/cometbft/abci/types" "github.com/cometbft/cometbft/libs/log" "github.com/cosmos/cosmos-sdk/codec" storetypes "github.com/cosmos/cosmos-sdk/store/types" @@ -60,3 +62,11 @@ func (k Keeper) GetLastTotalPower(ctx sdk.Context) *big.Int { func (k Keeper) IterateBondedValidatorsByPower(ctx sdk.Context, f func(index int64, validator stakingtypes.ValidatorI) bool) { k.stakingKeeper.IterateBondedValidatorsByPower(ctx, f) } + +func (k Keeper) GetValidatorUpdates(ctx sdk.Context) []abci.ValidatorUpdate { + return k.stakingKeeper.GetValidatorUpdates(ctx) +} + +func (k Keeper) GetValidatorByConsAddr(ctx sdk.Context, addr sdk.ConsAddress) (stakingtypes.Validator, bool) { + return k.stakingKeeper.GetValidatorByConsAddr(ctx, addr) +} diff --git a/x/oracle/keeper/msg_server_create_price.go b/x/oracle/keeper/msg_server_create_price.go index eec99af8b..171d29e70 100644 --- a/x/oracle/keeper/msg_server_create_price.go +++ b/x/oracle/keeper/msg_server_create_price.go @@ -15,7 +15,8 @@ func (k msgServer) CreatePrice(goCtx context.Context, msg *types.MsgCreatePrice) 3. check the rule fulfilled(sources check), check the decimal of the 1st mathc the params' definition(among prices the decimal had been checked in ante stage), timestamp:later than previous block's timestamp, [not future than now(+1s), this is checked in anteHandler], timestamp verification is not necessary **/ - newItem, caches, _ := GetAggregatorContext(ctx, k.Keeper).NewCreatePrice(ctx, msg) + //newItem, caches, _ := k.GetAggregatorContext(ctx, k.Keeper).NewCreatePrice(ctx, msg) + newItem, caches, _ := GetAggregatorContext(ctx, &k.Keeper).NewCreatePrice(ctx, msg) if caches != nil { if newItem != nil { diff --git a/x/oracle/keeper/prices.go b/x/oracle/keeper/prices.go index 4f6c108ae..3550592ad 100644 --- a/x/oracle/keeper/prices.go +++ b/x/oracle/keeper/prices.go @@ -5,7 +5,6 @@ import ( "github.com/ExocoreNetwork/exocore/x/oracle/types" "github.com/cosmos/cosmos-sdk/store/prefix" - storetypes "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" ) @@ -26,9 +25,9 @@ func (k Keeper) GetPrices( tokenId int32, ) (val types.Prices, found bool) { - store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.PricesKeyPrefix)) - store = prefix.NewStore(store, types.PricesKey(tokenId)) - + // store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.PricesKeyPrefix)) + // store = prefix.NewStore(store, types.PricesKey(tokenId)) + store := k.getPriceTRStore(ctx, tokenId) nextRoundIdB := store.Get(types.PricesNextRountIdKey) if nextRoundIdB == nil { return val, false @@ -59,8 +58,9 @@ func (k Keeper) RemovePrices( tokenId int32, ) { - store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.PricesKeyPrefix)) - store = prefix.NewStore(store, types.PricesKey(tokenId)) + // store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.PricesKeyPrefix)) + // store = prefix.NewStore(store, types.PricesKey(tokenId)) + store := k.getPriceTRStore(ctx, tokenId) // iterator := sdk.KVStorePrefixIterator(store, []byte{}) iterator := store.Iterator(nil, nil) defer iterator.Close() @@ -90,7 +90,7 @@ func (k Keeper) AppendPriceTR(ctx sdk.Context, tokenId int32, priceTR types.Pric if nextRoundId != priceTR.RoundId { return } - store := getPriceTRStore(ctx, k.storeKey, tokenId) + store := k.getPriceTRStore(ctx, tokenId) b := k.cdc.MustMarshal(&priceTR) store.Set(types.PricesRoundKey(nextRoundId), b) } @@ -112,9 +112,9 @@ func (k Keeper) GetPriceTRRoundId(ctx sdk.Context, tokenId int32, roundId uint64 } func (k Keeper) GetPriceTRLatest(ctx sdk.Context, tokenId int32) (price types.PriceWithTimeAndRound, found bool) { - store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.PricesKeyPrefix)) - store = prefix.NewStore(store, types.PricesKey(tokenId)) - + // store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.PricesKeyPrefix)) + // store = prefix.NewStore(store, types.PricesKey(tokenId)) + store := k.getPriceTRStore(ctx, tokenId) nextRoundIdB := store.Get(types.PricesNextRountIdKey) if nextRoundIdB == nil { return @@ -131,7 +131,8 @@ func (k Keeper) GetPriceTRLatest(ctx sdk.Context, tokenId int32) (price types.Pr func (k Keeper) GetNextRoundId(ctx sdk.Context, tokenId int32) (nextRoundId uint64) { nextRoundId = 1 - store := getPriceTRStore(ctx, k.storeKey, tokenId) + //store := getPriceTRStore(ctx, k.storeKey, tokenId) + store := k.getPriceTRStore(ctx, tokenId) nextRoundIdB := store.Get(types.PricesNextRountIdKey) if nextRoundIdB != nil { nextRoundId = binary.BigEndian.Uint64(nextRoundIdB) @@ -139,9 +140,12 @@ func (k Keeper) GetNextRoundId(ctx sdk.Context, tokenId int32) (nextRoundId uint return } -func getPriceTRStore(ctx sdk.Context, storeKey storetypes.StoreKey, tokenId int32) prefix.Store { - store := prefix.NewStore(ctx.KVStore(storeKey), types.KeyPrefix(types.PricesKeyPrefix)) +//func getPriceTRStore(ctx sdk.Context, storeKey storetypes.StoreKey, tokenId int32) prefix.Store { +// store := prefix.NewStore(ctx.KVStore(storeKey), types.KeyPrefix(types.PricesKeyPrefix)) +// return prefix.NewStore(store, types.PricesKey(tokenId)) +//} + +func (k Keeper) getPriceTRStore(ctx sdk.Context, tokenId int32) prefix.Store { + store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.PricesKeyPrefix)) return prefix.NewStore(store, types.PricesKey(tokenId)) } - -//TODO: Get(tokenId, roundId), GetLatestRound(tokenId), remove is not cared for now diff --git a/x/oracle/keeper/single.go b/x/oracle/keeper/single.go index cd2c18c5c..4ca489673 100644 --- a/x/oracle/keeper/single.go +++ b/x/oracle/keeper/single.go @@ -1,9 +1,14 @@ package keeper import ( + "math/big" + "github.com/ExocoreNetwork/exocore/x/oracle/keeper/aggregator" "github.com/ExocoreNetwork/exocore/x/oracle/keeper/cache" + "github.com/ExocoreNetwork/exocore/x/oracle/keeper/common" + "github.com/ExocoreNetwork/exocore/x/oracle/types" sdk "github.com/cosmos/cosmos-sdk/types" + stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types" ) var cs *cache.Cache @@ -26,9 +31,119 @@ func GetAggregatorContext(ctx sdk.Context, k Keeper) *aggregator.AggregatorConte c := GetCaches() c.ResetCaches() agc = aggregator.NewAggregatorContext() - if ok := c.RecacheAggregatorContext(ctx, agc, k); !ok { + if ok := recacheAggregatorContext(ctx, agc, k, c); !ok { //this is the very first time oracle has been started, fill relalted info as initialization - c.InitAggregatorContext(ctx, agc, k) + initAggregatorContext(ctx, agc, k, c) } return agc } + +// func recacheAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k common.KeeperOracle, c *cache.Cache) bool { +func recacheAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k Keeper, c *cache.Cache) bool { + + from := uint64(ctx.BlockHeight()) - common.MaxNonce + to := uint64(ctx.BlockHeight()) - 1 + + h, ok := k.GetValidatorUpdateBlock(ctx) + recentParamsMap := k.GetAllRecentParamsAsMap(ctx) + if !ok || recentParamsMap == nil { + //no cache, this is the very first running, so go to initial proces instead + return false + } + + if h.Block > from { + from = h.Block + } + + totalPower := big.NewInt(0) + validatorPowers := make(map[string]*big.Int) + k.IterateBondedValidatorsByPower(ctx, func(index int64, validator stakingtypes.ValidatorI) bool { + power := big.NewInt(validator.GetConsensusPower(validator.GetBondedTokens())) + addr := string(validator.GetOperator()) + validatorPowers[addr] = power + totalPower = new(big.Int).Add(totalPower, power) + return false + }) + agc.SetValidatorPowers(validatorPowers) + agc.SetTotalPower(totalPower) + //TODO: test only + if k.GetLastTotalPower(ctx).Cmp(totalPower) != 0 { + panic("something wrong when get validatorsPower from staking module") + } + + //reset validators + c.AddCache(cache.CacheItemV(validatorPowers), k) + + recentMsgs := k.GetAllRecentMsgAsMap(ctx) + var pTmp common.Params + for ; from < to; from++ { + //fill params + for b, recentParams := range recentParamsMap { + prev := uint64(0) + if b <= from && b > prev { + pTmp = common.Params(*recentParams) + agc.SetParams(&pTmp) + if prev > 0 { + //TODO: safe delete + delete(recentParamsMap, prev) + } + prev = b + } + } + + agc.PrepareRound(ctx, from) + + if msgs := recentMsgs[from+1]; msgs != nil { + for _, msg := range msgs { + //these messages are retreived for recache, just skip the validation check and fill the memory cache + agc.FillPrice(&types.MsgCreatePrice{ + Creator: msg.Validator, + FeederId: msg.FeederId, + Prices: msg.PSources, + }) + } + } + agc.SealRound(ctx, false) + } + + //fill params cache + c.AddCache(cache.CacheItemP(&pTmp), k) + + agc.PrepareRound(ctx, to) + + return true +} + +func initAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k common.KeeperOracle, c *cache.Cache) error { + //set params + p := k.GetParams(ctx) + m := make(map[uint64]*types.Params) + m[uint64(ctx.BlockHeight())] = &p + // k.setParams4CacheRecover(m) //used to trace tokenFeeder's update during cache recover + pTmp := common.Params(p) + agc.SetParams(&pTmp) + //set params cache + c.AddCache(cache.CacheItemP(&pTmp), k) + + totalPower := big.NewInt(0) + validatorPowers := make(map[string]*big.Int) + k.IterateBondedValidatorsByPower(ctx, func(index int64, validator stakingtypes.ValidatorI) bool { + power := big.NewInt(validator.GetConsensusPower(validator.GetBondedTokens())) + addr := string(validator.GetOperator()) + //agc.validatorsPower[addr] = power + validatorPowers[addr] = power + totalPower = new(big.Int).Add(totalPower, power) + return false + }) + agc.SetTotalPower(totalPower) + agc.SetValidatorPowers(validatorPowers) + if k.GetLastTotalPower(ctx).Cmp(totalPower) != 0 { + panic("-") + } + + //set validatorPower cache + c.AddCache(cache.CacheItemV(validatorPowers), k) + + agc.PrepareRound(ctx, uint64(ctx.BlockHeight())-1) + return nil +} diff --git a/x/oracle/module.go b/x/oracle/module.go index 9e568dfdb..2b6b2aec3 100644 --- a/x/oracle/module.go +++ b/x/oracle/module.go @@ -4,20 +4,22 @@ import ( "context" "encoding/json" "fmt" + "math/big" // this line is used by starport scaffolding # 1 "github.com/grpc-ecosystem/grpc-gateway/runtime" "github.com/spf13/cobra" - abci "github.com/cometbft/cometbft/abci/types" - "github.com/ExocoreNetwork/exocore/x/oracle/client/cli" "github.com/ExocoreNetwork/exocore/x/oracle/keeper" + "github.com/ExocoreNetwork/exocore/x/oracle/keeper/cache" "github.com/ExocoreNetwork/exocore/x/oracle/types" + abci "github.com/cometbft/cometbft/abci/types" "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/codec" cdctypes "github.com/cosmos/cosmos-sdk/codec/types" + cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/module" ) @@ -144,12 +146,56 @@ func (AppModule) ConsensusVersion() uint64 { return 1 } func (am AppModule) BeginBlock(_ sdk.Context, _ abci.RequestBeginBlock) {} // EndBlock contains the logic that is automatically triggered at the end of each block -func (am AppModule) EndBlock(_ sdk.Context, _ abci.RequestEndBlock) []abci.ValidatorUpdate { +func (am AppModule) EndBlock(ctx sdk.Context, _ abci.RequestEndBlock) []abci.ValidatorUpdate { //TODO: //1. check validator update //if {validatorSetUpdate} -> update roundInfo(seal all active) //check roundInfo -> seal {success, fail} //{params} -> prepareRoundInfo //sealRounds() -> prepareRounds() + // am.keeper.GetCaches().CommitCache(ctx, true, am.keeper) + //TODO: udpate the validatorset first + cs := keeper.GetCaches() + validatorUpdates := am.keeper.GetValidatorUpdates(ctx) + forceSeal := false + agc := keeper.GetAggregatorContext(ctx, am.keeper) + + if len(validatorUpdates) > 0 { + //validatorUpdates := am.keeper.GetValidatorUpdates(ctx) + validatorList := make(map[string]*big.Int) + for _, vu := range validatorUpdates { + pubKey, _ := cryptocodec.FromTmProtoPublicKey(vu.PubKey) + validator, _ := am.keeper.GetValidatorByConsAddr(ctx, sdk.GetConsAddress(pubKey)) + validatorList[validator.OperatorAddress] = big.NewInt(vu.Power) + } + // cs.AddCache(validatorList, am.keeper) + validatorPowers := make(map[string]*big.Int) + cs.GetCache(cache.CacheItemV(validatorPowers)) + //update validatorPowerList in aggregatorContext + // keeper.GetAggregatorContext(ctx, am.keeper).SetValidatorPowers(validatorPowers) + // agc := keeper.GetAggregatorContext(ctx, am.keeper) + agc.SetValidatorPowers(validatorPowers) + //TODO: seal all alive round since validatorSet changed here + forceSeal = true + } + + //TODO: for v1 use mode==1, just check the failed feeders + _, failed := agc.SealRound(ctx, forceSeal) + //append new round with previous price for fail-seal token + for _, tokenId := range failed { + if pTR, ok := am.keeper.GetPriceTRLatest(ctx, tokenId); ok { + pTR.RoundId++ + am.keeper.AppendPriceTR(ctx, tokenId, pTR) + } else { + nextRoundId := am.keeper.GetNextRoundId(ctx, tokenId) + am.keeper.AppendPriceTR(ctx, tokenId, types.PriceWithTimeAndRound{ + RoundId: nextRoundId, + }) + } + } + + agc.PrepareRound(ctx, 0) + + cs.CommitCache(ctx, true, am.keeper) return []abci.ValidatorUpdate{} }