Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TECH-631: Atomic C-Chain txs on the cblocks query #31

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions caching/aggregates_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"net/url"
"time"

"github.com/chain4travel/magellan/db"

"github.com/chain4travel/magellan/cfg"
"github.com/chain4travel/magellan/models"
"github.com/chain4travel/magellan/services/indexes/params"
Expand Down Expand Up @@ -221,12 +223,17 @@ func (ac *aggregatesCache) GetAggregatesAndUpdate(chains map[string]cfg.Chain, c
}

// 3rd step: we obtain the count of the transactions based on the block range we acquired from the previous steps
// we will construct based on the block numbers the relevant block_idx filters since this is our main index in the cvm_transactions_txdata table
// we will construct based on the block numbers the relevant block_idx filters since this is our main index
// in the cvm_transactions_txdata and cvm_transactions_atomic tables
union := dbr.Union(
dbRunner.Select("block_idx").From(db.TableCvmTransactionsTxdata),
dbRunner.Select("block_idx").From(db.TableCvmTransactionsAtomic),
).As("union_q")
builder = dbRunner.
Select("count(*) as transaction_count").
From("cvm_transactions_txdata").
Where("cvm_transactions_txdata.block_idx >= concat(?,'000')", firstBlockValue.Block).
Where("cvm_transactions_txdata.block_idx <= concat(?,'999')", lastBlockValue.Block)
Select("count(*) as transaction_count").
From(union).
Where("union_q.block_idx >= concat(?,'000')", firstBlockValue.Block).
Where("union_q.block_idx <= concat(?,'999')", lastBlockValue.Block)

cvmIntervals := models.AggregatesList{}

