-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[R4R] feat(oracle) #24
Changes from 1 commit
fc501d1
7e0cb8e
f5cf831
8d2d2ac
42f0f6c
30f2d25
4a3cd7d
1ae5769
28bd930
cba6cb1
58cc010
6787b14
7695c4f
2e65519
cc373c9
c0e19aa
81b69ff
1b1e17f
3fbf0a9
5df805d
4208917
c006454
a2aabfe
4a19ace
161f081
6cb3677
f15339c
5d08798
7557592
a945c0b
1a1d063
9800fb8
07bb9e5
8aa9a1b
9acca96
5ada640
86d7212
e988fc0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,288 @@ | ||
package aggregator | ||
|
||
import ( | ||
"errors" | ||
"math/big" | ||
"time" | ||
|
||
"github.com/ExocoreNetwork/exocore/x/oracle/keeper/cache" | ||
"github.com/ExocoreNetwork/exocore/x/oracle/keeper/common" | ||
"github.com/ExocoreNetwork/exocore/x/oracle/types" | ||
sdk "github.com/cosmos/cosmos-sdk/types" | ||
) | ||
|
||
type cacheItemM struct { | ||
feederId int32 | ||
pSources []*types.PriceWithSource | ||
validator string | ||
} | ||
|
||
type priceItemKV struct { | ||
TokenId int32 | ||
PriceTR types.PriceWithTimeAndRound | ||
} | ||
|
||
type roundInfo struct { | ||
//this round of price will start from block basedBlock+1, the basedBlock served as a trigger to notify validators to submit prices | ||
basedBlock uint64 | ||
//next round id of the price oracle service, price with thie id will be record on block basedBlock+1 if all prices submitted by validators(for v1, validators serve as oracle nodes) get to consensus immedately | ||
nextRoundId uint64 | ||
//indicate if this round is open for collecting prices or closed in either condition that success with a consensused price or not | ||
//1: open, 2: closed | ||
status int32 | ||
} | ||
|
||
// AggregatorContext keeps memory cache for state params, validatorset, and updatedthese values as they udpated on chain. And it keeps the infomation to track all tokenFeeders' status and data collection | ||
type AggregatorContext struct { | ||
params *common.Params | ||
|
||
//validator->power | ||
validatorsPower map[string]*big.Int | ||
totalPower *big.Int | ||
|
||
//each active feederToken has a roundInfo | ||
rounds map[int32]*roundInfo | ||
|
||
//each roundInfo has a worker | ||
aggregators map[int32]*worker | ||
} | ||
|
||
func (agc *AggregatorContext) sanityCheck(msg *types.MsgCreatePrice) error { | ||
//sanity check | ||
//TODO: check nonce [1,3] in anteHandler, related to params, may not able | ||
//TODO: check the msgCreatePrice's Decimal is correct with params setting | ||
//TODO: check len(price.prices)>0, len(price.prices._range_eachPriceWithSource.Prices)>0, at least has one source, and for each source has at least one price | ||
//TODO: check for each source, at most maxDetId count price (now in filter, ->anteHandler) | ||
if agc.validatorsPower[msg.Creator] == nil { | ||
return errors.New("signer is not validator") | ||
} | ||
|
||
if msg.Nonce < 1 || msg.Nonce > common.MaxNonce { | ||
return errors.New("nonce invalid") | ||
} | ||
|
||
//TODO: sanity check for price(no more than maxDetId count for each source, this should be take care in anteHandler) | ||
if msg.Prices == nil || len(msg.Prices) == 0 { | ||
return errors.New("msg should provide at least one price") | ||
} | ||
|
||
for _, pSource := range msg.Prices { | ||
if pSource.Prices == nil || len(pSource.Prices) == 0 || len(pSource.Prices) > common.MaxDetId || !agc.params.IsValidSource(pSource.SourceId) { | ||
return errors.New("source should be valid and provide at least one price") | ||
} | ||
//check with params is coressponding source is deteministic | ||
if agc.params.IsDeterministicSource(pSource.SourceId) { | ||
for _, pDetId := range pSource.Prices { | ||
//TODO: verify the format of DetId is correct, since this is string, and we will make consensus with validator's power, so it's ok not to verify the format | ||
//just make sure the DetId won't mess up with NS's placeholder id, the limitation of maximum count one validator can submit will be check by filter | ||
if len(pDetId.DetId) == 0 { | ||
//deterministic must have specified deterministicId | ||
return errors.New("ds should have roundid") | ||
} | ||
//DS's price value will go through consensus process, so it's safe to skip the check here | ||
} | ||
} else { | ||
//sanity check: NS submit only one price with detId=="" | ||
if len(pSource.Prices) > 1 || len(pSource.Prices[0].DetId) > 0 { | ||
return errors.New("ns should not have roundid") | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (agc *AggregatorContext) checkMsg(msg *types.MsgCreatePrice) error { | ||
if err := agc.sanityCheck(msg); err != nil { | ||
return err | ||
} | ||
|
||
//check feeder is active | ||
feederContext := agc.rounds[msg.FeederId] | ||
if feederContext == nil || feederContext.status != 1 { | ||
//feederId does not exist or not alive | ||
return errors.New("context not exist or not available") | ||
} | ||
//senity check on basedBlock | ||
if msg.BasedBlock != feederContext.basedBlock { | ||
return errors.New("baseblock not match") | ||
} | ||
|
||
//check sources rule matches | ||
if ok, err := agc.params.CheckRules(msg.FeederId, msg.Prices); !ok { | ||
return err | ||
} | ||
return nil | ||
} | ||
|
||
func (agc *AggregatorContext) FillPrice(msg *types.MsgCreatePrice) (*priceItemKV, *cache.CacheItemM, error) { | ||
// fmt.Println("debug agc fillprice") | ||
feederWorker := agc.aggregators[msg.FeederId] | ||
//worker initialzed here reduce workload for Endblocker | ||
if feederWorker == nil { | ||
feederWorker = newWorker(msg.FeederId, agc) | ||
agc.aggregators[msg.FeederId] = feederWorker | ||
} | ||
|
||
if feederWorker.sealed { | ||
return nil, nil, errors.New("") | ||
} | ||
|
||
if listFilled := feederWorker.do(msg); listFilled != nil { | ||
if finalPrice := feederWorker.aggregate(); finalPrice != nil { | ||
agc.rounds[msg.FeederId].status = 2 | ||
feederWorker.seal() | ||
return &priceItemKV{agc.params.GetTokenFeeder(msg.FeederId).TokenId, types.PriceWithTimeAndRound{ | ||
Price: finalPrice.String(), | ||
Decimal: agc.params.GetTokenInfo(msg.FeederId).Decimal, | ||
//TODO: check the format | ||
Timestamp: time.Now().String(), | ||
Check warning Code scanning / CodeQL Calling the system time Warning
Calling the system time may be a possible source of non-determinism
|
||
RoundId: agc.rounds[msg.FeederId].nextRoundId, | ||
}}, &cache.CacheItemM{FeederId: msg.FeederId}, nil | ||
} | ||
return nil, &cache.CacheItemM{msg.FeederId, listFilled, msg.Creator}, nil | ||
} | ||
|
||
return nil, nil, errors.New("") | ||
} | ||
|
||
// NewCreatePrice receives msgCreatePrice message, and goes process: filter->aggregator, filter->calculator->aggregator | ||
// non-deterministic data will goes directly into aggregator, and deterministic data will goes into calculator first to get consensus on the deterministic id. | ||
func (agc *AggregatorContext) NewCreatePrice(ctx sdk.Context, msg *types.MsgCreatePrice) (*priceItemKV, *cache.CacheItemM, error) { | ||
// fmt.Println("debug agc.newcreateprice") | ||
if err := agc.checkMsg(msg); err != nil { | ||
// fmt.Println("debug agc.newcreateprice.error", err) | ||
return nil, nil, err | ||
} | ||
// fmt.Println("debug before agc.fillprice") | ||
return agc.FillPrice(msg) | ||
} | ||
|
||
// prepare for new roundInfo, just update the status kept in memory | ||
// executed at EndBlock stage, seall all success or expired roundInfo | ||
// including possible aggregation and state update | ||
// when validatorSet update, set force to true, to seal all alive round | ||
// returns: 1st successful sealed, need to be written to KVStore, 2nd: failed sealed tokenId, use previous price to write to KVStore | ||
func (agc *AggregatorContext) SealRound(ctx sdk.Context, force bool) (success []*priceItemKV, failed []int32) { | ||
//1. check validatorSet udpate | ||
//TODO: if validatoSet has been updated in current block, just seal all active rounds and return | ||
//1. for sealed worker, the KVStore has been updated | ||
for feederId, round := range agc.rounds { | ||
if round.status == 1 { | ||
feeder := agc.params.GetTokenFeeder(feederId) | ||
//TODO: for mode=1, we don't do aggregate() here, since if it donesn't success in the transaction execution stage, it won't success here | ||
//but it's not always the same for other modes, switch modes | ||
switch common.Mode { | ||
case 1: | ||
expired := feeder.EndBlock > 0 && ctx.BlockHeight() >= feeder.EndBlock | ||
outOfWindow := uint64(ctx.BlockHeight())-round.basedBlock >= uint64(common.MaxNonce) | ||
if expired || outOfWindow || force { | ||
//TODO: WRITE TO KVSTORE with previous round data for this round | ||
failed = append(failed, feeder.TokenId) | ||
if expired { | ||
delete(agc.rounds, feederId) | ||
delete(agc.aggregators, feederId) | ||
} else { | ||
round.status = 2 | ||
agc.aggregators[feederId] = nil | ||
} | ||
} | ||
} | ||
} | ||
//all status: 1->2, remove its aggregator | ||
if agc.aggregators[feederId] != nil && agc.aggregators[feederId].sealed { | ||
agc.aggregators[feederId] = nil | ||
} | ||
} | ||
Check warning Code scanning / CodeQL Iteration over map Warning
Iteration over map may be a possible source of non-determinism
|
||
return | ||
} | ||
|
||
//func (agc *AggregatorContext) ForceSeal(ctx sdk.Context) (success []*priceItemKV, failed []int32) { | ||
// | ||
//} | ||
|
||
func (agc *AggregatorContext) PrepareRound(ctx sdk.Context, block uint64) { | ||
//block>0 means recache initialization, all roundInfo is empty | ||
if block == 0 { | ||
block = uint64(ctx.BlockHeight()) | ||
} | ||
|
||
// fmt.Println("debug agc.prepareround, height:", block) | ||
for feederId, feeder := range agc.params.GetTokenFeeders() { | ||
if feederId == 0 { | ||
continue | ||
} | ||
// fmt.Println("debug agc.prepareround, feederId:", feederId) | ||
if (feeder.EndBlock > 0 && uint64(feeder.EndBlock) <= block) || uint64(feeder.StartBaseBlock) > block { | ||
|
||
// fmt.Println("debug agc.prepareround 2, feederId:", feederId, feeder.StartBaseBlock, block) | ||
//this feeder is inactive | ||
continue | ||
} | ||
|
||
// fmt.Println("debug agc.prepareround 3, feederId:", feederId) | ||
|
||
delta := (block - uint64(feeder.StartBaseBlock)) | ||
left := delta % uint64(feeder.Interval) | ||
count := delta / uint64(feeder.Interval) | ||
latestBasedblock := block - left | ||
latestNextRoundId := uint64(feeder.StartRoundId) + count | ||
|
||
feederIdInt32 := int32(feederId) | ||
round := agc.rounds[feederIdInt32] | ||
if round == nil { | ||
round = &roundInfo{ | ||
basedBlock: latestBasedblock, | ||
nextRoundId: latestNextRoundId, | ||
} | ||
if left >= common.MaxNonce { | ||
round.status = 2 | ||
} else { | ||
round.status = 1 | ||
} | ||
agc.rounds[feederIdInt32] = round | ||
} else { | ||
//prepare a new round for exist roundInfo | ||
if left == 0 { | ||
round.basedBlock = latestBasedblock | ||
round.nextRoundId = latestNextRoundId | ||
round.status = 1 | ||
//drop previous worker | ||
agc.aggregators[feederIdInt32] = nil | ||
} else if round.status == 1 && left >= common.MaxNonce { | ||
//this shouldn't happend, if do sealround properly before prepareRound, basically for test only | ||
round.status = 2 | ||
//TODO: just modify the status here, since sealRound should do all the related seal actios already when parepare invoked | ||
} | ||
} | ||
} | ||
} | ||
|
||
func (agc *AggregatorContext) SetParams(p *common.Params) { | ||
agc.params = p | ||
} | ||
|
||
func (agc *AggregatorContext) SetValidatorPowers(vp map[string]*big.Int) { | ||
// t := big.NewInt(0) | ||
agc.totalPower = big.NewInt(0) | ||
agc.validatorsPower = make(map[string]*big.Int) | ||
for addr, power := range vp { | ||
agc.validatorsPower[addr] = power | ||
agc.totalPower = new(big.Int).Add(agc.totalPower, power) | ||
} | ||
Comment on lines
+252
to
+255
Check warning Code scanning / CodeQL Iteration over map Warning
Iteration over map may be a possible source of non-determinism
|
||
} | ||
func (agc *AggregatorContext) GetValidatorPowers() (vp map[string]*big.Int) { | ||
return agc.validatorsPower | ||
} | ||
|
||
//func (agc *AggregatorContext) SetTotalPower(power *big.Int) { | ||
// agc.totalPower = power | ||
//} | ||
|
||
func NewAggregatorContext() *AggregatorContext { | ||
return &AggregatorContext{ | ||
validatorsPower: make(map[string]*big.Int), | ||
totalPower: big.NewInt(0), | ||
rounds: make(map[int32]*roundInfo), | ||
aggregators: make(map[int32]*worker), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The status field uses an integer type to indicate status (1 for open, 2 for closed). Using enumerated types or constant definitions instead of magic numbers makes the code easier to understand and maintain.