Skip to content

Commit

Permalink
fix(sensor): accurately timestamp data on arrival (#418)
Browse files Browse the repository at this point in the history
* accurately timestamp data on arrival

* lint

* fix: timestamp (#419)

---------

Co-authored-by: Minh Vu <[email protected]>
  • Loading branch information
rebelArtists and minhd-vu authored Oct 31, 2024
1 parent 649ac10 commit efea2e2
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 61 deletions.
3 changes: 1 addition & 2 deletions cmd/p2p/sensor/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,7 @@ var SensorCmd = &cobra.Command{
if err := removePeerMessages(msgCounter, server.Peers()); err != nil {
log.Error().Err(err).Msg("Failed to clean up peer messages")
}

db.WritePeers(context.Background(), server.Peers())
db.WritePeers(context.Background(), server.Peers(), time.Now())
case peer := <-opts.Peers:
// Lock the peers map before modifying it.
peersMutex.Lock()
Expand Down
13 changes: 7 additions & 6 deletions p2p/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package database
import (
"context"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
Expand All @@ -17,27 +18,27 @@ import (
type Database interface {
// WriteBlock will write the both the block and block event to the database
// if ShouldWriteBlocks and ShouldWriteBlockEvents return true, respectively.
WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int)
WriteBlock(context.Context, *enode.Node, *types.Block, *big.Int, time.Time)

// WriteBlockHeaders will write the block headers if ShouldWriteBlocks
// returns true.
WriteBlockHeaders(context.Context, []*types.Header)
WriteBlockHeaders(context.Context, []*types.Header, time.Time)

// WriteBlockHashes will write the block hashes if ShouldWriteBlockEvents
// returns true.
WriteBlockHashes(context.Context, *enode.Node, []common.Hash)
WriteBlockHashes(context.Context, *enode.Node, []common.Hash, time.Time)

// WriteBlockBodies will write the block bodies if ShouldWriteBlocks returns
// true.
WriteBlockBody(context.Context, *eth.BlockBody, common.Hash)
WriteBlockBody(context.Context, *eth.BlockBody, common.Hash, time.Time)

// WriteTransactions will write the both the transaction and transaction
// event to the database if ShouldWriteTransactions and
// ShouldWriteTransactionEvents return true, respectively.
WriteTransactions(context.Context, *enode.Node, []*types.Transaction)
WriteTransactions(context.Context, *enode.Node, []*types.Transaction, time.Time)

// WritePeers will write the connected peers to the database.
WritePeers(context.Context, []*p2p.Peer)
WritePeers(context.Context, []*p2p.Peer, time.Time)

// HasBlock will return whether the block is in the database. If the database
// client has not been initialized this will always return true.
Expand Down
83 changes: 38 additions & 45 deletions p2p/database/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,23 +150,23 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
}

// WriteBlock writes the block and the block event to datastore.
func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int) {
func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *types.Block, td *big.Int, tfs time.Time) {
if d.client == nil {
return
}

if d.ShouldWriteBlockEvents() {
d.jobs <- struct{}{}
go func() {
d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind)
d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs)
<-d.jobs
}()
}

if d.ShouldWriteBlocks() {
d.jobs <- struct{}{}
go func() {
d.writeBlock(ctx, block, td)
d.writeBlock(ctx, block, td, tfs)
<-d.jobs
}()
}
Expand All @@ -176,15 +176,15 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ
// write block events because headers will only be sent to the sensor when
// requested. The block events will be written when the hash is received
// instead.
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header) {
func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Header, tfs time.Time) {
if d.client == nil || !d.ShouldWriteBlocks() {
return
}

for _, h := range headers {
d.jobs <- struct{}{}
go func(header *types.Header) {
d.writeBlockHeader(ctx, header)
d.writeBlockHeader(ctx, header, tfs)
<-d.jobs
}(h)
}
Expand All @@ -195,41 +195,41 @@ func (d *Datastore) WriteBlockHeaders(ctx context.Context, headers []*types.Head
// requested. The block events will be written when the hash is received
// instead. It will write the uncles and transactions to datastore if they
// don't already exist.
func (d *Datastore) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash) {
func (d *Datastore) WriteBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) {
if d.client == nil || !d.ShouldWriteBlocks() {
return
}

d.jobs <- struct{}{}
go func() {
d.writeBlockBody(ctx, body, hash)
d.writeBlockBody(ctx, body, hash, tfs)
<-d.jobs
}()
}

// WriteBlockHashes will write the block events to datastore.
func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []common.Hash) {
func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []common.Hash, tfs time.Time) {
if d.client == nil || !d.ShouldWriteBlockEvents() || len(hashes) == 0 {
return
}

d.jobs <- struct{}{}
go func() {
d.writeEvents(ctx, peer, BlockEventsKind, hashes, BlocksKind)
d.writeEvents(ctx, peer, BlockEventsKind, hashes, BlocksKind, tfs)
<-d.jobs
}()
}

// WriteTransactions will write the transactions and transaction events to datastore.
func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction) {
func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs []*types.Transaction, tfs time.Time) {
if d.client == nil {
return
}

if d.ShouldWriteTransactions() {
d.jobs <- struct{}{}
go func() {
d.writeTransactions(ctx, txs)
d.writeTransactions(ctx, txs, tfs)
<-d.jobs
}()
}
Expand All @@ -242,14 +242,14 @@ func (d *Datastore) WriteTransactions(ctx context.Context, peer *enode.Node, txs

d.jobs <- struct{}{}
go func() {
d.writeEvents(ctx, peer, TransactionEventsKind, hashes, TransactionsKind)
d.writeEvents(ctx, peer, TransactionEventsKind, hashes, TransactionsKind, tfs)
<-d.jobs
}()
}
}