Expand Down
20 changes: 14 additions & 6 deletions db/dbmodel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1282,8 +1282,11 @@ func (p *persist) InsertCvmAddresses(
type CvmTransactionsAtomic struct {
TransactionID string
Block string
Idx uint64
FromAddr string
ChainID string
Type models.CChainType
Serialization []byte
CreatedAt time.Time
}

Expand All @@ -1296,8 +1299,11 @@ func (p *persist) QueryCvmTransactionsAtomic(
err := sess.Select(
"transaction_id",
"cast(block as char) as block",
"idx",
"from_addr",
"chain_id",
"type",
"serialization",
"created_at",
).From(TableCvmTransactionsAtomic).
Where("transaction_id=?", q.TransactionID).
Expand All @@ -1313,16 +1319,16 @@ func (p *persist) InsertCvmTransactionsAtomic(
) error {
var err error
_, err = sess.
InsertBySql("insert into "+TableCvmTransactionsAtomic+" (transaction_id,block,chain_id,type,created_at) values(?,"+v.Block+",?,?,?)",
v.TransactionID, v.ChainID, v.Type, v.CreatedAt).
InsertBySql("insert into "+TableCvmTransactionsAtomic+" (transaction_id,block,idx,from_addr,chain_id,type,serialization,created_at) values(?,"+v.Block+",?,?,?,?,?,?)",
v.TransactionID, v.Idx, v.FromAddr, v.ChainID, v.Type, v.Serialization, v.CreatedAt).
ExecContext(ctx)
if err != nil && !utils.ErrIsDuplicateEntryError(err) {
return EventErr(TableCvmTransactionsAtomic, false, err)
}
if upd {
_, err = sess.
UpdateBySql("update "+TableCvmTransactionsAtomic+" set block="+v.Block+",chain_id=?,type=?,created_at=? where transaction_id=?",
v.ChainID, v.Type, v.CreatedAt, v.TransactionID).
UpdateBySql("update "+TableCvmTransactionsAtomic+" set block="+v.Block+",idx=?,from_addr=?,chain_id=?,type=?,serialization=?,created_at=? where transaction_id=?",
v.Idx, v.FromAddr, v.ChainID, v.Type, v.Serialization, v.CreatedAt, v.TransactionID).
ExecContext(ctx)
if err != nil {
return EventErr(TableCvmTransactionsAtomic, true, err)
Expand Down Expand Up @@ -2559,7 +2565,8 @@ func (p *persist) InsertMultisigAlias(ctx context.Context, session dbr.SessionRu
func (p *persist) QueryMultisigAliasForOwner(
ctx context.Context,
session dbr.SessionRunner,
owner string) (*[]MultisigAlias, error) {
owner string,
) (*[]MultisigAlias, error) {
v := &[]MultisigAlias{}
_, err := session.Select(
"alias, memo, bech32_address, owner, transaction_id, created_at",
Expand All @@ -2572,7 +2579,8 @@ func (p *persist) QueryMultisigAliasForOwner(
func (p *persist) DeleteMultisigAlias(
ctx context.Context,
session dbr.SessionRunner,
alias string) error {
alias string,
) error {
_, err := session.DeleteFrom(TableMultisigAliases).Where("alias=?", alias).ExecContext(ctx)
if err != nil {
return EventErr(TableMultisigAliases, false, err)
Expand Down
8 changes: 6 additions & 2 deletions services/db/migrations/020_cvm.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ create table `cvm_blocks`

create table `cvm_transactions_atomic`
(
transaction_id varchar(50) not null primary key,
transaction_id varchar(50) not null,
block decimal(65) not null,
idx bigint unsigned not null,
from_addr varchar(50) not null,
chain_id varchar(50) not null,
type smallint not null,
created_at timestamp not null default current_timestamp
serialization mediumblob,
created_at timestamp not null default current_timestamp,
primary key(block,idx)
);

create table `cvm_addresses`
Expand Down
6 changes: 5 additions & 1 deletion services/db/migrations/045_block_idx_index.down.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
DROP INDEX cvm_transactions_txdata_block_idx on cvm_transactions_txdata;
ALTER TABLE cvm_transactions_txdata DROP COLUMN block_idx;
CREATE UNIQUE INDEX cvm_transactions_txdata_block ON cvm_transactions_txdata (block, idx);
CREATE UNIQUE INDEX cvm_transactions_txdata_block ON cvm_transactions_txdata (block, idx);

DROP INDEX cvm_transactions_atomic_block_idx on cvm_transactions_atomic;
ALTER TABLE cvm_transactions_atomic DROP COLUMN block_idx;
CREATE UNIQUE INDEX cvm_transactions_atomic_block ON cvm_transactions_atomic (block, idx);
8 changes: 7 additions & 1 deletion services/db/migrations/045_block_idx_index.up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,10 @@ ALTER TABLE cvm_transactions_txdata MODIFY block BigInt UNSIGNED NOT NULL;
ALTER TABLE cvm_transactions_txdata MODIFY idx SmallInt UNSIGNED NOT NULL;
DROP INDEX cvm_transactions_txdata_block on cvm_transactions_txdata;
ALTER TABLE cvm_transactions_txdata ADD COLUMN block_idx BIGINT UNSIGNED GENERATED ALWAYS AS (BLOCK*1000 + cast(999 - cast(idx AS SIGNED) AS UNSIGNED)) STORED;
CREATE UNIQUE INDEX cvm_transactions_txdata_block_idx ON cvm_transactions_txdata (block_idx);
CREATE UNIQUE INDEX cvm_transactions_txdata_block_idx ON cvm_transactions_txdata (block_idx);

ALTER TABLE cvm_transactions_atomic MODIFY block BigInt UNSIGNED NOT NULL;
ALTER TABLE cvm_transactions_atomic MODIFY idx SmallInt UNSIGNED NOT NULL;
DROP INDEX cvm_transactions_atomic_block on cvm_transactions_atomic;
ALTER TABLE cvm_transactions_atomic ADD COLUMN block_idx BIGINT UNSIGNED GENERATED ALWAYS AS (BLOCK*1000 + cast(999 - cast(idx AS SIGNED) AS UNSIGNED)) STORED;
CREATE UNIQUE INDEX cvm_transactions_atomic_block_idx ON cvm_transactions_atomic (block_idx);
46 changes: 32 additions & 14 deletions services/indexes/avax/reader_list_cblocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
"strings"
"time"

"github.com/gocraft/dbr/v2"

"github.com/chain4travel/magellan/cfg"
"github.com/chain4travel/magellan/db"
"github.com/chain4travel/magellan/models"
"github.com/chain4travel/magellan/services/indexes/params"
"github.com/gocraft/dbr/v2"
)

func (r *Reader) ListCBlocks(ctx context.Context, p *params.ListCBlocksParams) (*models.CBlockList, error) {
Expand Down Expand Up @@ -93,19 +94,36 @@ func (r *Reader) ListCBlocks(ctx context.Context, p *params.ListCBlocksParams) (
GasPrice uint64
}

sq := dbRunner.Select(
"serialization",
"created_at",
"F.address AS from_addr",
"block",
"idx",
"status",
"gas_used",
"gas_price",
"block_idx",
).
From(db.TableCvmTransactionsTxdata).
LeftJoin(dbr.I(db.TableCvmAccounts).As("F"), "id_from_addr = F.id")
// get cvm txs data
union := dbr.Union(
dbRunner.Select(
"serialization",
"created_at",
"F.address AS from_addr",
"block",
"idx",
"status",
"gas_used",
"gas_price",
"block_idx",
).
From(db.TableCvmTransactionsTxdata).
LeftJoin(dbr.I(db.TableCvmAccounts).As("F"), "id_from_addr = F.id"),
dbRunner.Select(
"serialization",
"created_at",
"from_addr",
"block",
"idx",
"0",
"0",
"0",
"block_idx",
).
From(db.TableCvmTransactionsAtomic),
).As("union_q")

sq := dbRunner.Select("*").From(union)

if p.ListParams.StartTimeProvided {
sq = sq.Where("created_at >= ?", p.ListParams.StartTime)
Expand Down
74 changes: 54 additions & 20 deletions services/indexes/cvm/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,31 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"time"

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/genesis"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/utils/crypto"
"github.com/ava-labs/avalanchego/utils/formatting/address"
"github.com/ava-labs/avalanchego/utils/math"
"github.com/ava-labs/avalanchego/version"
"github.com/ava-labs/avalanchego/vms/components/verify"
"github.com/ava-labs/avalanchego/vms/proposervm/block"
"github.com/ava-labs/avalanchego/vms/secp256k1fx"
"github.com/ava-labs/coreth/core/types"
"github.com/ava-labs/coreth/plugin/evm"
"github.com/chain4travel/magellan/cfg"
"github.com/chain4travel/magellan/db"
"github.com/chain4travel/magellan/models"
"github.com/chain4travel/magellan/modelsc"
"github.com/chain4travel/magellan/services"
avaxIndexer "github.com/chain4travel/magellan/services/indexes/avax"
"github.com/chain4travel/magellan/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"

avaxIndexer "github.com/chain4travel/magellan/services/indexes/avax"
)

var ErrUnknownBlockType = errors.New("unknown block type")
Expand Down Expand Up @@ -180,34 +185,77 @@ func (w *Writer) indexBlock(ctx services.ConsumerCtx, blockBytes []byte) error {
}

func (w *Writer) indexBlockInternal(ctx services.ConsumerCtx, atomicTXs []*evm.Tx, proposer *models.BlockProposal, block *types.Block) error {
txIDs := make([]string, len(atomicTXs))

var typ models.CChainType = 0
var err error
var fromAddr string
var toAddr string
ecdsaRecoveryFactory := crypto.FactorySECP256K1R{}
var amount uint64

// OPT: Store maybe only TX bytes instead whole ExtData
for i, atomicTX := range atomicTXs {
txID := atomicTX.ID()
txIDs[i] = txID.String()
switch atx := atomicTX.UnsignedAtomicTx.(type) {
case *evm.UnsignedExportTx:
atxOutput := atx.ExportedOutputs[0].Out.(*secp256k1fx.TransferOutput)
toAddr, err = address.FormatBech32(models.Bech32HRP, atxOutput.Addrs[0].Bytes())
if err != nil {
return err
}
fromAddr = atx.Ins[0].Address.String()
amount = atxOutput.Amount()
typ = models.CChainExport
err = w.indexExportTx(ctx, txID, atx, block.ExtData())
if err != nil {
return err
}
case *evm.UnsignedImportTx:
unsignedBytes, err := w.codec.Marshal(0, &atomicTX.UnsignedAtomicTx)
cred, ok := atomicTX.Creds[0].(*secp256k1fx.Credential)
if !ok {
return err
}
publicKey, err := ecdsaRecoveryFactory.RecoverPublicKey(unsignedBytes, cred.Sigs[0][:])
if err != nil {
return err
}

fromAddr, err = address.FormatBech32(models.Bech32HRP, publicKey.Address().Bytes())
if err != nil {
return err
}
toAddr = atx.Outs[0].Address.String()
amount = atx.Outs[0].Amount
typ = models.CChainImport
err = w.indexImportTx(ctx, txID, atx, atomicTX.Creds, block.ExtData(), unsignedBytes)
if err != nil {
return err
}
default:
}

txData, err := json.Marshal(models.CTransactionDataBase{
Hash: txID.String(),
To: toAddr,
Amount: strconv.FormatUint(amount, 10),
})
if err != nil {
return err
}

cvmTransaction := &db.CvmTransactionsAtomic{
TransactionID: txID.String(),
Block: block.Header().Number.String(),
Idx: uint64(i),
FromAddr: fromAddr,
ChainID: ctx.ChainID(),
Type: typ,
Serialization: txData,
CreatedAt: ctx.Time(),
}
err = ctx.Persist().InsertCvmTransactionsAtomic(ctx.Ctx(), ctx.DB(), cvmTransaction, cfg.PerformUpdates)
if err != nil {
return err
}
}

for ipos, rawtx := range block.Transactions() {
Expand Down Expand Up @@ -279,20 +327,6 @@ func (w *Writer) indexBlockInternal(ctx services.ConsumerCtx, atomicTXs []*evm.T
}
}

for _, txIDString := range txIDs {
cvmTransaction := &db.CvmTransactionsAtomic{
TransactionID: txIDString,
Block: block.Header().Number.String(),
ChainID: ctx.ChainID(),
Type: typ,
CreatedAt: ctx.Time(),
}
err = ctx.Persist().InsertCvmTransactionsAtomic(ctx.Ctx(), ctx.DB(), cvmTransaction, cfg.PerformUpdates)
if err != nil {
return err
}
}

blockBytes, err := json.Marshal(block.Header())
if err != nil {
return err
Expand All @@ -303,7 +337,7 @@ func (w *Writer) indexBlockInternal(ctx services.ConsumerCtx, atomicTXs []*evm.T
Hash: block.Hash().String(),
ChainID: ctx.ChainID(),
EvmTx: int16(len(block.Transactions())),
AtomicTx: int16(len(txIDs)),
AtomicTx: int16(len(atomicTXs)),
Serialization: blockBytes,
CreatedAt: ctx.Time(),
Proposer: proposer.Proposer,
Expand Down