Skip to content

Commit

Permalink
change withdrawal indexing (#53)
Browse files Browse the repository at this point in the history
  • Loading branch information
sh-cha authored Nov 25, 2024
1 parent 919264e commit a6bf7bb
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 64 deletions.
7 changes: 7 additions & 0 deletions cmd/opinitd/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,13 @@ v0.1.9-2: Fill block hash of finalized tree
}

return executor.Migration0192(cmdCtx, db, rpcClient)
case "v0.1.10":
// Run migration for v0.1.10
db, err := db.NewDB(bot.GetDBPath(ctx.homePath, bottypes.BotTypeExecutor))
if err != nil {
return err
}
return executor.Migration0110(db)
default:
return fmt.Errorf("unknown migration version: %s", version)
}
Expand Down
43 changes: 32 additions & 11 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,15 @@ func (db *LevelDB) Close() error {
// PrefixedIterate iterates over the key-value pairs in the database with prefixing the keys.
//
// @dev: `LevelDB.prefix + prefix` is used as the prefix for the iteration.
func (db *LevelDB) PrefixedIterate(prefix []byte, start []byte, cb func(key, value []byte) (stop bool, err error)) error {
func (db *LevelDB) PrefixedIterate(prefix []byte, start []byte, cb func(key, value []byte) (stop bool, err error)) (iterErr error) {
iter := db.db.NewIterator(util.BytesPrefix(db.PrefixedKey(prefix)), nil)
defer iter.Release()
defer func() {
iter.Release()
if iterErr == nil {
iterErr = iter.Error()
}
}()

if start != nil {
iter.Seek(db.PrefixedKey(start))
} else {
Expand All @@ -105,14 +111,27 @@ func (db *LevelDB) PrefixedIterate(prefix []byte, start []byte, cb func(key, val
}
iter.Next()
}
return iter.Error()
return
}

func (db *LevelDB) PrefixedReverseIterate(prefix []byte, start []byte, cb func(key, value []byte) (stop bool, err error)) error {
func (db *LevelDB) PrefixedReverseIterate(prefix []byte, start []byte, cb func(key, value []byte) (stop bool, err error)) (iterErr error) {
iter := db.db.NewIterator(util.BytesPrefix(db.PrefixedKey(prefix)), nil)
defer iter.Release()
defer func() {
iter.Release()
if iterErr == nil {
iterErr = iter.Error()
}
}()

if start != nil {
iter.Seek(db.PrefixedKey(start))
if ok := iter.Seek(db.PrefixedKey(start)); ok || iter.Last() {
// if the valid key is not found, the iterator will be at the last key
// if the key is found, the iterator will be at the key
// or the previous key if the key is not found
if ok && !bytes.Equal(db.PrefixedKey(start), iter.Key()) {
iter.Prev()
}
}
} else {
iter.Last()
}
Expand All @@ -127,15 +146,20 @@ func (db *LevelDB) PrefixedReverseIterate(prefix []byte, start []byte, cb func(k

iter.Prev()
}
return iter.Error()
return
}

// SeekPrevInclusiveKey seeks the previous key-value pair in the database with prefixing the keys.
//
// @dev: `LevelDB.prefix + prefix` is used as the prefix for the iteration.
func (db *LevelDB) SeekPrevInclusiveKey(prefix []byte, key []byte) (k []byte, v []byte, err error) {
iter := db.db.NewIterator(util.BytesPrefix(db.PrefixedKey(prefix)), nil)
defer iter.Release()
defer func() {
iter.Release()
if err == nil {
err = iter.Error()
}
}()
if ok := iter.Seek(db.PrefixedKey(key)); ok || iter.Last() {
// if the valid key is not found, the iterator will be at the last key
// if the key is found, the iterator will be at the key
Expand All @@ -148,9 +172,6 @@ func (db *LevelDB) SeekPrevInclusiveKey(prefix []byte, key []byte) (k []byte, v
} else {
err = dbtypes.ErrNotFound
}
if iter.Error() != nil {
err = iter.Error()
}
return k, v, err
}

Expand Down
7 changes: 4 additions & 3 deletions executor/child/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (ch Child) QueryWithdrawal(sequence uint64) (executortypes.QueryWithdrawalR
}

func (ch Child) QueryWithdrawals(address string, offset uint64, limit uint64, descOrder bool) (executortypes.QueryWithdrawalsResponse, error) {
sequences, next, total, err := ch.GetSequencesByAddress(address, offset, limit, descOrder)
sequences, next, err := ch.GetSequencesByAddress(address, offset, limit, descOrder)
if err != nil {
return executortypes.QueryWithdrawalsResponse{}, err
}
Expand All @@ -65,8 +65,9 @@ func (ch Child) QueryWithdrawals(address string, offset uint64, limit uint64, de

res := executortypes.QueryWithdrawalsResponse{
Withdrawals: withdrawals,
Next: next,
Total: total,
}
if next != 0 {
res.Next = &next
}
return res, nil
}
60 changes: 12 additions & 48 deletions executor/child/withdraw.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,9 +202,9 @@ func (ch *Child) GetWithdrawal(sequence uint64) (executortypes.WithdrawalData, e
return data, err
}

func (ch *Child) GetSequencesByAddress(address string, offset uint64, limit uint64, descOrder bool) (sequences []uint64, next, total uint64, err error) {
func (ch *Child) GetSequencesByAddress(address string, offset uint64, limit uint64, descOrder bool) (sequences []uint64, next uint64, err error) {
if limit == 0 {
return nil, 0, 0, nil
return nil, 0, nil
}

count := uint64(0)
Expand All @@ -213,43 +213,32 @@ func (ch *Child) GetSequencesByAddress(address string, offset uint64, limit uint
if err != nil {
return true, err
}
sequences = append(sequences, sequence)
count++
if count >= limit {
next = sequence
return true, nil
}
sequences = append(sequences, sequence)
count++
return false, nil
}
total, err = ch.GetLastAddressIndex(address)
if err != nil {
return nil, 0, 0, err
}

if descOrder {
if offset > total || offset == 0 {
offset = total
var startKey []byte
if offset != 0 {
startKey = executortypes.PrefixedWithdrawalKeyAddressIndex(address, offset)
}
startKey := executortypes.PrefixedWithdrawalKeyAddressIndex(address, offset)
err = ch.DB().PrefixedReverseIterate(executortypes.PrefixedWithdrawalKeyAddress(address), startKey, fetchFn)
if err != nil {
return nil, 0, 0, err
return nil, 0, err
}

next = offset - count
} else {
if offset == 0 {
offset = 1
}
startKey := executortypes.PrefixedWithdrawalKeyAddressIndex(address, offset)
err := ch.DB().PrefixedIterate(executortypes.PrefixedWithdrawalKeyAddress(address), startKey, fetchFn)
if err != nil {
return nil, 0, 0, err
return nil, 0, err
}

next = offset + count
}

return sequences, next, total, nil
return sequences, next, nil
}

// SetWithdrawal store the withdrawal data for the given sequence to the database
Expand All @@ -265,38 +254,13 @@ func (ch *Child) WithdrawalToRawKVs(sequence uint64, data executortypes.Withdraw
Value: dataBytes,
})

addressIndex, err := ch.GetAddressIndex(data.To)
if err != nil {
return nil, err
}
ch.addressIndexMap[data.To] = addressIndex + 1
kvs = append(kvs, types.RawKV{
Key: ch.DB().PrefixedKey(executortypes.PrefixedWithdrawalKeyAddressIndex(data.To, ch.addressIndexMap[data.To])),
Key: ch.DB().PrefixedKey(executortypes.PrefixedWithdrawalKeyAddressIndex(data.To, sequence)),
Value: dbtypes.FromUint64(sequence),
})
return kvs, nil
}

func (ch *Child) GetAddressIndex(address string) (uint64, error) {
if index, ok := ch.addressIndexMap[address]; !ok {
lastIndex, err := ch.GetLastAddressIndex(address)
if err != nil {
return 0, err
}
return lastIndex, nil
} else {
return index, nil
}
}

func (ch *Child) GetLastAddressIndex(address string) (lastIndex uint64, err error) {
err = ch.DB().PrefixedReverseIterate(executortypes.PrefixedWithdrawalKeyAddress(address), nil, func(key, _ []byte) (bool, error) {
lastIndex = dbtypes.ToUint64Key(key[len(key)-8:])
return true, nil
})
return lastIndex, err
}

func (ch *Child) DeleteFutureWithdrawals(fromSequence uint64) error {
return ch.DB().PrefixedIterate(executortypes.WithdrawalKey, nil, func(key, _ []byte) (bool, error) {
if len(key) != len(executortypes.WithdrawalKey)+1+8 {
Expand Down
33 changes: 33 additions & 0 deletions executor/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,3 +237,36 @@ func Migration0192(ctx context.Context, db types.DB, rpcClient *rpcclient.RPCCli
return false, nil
})
}

func Migration0110(db types.DB) error {
nodeDB := db.WithPrefix([]byte(types.ChildName))
err := nodeDB.PrefixedIterate(executortypes.WithdrawalKey, nil, func(key, value []byte) (bool, error) {
// pass PrefixedWithdrawalKey ( WithdrawalKey / Sequence )
// we only delete PrefixedWithdrawalKeyAddressIndex ( WithdrawalKey / Address / Sequence )
if len(key) == len(executortypes.WithdrawalKey)+1+8 {
return false, nil
}
err := nodeDB.Delete(key)
if err != nil {
return true, err
}
return false, nil
})
if err != nil {
return err
}

return nodeDB.PrefixedIterate(executortypes.WithdrawalKey, nil, func(key, value []byte) (bool, error) {
sequence := dbtypes.ToUint64Key(key[len(key)-8:])
var data executortypes.WithdrawalData
err := json.Unmarshal(value, &data)
if err != nil {
return true, err
}
err = nodeDB.Set(executortypes.PrefixedWithdrawalKeyAddressIndex(data.To, sequence), dbtypes.FromUint64(sequence))
if err != nil {
return true, err
}
return false, nil
})
}
3 changes: 1 addition & 2 deletions executor/types/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,5 @@ type QueryWithdrawalResponse struct {

type QueryWithdrawalsResponse struct {
Withdrawals []QueryWithdrawalResponse `json:"withdrawals"`
Next uint64 `json:"next"`
Total uint64 `json:"total"`
Next *uint64 `json:"next,omitempty"`
}

0 comments on commit a6bf7bb

Please sign in to comment.