// WritePeers writes the connected peers to datastore.
func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer) {
func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer, tls time.Time) {
if d.client == nil || !d.ShouldWritePeers() {
return
}
Expand All @@ -259,7 +259,6 @@ func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer) {

keys := make([]*datastore.Key, 0, len(peers))
dsPeers := make([]*DatastorePeer, 0, len(peers))
now := time.Now()

for _, peer := range peers {
keys = append(keys, datastore.NameKey(PeersKind, peer.ID().String(), nil))
Expand All @@ -268,8 +267,8 @@ func (d *Datastore) WritePeers(ctx context.Context, peers []*p2p.Peer) {
Caps: peer.Info().Caps,
URL: peer.Node().URLv4(),
LastSeenBy: d.sensorID,
TimeLastSeen: now,
TTL: now.Add(d.ttl),
TimeLastSeen: tls,
TTL: tls.Add(d.ttl),
})
}

Expand Down Expand Up @@ -320,9 +319,7 @@ func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool {

// newDatastoreHeader creates a DatastoreHeader from a types.Header. Some
// values are converted into strings to prevent a loss of precision.
func (d *Datastore) newDatastoreHeader(header *types.Header) *DatastoreHeader {
now := time.Now()

func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time) *DatastoreHeader {
return &DatastoreHeader{
ParentHash: datastore.NameKey(BlocksKind, header.ParentHash.Hex(), nil),
UncleHash: header.UncleHash.Hex(),
Expand All @@ -340,14 +337,14 @@ func (d *Datastore) newDatastoreHeader(header *types.Header) *DatastoreHeader {
MixDigest: header.MixDigest.String(),
Nonce: fmt.Sprint(header.Nonce.Uint64()),
BaseFee: header.BaseFee.String(),
TimeFirstSeen: now,
TTL: now.Add(d.ttl),
TimeFirstSeen: tfs,
TTL: tfs.Add(d.ttl),
}
}

// newDatastoreTransaction creates a DatastoreTransaction from a types.Transaction. Some
// values are converted into strings to prevent a loss of precision.
func (d *Datastore) newDatastoreTransaction(tx *types.Transaction) *DatastoreTransaction {
func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, tfs time.Time) *DatastoreTransaction {
v, r, s := tx.RawSignatureValues()
var from, to string

Expand All @@ -360,8 +357,6 @@ func (d *Datastore) newDatastoreTransaction(tx *types.Transaction) *DatastoreTra
to = tx.To().Hex()
}

now := time.Now()

return &DatastoreTransaction{
Data: tx.Data(),
From: from,
Expand All @@ -376,13 +371,13 @@ func (d *Datastore) newDatastoreTransaction(tx *types.Transaction) *DatastoreTra
R: r.String(),
S: s.String(),
Time: tx.Time(),
TimeFirstSeen: now,
TTL: now.Add(d.ttl),
TimeFirstSeen: tfs,
TTL: tfs.Add(d.ttl),
Type: int16(tx.Type()),
}
}

func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.Int) {
func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.Int, tfs time.Time) {
key := datastore.NameKey(BlocksKind, block.Hash().Hex(), nil)

_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
Expand All @@ -395,7 +390,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.

if dsBlock.DatastoreHeader == nil {
shouldWrite = true
dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header())
dsBlock.DatastoreHeader = d.newDatastoreHeader(block.Header(), tfs)
}

