diff --git a/plugins/indexing/auth/module.go b/plugins/indexing/auth/module.go index 38d8fb9d..e04ba00c 100644 --- a/plugins/indexing/auth/module.go +++ b/plugins/indexing/auth/module.go @@ -23,14 +23,14 @@ func (s wrappedAccount) MarshalJSON() ([]byte, error) { return s.cdc.MarshalInterfaceJSON(s.Account) } -func ExtractUpdate(cdc codec.Codec, logger *log.Logger, change *storetypes.StoreKVPair) (*types.Message, error) { +func ExtractUpdate(ctx *types.BlockContext, cdc codec.Codec, logger *log.Logger, change *storetypes.StoreKVPair) (*types.Message, error) { if _, found := bytes.CutPrefix(change.Key, authtypes.AddressStoreKeyPrefix); found { acc, err := codec.CollInterfaceValue[sdk.AccountI](cdc).Decode(change.Value) if err != nil { return nil, err } - return types.NewMessage("account", &wrappedAccount{cdc: cdc, Account: acc}), nil + return types.NewMessage("account", &wrappedAccount{cdc: cdc, Account: acc}, ctx), nil } logger.Trace("skipping change", "change", change) diff --git a/plugins/indexing/bank/module.go b/plugins/indexing/bank/module.go index aea2ce34..cbb74b64 100644 --- a/plugins/indexing/bank/module.go +++ b/plugins/indexing/bank/module.go @@ -16,7 +16,7 @@ import ( const StoreKey = banktypes.StoreKey -func ExtractUpdate(_ codec.Codec, logger *log.Logger, change *storetypes.StoreKVPair) (*types.Message, error) { +func ExtractUpdate(ctx *types.BlockContext, _ codec.Codec, logger *log.Logger, change *storetypes.StoreKVPair) (*types.Message, error) { if keyBytes, found := bytes.CutPrefix(change.Key, banktypes.SupplyKey); found { _, key, err := collections.StringKey.Decode(keyBytes) if err != nil { @@ -37,7 +37,7 @@ func ExtractUpdate(_ codec.Codec, logger *log.Logger, change *storetypes.StoreKV Amount: amount.String(), } - return types.NewMessage("supply", data), nil + return types.NewMessage("supply", data, ctx), nil } else if keyBytes, found := bytes.CutPrefix(change.Key, banktypes.BalancesPrefix); found { _, key, err := collections.PairKeyCodec(sdk.AccAddressKey, collections.StringKey).Decode(keyBytes) if err != nil { @@ -60,7 +60,7 @@ func ExtractUpdate(_ codec.Codec, logger *log.Logger, change *storetypes.StoreKV Denom: key.K2(), } - return types.NewMessage("account-balance", data), nil + return types.NewMessage("account-balance", data, ctx), nil } logger.Trace("skipping change", "change", change) diff --git a/plugins/indexing/base/block.go b/plugins/indexing/base/block.go index 483f0669..b04259ff 100644 --- a/plugins/indexing/base/block.go +++ b/plugins/indexing/base/block.go @@ -11,7 +11,7 @@ import ( types "github.com/sedaprotocol/seda-chain/plugins/indexing/types" ) -func ExtractBlockUpdate(req abci.RequestFinalizeBlock) (*types.Message, error) { +func ExtractBlockUpdate(ctx *types.BlockContext, req abci.RequestFinalizeBlock) (*types.Message, error) { hash := strings.ToUpper(hex.EncodeToString(req.Hash)) txCount := len(req.Txs) proposerAddress, err := sdk.ConsAddressFromHex(hex.EncodeToString(req.ProposerAddress)) @@ -31,5 +31,5 @@ func ExtractBlockUpdate(req abci.RequestFinalizeBlock) (*types.Message, error) { ProposerAddress: proposerAddress.String(), } - return types.NewMessage("block", data), nil + return types.NewMessage("block", data, ctx), nil } diff --git a/plugins/indexing/base/transaction.go b/plugins/indexing/base/transaction.go index d79f576f..eadb26da 100644 --- a/plugins/indexing/base/transaction.go +++ b/plugins/indexing/base/transaction.go @@ -22,7 +22,7 @@ func (s wrappedTx) MarshalJSON() ([]byte, error) { return s.cdc.MarshalJSON(s.Tx) } -func ExtractTransactionUpdates(cdc codec.Codec, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) ([]*types.Message, error) { +func ExtractTransactionUpdates(ctx *types.BlockContext, cdc codec.Codec, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) ([]*types.Message, error) { messages := make([]*types.Message, 0, len(req.Txs)) timestamp := req.Time @@ -49,7 +49,7 @@ func ExtractTransactionUpdates(cdc codec.Codec, req abci.RequestFinalizeBlock, r Result: txResult, } - messages = append(messages, types.NewMessage("tx", data)) + messages = append(messages, types.NewMessage("tx", data, ctx)) } return messages, nil diff --git a/plugins/indexing/plugin.go b/plugins/indexing/plugin.go index 5b2994fb..1e8b0ecc 100644 --- a/plugins/indexing/plugin.go +++ b/plugins/indexing/plugin.go @@ -33,14 +33,14 @@ var _ storetypes.ABCIListener = &IndexerPlugin{} // IndexerPlugin is the implementation of the baseapp.ABCIListener interface // For Go plugins this is all that is required to process data sent over gRPC. type IndexerPlugin struct { - blockHeight int64 - cdc codec.Codec - sqsClient *pluginsqs.SqsClient - logger *log.Logger + block *types.BlockContext + cdc codec.Codec + sqsClient *pluginsqs.SqsClient + logger *log.Logger } func (p *IndexerPlugin) publishToQueue(messages []*types.Message) error { - publishError := p.sqsClient.PublishToQueue(p.blockHeight, messages) + publishError := p.sqsClient.PublishToQueue(messages) if publishError != nil { p.logger.Error("Failed to publish messages to queue.", "error", publishError) return publishError @@ -51,18 +51,19 @@ func (p *IndexerPlugin) publishToQueue(messages []*types.Message) error { func (p *IndexerPlugin) ListenFinalizeBlock(_ context.Context, req abci.RequestFinalizeBlock, res abci.ResponseFinalizeBlock) error { p.logger.Debug(fmt.Sprintf("[%d] Start processing finalize block.", req.Height)) - p.blockHeight = req.Height + p.block = types.NewBlockContext(req.Height, req.Time) + // TODO(#229) Change to +2 to account for the votes message messages := make([]*types.Message, 0, len(req.Txs)+1) - blockMessage, err := base.ExtractBlockUpdate(req) + blockMessage, err := base.ExtractBlockUpdate(p.block, req) if err != nil { p.logger.Error("Failed to extract block update", "error", err) return err } messages = append(messages, blockMessage) - txMessages, err := base.ExtractTransactionUpdates(p.cdc, req, res) + txMessages, err := base.ExtractTransactionUpdates(p.block, p.cdc, req, res) if err != nil { p.logger.Error("Failed to extract Tx updates", "error", err) return err @@ -81,16 +82,16 @@ func (p *IndexerPlugin) ListenFinalizeBlock(_ context.Context, req abci.RequestF func (p *IndexerPlugin) extractUpdate(change *storetypes.StoreKVPair) (*types.Message, error) { switch change.StoreKey { case bankmodule.StoreKey: - return bankmodule.ExtractUpdate(p.cdc, p.logger, change) + return bankmodule.ExtractUpdate(p.block, p.cdc, p.logger, change) case authmodule.StoreKey: - return authmodule.ExtractUpdate(p.cdc, p.logger, change) + return authmodule.ExtractUpdate(p.block, p.cdc, p.logger, change) default: return nil, nil } } func (p *IndexerPlugin) ListenCommit(_ context.Context, _ abci.ResponseCommit, changeSet []*storetypes.StoreKVPair) error { - p.logger.Debug(fmt.Sprintf("[%d] Start processing commit", p.blockHeight)) + p.logger.Debug(fmt.Sprintf("[%d] Start processing commit", p.block.Height)) var messages []*types.Message for _, change := range changeSet { @@ -110,7 +111,7 @@ func (p *IndexerPlugin) ListenCommit(_ context.Context, _ abci.ResponseCommit, c return err } - p.logger.Debug(fmt.Sprintf("[%d] Processed commit", p.blockHeight)) + p.logger.Debug(fmt.Sprintf("[%d] Processed commit", p.block.Height)) return nil } diff --git a/plugins/indexing/sqs/sqs_client.go b/plugins/indexing/sqs/sqs_client.go index 87cf1291..94fad7e3 100644 --- a/plugins/indexing/sqs/sqs_client.go +++ b/plugins/indexing/sqs/sqs_client.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "strconv" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/sqs" @@ -31,7 +32,7 @@ func (sc *SqsClient) sendMessageBatch(batch []*sqs.SendMessageBatchRequestEntry) return result.Failed, nil } -func (sc *SqsClient) PublishToQueue(height int64, data []*types.Message) error { +func (sc *SqsClient) PublishToQueue(data []*types.Message) error { // Remember max message size is 262,144 bytes entries := make([]*sqs.SendMessageBatchRequestEntry, 0, 10) @@ -48,7 +49,11 @@ func (sc *SqsClient) PublishToQueue(height int64, data []*types.Message) error { MessageAttributes: map[string]*sqs.MessageAttributeValue{ "height": { DataType: aws.String("Number"), - StringValue: aws.String(strconv.FormatInt(height, 10)), + StringValue: aws.String(strconv.FormatInt(message.Block.Height, 10)), + }, + "time": { + DataType: aws.String("String"), + StringValue: aws.String(message.Block.Time.Format(time.RFC3339)), }, }, MessageBody: aws.String(string(serialisedMessage)), diff --git a/plugins/indexing/types/block_context.go b/plugins/indexing/types/block_context.go new file mode 100644 index 00000000..e2462ab2 --- /dev/null +++ b/plugins/indexing/types/block_context.go @@ -0,0 +1,15 @@ +package types + +import "time" + +type BlockContext struct { + Height int64 + Time time.Time +} + +func NewBlockContext(height int64, timestamp time.Time) *BlockContext { + return &BlockContext{ + Height: height, + Time: timestamp, + } +} diff --git a/plugins/indexing/types/message.go b/plugins/indexing/types/message.go index a476c86b..a194d231 100644 --- a/plugins/indexing/types/message.go +++ b/plugins/indexing/types/message.go @@ -1,13 +1,15 @@ package types type Message struct { - Type string `json:"type"` - Data interface{} `json:"data"` + Block *BlockContext `json:"-"` + Type string `json:"type"` + Data interface{} `json:"data"` } -func NewMessage(messageType string, data interface{}) *Message { +func NewMessage(messageType string, data interface{}, block *BlockContext) *Message { return &Message{ - Type: messageType, - Data: data, + Block: block, + Type: messageType, + Data: data, } }