Skip to content

Commit

Permalink
feat(oracle-proto):update caches with recache for aggregatorContext, …
Browse files Browse the repository at this point in the history
…refactor
  • Loading branch information
leonz789 committed Mar 18, 2024
1 parent 7a228f0 commit 03cc0ef
Show file tree
Hide file tree
Showing 8 changed files with 262 additions and 160 deletions.
20 changes: 12 additions & 8 deletions x/oracle/keeper/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math/big"
"time"

"github.com/ExocoreNetwork/exocore/x/oracle/keeper/cache"
"github.com/ExocoreNetwork/exocore/x/oracle/keeper/common"
"github.com/ExocoreNetwork/exocore/x/oracle/types"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (agc *AggregatorContext) checkMsg(msg *types.MsgCreatePrice) error {
return nil
}

func (agc *AggregatorContext) FillPrice(msg *types.MsgCreatePrice) (*priceItemKV, *cacheItemM, error) {
func (agc *AggregatorContext) FillPrice(msg *types.MsgCreatePrice) (*priceItemKV, *cache.CacheItemM, error) {
feederWorker := agc.aggregators[msg.FeederId]
//worker initialzed here reduce workload for Endblocker
if feederWorker == nil {
Expand All @@ -135,17 +136,17 @@ func (agc *AggregatorContext) FillPrice(msg *types.MsgCreatePrice) (*priceItemKV
//TODO: check the format
Timestamp: time.Now().String(),
RoundId: agc.rounds[msg.FeederId].nextRoundId,
}}, &cacheItemM{feederId: msg.FeederId}, nil
}}, &cache.CacheItemM{FeederId: msg.FeederId}, nil
}
return nil, &cacheItemM{msg.FeederId, listFilled, msg.Creator}, nil
return nil, &cache.CacheItemM{msg.FeederId, listFilled, msg.Creator}, nil
}

return nil, nil, errors.New("")
}

