Skip to content

Commit

Permalink
chore(plugin): publish timestamp
Browse files Browse the repository at this point in the history
Also refactor SQS publishing to allow messages to carry the associated
block height and time.

Closes: #242
  • Loading branch information
Thomasvdam committed Apr 11, 2024
1 parent b6974d3 commit 24feec8
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 28 deletions.
4 changes: 2 additions & 2 deletions plugins/indexing/auth/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ func (s wrappedAccount) MarshalJSON() ([]byte, error) {
return s.cdc.MarshalInterfaceJSON(s.Account)
}

func ExtractUpdate(cdc codec.Codec, logger *log.Logger, change *storetypes.StoreKVPair) (*types.Message, error) {
func ExtractUpdate(ctx *types.BlockContext, cdc codec.Codec, logger *log.Logger, change *storetypes.StoreKVPair) (*types.Message, error) {
if _, found := bytes.CutPrefix(change.Key, authtypes.AddressStoreKeyPrefix); found {
acc, err := codec.CollInterfaceValue[sdk.AccountI](cdc).Decode(change.Value)
if err != nil {
return nil, err
}

return types.NewMessage("account", &wrappedAccount{cdc: cdc, Account: acc}), nil
return types.NewMessage("account", &wrappedAccount{cdc: cdc, Account: acc}, ctx), nil
}

logger.Trace("skipping change", "change", change)
Expand Down
6 changes: 3 additions & 3 deletions plugins/indexing/bank/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (

const StoreKey = banktypes.StoreKey

func ExtractUpdate(_ codec.Codec, logger *log.Logger, change *storetypes.StoreKVPair) (*types.Message, error) {
func ExtractUpdate(ctx *types.BlockContext, _ codec.Codec, logger *log.Logger, change *storetypes.StoreKVPair) (*types.Message, error) {
if keyBytes, found := bytes.CutPrefix(change.Key, banktypes.SupplyKey); found {
_, key, err := collections.StringKey.Decode(keyBytes)
if err != nil {
Expand All @@ -37,7 +37,7 @@ func ExtractUpdate(_ codec.Codec, logger *log.Logger, change *storetypes.StoreKV
Amount: amount.String(),
}

return types.NewMessage("supply", data), nil
return types.NewMessage("supply", data, ctx), nil
} else if keyBytes, found := bytes.CutPrefix(change.Key, banktypes.BalancesPrefix); found {
_, key, err := collections.PairKeyCodec(sdk.AccAddressKey, collections.StringKey).Decode(keyBytes)
if err != nil {
Expand All @@ -60,7 +60,7 @@ func ExtractUpdate(_ codec.Codec, logger *log.Logger, change *storetypes.StoreKV
Denom: key.K2(),
}

return types.NewMessage("account-balance", data), nil
return types.NewMessage("account-balance", data, ctx), nil
}

logger.Trace("skipping change", "change", change)
Expand Down
4 changes: 2 additions & 2 deletions plugins/indexing/base/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
types "github.com/sedaprotocol/seda-chain/plugins/indexing/types"
)

func ExtractBlockUpdate(req abci.RequestFinalizeBlock) (*types.Message, error) {
func ExtractBlockUpdate(ctx *types.BlockContext, req abci.RequestFinalizeBlock) (*types.Message, error) {
hash := strings.ToUpper(hex.EncodeToString(req.Hash))
txCount := len(req.Txs)
proposerAddress, err := sdk.ConsAddressFromHex(hex.EncodeToString(req.ProposerAddress))
Expand All @@ -31,5 +31,5 @@ func ExtractBlockUpdate(req abci.RequestFinalizeBlock) (*types.Message, error) {
ProposerAddress: proposerAddress.String(),
}

return types.NewMessage("block", data), nil
return types.NewMessage("block", data, ctx), nil
}
4 changes: 2 additions & 2 deletions plugins/indexing/base/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (s wrappedTx) MarshalJSON() ([]byte, error) {
return s.cdc.MarshalJSON(s.Tx)
}

func ExtractTransactionUpdates(cdc codec.Codec, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) ([]*types.Message, error) {
func ExtractTransactionUpdates(ctx *types.BlockContext, cdc codec.Codec, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) ([]*types.Message, error) {
messages := make([]*types.Message, 0, len(req.Txs))

timestamp := req.Time
Expand All @@ -49,7 +49,7 @@ func ExtractTransactionUpdates(cdc codec.Codec, req abci.RequestFinalizeBlock, r
Result: txResult,
}

messages = append(messages, types.NewMessage("tx", data))
messages = append(messages, types.NewMessage("tx", data, ctx))
}

return messages, nil
Expand Down
25 changes: 13 additions & 12 deletions plugins/indexing/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ var _ storetypes.ABCIListener = &IndexerPlugin{}
// IndexerPlugin is the implementation of the baseapp.ABCIListener interface
// For Go plugins this is all that is required to process data sent over gRPC.
type IndexerPlugin struct {
blockHeight int64
cdc codec.Codec
sqsClient *pluginsqs.SqsClient
logger *log.Logger
block *types.BlockContext
cdc codec.Codec
sqsClient *pluginsqs.SqsClient
logger *log.Logger
}

func (p *IndexerPlugin) publishToQueue(messages []*types.Message) error {
publishError := p.sqsClient.PublishToQueue(p.blockHeight, messages)
publishError := p.sqsClient.PublishToQueue(messages)
if publishError != nil {
p.logger.Error("Failed to publish messages to queue.", "error", publishError)
return publishError
Expand All @@ -51,18 +51,19 @@ func (p *IndexerPlugin) publishToQueue(messages []*types.Message) error {

func (p *IndexerPlugin) ListenFinalizeBlock(_ context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error {
p.logger.Debug(fmt.Sprintf("[%d] Start processing finalize block.", req.Height))
p.blockHeight = req.Height
p.block = types.NewBlockContext(req.Height, req.Time)

// TODO(#229) Change to +2 to account for the votes message
messages := make([]*types.Message, 0, len(req.Txs)+1)

blockMessage, err := base.ExtractBlockUpdate(req)
blockMessage, err := base.ExtractBlockUpdate(p.block, req)
if err != nil {
p.logger.Error("Failed to extract block update", "error", err)
return err
}
messages = append(messages, blockMessage)

txMessages, err := base.ExtractTransactionUpdates(p.cdc, req, res)
txMessages, err := base.ExtractTransactionUpdates(p.block, p.cdc, req, res)
if err != nil {
p.logger.Error("Failed to extract Tx updates", "error", err)
return err
Expand All @@ -81,16 +82,16 @@ func (p *IndexerPlugin) ListenFinalizeBlock(_ context.Context, req abci.RequestF
func (p *IndexerPlugin) extractUpdate(change *storetypes.StoreKVPair) (*types.Message, error) {
switch change.StoreKey {
case bankmodule.StoreKey:
return bankmodule.ExtractUpdate(p.cdc, p.logger, change)
return bankmodule.ExtractUpdate(p.block, p.cdc, p.logger, change)
case authmodule.StoreKey:
return authmodule.ExtractUpdate(p.cdc, p.logger, change)
return authmodule.ExtractUpdate(p.block, p.cdc, p.logger, change)
default:
return nil, nil
}
}

func (p *IndexerPlugin) ListenCommit(_ context.Context, _ abci.ResponseCommit, changeSet []*storetypes.StoreKVPair) error {
p.logger.Debug(fmt.Sprintf("[%d] Start processing commit", p.blockHeight))
p.logger.Debug(fmt.Sprintf("[%d] Start processing commit", p.block.Height))
var messages []*types.Message

for _, change := range changeSet {
Expand All @@ -110,7 +111,7 @@ func (p *IndexerPlugin) ListenCommit(_ context.Context, _ abci.ResponseCommit, c
return err
}

p.logger.Debug(fmt.Sprintf("[%d] Processed commit", p.blockHeight))
p.logger.Debug(fmt.Sprintf("[%d] Processed commit", p.block.Height))
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions plugins/indexing/sqs/sqs_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/sqs"
Expand All @@ -31,7 +32,7 @@ func (sc *SqsClient) sendMessageBatch(batch []*sqs.SendMessageBatchRequestEntry)
return result.Failed, nil
}

func (sc *SqsClient) PublishToQueue(height int64, data []*types.Message) error {
func (sc *SqsClient) PublishToQueue(data []*types.Message) error {
// Remember max message size is 262,144 bytes
entries := make([]*sqs.SendMessageBatchRequestEntry, 0, 10)

Expand All @@ -48,7 +49,11 @@ func (sc *SqsClient) PublishToQueue(height int64, data []*types.Message) error {
MessageAttributes: map[string]*sqs.MessageAttributeValue{
"height": {
DataType: aws.String("Number"),
StringValue: aws.String(strconv.FormatInt(height, 10)),
StringValue: aws.String(strconv.FormatInt(message.Block.Height, 10)),
},
"time": {
DataType: aws.String("String"),
StringValue: aws.String(message.Block.Time.Format(time.RFC3339)),
},
},
MessageBody: aws.String(string(serialisedMessage)),
Expand Down
15 changes: 15 additions & 0 deletions plugins/indexing/types/block_context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package types

import "time"

type BlockContext struct {
Height int64
Time time.Time
}

func NewBlockContext(height int64, timestamp time.Time) *BlockContext {
return &BlockContext{
Height: height,
Time: timestamp,
}
}
12 changes: 7 additions & 5 deletions plugins/indexing/types/message.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package types

type Message struct {
Type string `json:"type"`
Data interface{} `json:"data"`
Block *BlockContext `json:"-"`
Type string `json:"type"`
Data interface{} `json:"data"`
}

func NewMessage(messageType string, data interface{}) *Message {
func NewMessage(messageType string, data interface{}, block *BlockContext) *Message {
return &Message{
Type: messageType,
Data: data,
Block: block,
Type: messageType,
Data: data,
}
}

0 comments on commit 24feec8

Please sign in to comment.