diff --git a/backend/cmd/store/main.go b/backend/cmd/store/main.go new file mode 100644 index 000000000..279723a7e --- /dev/null +++ b/backend/cmd/store/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "errors" + "net/http" + "os" + + "github.com/gobitfly/beaconchain/pkg/commons/db2/database" + "github.com/gobitfly/beaconchain/pkg/commons/log" + "github.com/gobitfly/beaconchain/pkg/commons/utils" +) + +func main() { + args := os.Args[1:] + project := args[0] + instance := args[1] + table := args[2] + + bt, err := database.NewBigTable(project, instance, nil) + if err != nil { + panic(err) + } + remote := database.NewRemote(database.Wrap(bt, table)) + go func() { + log.Info("starting remote raw store on port 8087") + if err := http.ListenAndServe("0.0.0.0:8087", remote.Routes()); err != nil && !errors.Is(err, http.ErrServerClosed) { + panic(err) + } + }() + utils.WaitForCtrlC() +} diff --git a/backend/pkg/commons/db2/data/data.go b/backend/pkg/commons/db2/data/data.go new file mode 100644 index 000000000..24879dc01 --- /dev/null +++ b/backend/pkg/commons/db2/data/data.go @@ -0,0 +1,278 @@ +package data + +import ( + "encoding/hex" + "errors" + "fmt" + "sort" + "strings" + "time" + + "github.com/ethereum/go-ethereum/common" + "golang.org/x/exp/maps" + "google.golang.org/protobuf/proto" + + "github.com/gobitfly/beaconchain/pkg/commons/db2/database" + "github.com/gobitfly/beaconchain/pkg/commons/types" +) + +type Store struct { + db database.Database +} + +func NewStore(store database.Database) Store { + return Store{ + db: store, + } +} + +type TransferWithIndexes struct { + Indexed *types.Eth1ERC20Indexed + TxIndex int + LogIndex int +} + +func (store Store) BlockERC20TransfersToItems(chainID string, transfers []TransferWithIndexes) (map[string][]database.Item, error) { + items := make(map[string][]database.Item) + for _, transfer := range transfers { + b, err := proto.Marshal(transfer.Indexed) + if err != nil { + return nil, err + } + key := keyERC20(chainID, transfer.Indexed.ParentHash, transfer.LogIndex) + item := []database.Item{{Family: defaultFamily, Column: key}} + items[key] = []database.Item{{Family: defaultFamily, Column: dataColumn, Data: b}} + + items[keyERC20Time(chainID, transfer.Indexed, transfer.Indexed.From, transfer.TxIndex, transfer.LogIndex)] = item + items[keyERC20Time(chainID, transfer.Indexed, transfer.Indexed.To, transfer.TxIndex, transfer.LogIndex)] = item + + items[keyERC20ContractAllTime(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item + items[keyERC20ContractTime(chainID, transfer.Indexed, transfer.Indexed.From, transfer.TxIndex, transfer.LogIndex)] = item + items[keyERC20ContractTime(chainID, transfer.Indexed, transfer.Indexed.To, transfer.TxIndex, transfer.LogIndex)] = item + + items[keyERC20To(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item + items[keyERC20From(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item + items[keyERC20Sent(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item + items[keyERC20Received(chainID, transfer.Indexed, transfer.TxIndex, transfer.LogIndex)] = item + } + return items, nil +} + +func (store Store) AddBlockERC20Transfers(chainID string, transactions []TransferWithIndexes) error { + items, err := store.BlockERC20TransfersToItems(chainID, transactions) + if err != nil { + return err + } + return store.db.BulkAdd(items) +} + +func (store Store) BlockTransactionsToItems(chainID string, transactions []*types.Eth1TransactionIndexed) (map[string][]database.Item, error) { + items := make(map[string][]database.Item) + for i, transaction := range transactions { + b, err := proto.Marshal(transaction) + if err != nil { + return nil, err + } + key := keyTx(chainID, transaction.GetHash()) + item := []database.Item{{Family: defaultFamily, Column: key}} + items[key] = []database.Item{{Family: defaultFamily, Column: dataColumn, Data: b}} + items[keyTxSent(chainID, transaction, i)] = item + items[keyTxReceived(chainID, transaction, i)] = item + + items[keyTxTime(chainID, transaction, transaction.To, i)] = item + items[keyTxBlock(chainID, transaction, transaction.To, i)] = item + items[keyTxMethod(chainID, transaction, transaction.To, i)] = item + + items[keyTxTime(chainID, transaction, transaction.From, i)] = item + items[keyTxBlock(chainID, transaction, transaction.From, i)] = item + items[keyTxMethod(chainID, transaction, transaction.From, i)] = item + + if transaction.ErrorMsg != "" { + items[keyTxError(chainID, transaction, transaction.To, i)] = item + items[keyTxError(chainID, transaction, transaction.From, i)] = item + } + + if transaction.IsContractCreation { + items[keyTxContractCreation(chainID, transaction, transaction.To, i)] = item + items[keyTxContractCreation(chainID, transaction, transaction.From, i)] = item + } + } + return items, nil +} + +func (store Store) AddBlockTransactions(chainID string, transactions []*types.Eth1TransactionIndexed) error { + items, err := store.BlockTransactionsToItems(chainID, transactions) + if err != nil { + return err + } + return store.db.BulkAdd(items) +} + +func (store Store) Get(chainIDs []string, addresses []common.Address, prefixes map[string]map[string]string, limit int64, opts ...Option) ([]*Interaction, map[string]map[string]string, error) { + sources := map[formatType]unMarshalInteraction{ + typeTx: unMarshalTx, + typeTransfer: unMarshalTransfer, + } + options := apply(opts) + if options.ignoreTxs { + delete(sources, typeTx) + } + if options.ignoreTransfers { + delete(sources, typeTransfer) + } + var interactions []*interactionWithInfo + for interactionType, unMarshalFunc := range sources { + filter, err := makeFilters(options, interactionType) + if err != nil { + return nil, nil, err + } + temp, err := store.getBy(unMarshalFunc, chainIDs, addresses, prefixes, limit, filter) + if err != nil { + return nil, nil, err + } + interactions = append(interactions, temp...) + } + sort.Sort(byTimeDesc(interactions)) + if int64(len(interactions)) > limit { + interactions = interactions[:limit] + } + + var res []*Interaction + if prefixes == nil { + prefixes = make(map[string]map[string]string) + } + for i := 0; i < len(interactions); i++ { + if prefixes[interactions[i].chainID] == nil { + prefixes[interactions[i].chainID] = make(map[string]string) + } + prefixes[interactions[i].chainID][interactions[i].root] = interactions[i].key + res = append(res, interactions[i].Interaction) + } + return res, prefixes, nil +} + +func (store Store) getBy(unMarshal unMarshalInteraction, chainIDs []string, addresses []common.Address, prefixes map[string]map[string]string, limit int64, condition filter) ([]*interactionWithInfo, error) { + var interactions []*interactionWithInfo + for _, chainID := range chainIDs { + for _, address := range addresses { + root := condition.get(chainID, address) + prefix := root + if prefixes != nil && prefixes[chainID] != nil && prefixes[chainID][root] != "" { + prefix = prefixes[chainID][root] + } + upper := condition.limit(root) + indexRows, err := store.db.GetRowsRange(upper, prefix, database.WithLimit(limit), database.WithOpenRange(true)) + if err != nil { + if errors.Is(err, database.ErrNotFound) { + continue + } + return nil, err + } + txKeys := make(map[string]string) + for _, row := range indexRows { + for key := range row.Values { + txKey := strings.TrimPrefix(key, fmt.Sprintf("%s:", defaultFamily)) + txKeys[txKey] = row.Key + } + } + txRows, err := store.db.GetRowsWithKeys(maps.Keys(txKeys)) + if err != nil { + return nil, err + } + for _, row := range txRows { + interaction, err := unMarshal(row.Values[fmt.Sprintf("%s:%s", defaultFamily, dataColumn)]) + if err != nil { + return nil, err + } + interaction.ChainID = chainID + interactions = append(interactions, &interactionWithInfo{ + Interaction: interaction, + chainID: chainID, + root: root, + key: txKeys[row.Key], + }) + } + } + } + return interactions, nil +} + +type interactionWithInfo struct { + *Interaction + chainID string + root string + key string +} + +type byTimeDesc []*interactionWithInfo + +func (c byTimeDesc) Len() int { return len(c) } +func (c byTimeDesc) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +func (c byTimeDesc) Less(i, j int) bool { + t1 := c[i].Interaction.Time + t2 := c[j].Interaction.Time + if t1.Equal(t2) { + return c[i].key < c[j].key + } + return t1.After(t2) +} + +type Interaction struct { + ChainID string + Hash []byte + Method []byte + Time time.Time + Type string + Value []byte + Asset string + From string + To string +} + +var erc20Transfer, _ = hex.DecodeString("a9059cbb") + +type unMarshalInteraction func(b []byte) (*Interaction, error) + +func unMarshalTx(b []byte) (*Interaction, error) { + tx := &types.Eth1TransactionIndexed{} + if err := proto.Unmarshal(b, tx); err != nil { + return nil, err + } + return parseTx(tx), nil +} + +func unMarshalTransfer(b []byte) (*Interaction, error) { + tx := &types.Eth1ERC20Indexed{} + if err := proto.Unmarshal(b, tx); err != nil { + return nil, err + } + return parseTransfer(tx), nil +} + +func parseTransfer(transfer *types.Eth1ERC20Indexed) *Interaction { + return &Interaction{ + ChainID: "", + Hash: transfer.ParentHash, + Method: erc20Transfer, + Time: transfer.Time.AsTime(), + Type: "ERC20", + Value: transfer.Value, + Asset: hex.EncodeToString(transfer.TokenAddress), + From: hex.EncodeToString(transfer.From), + To: hex.EncodeToString(transfer.To), + } +} + +func parseTx(tx *types.Eth1TransactionIndexed) *Interaction { + return &Interaction{ + ChainID: "", + Hash: tx.Hash, + Method: tx.MethodId, + Time: tx.Time.AsTime(), + Type: "Transaction", + Value: tx.Value, + Asset: "ETH", + From: hex.EncodeToString(tx.From), + To: hex.EncodeToString(tx.To), + } +} diff --git a/backend/pkg/commons/db2/data/data_external_test.go b/backend/pkg/commons/db2/data/data_external_test.go new file mode 100644 index 000000000..04afd2bd3 --- /dev/null +++ b/backend/pkg/commons/db2/data/data_external_test.go @@ -0,0 +1,109 @@ +package data_test + +import ( + "encoding/hex" + "os" + "testing" + + "github.com/ethereum/go-ethereum/common" + + "github.com/gobitfly/beaconchain/pkg/commons/db2/data" + "github.com/gobitfly/beaconchain/pkg/commons/db2/database" +) + +var chainIDs = []string{ + "1", + "17000", + "100", +} + +var addresses = []common.Address{ + common.HexToAddress("0x95222290DD7278Aa3Ddd389Cc1E1d165CC4BAfe5"), + common.HexToAddress("0x388C818CA8B9251b393131C08a736A67ccB19297"), + common.HexToAddress("0x6d2e03b7EfFEae98BD302A9F836D0d6Ab0002766"), + common.HexToAddress("0x10e4597ff93cbee194f4879f8f1d54a370db6969"), +} + +func dbFromEnv(t *testing.T, table string) database.Database { + project := os.Getenv("BIGTABLE_PROJECT") + instance := os.Getenv("BIGTABLE_INSTANCE") + remote := os.Getenv("REMOTE_URL") + if project != "" && instance != "" { + db, err := database.NewBigTable(project, instance, nil) + if err != nil { + t.Fatal(err) + } + return database.Wrap(db, table) + } + if remote != "" { + return database.NewRemoteClient(remote) + } + t.Skip("skipping test, set BIGTABLE_PROJECT and BIGTABLE_INSTANCE or REMOTE_URL") + return nil +} + +func TestStoreExternal(t *testing.T) { + db := dbFromEnv(t, data.Table) + store := data.NewStore(db) + + /* + list of filter + data.ByMethod(method) + data.ByAsset(asset) + data.OnlyReceived() + data.OnlySent() + data.IgnoreTransactions() + data.IgnoreTransfers() + data.WithTimeRange(from, to) + */ + tests := []struct { + name string + limit int64 + chainIDs []string + scroll int + addresses []common.Address + opts []data.Option + }{ + { + name: "all no filter", + limit: 25, + scroll: 2, + chainIDs: chainIDs, + addresses: addresses, + }, + { + name: "tx with erc20 transfer method", + limit: 10, + chainIDs: chainIDs, + addresses: addresses, + opts: []data.Option{ + data.IgnoreTransfers(), + data.ByMethod("a9059cbb"), + }, + }, + { + name: "transfer", + limit: 10, + chainIDs: chainIDs, + addresses: addresses, + opts: []data.Option{ + data.IgnoreTransactions(), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var lastPrefixes map[string]map[string]string + for i := 0; i < tt.scroll+1; i++ { + interactions, prefixes, err := store.Get(chainIDs, addresses, lastPrefixes, tt.limit, tt.opts...) + if err != nil { + t.Fatal(err) + } + for _, interaction := range interactions { + t.Log(interaction.ChainID, "0x"+interaction.From, "0x"+interaction.To, "0x"+hex.EncodeToString(interaction.Hash), interaction.Time) + } + lastPrefixes = prefixes + } + }) + } +} diff --git a/backend/pkg/commons/db2/data/data_test.go b/backend/pkg/commons/db2/data/data_test.go new file mode 100644 index 000000000..0942331a6 --- /dev/null +++ b/backend/pkg/commons/db2/data/data_test.go @@ -0,0 +1,360 @@ +package data + +import ( + "context" + "encoding/hex" + "slices" + "sort" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "golang.org/x/exp/maps" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/gobitfly/beaconchain/pkg/commons/db2/database" + "github.com/gobitfly/beaconchain/pkg/commons/db2/databasetest" + "github.com/gobitfly/beaconchain/pkg/commons/types" +) + +var ( + alice = common.HexToAddress("0x000000000000000000000000000000000000abba") + bob = common.HexToAddress("0x000000000000000000000000000000000000beef") + carl = common.HexToAddress("0x000000000000000000000000000000000000cafe") + usdc = common.HexToAddress("0x000000000000000000000000000000000000dead") +) + +func TestStore(t *testing.T) { + client, admin := databasetest.NewBigTable(t) + + s, err := database.NewBigTableWithClient(context.Background(), client, admin, Schema) + if err != nil { + t.Fatal(err) + } + store := Store{ + db: database.Wrap(s, Table), + } + + tests := []struct { + name string + txs map[string][][]*types.Eth1TransactionIndexed // map[chainID][block][txPosition]*types.Eth1TransactionIndexed + transfers map[string][][]TransferWithIndexes + limit int64 + opts []Option + addresses []common.Address + expectedHashes []string + }{ + { + name: "one sender one chain ID", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash1", alice, bob, "", 0)}, + {newTx("hash2", alice, bob, "", 1)}, + {newTx("hash3", alice, bob, "", 2)}, + }, + }, + limit: 1, + addresses: []common.Address{alice}, + expectedHashes: []string{"hash3", "hash2", "hash1"}, + }, + { + name: "two sender one chain ID", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash1", alice, bob, "", 0)}, + {newTx("hash2", carl, bob, "", 1)}, + }, + }, + limit: 2, + addresses: []common.Address{alice, carl}, + expectedHashes: []string{"hash2", "hash1"}, + }, + { + name: "two sender one chain ID with limit", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash1", alice, bob, "", 0)}, + {newTx("hash2", carl, bob, "", 1)}, + {newTx("hash3", alice, bob, "", 2)}, + {newTx("hash4", carl, bob, "", 3)}, + }, + }, + limit: 2, + addresses: []common.Address{alice, carl}, + expectedHashes: []string{"hash4", "hash3", "hash2", "hash1"}, + }, + { + name: "two sender each on one chain", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash1", alice, bob, "", 0)}, + {newTx("hash3", alice, bob, "", 2)}, + }, + "2": { + {newTx("hash2", carl, bob, "", 1)}, + {newTx("hash4", carl, bob, "", 3)}, + }, + }, + limit: 2, + addresses: []common.Address{alice, carl}, + expectedHashes: []string{"hash4", "hash3", "hash2", "hash1"}, + }, + { + name: "two sender both on two chain", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash1", alice, bob, "", 0)}, + {newTx("hash3", carl, bob, "", 2)}, + }, + "2": { + {newTx("hash2", carl, bob, "", 1)}, + {newTx("hash4", alice, bob, "", 3)}, + }, + }, + limit: 2, + addresses: []common.Address{alice, carl}, + expectedHashes: []string{"hash4", "hash3", "hash2", "hash1"}, + }, + { + name: "by method", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash1", alice, bob, "foo", 0)}, + {newTx("hash2", alice, bob, "bar", 1)}, + {newTx("hash3", carl, bob, "foo", 2)}, + }, + }, + limit: 1, + opts: []Option{IgnoreTransfers(), ByMethod(hex.EncodeToString([]byte("foo")))}, + addresses: []common.Address{alice}, + expectedHashes: []string{"hash1"}, + }, + { + name: "by time range", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash1", alice, bob, "", 0)}, + {newTx("hash2", alice, bob, "", 1)}, + {newTx("hash3", alice, bob, "", 2)}, + }, + }, + limit: 1, + opts: []Option{WithTimeRange(timestamppb.New(t0), timestamppb.New(t0.Add(1*time.Second)))}, + addresses: []common.Address{alice}, + expectedHashes: []string{"hash2", "hash1"}, + }, + { + name: "by sender", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash1", alice, bob, "", 0)}, + {newTx("hash2", bob, alice, "", 1)}, + {newTx("hash3", alice, bob, "", 2)}, + }, + }, + limit: 1, + opts: []Option{OnlySent()}, + addresses: []common.Address{alice}, + expectedHashes: []string{"hash3", "hash1"}, + }, + { + name: "by receiver", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash1", alice, bob, "", 0)}, + {newTx("hash2", bob, alice, "", 1)}, + {newTx("hash3", alice, bob, "", 2)}, + }, + }, + limit: 1, + opts: []Option{OnlyReceived()}, + addresses: []common.Address{bob}, + expectedHashes: []string{"hash3", "hash1"}, + }, { + name: "only transfers", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash2", alice, bob, "", 1)}, + }, + }, + transfers: map[string][][]TransferWithIndexes{ + "1": { + {newTransfer("hash1", alice, bob, common.Address{}, 0)}, + {newTransfer("hash3", alice, bob, common.Address{}, 2)}, + }, + }, + limit: 1, + opts: []Option{IgnoreTransactions()}, + addresses: []common.Address{alice}, + expectedHashes: []string{"hash3", "hash1"}, + }, + { + name: "only txs", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash1", alice, bob, "", 0)}, + {newTx("hash3", alice, bob, "", 2)}, + }, + }, + transfers: map[string][][]TransferWithIndexes{ + "1": { + {newTransfer("hash2", alice, bob, common.Address{}, 1)}, + }, + }, + limit: 1, + opts: []Option{IgnoreTransfers()}, + addresses: []common.Address{alice}, + expectedHashes: []string{"hash3", "hash1"}, + }, + { + name: "mix of both", + txs: map[string][][]*types.Eth1TransactionIndexed{ + "1": { + {newTx("hash1", alice, bob, "", 0)}, + {newTx("hash3", alice, bob, "", 2)}, + {newTx("hash5", alice, bob, "", 4)}, + {newTx("hash7", alice, bob, "", 6)}, + {newTx("hash9", alice, bob, "", 8)}, + }, + }, + transfers: map[string][][]TransferWithIndexes{ + "1": { + {newTransfer("hash2", alice, bob, common.Address{}, 1)}, + {newTransfer("hash4", alice, bob, common.Address{}, 3)}, + {newTransfer("hash6", alice, bob, common.Address{}, 5)}, + {newTransfer("hash8", alice, bob, common.Address{}, 7)}, + {newTransfer("hash10", alice, bob, common.Address{}, 9)}, + }, + }, + limit: 2, + addresses: []common.Address{alice}, + expectedHashes: []string{"hash10", "hash9", "hash8", "hash7", "hash6", "hash5", "hash4", "hash3", "hash2", "hash1"}, + }, + { + name: "by asset with time range", + transfers: map[string][][]TransferWithIndexes{ + "1": { + {newTransfer("hash1", alice, bob, usdc, 0)}, + {newTransfer("hash2", alice, bob, usdc, 1)}, + {newTransfer("hash3", alice, bob, usdc, 2)}, + {newTransfer("hash4", alice, bob, usdc, 3)}, + {newTransfer("hash5", alice, bob, usdc, 4)}, + }, + }, + limit: 2, + opts: []Option{IgnoreTransactions(), ByAsset(usdc), WithTimeRange(timestamppb.New(t0.Add(1*time.Second)), timestamppb.New(t0.Add(3*time.Second)))}, + addresses: []common.Address{alice}, + expectedHashes: []string{"hash4", "hash3", "hash2"}, + }, + { + name: "by asset and sender with time range", + transfers: map[string][][]TransferWithIndexes{ + "1": { + {newTransfer("hash1", alice, bob, usdc, 0)}, + {newTransfer("hash2", bob, alice, usdc, 1)}, + {newTransfer("hash3", alice, bob, usdc, 2)}, + {newTransfer("hash4", bob, alice, usdc, 3)}, + {newTransfer("hash5", alice, bob, usdc, 4)}, + }, + }, + limit: 2, + opts: []Option{IgnoreTransactions(), OnlySent(), ByAsset(usdc), WithTimeRange(timestamppb.New(t0.Add(1*time.Second)), timestamppb.New(t0.Add(3*time.Second)))}, + addresses: []common.Address{alice}, + expectedHashes: []string{"hash3"}, + }, + { + name: "by asset and receiver with time range", + transfers: map[string][][]TransferWithIndexes{ + "1": { + {newTransfer("hash1", bob, alice, usdc, 0)}, + {newTransfer("hash2", alice, bob, usdc, 1)}, + {newTransfer("hash3", bob, alice, usdc, 2)}, + {newTransfer("hash4", alice, bob, usdc, 3)}, + {newTransfer("hash5", bob, alice, usdc, 4)}, + }, + }, + limit: 2, + opts: []Option{IgnoreTransactions(), OnlyReceived(), ByAsset(usdc), WithTimeRange(timestamppb.New(t0.Add(1*time.Second)), timestamppb.New(t0.Add(3*time.Second)))}, + addresses: []common.Address{alice}, + expectedHashes: []string{"hash3"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + defer func() { _ = s.Clear() }() + for chainID, blocks := range tt.txs { + for _, txs := range blocks { + if err := store.AddBlockTransactions(chainID, txs); err != nil { + t.Fatal(err) + } + } + } + for chainID, blocks := range tt.transfers { + for _, transfers := range blocks { + if err := store.AddBlockERC20Transfers(chainID, transfers); err != nil { + t.Fatal(err) + } + } + } + chainIDs := append(maps.Keys(tt.txs), maps.Keys(tt.transfers)...) + sort.Strings(chainIDs) + chainIDs = slices.Compact(chainIDs) + var suffix map[string]map[string]string + for i := int64(0); i < int64(len(tt.expectedHashes))/tt.limit; i++ { + txs, newSuffix, err := store.Get(chainIDs, tt.addresses, suffix, tt.limit, tt.opts...) + if err != nil { + t.Fatalf("tx %d: %v", i, err) + } + if len(txs) == 0 { + t.Fatalf("tx %d: no transactions found", i) + } + if got, want := int64(len(txs)), tt.limit; got != want { + t.Errorf("got %v, want %v", got, want) + } + for j := int64(0); j < tt.limit; j++ { + if got, want := string(txs[j].Hash), tt.expectedHashes[i*tt.limit+j]; got != want { + t.Errorf("got %v, want %v", got, want) + } + } + suffix = newSuffix + } + }) + } +} + +var t0 = time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + +func newTx(hash string, from, to common.Address, method string, delta int64) *types.Eth1TransactionIndexed { + return &types.Eth1TransactionIndexed{ + Hash: []byte(hash), + BlockNumber: 0, + Time: timestamppb.New(t0.Add(time.Duration(delta) * time.Second)), + MethodId: []byte(method), + From: from.Bytes(), + To: to.Bytes(), + Value: nil, + TxFee: nil, + GasPrice: nil, + IsContractCreation: false, + InvokesContract: false, + ErrorMsg: "", + BlobTxFee: nil, + BlobGasPrice: nil, + } +} + +func newTransfer(hash string, from, to, contract common.Address, delta int64) TransferWithIndexes { + return TransferWithIndexes{ + Indexed: &types.Eth1ERC20Indexed{ + ParentHash: []byte(hash), + BlockNumber: 0, + TokenAddress: contract.Bytes(), + Time: timestamppb.New(t0.Add(time.Duration(delta) * time.Second)), + From: from.Bytes(), + To: to.Bytes(), + Value: nil, + }, + TxIndex: 0, + LogIndex: 0, + } +} diff --git a/backend/pkg/commons/db2/data/filter.go b/backend/pkg/commons/db2/data/filter.go new file mode 100644 index 000000000..aea5b19f6 --- /dev/null +++ b/backend/pkg/commons/db2/data/filter.go @@ -0,0 +1,250 @@ +package data + +import ( + "fmt" + "strings" + + "github.com/ethereum/go-ethereum/common" + "github.com/golang/protobuf/ptypes/timestamp" +) + +type formatType string + +const ( + typeTx = formatType("tx") + typeTransfer = formatType("transfer") +) + +type filterType string + +const ( + byMethod = filterType("byMethod") + bySent = filterType("bySent") + byReceived = filterType("byReceived") + byAsset = filterType("byAsset") + byAssetSent = filterType("byAssetSent") + byAssetReceived = filterType("byAssetReceived") +) + +type filter interface { + get(chainID string, address common.Address) string + limit(prefix string) string +} + +type chainFilter interface { + addByMethod(method string) error + addBySent() error + addByReceived() error + addByAsset(asset common.Address) error + addTimeRange(from *timestamp.Timestamp, to *timestamp.Timestamp) error + valid() error + filterType() filterType + filter +} + +type chainFilterTx struct { + base string + filtered filterType + + method *string + from *timestamp.Timestamp + to *timestamp.Timestamp +} + +func newChainFilterTx() *chainFilterTx { + return &chainFilterTx{ + base: ":I:TX:
:TIME", + } +} + +func (c *chainFilterTx) addByMethod(method string) error { + if c.filtered != "" { + return fmt.Errorf("filter tx already filtered by %s", c.filtered) + } + c.base = ":I:TX:
:METHOD:" + c.method = &method + c.filtered = byMethod + return nil +} + +func (c *chainFilterTx) addBySent() error { + if c.filtered != "" { + return fmt.Errorf("filter tx already filtered by %s", c.filtered) + } + c.base = ":I:TX:
:TO" + c.filtered = bySent + return nil +} + +func (c *chainFilterTx) addByReceived() error { + if c.filtered != "" { + return fmt.Errorf("filter tx already filtered by %s", c.filtered) + } + c.base = ":I:TX:
:FROM" + c.filtered = byReceived + return nil +} + +func (c *chainFilterTx) addByAsset(common.Address) error { + return fmt.Errorf("cannot filter tx by asset") +} + +func (c *chainFilterTx) addTimeRange(from *timestamp.Timestamp, to *timestamp.Timestamp) error { + if from == nil || to == nil { + return fmt.Errorf("invalid time range: empty border") + } + c.from = from + c.to = to + return nil +} + +func (c *chainFilterTx) filterType() filterType { + return c.filtered +} + +func (c *chainFilterTx) valid() error { + return nil +} + +func (c *chainFilterTx) get(chainID string, address common.Address) string { + query := strings.Replace(c.base, "", chainID, 1) + query = strings.Replace(query, "
", fmt.Sprintf("%x", address.Bytes()), 1) + if c.method != nil { + query = strings.Replace(query, "", *c.method, 1) + } + if c.to != nil { + query = fmt.Sprintf("%s:%s", query, reversePaddedTimestamp(c.to)) + } + return query +} + +func (c *chainFilterTx) limit(prefix string) string { + if c.from != nil { + index := strings.LastIndex(prefix, ":") + return toSuccessor(fmt.Sprintf("%s:%s", prefix[:index], reversePaddedTimestamp(c.from))) + } + return toSuccessor(prefix) +} + +type chainFilterTransfer struct { + base string + filtered filterType + + asset *common.Address + from *timestamp.Timestamp + to *timestamp.Timestamp +} + +func newChainFilterTransfer() *chainFilterTransfer { + return &chainFilterTransfer{ + base: ":I:ERC20:
:TIME", + } +} + +func (c *chainFilterTransfer) addByMethod(string) error { + return fmt.Errorf("cannot filter transfer by method") +} + +func (c *chainFilterTransfer) addBySent() error { + if c.filtered != "" { + if c.filtered != byAsset { + return fmt.Errorf("filter transfer already filtered by %s", c.filtered) + } + return c.addByAssetSent() + } + c.base = ":I:ERC20:
:TOKEN_SENT" + c.filtered = bySent + return nil +} + +func (c *chainFilterTransfer) addByReceived() error { + if c.filtered != "" { + if c.filtered != byAsset { + return fmt.Errorf("filter transfer already filtered by %s", c.filtered) + } + return c.addByAssetReceived() + } + c.base = ":I:ERC20:
:TOKEN_RECEIVED" + c.filtered = byReceived + return nil +} + +func (c *chainFilterTransfer) addByAssetReceived() error { + c.base = ":I:ERC20:
:TOKEN_RECEIVED:" + c.filtered = byAssetReceived + return nil +} + +func (c *chainFilterTransfer) addByAssetSent() error { + c.base = ":I:ERC20:
:TOKEN_SENT:" + c.filtered = byAssetSent + return nil +} + +func (c *chainFilterTransfer) addByAsset(asset common.Address) error { + if c.filtered != "" { + if c.filtered != byReceived && c.filtered != bySent { + return fmt.Errorf("filter transfer already filtered by %s", c.filtered) + } + } + c.asset = &asset + if c.filtered == byReceived { + return c.addByAssetReceived() + } + if c.filtered == bySent { + return c.addByAssetSent() + } + c.base = ":I:ERC20::
:TIME" + c.filtered = byAsset + return nil +} + +func (c *chainFilterTransfer) addTimeRange(from, to *timestamp.Timestamp) error { + if from == nil || to == nil { + return fmt.Errorf("invalid time range: empty border") + } + if c.filtered == byReceived || c.filtered == bySent { + return fmt.Errorf("cannot apply range over filter by %s", c.filtered) + } + c.from = from + c.to = to + return nil +} + +func (c *chainFilterTransfer) filterType() filterType { + return c.filtered +} + +func (c *chainFilterTransfer) valid() error { + if (c.from != nil || c.to != nil) && (c.filtered == bySent || c.filtered == byReceived) { + return fmt.Errorf("cannot apply range over filter by %s", c.filtered) + } + return nil +} + +func (c *chainFilterTransfer) get(chainID string, address common.Address) string { + query := strings.Replace(c.base, "", chainID, 1) + query = strings.Replace(query, "
", fmt.Sprintf("%x", address.Bytes()), 1) + if c.asset != nil { + query = strings.Replace(query, "", fmt.Sprintf("%x", c.asset.Bytes()), 1) + } + if c.to != nil { + query = fmt.Sprintf("%s:%s", query, reversePaddedTimestamp(c.to)) + } + return query +} + +func (c *chainFilterTransfer) limit(prefix string) string { + if c.from != nil { + index := strings.LastIndex(prefix, ":") + return toSuccessor(fmt.Sprintf("%s:%s", prefix[:index], reversePaddedTimestamp(c.from))) + } + return toSuccessor(prefix) +} + +// toSuccessor add suffix ";" has it comes after ":" in the ascii order +// this is a simple way to have an infinite bound limit +// prefix must be a real prefix and not a key +func toSuccessor(prefix string) string { + return prefix + ";" +} diff --git a/backend/pkg/commons/db2/data/filter_test.go b/backend/pkg/commons/db2/data/filter_test.go new file mode 100644 index 000000000..8bf258cd4 --- /dev/null +++ b/backend/pkg/commons/db2/data/filter_test.go @@ -0,0 +1,140 @@ +package data + +import ( + "testing" + + "github.com/ethereum/go-ethereum/common" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestFilter(t *testing.T) { + tests := []struct { + name string + filter chainFilter + add func(chainFilter) error + expectErr bool + expectType filterType + }{ + { + name: "tx by asset should err", + filter: newChainFilterTx(), + add: func(c chainFilter) error { + return c.addByAsset(common.Address{}) + }, + expectErr: true, + }, + { + name: "tx invalid time range", + filter: newChainFilterTx(), + add: func(c chainFilter) error { + return c.addTimeRange(nil, nil) + }, + expectErr: true, + }, + { + name: "transfer by method should err", + filter: newChainFilterTransfer(), + add: func(c chainFilter) error { + return c.addByMethod("") + }, + expectErr: true, + }, + { + name: "transfer by asset sent", + filter: newChainFilterTransfer(), + add: func(c chainFilter) error { + if err := c.addByAsset(common.Address{}); err != nil { + return err + } + return c.addBySent() + }, + expectType: byAssetSent, + }, + { + name: "transfer by asset received", + filter: newChainFilterTransfer(), + add: func(c chainFilter) error { + if err := c.addByAsset(common.Address{}); err != nil { + return err + } + return c.addByReceived() + }, + expectType: byAssetReceived, + }, + { + name: "transfer by sent asset", + filter: newChainFilterTransfer(), + add: func(c chainFilter) error { + if err := c.addBySent(); err != nil { + return err + } + return c.addByAsset(common.Address{}) + }, + expectType: byAssetSent, + }, + { + name: "transfer by received asset", + filter: newChainFilterTransfer(), + add: func(c chainFilter) error { + if err := c.addByReceived(); err != nil { + return err + } + return c.addByAsset(common.Address{}) + }, + expectType: byAssetReceived, + }, + { + name: "transfer invalid time range", + filter: newChainFilterTransfer(), + add: func(c chainFilter) error { + return c.addTimeRange(nil, nil) + }, + expectErr: true, + }, + { + name: "transfer time range over bySent should err", + filter: newChainFilterTransfer(), + add: func(c chainFilter) error { + if err := c.addTimeRange(timestamppb.New(t0), timestamppb.New(t0)); err != nil { + return err + } + if err := c.addBySent(); err != nil { + return err + } + return c.valid() + }, + expectErr: true, + }, + { + name: "transfer time range over byReceived should err", + filter: newChainFilterTransfer(), + add: func(c chainFilter) error { + if err := c.addTimeRange(timestamppb.New(t0), timestamppb.New(t0)); err != nil { + return err + } + if err := c.addByReceived(); err != nil { + return err + } + return c.valid() + }, + expectErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.add(tt.filter) + if err != nil { + if !tt.expectErr { + t.Errorf("unexpected err: %s", err) + } + return + } + if tt.expectErr { + t.Error("expected err but got nil") + } + if got, want := tt.filter.filterType(), tt.expectType; got != want { + t.Errorf("got %v, want %v", got, want) + } + }) + } +} diff --git a/backend/pkg/commons/db2/data/keys.go b/backend/pkg/commons/db2/data/keys.go new file mode 100644 index 000000000..a039db47c --- /dev/null +++ b/backend/pkg/commons/db2/data/keys.go @@ -0,0 +1,228 @@ +package data + +import ( + "fmt" + "strings" + + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/gobitfly/beaconchain/pkg/commons/log" + "github.com/gobitfly/beaconchain/pkg/commons/types" +) + +const ( + maxInt = 9223372036854775807 + maxExecutionLayerBlockNumber = 1000000000 + + txPerBlockLimit = 10_000 + logPerTxLimit = 100_000 +) + +func reversePaddedIndex(i int, maxValue int) string { + if i > maxValue { + log.Fatal(nil, fmt.Sprintf("padded index %v is greater than the max index of %v", i, maxValue), 0) + } + // TODO probably a bug here + // TODO -1 means that the result will be 100, 99, 01 + // TODO meanings index 0 (100) will be placed before index 1 (99) + // TODO it will be index 1 (99), ..., index 90 (10), index 0 (100), index 91 (09) + length := fmt.Sprintf("%d", len(fmt.Sprintf("%d", maxValue))-1) + fmtStr := "%0" + length + "d" + return fmt.Sprintf(fmtStr, maxValue-i) +} + +func reversePaddedTimestamp(timestamp *timestamppb.Timestamp) string { + if timestamp == nil { + log.Fatal(nil, fmt.Sprintf("unknown timestamp: %v", timestamp), 0) + } + return fmt.Sprintf("%019d", maxInt-timestamp.Seconds) +} + +func reversedPaddedBlockNumber(blockNumber uint64) string { + return fmt.Sprintf("%09d", maxExecutionLayerBlockNumber-blockNumber) +} + +func keyTx(chainID string, hash []byte) string { + format := ":TX:" + replacer := strings.NewReplacer("", chainID, "", fmt.Sprintf("%x", hash)) + return replacer.Replace(format) +} + +func keyTxSent(chainID string, tx *types.Eth1TransactionIndexed, index int) string { + format := ":I:TX::TO::