if len(dsBlock.TotalDifficulty) == 0 {
Expand All @@ -406,7 +401,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
if dsBlock.Transactions == nil && len(block.Transactions()) > 0 {
shouldWrite = true
if d.shouldWriteTransactions {
d.writeTransactions(ctx, block.Transactions())
d.writeTransactions(ctx, block.Transactions(), tfs)
}

dsBlock.Transactions = make([]*datastore.Key, 0, len(block.Transactions()))
Expand All @@ -419,7 +414,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
shouldWrite = true
dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles()))
for _, uncle := range block.Uncles() {
d.writeBlockHeader(ctx, uncle)
d.writeBlockHeader(ctx, uncle, tfs)
dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
}
}
Expand All @@ -439,16 +434,15 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.

// writeEvent writes either a block or transaction event to datastore depending
// on the provided eventKind and hashKind.
func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.Hash, hashKind string) {
func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.Hash, hashKind string, tfs time.Time) {
key := datastore.IncompleteKey(eventKind, nil)
now := time.Now()

event := DatastoreEvent{
SensorId: d.sensorID,
PeerId: peer.URLv4(),
Hash: datastore.NameKey(hashKind, hash.Hex(), nil),
Time: now,
TTL: now.Add(d.ttl),
Time: tfs,
TTL: tfs.Add(d.ttl),
}
if _, err := d.client.Put(context.Background(), key, &event); err != nil {
log.Error().Err(err).Msgf("Failed to write to %v", eventKind)
Expand All @@ -458,10 +452,9 @@ func (d *Datastore) writeEvent(peer *enode.Node, eventKind string, hash common.H
// writeEvents writes either block or transaction events to datastore depending
// on the provided eventKind and hashKind. This is similar to writeEvent but
// batches the request.
func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind string, hashes []common.Hash, hashKind string) {
func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind string, hashes []common.Hash, hashKind string, tfs time.Time) {
keys := make([]*datastore.Key, 0, len(hashes))
events := make([]*DatastoreEvent, 0, len(hashes))
now := time.Now()

for _, hash := range hashes {
keys = append(keys, datastore.IncompleteKey(eventKind, nil))
Expand All @@ -470,8 +463,8 @@ func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind
SensorId: d.sensorID,
PeerId: peer.URLv4(),
Hash: datastore.NameKey(hashKind, hash.Hex(), nil),
Time: now,
TTL: now.Add(d.ttl),
Time: tfs,
TTL: tfs.Add(d.ttl),
}
events = append(events, &event)
}
Expand All @@ -483,7 +476,7 @@ func (d *Datastore) writeEvents(ctx context.Context, peer *enode.Node, eventKind

// writeBlockHeader will write the block header to datastore if it doesn't
// exist.
func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header) {
func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, tfs time.Time) {
key := datastore.NameKey(BlocksKind, header.Hash().Hex(), nil)

_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
Expand All @@ -492,7 +485,7 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header)
return nil
}

block.DatastoreHeader = d.newDatastoreHeader(header)
block.DatastoreHeader = d.newDatastoreHeader(header, tfs)
_, err := tx.Put(key, &block)
return err
}, datastore.MaxAttempts(MaxAttempts))
Expand All @@ -502,7 +495,7 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header)
}
}

func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash) {
func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, hash common.Hash, tfs time.Time) {
key := datastore.NameKey(BlocksKind, hash.Hex(), nil)

_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
Expand All @@ -516,7 +509,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
if block.Transactions == nil && len(body.Transactions) > 0 {
shouldWrite = true
if d.shouldWriteTransactions {
d.writeTransactions(ctx, body.Transactions)
d.writeTransactions(ctx, body.Transactions, tfs)
}

block.Transactions = make([]*datastore.Key, 0, len(body.Transactions))
Expand All @@ -529,7 +522,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
shouldWrite = true
block.Uncles = make([]*datastore.Key, 0, len(body.Uncles))
for _, uncle := range body.Uncles {
d.writeBlockHeader(ctx, uncle)
d.writeBlockHeader(ctx, uncle, tfs)
block.Uncles = append(block.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
}
}
Expand All @@ -549,13 +542,13 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has

// writeTransactions will write the transactions to datastore and return the
// transaction hashes.
func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction) {
func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transaction, tfs time.Time) {
keys := make([]*datastore.Key, 0, len(txs))
transactions := make([]*DatastoreTransaction, 0, len(txs))

for _, tx := range txs {
keys = append(keys, datastore.NameKey(TransactionsKind, tx.Hash().Hex(), nil))
transactions = append(transactions, d.newDatastoreTransaction(tx))
transactions = append(transactions, d.newDatastoreTransaction(tx, tfs))
}

if _, err := d.client.PutMulti(ctx, keys, transactions); err != nil {
Expand Down
Loading

0 comments on commit efea2e2

Please sign in to comment.