// NewCreatePrice receives msgCreatePrice message, and goes process: filter->aggregator, filter->calculator->aggregator
// non-deterministic data will goes directly into aggregator, and deterministic data will goes into calculator first to get consensus on the deterministic id.
func (agc *AggregatorContext) NewCreatePrice(ctx sdk.Context, msg *types.MsgCreatePrice) (*priceItemKV, *cacheItemM, error) {
func (agc *AggregatorContext) NewCreatePrice(ctx sdk.Context, msg *types.MsgCreatePrice) (*priceItemKV, *cache.CacheItemM, error) {

if err := agc.checkMsg(msg); err != nil {
return nil, nil, err
Expand All @@ -157,8 +158,9 @@ func (agc *AggregatorContext) NewCreatePrice(ctx sdk.Context, msg *types.MsgCrea
// prepare for new roundInfo, just update the status kept in memory
// executed at EndBlock stage, seall all success or expired roundInfo
// including possible aggregation and state update
// when validatorSet update, set force to true, to seal all alive round
// returns: 1st successful sealed, need to be written to KVStore, 2nd: failed sealed tokenId, use previous price to write to KVStore
func (agc *AggregatorContext) SealRound(ctx sdk.Context) (success []*priceItemKV, failed []int32) {
func (agc *AggregatorContext) SealRound(ctx sdk.Context, force bool) (success []*priceItemKV, failed []int32) {
//1. check validatorSet udpate
//TODO: if validatoSet has been updated in current block, just seal all active rounds and return
//1. for sealed worker, the KVStore has been updated
Expand All @@ -171,7 +173,7 @@ func (agc *AggregatorContext) SealRound(ctx sdk.Context) (success []*priceItemKV
case 1:
expired := ctx.BlockHeight() >= feeder.EndBlock
outOfWindow := uint64(ctx.BlockHeight())-round.basedBlock >= uint64(common.MaxNonce)
if expired || outOfWindow {
if expired || outOfWindow || force {
//TODO: WRITE TO KVSTORE with previous round data for this round
failed = append(failed, feeder.TokenId)
if expired {
Expand All @@ -180,8 +182,6 @@ func (agc *AggregatorContext) SealRound(ctx sdk.Context) (success []*priceItemKV
} else {
round.status = 2
agc.aggregators[feederId] = nil
//TODO: WRITE TO KVSTORE with previous round data for this round
failed = append(failed, feeder.TokenId)
}
}
}
Expand All @@ -194,6 +194,10 @@ func (agc *AggregatorContext) SealRound(ctx sdk.Context) (success []*priceItemKV
return
}

//func (agc *AggregatorContext) ForceSeal(ctx sdk.Context) (success []*priceItemKV, failed []int32) {
//
//}

func (agc *AggregatorContext) PrepareRound(ctx sdk.Context, block uint64) {
//block>0 means recache initialization, all roundInfo is empty
if block == 0 {
Expand Down
180 changes: 49 additions & 131 deletions x/oracle/keeper/cache/caches.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,19 @@ package cache
import (
"math/big"

"github.com/ExocoreNetwork/exocore/x/oracle/keeper/aggregator"
"github.com/ExocoreNetwork/exocore/x/oracle/keeper/common"
"github.com/ExocoreNetwork/exocore/x/oracle/types"
sdk "github.com/cosmos/cosmos-sdk/types"
stakingTypes "github.com/cosmos/cosmos-sdk/x/staking/types"
)

var zeroBig = big.NewInt(0)

type cacheItemV map[string]*big.Int
type cacheItemP *common.Params
type cacheItemM struct {
feederId int32
pSources []*types.PriceWithSource
validator string
type CacheItemV map[string]*big.Int
type CacheItemP *common.Params
type CacheItemM struct {
FeederId int32
PSources []*types.PriceWithSource
Validator string
}

type Cache struct {
Expand All @@ -26,7 +24,7 @@ type Cache struct {
params *cacheParams
}

type cacheMsgs map[int32][]*cacheItemM
type cacheMsgs map[int32][]*CacheItemM

// used to track validator change
type cacheValidator struct {
Expand All @@ -40,11 +38,11 @@ type cacheParams struct {
update bool
}

func (c cacheMsgs) add(item *cacheItemM) {
c[item.feederId] = append(c[item.feederId], item)
func (c cacheMsgs) add(item *CacheItemM) {
c[item.FeederId] = append(c[item.FeederId], item)
}
func (c cacheMsgs) remove(item *cacheItemM) {
delete(c, item.feederId)
func (c cacheMsgs) remove(item *CacheItemM) {
delete(c, item.FeederId)
}

func (c cacheMsgs) commit(ctx sdk.Context, k common.KeeperOracle) {
Expand All @@ -56,9 +54,9 @@ func (c cacheMsgs) commit(ctx sdk.Context, k common.KeeperOracle) {
for _, msgs4Feeder := range c {
for _, msg := range msgs4Feeder {
recentMsgs.Msgs = append(recentMsgs.Msgs, &types.MsgItem{
FeederId: msg.feederId,
PSources: msg.pSources,
Validator: msg.validator,
FeederId: msg.FeederId,
PSources: msg.PSources,
Validator: msg.Validator,
})
}
}
Expand Down Expand Up @@ -124,29 +122,58 @@ func (c *cacheParams) commit(ctx sdk.Context, k common.KeeperOracle) {
// memory cache
func (c *Cache) AddCache(i any, k common.KeeperOracle) {
switch item := i.(type) {
case *cacheItemM:
case *CacheItemM:
c.msg.add(item)
// case *params:
case cacheItemP:
case CacheItemP:
c.params.add(item)
case cacheItemV:
case CacheItemV:
c.validators.add(item)
default:
panic("no other types are support")
}
}
func (c *Cache) RemoveCache(i any, k common.KeeperOracle) {
switch item := i.(type) {
case *cacheItemM:
case *CacheItemM:
c.msg.remove(item)
default:
}
}

func (c *Cache) GetCache(i any) bool {
switch item := i.(type) {
case CacheItemV:
if item == nil {
return false
}
for addr, power := range c.validators.validators {
item[addr] = power
}
case CacheItemP:
if item == nil {
return false
}
*item = *(c.params.params)
case *[]*CacheItemM:
if item == nil {
return false
}
tmp := make([]*CacheItemM, 0, len(c.msg))
for _, msgs := range c.msg {
tmp = append(tmp, msgs...)
}
*item = tmp
default:
return false
}
return true
}

func (c *Cache) CommitCache(ctx sdk.Context, reset bool, k common.KeeperOracle) {
if len(c.msg) > 0 {
c.msg.commit(ctx, k)
c.msg = make(map[int32][]*cacheItemM)
c.msg = make(map[int32][]*CacheItemM)
}

if c.validators.update {
Expand All @@ -169,7 +196,7 @@ func (c *Cache) ResetCaches() {

func NewCache() *Cache {
return &Cache{
msg: make(map[int32][]*cacheItemM),
msg: make(map[int32][]*CacheItemM),
validators: &cacheValidator{
validators: make(map[string]*big.Int),
},
Expand All @@ -178,112 +205,3 @@ func NewCache() *Cache {
},
}
}

func (c *Cache) RecacheAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k common.KeeperOracle) bool {

from := uint64(ctx.BlockHeight()) - common.MaxNonce
to := uint64(ctx.BlockHeight()) - 1

h, ok := k.GetValidatorUpdateBlock(ctx)
recentParamsMap := k.GetAllRecentParamsAsMap(ctx)
if !ok || recentParamsMap == nil {
//no cache, this is the very first running, so go to initial proces instead
return false
}

if h.Block > from {
from = h.Block
}

totalPower := big.NewInt(0)
validatorPowers := make(map[string]*big.Int)
k.IterateBondedValidatorsByPower(ctx, func(index int64, validator stakingTypes.ValidatorI) bool {
power := big.NewInt(validator.GetConsensusPower(validator.GetBondedTokens()))
addr := string(validator.GetOperator())
validatorPowers[addr] = power
totalPower = new(big.Int).Add(totalPower, power)
return false
})
agc.SetValidatorPowers(validatorPowers)
agc.SetTotalPower(totalPower)
//TODO: test only
if k.GetLastTotalPower(ctx).Cmp(totalPower) != 0 {
panic("something wrong when get validatorsPower from staking module")
}

//reset validators
c.AddCache(cacheItemV(validatorPowers), k)

recentMsgs := k.GetAllRecentMsgAsMap(ctx)
var pTmp common.Params
for ; from < to; from++ {
//fill params
for b, recentParams := range recentParamsMap {
prev := uint64(0)
if b <= from && b > prev {
pTmp = common.Params(*recentParams)
agc.SetParams(&pTmp)
if prev > 0 {
//TODO: safe delete
delete(recentParamsMap, prev)
}
prev = b
}
}

agc.PrepareRound(ctx, from)

if msgs := recentMsgs[from+1]; msgs != nil {
for _, msg := range msgs {
//these messages are retreived for recache, just skip the validation check and fill the memory cache
agc.FillPrice(&types.MsgCreatePrice{
Creator: msg.Validator,
FeederId: msg.FeederId,
Prices: msg.PSources,
})
}
}
agc.SealRound(ctx)
}

//fill params cache
c.AddCache(cacheItemP(&pTmp), k)

agc.PrepareRound(ctx, to)

return true
}

func (c *Cache) InitAggregatorContext(ctx sdk.Context, agc *aggregator.AggregatorContext, k common.KeeperOracle) error {
//set params
p := k.GetParams(ctx)
m := make(map[uint64]*types.Params)
m[uint64(ctx.BlockHeight())] = &p
// k.setParams4CacheRecover(m) //used to trace tokenFeeder's update during cache recover
pTmp := common.Params(p)
agc.SetParams(&pTmp)
//set params cache
c.AddCache(cacheItemP(&pTmp), k)

totalPower := big.NewInt(0)
validatorPowers := make(map[string]*big.Int)
k.IterateBondedValidatorsByPower(ctx, func(index int64, validator stakingTypes.ValidatorI) bool {
power := big.NewInt(validator.GetConsensusPower(validator.GetBondedTokens()))
addr := string(validator.GetOperator())
//agc.validatorsPower[addr] = power
validatorPowers[addr] = power
totalPower = new(big.Int).Add(totalPower, power)
return false
})
agc.SetTotalPower(totalPower)
agc.SetValidatorPowers(validatorPowers)
if k.GetLastTotalPower(ctx).Cmp(totalPower) != 0 {
panic("-")
}

//set validatorPower cache
c.AddCache(cacheItemV(validatorPowers), k)

agc.PrepareRound(ctx, uint64(ctx.BlockHeight())-1)
return nil
}
4 changes: 4 additions & 0 deletions x/oracle/keeper/common/expected_keepers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package common
import (
"math/big"

// "cosmossdk.io/api/tendermint/abci"
"github.com/ExocoreNetwork/exocore/x/oracle/types"
abci "github.com/cometbft/cometbft/abci/types"
sdk "github.com/cosmos/cosmos-sdk/types"
stakingTypes "github.com/cosmos/cosmos-sdk/x/staking/types"
)
Expand All @@ -13,6 +15,8 @@ type KeeperOracle interface {

IterateBondedValidatorsByPower(sdk.Context, func(index int64, validator stakingTypes.ValidatorI) bool)
GetLastTotalPower(sdk.Context) *big.Int
GetValidatorUpdates(sdk.Context) []abci.ValidatorUpdate
GetValidatorByConsAddr(sdk.Context, sdk.ConsAddress) (stakingTypes.Validator, bool)

GetIndexRecentMsg(sdk.Context) (types.IndexRecentMsg, bool)
GetAllRecentMsgAsMap(sdk.Context) map[uint64][]*types.MsgItem
Expand Down
10 changes: 10 additions & 0 deletions x/oracle/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"fmt"
"math/big"

// "cosmossdk.io/api/tendermint/abci"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/cometbft/cometbft/libs/log"
"github.com/cosmos/cosmos-sdk/codec"
storetypes "github.com/cosmos/cosmos-sdk/store/types"
Expand Down Expand Up @@ -60,3 +62,11 @@ func (k Keeper) GetLastTotalPower(ctx sdk.Context) *big.Int {
func (k Keeper) IterateBondedValidatorsByPower(ctx sdk.Context, f func(index int64, validator stakingtypes.ValidatorI) bool) {
k.stakingKeeper.IterateBondedValidatorsByPower(ctx, f)
}

func (k Keeper) GetValidatorUpdates(ctx sdk.Context) []abci.ValidatorUpdate {
return k.stakingKeeper.GetValidatorUpdates(ctx)
}

func (k Keeper) GetValidatorByConsAddr(ctx sdk.Context, addr sdk.ConsAddress) (stakingtypes.Validator, bool) {
return k.stakingKeeper.GetValidatorByConsAddr(ctx, addr)
}
3 changes: 2 additions & 1 deletion x/oracle/keeper/msg_server_create_price.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ func (k msgServer) CreatePrice(goCtx context.Context, msg *types.MsgCreatePrice)
3. check the rule fulfilled(sources check), check the decimal of the 1st mathc the params' definition(among prices the decimal had been checked in ante stage), timestamp:later than previous block's timestamp, [not future than now(+1s), this is checked in anteHandler], timestamp verification is not necessary
**/

newItem, caches, _ := GetAggregatorContext(ctx, k.Keeper).NewCreatePrice(ctx, msg)
//newItem, caches, _ := k.GetAggregatorContext(ctx, k.Keeper).NewCreatePrice(ctx, msg)
newItem, caches, _ := GetAggregatorContext(ctx, &k.Keeper).NewCreatePrice(ctx, msg)

if caches != nil {
if newItem != nil {
Expand Down
Loading

0 comments on commit 03cc0ef

Please sign in to comment.