Skip to content

Example Go code for subscribe for block streaming and parsing txs

alexjlan edited this page May 4, 2020 · 2 revisions
package main
import (
	"context"
	"errors"
	"log"
	"github.com/Oneledger/explorer/common"
	"github.com/Oneledger/protocol/action"
	"github.com/Oneledger/protocol/action/transfer"
	"github.com/Oneledger/protocol/serialize"
	tmrpc "github.com/tendermint/tendermint/rpc/client"
	ctypes "github.com/tendermint/tendermint/rpc/core/types"
	tmtypes "github.com/tendermint/tendermint/types"
)
type Fee struct {
	Amount string `json:"amount"`
	Gas    int64  `json:"gas"`
}
type Recipient struct {
	Account string `json:"account"`
	Amount  string `json:"amount"`
}
type Transaction struct {
	Type        string          `json:"type"`
	BlockHeight int64           `json:"blockHeight"`
	Hash        common.HexBytes `json:"hash"`
	ChainID     string          `json:"chainID"`
	From       string      `json:"from"`
	Recipients []Recipient `json:"recipients,omitempty"`
	Fee        Fee         `json:"fee"`
	Memo       string      `json:"memo"`
}
func main() {
	quit := make(chan string)
	// Oneledger fullnode rpc port
	tmAddress := "tcp://127.0.0.1:26600"
	tmClient, err := tmrpc.NewHTTP(tmAddress, "/websocket")
	if err != nil {
		// error handler
	}
	err = tmClient.OnStart()
	if err != nil {
		// error handler
	}
	log.Println("Successfully connected tendermint clients!")
	ctx := context.Background()
	blocksQuery := "tm.event = 'NewBlock'"
	blockEvents, err := tmClient.Subscribe(ctx, "tx-explorer-watcher", blocksQuery)
	if err != nil {
		// error handler
	}
	// Listen for new blocks
	go func(blockEvents <-chan ctypes.ResultEvent) {
		for event := range blockEvents {
			b, ok := event.Data.(tmtypes.EventDataNewBlock)
			if !ok {
				log.Println("Incorrect block event type in blockStream goroutine: ", b)
				continue
			}
			log.Println("Incoming block: ", b.Block.Height)
			for _, txn := range b.Block.Data.Txs {
				signedTx, err := ConvertFromBytes(txn)
				if err != nil {
					// error handler
					continue
				}
				tx, err := convertSendTx(signedTx)
				if err != nil {
					// error handler
					continue
				}
				tx.BlockHeight = b.Block.Height
				tx.Hash = txn.Hash()
				tx.ChainID = b.Block.ChainID
				log.Println("tx: ", tx)
			}
		}
		quit <- "Failed to get tendermint event stream"
	}(blockEvents)
	select {
	case reason := <-quit:
		panic(reason)
	}
}
func ConvertFromBytes(tx []byte) (action.SignedTx, error) {
	var base action.SignedTx
	szr := serialize.GetSerializer(serialize.NETWORK)
	err := szr.Deserialize(tx, &base)
	if err != nil {
		return action.SignedTx{}, err
	}
	return base, nil
}
func convertSendTx(base action.SignedTx) (Transaction, error) {
	var send transfer.Send
	err := serialize.GetSerializer(serialize.NETWORK).Deserialize(base.Data, &send)
	if err != nil {
		return Transaction{}, errors.New("Failed to deserialize tx data")
	}
	fee := base.Fee
	memo := base.Memo
	from := send.From
	to := send.To
	amount := send.Amount
	return Transaction{
		Type: send.Type().String(),
		Fee: Fee{
			Amount: fee.Price.String(),
			Gas:    fee.Gas,
		},
		From:       from.Humanize(),
		Recipients: []Recipient{{Account: to.Humanize(), Amount: amount.String()}},
		Memo:       memo,
	}, nil
}