-
Notifications
You must be signed in to change notification settings - Fork 1
/
nebuladb.go
127 lines (105 loc) · 2.71 KB
/
nebuladb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package ants
import (
"context"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/libp2p/go-libp2p/core/peer"
)
type NebulaProvider interface {
GetLatestPeerIds(ctx context.Context) ([]peer.ID, error)
}
type NebulaDB struct {
ConnString string
CrawlInterval time.Duration
connPool *pgxpool.Pool
}
var _ NebulaProvider = (*NebulaDB)(nil)
func NewNebulaDB(connString string, crawlInterval time.Duration) *NebulaDB {
return &NebulaDB{
ConnString: connString,
CrawlInterval: crawlInterval,
}
}
func (db *NebulaDB) Open(ctx context.Context) error {
if db.connPool != nil {
return nil
}
connPool, err := pgxpool.New(ctx, db.ConnString)
if err != nil {
logger.Warn("unable to open connection to Nebula DB: ", err)
return err
}
logger.Debug("opened connection to Nebula DB")
db.connPool = connPool
return nil
}
func (db *NebulaDB) Close() {
db.connPool.Close()
db.connPool = nil
}
func (db *NebulaDB) GetLatestPeerIds(ctx context.Context) ([]peer.ID, error) {
// Open a connection if it's not already open
if db.connPool == nil {
err := db.Open(ctx)
if err != nil {
return nil, err
}
defer db.Close()
}
logger.Debug("getting last crawl from Nebula DB")
crawlIdQuery := `
SELECT c.id
FROM crawls c
WHERE c.started_at > $1
ORDER BY c.started_at ASC
LIMIT 1
`
crawlIntervalAgo := time.Now().Add(-db.CrawlInterval)
var crawlId uint64
err := db.connPool.QueryRow(ctx, crawlIdQuery, crawlIntervalAgo).Scan(&crawlId)
if err != nil {
logger.Warn("unable to get last crawl from Nebula DB: ", err)
return nil, err
}
peersQuery := `
SELECT p.multi_hash
FROM visits v
JOIN peers p ON p.id = v.peer_id
WHERE v.visit_started_at >= $1
AND v.crawl_id = $2
AND v.connect_error IS NULL
`
beforeLastCrawlStarted := crawlIntervalAgo.Add(-db.CrawlInterval)
rows, err := db.connPool.Query(ctx, peersQuery, beforeLastCrawlStarted, crawlId)
if err != nil {
logger.Warn("unable to get peers from Nebula DB: ", err)
return nil, err
}
var peerIds []peer.ID
for rows.Next() {
var multiHash string
err := rows.Scan(&multiHash)
if err != nil {
continue
}
peerId, err := peer.Decode(multiHash)
if err != nil {
continue
}
peerIds = append(peerIds, peerId)
}
logger.Debugf("found %d peers during the last Nebula crawl", len(peerIds))
return peerIds, nil
}
type NebulaServiceProvider struct {
address string
}
var _ NebulaProvider = (*NebulaServiceProvider)(nil)
func NewNebulaServiceProvider() *NebulaServiceProvider {
// TODO: init gRPC service client
return &NebulaServiceProvider{}
}
func (n NebulaServiceProvider) GetLatestPeerIds(ctx context.Context) ([]peer.ID, error) {
// TODO: request peers from Nebula service
return nil, nil
}