diff --git a/abi2/abi2.go b/abi2/abi2.go
index 3a1d9c52..bd87e173 100644
--- a/abi2/abi2.go
+++ b/abi2/abi2.go
@@ -553,6 +553,7 @@ type coldef struct {
// Implements the [e2pg.Integration] interface
type Integration struct {
+ name string
Event Event
Table Table
Columns []string
@@ -585,8 +586,9 @@ type Integration struct {
// For example:
//
// {"name": "my_column", "type": "db_type", "filter_op": "contains", "filter_arg": ["0x000"]}
-func New(ev Event, bd []BlockData, table Table) (Integration, error) {
+func New(name string, ev Event, bd []BlockData, table Table) (Integration, error) {
ig := Integration{
+ name: name,
Event: ev,
Table: table,
numIndexed: ev.numIndexed(),
@@ -636,6 +638,8 @@ func col(t Table, name string) (Column, error) {
return Column{}, fmt.Errorf("table %q doesn't contain column %q", t.Name, name)
}
+func (ig Integration) Name() string { return ig.name }
+
func (ig Integration) Events(context.Context) [][]byte { return [][]byte{} }
func (ig Integration) Delete(context.Context, wpg.Conn, uint64) error { return nil }
diff --git a/cmd/e2pg/dashboard.go b/cmd/e2pg/dashboard.go
deleted file mode 100644
index b14bd1fc..00000000
--- a/cmd/e2pg/dashboard.go
+++ /dev/null
@@ -1,175 +0,0 @@
-package main
-
-import (
- "encoding/json"
- "fmt"
- "log/slog"
- "net/http"
- "sync"
- "text/template"
-
- "github.com/indexsupply/x/e2pg"
-)
-
-type dashHandler struct {
- mgr *e2pg.Manager
- clientsMutex sync.Mutex
- clients map[string]chan e2pg.StatusSnapshot
-}
-
-func newDashHandler(mgr *e2pg.Manager, snaps <-chan e2pg.StatusSnapshot) *dashHandler {
- dh := &dashHandler{
- mgr: mgr,
- clients: make(map[string]chan e2pg.StatusSnapshot),
- }
- go func() {
- for {
- snap := <-snaps
- for _, c := range dh.clients {
- c <- snap
- }
- }
- }()
- return dh
-}
-
-func (dh *dashHandler) Updates(w http.ResponseWriter, r *http.Request) {
- w.Header().Set("Content-Type", "text/event-stream")
- w.Header().Set("Cache-Control", "no-cache")
- w.Header().Set("Connection", "keep-alive")
-
- slog.InfoContext(r.Context(), "start sse", "c", r.RemoteAddr, "n", len(dh.clients))
- c := make(chan e2pg.StatusSnapshot)
- dh.clientsMutex.Lock()
- dh.clients[r.RemoteAddr] = c
- dh.clientsMutex.Unlock()
- defer func() {
- dh.clientsMutex.Lock()
- delete(dh.clients, r.RemoteAddr)
- dh.clientsMutex.Unlock()
- close(c)
- slog.InfoContext(r.Context(), "stop sse", "c", r.RemoteAddr, "n", len(dh.clients))
- }()
-
- for {
- var snap e2pg.StatusSnapshot
- select {
- case snap = <-c:
- case <-r.Context().Done(): // disconnect
- return
- }
- sjson, err := json.Marshal(snap)
- if err != nil {
- slog.ErrorContext(r.Context(), "json error", "e", err)
- }
- fmt.Fprintf(w, "data: %s\n\n", sjson)
- flusher, ok := w.(http.Flusher)
- if !ok {
- http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
- return
- }
- flusher.Flush()
- }
-}
-
-func (dh *dashHandler) Index(w http.ResponseWriter, r *http.Request) {
- const dashboardHTML = `
-
-
-
- E2PG
-
-
-
- E2PG
-
-
-
-
- Task |
- Chain |
- Block |
- Hash |
- Blocks |
- Events |
- Latency |
- Error |
-
-
-
- {{ range $id, $status := . -}}
- {{ $name := $status.Name -}}
-
- {{ $name }} |
- {{ $id }} |
- {{ $status.Num }} |
- {{ $status.Hash }} |
- {{ $status.BlockCount }} |
- {{ $status.EventCount }} |
- {{ $status.TotalLatencyP50 }} |
- {{ $status.Error }} |
-
- {{ end -}}
-
-
-
-
-
-
- `
- tmpl, err := template.New("dashboard").Parse(dashboardHTML)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
- snaps := make(map[uint64]e2pg.StatusSnapshot)
- for _, task := range dh.mgr.Tasks() {
- s := task.Status()
- snaps[s.ChainID] = s
- }
- err = tmpl.Execute(w, snaps)
- if err != nil {
- http.Error(w, err.Error(), http.StatusInternalServerError)
- return
- }
-}
diff --git a/cmd/e2pg/main.go b/cmd/e2pg/main.go
index f380d9a5..16a44a63 100644
--- a/cmd/e2pg/main.go
+++ b/cmd/e2pg/main.go
@@ -16,6 +16,7 @@ import (
"time"
"github.com/indexsupply/x/e2pg"
+ "github.com/indexsupply/x/e2pg/web"
"github.com/indexsupply/x/pgmig"
"github.com/indexsupply/x/wctx"
"github.com/indexsupply/x/wos"
@@ -43,7 +44,6 @@ func main() {
version bool
)
flag.StringVar(&cfile, "config", "", "task config file")
-
flag.BoolVar(&skipMigrate, "skip-migrate", false, "do not run db migrations on startup")
flag.StringVar(&listen, "l", ":8546", "dashboard server listen address")
flag.BoolVar(¬x, "notx", false, "disable pg tx")
@@ -77,36 +77,37 @@ func main() {
os.Exit(0)
}
- var conf e2pg.Config
+ var (
+ conf e2pg.Config
+ pgurl string
+ )
switch {
case cfile == "":
- fmt.Printf("missing config file\n")
- os.Exit(1)
+ pgurl = os.Getenv("DATABASE_URL")
case cfile != "":
f, err := os.Open(cfile)
check(err)
check(json.NewDecoder(f).Decode(&conf))
+ pgurl = wos.Getenv(conf.PGURL)
}
if !skipMigrate {
- migdb, err := pgxpool.New(ctx, wos.Getenv(conf.PGURL))
+ migdb, err := pgxpool.New(ctx, pgurl)
check(err)
check(pgmig.Migrate(migdb, e2pg.Migrations))
migdb.Close()
}
- pg, err := pgxpool.New(ctx, wos.Getenv(conf.PGURL))
+ pg, err := pgxpool.New(ctx, pgurl)
check(err)
-
var (
- pbuf bytes.Buffer
- snaps = make(chan e2pg.StatusSnapshot)
- tskmgr = e2pg.NewManager(pg, snaps, conf)
- dh = newDashHandler(tskmgr, snaps)
+ pbuf bytes.Buffer
+ mgr = e2pg.NewManager(pg, conf)
+ wh = web.New(mgr, &conf, pg)
)
mux := http.NewServeMux()
- mux.HandleFunc("/", dh.Index)
- mux.HandleFunc("/updates", dh.Updates)
+ mux.HandleFunc("/", wh.Index)
+ mux.HandleFunc("/task-updates", wh.Updates)
mux.HandleFunc("/debug/pprof/", npprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", npprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", npprof.Profile)
@@ -120,15 +121,16 @@ func main() {
if profile == "cpu" {
check(pprof.StartCPUProfile(&pbuf))
}
- tskmgr.Run()
+
+ go mgr.Run()
+
switch profile {
case "cpu":
pprof.StopCPUProfile()
- select {}
case "heap":
check(pprof.Lookup("heap").WriteTo(&pbuf, 0))
- select {}
}
+ select {}
}
func log(v bool, h http.Handler) http.Handler {
diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go
index 7029f921..22d95bdc 100644
--- a/e2pg/e2pg.go
+++ b/e2pg/e2pg.go
@@ -10,7 +10,6 @@ import (
"log/slog"
"strings"
"sync"
- "sync/atomic"
"time"
"github.com/indexsupply/x/abi2"
@@ -21,11 +20,10 @@ import (
"github.com/indexsupply/x/wctx"
"github.com/indexsupply/x/wpg"
- "github.com/bmizerany/perks/quantile"
"github.com/jackc/pgx/v5"
+ "github.com/jackc/pgx/v5/pgtype"
"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/sync/errgroup"
- "golang.org/x/text/message"
)
//go:embed schema.sql
@@ -39,6 +37,7 @@ type Source interface {
}
type Destination interface {
+ Name() string
Insert(context.Context, wpg.Conn, []eth.Block) (int64, error)
Delete(context.Context, wpg.Conn, uint64) error
Events(context.Context) [][]byte
@@ -100,7 +99,10 @@ func WithConcurrency(workers, batch uint64) Option {
func WithDestinations(dests ...Destination) Option {
return func(t *Task) {
- var filter [][]byte
+ var (
+ filter [][]byte
+ dstat = make(map[string]Dstat)
+ )
for i := range dests {
e := dests[i].Events(t.ctx)
// if one integration has no filter
@@ -110,9 +112,11 @@ func WithDestinations(dests ...Destination) Option {
break
}
filter = append(filter, e...)
+ dstat[dests[i].Name()] = Dstat{}
}
t.dests = dests
t.filter = filter
+ t.dstat = dstat
}
}
@@ -123,18 +127,19 @@ func NewTask(opts ...Option) *Task {
buffs: make([]geth.Buffer, 1),
batchSize: 1,
workers: 1,
- stat: status{
- tlat: quantile.NewTargeted(0.50, 0.90, 0.99),
- glat: quantile.NewTargeted(0.50, 0.90, 0.99),
- pglat: quantile.NewTargeted(0.50, 0.90, 0.99),
- },
}
for _, opt := range opts {
opt(t)
}
+ slog.InfoContext(t.ctx, "starting task", "dest-count", len(t.dests))
return t
}
+type Dstat struct {
+ NRows int64
+ Latency jsonDuration
+}
+
type Task struct {
Name string
ctx context.Context
@@ -146,68 +151,34 @@ type Task struct {
dests []Destination
start, stop uint64
+ dstatMut sync.Mutex
+ dstat map[string]Dstat
+
filter [][]byte
batch []eth.Block
buffs []geth.Buffer
batchSize uint64
workers uint64
- stat status
-}
-
-type status struct {
- ehash, ihash []byte
- enum, inum uint64
- tlat, glat, pglat *quantile.Stream
-
- reset time.Time
- err error
- blocks, events int64
-}
-
-type StatusSnapshot struct {
- ID string `json:"id"`
- Name string `json:"name"`
- ChainID uint64 `json:"chainID"`
- EthHash string `json:"eth_hash"`
- EthNum string `json:"eth_num"`
- Hash string `json:"hash"`
- Num string `json:"num"`
- EventCount string `json:"event_count"`
- BlockCount string `json:"block_count"`
- TotalLatencyP50 string `json:"total_latency_p50"`
- TotalLatencyP95 string `json:"total_latency_p95"`
- TotalLatencyP99 string `json:"total_latency_p99"`
- Error string `json:"error"`
-}
-
-func (task *Task) Status() StatusSnapshot {
- printer := message.NewPrinter(message.MatchLanguage("en"))
- snap := StatusSnapshot{}
- snap.ID = task.id
- snap.Name = task.Name
- snap.ChainID = wctx.ChainID(task.ctx)
- snap.EthHash = fmt.Sprintf("%.4x", task.stat.ehash)
- snap.EthNum = fmt.Sprintf("%d", task.stat.enum)
- snap.Hash = fmt.Sprintf("%.4x", task.stat.ihash)
- snap.Num = printer.Sprintf("%d", task.stat.inum)
- snap.BlockCount = fmt.Sprintf("%d", atomic.SwapInt64(&task.stat.blocks, 0))
- snap.EventCount = fmt.Sprintf("%d", atomic.SwapInt64(&task.stat.events, 0))
- snap.TotalLatencyP50 = fmt.Sprintf("%s", time.Duration(task.stat.tlat.Query(0.50)).Round(time.Millisecond))
- snap.TotalLatencyP95 = fmt.Sprintf("%.2f", task.stat.tlat.Query(0.95))
- snap.TotalLatencyP99 = fmt.Sprintf("%.2f", task.stat.tlat.Query(0.99))
- snap.Error = fmt.Sprintf("%v", task.stat.err)
- task.stat.tlat.Reset()
- return snap
+}
+
+func (t *Task) dstatw(name string, n int64, d time.Duration) {
+ t.dstatMut.Lock()
+ defer t.dstatMut.Unlock()
+
+ s := t.dstat[name]
+ s.NRows = n
+ s.Latency = jsonDuration(d)
+ t.dstat[name] = s
}
func (task *Task) Insert(n uint64, h []byte) error {
- const q = `insert into e2pg.task (id, number, hash) values ($1, $2, $3)`
+ const q = `insert into e2pg.task (id, num, hash) values ($1, $2, $3)`
_, err := task.pgp.Exec(context.Background(), q, task.id, n, h)
return err
}
func (task *Task) Latest() (uint64, []byte, error) {
- const q = `SELECT number, hash FROM e2pg.task WHERE id = $1 ORDER BY number DESC LIMIT 1`
+ const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1`
var n, h = uint64(0), []byte{}
err := task.pgp.QueryRow(context.Background(), q, task.id).Scan(&n, &h)
if errors.Is(err, pgx.ErrNoRows) {
@@ -243,24 +214,24 @@ func (task *Task) Setup() error {
return task.Insert(gethNum-1, h)
}
-func (task *Task) Run1(snaps chan<- StatusSnapshot, notx bool) {
+func (task *Task) Run1(updates chan<- string, notx bool) {
switch err := task.Converge(notx); {
- case err == nil:
- go func() {
- snap := task.Status()
- slog.InfoContext(task.ctx, "", "n", snap.Num, "h", snap.Hash)
- select {
- case snaps <- snap:
- default:
- }
- }()
case errors.Is(err, ErrDone):
return
case errors.Is(err, ErrNothingNew):
time.Sleep(time.Second)
- default:
+ case err != nil:
time.Sleep(time.Second)
slog.ErrorContext(task.ctx, "error", err)
+ default:
+ go func() {
+ // try out best to deliver update
+ // but don't stack up work
+ select {
+ case updates <- task.id:
+ default:
+ }
+ }()
}
}
@@ -300,7 +271,7 @@ func (task *Task) Converge(notx bool) error {
}
for reorgs := 0; reorgs <= 10; {
localNum, localHash := uint64(0), []byte{}
- const q = `SELECT number, hash FROM e2pg.task WHERE id = $1 ORDER BY number DESC LIMIT 1`
+ const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1`
err := pg.QueryRow(task.ctx, q, task.id).Scan(&localNum, &localHash)
if err != nil && !errors.Is(err, pgx.ErrNoRows) {
return fmt.Errorf("getting latest from task: %w", err)
@@ -330,11 +301,11 @@ func (task *Task) Converge(notx bool) error {
task.batch[i].SetNum(localNum + i + 1)
task.buffs[i].Number = task.batch[i].Num()
}
- switch err := task.writeIndex(localHash, pg, delta); {
+ switch nrows, err := task.loadinsert(localHash, pg, delta); {
case errors.Is(err, ErrReorg):
reorgs++
slog.ErrorContext(task.ctx, "reorg", "n", localNum, "h", fmt.Sprintf("%.4x", localHash))
- const dq = "delete from e2pg.task where id = $1 AND number >= $2"
+ const dq = "delete from e2pg.task where id = $1 AND num >= $2"
_, err := pg.Exec(task.ctx, dq, task.id, localNum)
if err != nil {
return fmt.Errorf("deleting block from task table: %w", err)
@@ -346,14 +317,42 @@ func (task *Task) Converge(notx bool) error {
}
case err != nil:
err = errors.Join(rollback(), err)
- task.stat.err = err
return err
default:
- task.stat.enum = gethNum
- task.stat.ehash = gethHash
- task.stat.blocks += int64(delta)
- task.stat.tlat.Insert(float64(time.Since(start)))
- return commit()
+ var last = task.batch[delta-1]
+ const uq = `
+ insert into e2pg.task (
+ id,
+ num,
+ hash,
+ src_num,
+ src_hash,
+ nblocks,
+ nrows,
+ latency,
+ dstat
+ )
+ values ($1, $2, $3, $4, $5, $6, $7, $8, $9)
+ `
+ _, err := pg.Exec(task.ctx, uq,
+ task.id,
+ last.Num(),
+ last.Hash(),
+ gethNum,
+ gethHash,
+ delta,
+ nrows,
+ time.Since(start),
+ task.dstat,
+ )
+ if err != nil {
+ return fmt.Errorf("updating task table: %w", err)
+ }
+ if err := commit(); err != nil {
+ return fmt.Errorf("commit converge tx: %w", err)
+ }
+ slog.InfoContext(task.ctx, "converge", "n", last.Num())
+ return nil
}
}
return errors.Join(ErrReorg, rollback())
@@ -369,8 +368,9 @@ func (task *Task) Converge(notx bool) error {
//
// The reading of block data and indexing of integrations happens concurrently
// with the number of go routines controlled by c.
-func (task *Task) writeIndex(localHash []byte, pg wpg.Conn, delta uint64) error {
+func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64, error) {
var (
+ nrows int64
eg1 = errgroup.Group{}
wsize = task.batchSize / task.workers
)
@@ -385,13 +385,13 @@ func (task *Task) writeIndex(localHash []byte, pg wpg.Conn, delta uint64) error
eg1.Go(func() error { return task.src.LoadBlocks(task.filter, bfs, blks) })
}
if err := eg1.Wait(); err != nil {
- return err
+ return 0, err
}
if len(task.batch[0].Header.Parent) != 32 {
- return fmt.Errorf("corrupt parent: %x\n", task.batch[0].Header.Parent)
+ return 0, fmt.Errorf("corrupt parent: %x\n", task.batch[0].Header.Parent)
}
if !bytes.Equal(localHash, task.batch[0].Header.Parent) {
- return ErrReorg
+ return 0, ErrReorg
}
var eg2 errgroup.Group
for i := uint64(0); i < task.workers && i*wsize < delta; i++ {
@@ -403,31 +403,98 @@ func (task *Task) writeIndex(localHash []byte, pg wpg.Conn, delta uint64) error
}
eg2.Go(func() error {
var eg3 errgroup.Group
- for _, dest := range task.dests {
- dest := dest
+ for j := range task.dests {
+ j := j
eg3.Go(func() error {
- count, err := dest.Insert(task.ctx, pg, blks)
- atomic.AddInt64(&task.stat.events, count)
+ t0 := time.Now()
+ count, err := task.dests[j].Insert(task.ctx, pg, blks)
+ task.dstatw(task.dests[j].Name(), count, time.Since(t0))
+ nrows += count
return err
})
}
return eg3.Wait()
})
}
- if err := eg2.Wait(); err != nil {
- return fmt.Errorf("writing indexed data: %w", err)
- }
- var last = task.batch[delta-1]
- const uq = "insert into e2pg.task (id, number, hash) values ($1, $2, $3)"
- _, err := pg.Exec(context.Background(), uq, task.id, last.Num(), last.Hash())
- if err != nil {
- return fmt.Errorf("updating task table: %w", err)
- }
- task.stat.inum = last.Num()
- task.stat.ihash = last.Hash()
+ return nrows, eg2.Wait()
+}
+
+type jsonDuration time.Duration
+
+func (d *jsonDuration) ScanInterval(i pgtype.Interval) error {
+ *d = jsonDuration(i.Microseconds * 1000)
return nil
}
+func (d *jsonDuration) UnmarshalJSON(data []byte) error {
+ if len(data) < 2 {
+ return fmt.Errorf("jsonDuration must be at leaset 2 bytes")
+ }
+ data = data[1 : len(data)-1] // remove quotes
+ dur, err := time.ParseDuration(string(data))
+ *d = jsonDuration(dur)
+ return err
+}
+
+func (d jsonDuration) MarshalJSON() ([]byte, error) {
+ return []byte(fmt.Sprintf(`"%s"`, d.String())), nil
+}
+
+func (d jsonDuration) String() string {
+ return time.Duration(d).Round(time.Millisecond).String()
+}
+
+type TaskUpdate struct {
+ ID string `db:"id"`
+ Num uint64 `db:"num"`
+ Hash eth.Bytes `db:"hash"`
+ SrcNum uint64 `db:"src_num"`
+ SrcHash eth.Bytes `db:"src_hash"`
+ NBlocks uint64 `db:"nblocks"`
+ NRows uint64 `db:"nrows"`
+ Latency jsonDuration `db:"latency"`
+ Dstat map[string]Dstat `db:"dstat"`
+}
+
+func TaskUpdate1(ctx context.Context, pg wpg.Conn, id string) (TaskUpdate, error) {
+ const q = `
+ select
+ id,
+ num,
+ hash,
+ coalesce(src_num, 0) src_num,
+ coalesce(src_hash, '\x00') src_hash,
+ coalesce(nblocks, 0) nblocks,
+ coalesce(nrows, 0) nrows,
+ coalesce(latency, '0')::interval latency,
+ coalesce(dstat, '{}') dstat
+ from e2pg.task
+ where id = $1
+ order by num desc
+ limit 1;
+ `
+ row, _ := pg.Query(ctx, q, id)
+ return pgx.CollectOneRow(row, pgx.RowToStructByName[TaskUpdate])
+}
+
+func TaskUpdates(ctx context.Context, pg wpg.Conn) ([]TaskUpdate, error) {
+ rows, _ := pg.Query(ctx, `
+ select distinct on (id)
+ id,
+ num,
+ hash,
+ coalesce(src_num, 0) src_num,
+ coalesce(src_hash, '\x00') src_hash,
+ coalesce(nblocks, 0) nblocks,
+ coalesce(nrows, 0) nrows,
+ coalesce(latency, '0')::interval latency,
+ coalesce(dstat, '{}') dstat
+ from e2pg.task
+ order by id, num desc;
+ `)
+ return pgx.CollectRows(rows, pgx.RowToStructByName[TaskUpdate])
+}
+
var compiled = map[string]Destination{}
// Loads, Starts, and provides method for Restarting tasks
@@ -435,24 +502,23 @@ var compiled = map[string]Destination{}
type Manager struct {
running sync.Mutex
restart chan struct{}
- snaps chan<- StatusSnapshot
tasks []*Task
+ updates chan string
pgp *pgxpool.Pool
conf Config
}
-func NewManager(pgp *pgxpool.Pool, snaps chan<- StatusSnapshot, conf Config) *Manager {
+func NewManager(pgp *pgxpool.Pool, conf Config) *Manager {
return &Manager{
restart: make(chan struct{}),
- snaps: snaps,
+ updates: make(chan string),
pgp: pgp,
conf: conf,
}
}
-// TODO(r): remove once old dashboard is gone
-func (tm *Manager) Tasks() []*Task {
- return tm.tasks
+func (tm *Manager) Updates() string {
+ return <-tm.updates
}
func (tm *Manager) runTask(t *Task) error {
@@ -465,7 +531,7 @@ func (tm *Manager) runTask(t *Task) error {
slog.Info("restart-task", "name", t.Name)
return nil
default:
- t.Run1(tm.snaps, false)
+ t.Run1(tm.updates, false)
}
}
}
@@ -487,16 +553,15 @@ func (tm *Manager) Run() error {
tm.running.Lock()
defer tm.running.Unlock()
tm.restart = make(chan struct{})
- var err error
- tm.tasks, err = loadTasks(context.Background(), tm.pgp, tm.conf)
+ tasks, err := loadTasks(context.Background(), tm.pgp, tm.conf)
if err != nil {
return fmt.Errorf("loading tasks: %w", err)
}
var eg errgroup.Group
- for i := range tm.tasks {
+ for i := range tasks {
i := i
eg.Go(func() error {
- return tm.runTask(tm.tasks[i])
+ return tm.runTask(tasks[i])
})
}
return eg.Wait()
@@ -568,7 +633,7 @@ func getDest(pgp *pgxpool.Pool, ig Integration) (Destination, error) {
}
return cig, nil
default:
- aig, err := abi2.New(ig.Event, ig.Block, ig.Table)
+ aig, err := abi2.New(ig.Name, ig.Event, ig.Block, ig.Table)
if err != nil {
return nil, fmt.Errorf("building abi integration: %w", err)
}
@@ -591,10 +656,10 @@ func getSource(sc SourceConfig) (Source, error) {
}
}
-func Integrations(ctx context.Context, pgp *pgxpool.Pool) ([]Integration, error) {
+func Integrations(ctx context.Context, pg wpg.Conn) ([]Integration, error) {
var res []Integration
const q = `select conf from e2pg.integrations`
- rows, err := pgp.Query(ctx, q)
+ rows, err := pg.Query(ctx, q)
if err != nil {
return nil, fmt.Errorf("querying integrations: %w", err)
}
@@ -669,8 +734,8 @@ func (conf Config) Valid(intg Integration) error {
return nil
}
-func (conf Config) AllIntegrations(ctx context.Context, pgp *pgxpool.Pool) ([]Integration, error) {
- res, err := Integrations(ctx, pgp)
+func (conf Config) AllIntegrations(ctx context.Context, pg wpg.Conn) ([]Integration, error) {
+ res, err := Integrations(ctx, pg)
if err != nil {
return nil, fmt.Errorf("loading db integrations: %w", err)
}
@@ -680,6 +745,20 @@ func (conf Config) AllIntegrations(ctx context.Context, pgp *pgxpool.Pool) ([]In
return res, nil
}
+func (conf Config) IntegrationsBySource(ctx context.Context, pg wpg.Conn) (map[string][]Integration, error) {
+ igs, err := conf.AllIntegrations(ctx, pg)
+ if err != nil {
+ return nil, fmt.Errorf("querying all integrations: %w", err)
+ }
+ res := make(map[string][]Integration)
+ for _, ig := range igs {
+ for _, sc := range ig.SourceConfigs {
+ res[sc.Name] = append(res[sc.Name], ig)
+ }
+ }
+ return res, nil
+}
+
func (conf Config) AllSourceConfigs(ctx context.Context, pgp *pgxpool.Pool) ([]SourceConfig, error) {
res, err := SourceConfigs(ctx, pgp)
if err != nil {
diff --git a/e2pg/e2pg_test.go b/e2pg/e2pg_test.go
index 2a4ad3a1..064264ef 100644
--- a/e2pg/e2pg_test.go
+++ b/e2pg/e2pg_test.go
@@ -78,6 +78,10 @@ func (dest *testDestination) Events(_ context.Context) [][]byte {
return nil
}
+func (dest *testDestination) Name() string {
+ return "test"
+}
+
type testGeth struct {
blocks []eth.Block
}
diff --git a/e2pg/migrations.go b/e2pg/migrations.go
index 6a1e272e..01b9cbde 100644
--- a/e2pg/migrations.go
+++ b/e2pg/migrations.go
@@ -23,4 +23,15 @@ var Migrations = map[int]pgmig.Migration{
create unique index on e2pg.sources(name);
`,
},
+ 10: pgmig.Migration{
+ SQL: `
+ alter table e2pg.task rename column number to num;
+ alter table e2pg.task add column src_hash bytea;
+ alter table e2pg.task add column src_num numeric;
+ alter table e2pg.task add column nblocks numeric;
+ alter table e2pg.task add column nrows numeric;
+ alter table e2pg.task add column latency interval;
+ alter table e2pg.task add column dstat jsonb;
+ `,
+ },
}
diff --git a/e2pg/schema.sql b/e2pg/schema.sql
index d29b63bf..556552fa 100644
--- a/e2pg/schema.sql
+++ b/e2pg/schema.sql
@@ -45,9 +45,15 @@ CREATE TABLE e2pg.sources (
CREATE TABLE e2pg.task (
id text NOT NULL,
- number bigint,
+ num bigint,
hash bytea,
- insert_at timestamp with time zone DEFAULT now()
+ insert_at timestamp with time zone DEFAULT now(),
+ src_hash bytea,
+ src_num numeric,
+ nblocks numeric,
+ nrows numeric,
+ latency interval,
+ dstat jsonb
);
@@ -65,7 +71,7 @@ CREATE UNIQUE INDEX sources_name_idx ON e2pg.sources USING btree (name);
-CREATE INDEX task_id_number_idx ON e2pg.task USING btree (id, number DESC);
+CREATE INDEX task_id_number_idx ON e2pg.task USING btree (id, num DESC);
diff --git a/e2pg/web/index.html b/e2pg/web/index.html
new file mode 100644
index 00000000..5d7e826a
--- /dev/null
+++ b/e2pg/web/index.html
@@ -0,0 +1,151 @@
+
+
+
+ E2PG
+
+
+
+
+ {{ range $tu := .TaskUpdates -}}
+
+
+
+ {{ (index $.ChainsByTaskID $tu.ID).Name }}
+
+
+ {{ $tu.NRows }}
+
+
+ {{ $tu.Latency }}
+
+
+ {{(printf "0x%x" $tu.Hash)}}
+
+
+ {{ $tu.Num }}
+
+
+ {{ range $name, $stat := $tu.Dstat -}}
+
+
+ {{ $name }}
+
+
+ {{ $stat.NRows }}
+
+
+ {{ $stat.Latency }}
+
+
+ {{ end -}}
+
+ {{ end -}}
+
+
+
diff --git a/e2pg/web/web.go b/e2pg/web/web.go
new file mode 100644
index 00000000..89b882d3
--- /dev/null
+++ b/e2pg/web/web.go
@@ -0,0 +1,138 @@
+package web
+
+import (
+ _ "embed"
+ "encoding/json"
+ "fmt"
+ "html/template"
+ "log/slog"
+ "net/http"
+ "os"
+ "sync"
+
+ "github.com/indexsupply/x/e2pg"
+
+ "github.com/jackc/pgx/v5/pgxpool"
+)
+
+type Handler struct {
+ pgp *pgxpool.Pool
+ mgr *e2pg.Manager
+ conf *e2pg.Config
+
+ clientsMutex sync.Mutex
+ clients map[string]chan string
+}
+
+func New(mgr *e2pg.Manager, conf *e2pg.Config, pgp *pgxpool.Pool) *Handler {
+ h := &Handler{
+ pgp: pgp,
+ mgr: mgr,
+ conf: conf,
+ clients: make(map[string]chan string),
+ }
+ go func() {
+ for {
+ tid := mgr.Updates()
+ for _, c := range h.clients {
+ c <- tid
+ }
+ }
+ }()
+ return h
+}
+
+func read(file string) string {
+ b, _ := os.ReadFile(fmt.Sprintf("./e2pg/web/%s.html", file))
+ return string(b)
+}
+
+type IndexView struct {
+ Igs map[string][]e2pg.Integration
+ TaskUpdates []e2pg.TaskUpdate
+ ChainsByTaskID map[string]e2pg.SourceConfig
+}
+
+func (h *Handler) Index(w http.ResponseWriter, r *http.Request) {
+ var (
+ ctx = r.Context()
+ view = IndexView{}
+ err error
+ )
+ view.TaskUpdates, err = e2pg.TaskUpdates(ctx, h.pgp)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ view.Igs, err = h.conf.IntegrationsBySource(ctx, h.pgp)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ sources, err := h.conf.AllSourceConfigs(ctx, h.pgp)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ view.ChainsByTaskID = map[string]e2pg.SourceConfig{}
+ for i := range sources {
+ k := fmt.Sprintf("%d-main", sources[i].ChainID)
+ view.ChainsByTaskID[k] = sources[i]
+ }
+
+ tmpl, err := template.New("index").Parse(read("index"))
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+ err = tmpl.Execute(w, view)
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+}
+
+func (h *Handler) Updates(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Content-Type", "text/event-stream")
+ w.Header().Set("Cache-Control", "no-cache")
+ w.Header().Set("Connection", "keep-alive")
+
+ slog.InfoContext(r.Context(), "start sse", "c", r.RemoteAddr, "n", len(h.clients))
+ c := make(chan string)
+ h.clientsMutex.Lock()
+ h.clients[r.RemoteAddr] = c
+ h.clientsMutex.Unlock()
+ defer func() {
+ h.clientsMutex.Lock()
+ delete(h.clients, r.RemoteAddr)
+ h.clientsMutex.Unlock()
+ close(c)
+ slog.InfoContext(r.Context(), "stop sse", "c", r.RemoteAddr, "n", len(h.clients))
+ }()
+
+ for {
+ var tid string
+ select {
+ case tid = <-c:
+ case <-r.Context().Done(): // disconnect
+ return
+ }
+ tu, err := e2pg.TaskUpdate1(r.Context(), h.pgp, tid)
+ if err != nil {
+ slog.ErrorContext(r.Context(), "json error", "e", err)
+ return
+ }
+ sjson, err := json.Marshal(tu)
+ if err != nil {
+ slog.ErrorContext(r.Context(), "json error", "e", err)
+ return
+ }
+ fmt.Fprintf(w, "data: %s\n\n", sjson)
+ flusher, ok := w.(http.Flusher)
+ if !ok {
+ http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
+ return
+ }
+ flusher.Flush()
+ }
+}
diff --git a/eth/types.go b/eth/types.go
index 50c7ed78..5188cd6f 100644
--- a/eth/types.go
+++ b/eth/types.go
@@ -98,6 +98,10 @@ func (hb *Bytes) UnmarshalJSON(data []byte) error {
return err
}
+func (hb Bytes) MarshalJSON() ([]byte, error) {
+ return []byte(`"` + fmt.Sprintf("0x%x", hb) + `"`), nil
+}
+
func (hb *Bytes) Write(p []byte) (int, error) {
if len(*hb) < len(p) {
*hb = append(*hb, make([]byte, len(p)-len(*hb))...)
diff --git a/go.mod b/go.mod
index 5d4f58a4..16241e66 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,6 @@ go 1.21
require (
blake.io/pqx v0.2.1
github.com/aws/aws-sdk-go v1.44.285
- github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e
github.com/btcsuite/btcd/btcec/v2 v2.3.2
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0
github.com/golang/snappy v0.0.4
@@ -15,7 +14,6 @@ require (
golang.org/x/crypto v0.6.0
golang.org/x/net v0.6.0
golang.org/x/sync v0.1.0
- golang.org/x/text v0.7.0
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2
kr.dev/diff v0.3.0
)
@@ -31,5 +29,6 @@ require (
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect
golang.org/x/sys v0.5.0 // indirect
+ golang.org/x/text v0.7.0 // indirect
kr.dev/errorfmt v0.1.1 // indirect
)
diff --git a/go.sum b/go.sum
index ba87bbe4..12d5c76c 100644
--- a/go.sum
+++ b/go.sum
@@ -2,8 +2,6 @@ blake.io/pqx v0.2.1 h1:Qz3yyNmPIFCyRS9HLnxtQNIL809ZC13aWvpeiXU3oS8=
blake.io/pqx v0.2.1/go.mod h1:hcG2tklM4QIxdfL+laWGAmtIDVgPKkWtxGG/t7umOfA=
github.com/aws/aws-sdk-go v1.44.285 h1:rgoWYl+NdmKzRgoi/fZLEtGXOjCkcWIa5jPH02Uahdo=
github.com/aws/aws-sdk-go v1.44.285/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
-github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e h1:mWOqoK5jV13ChKf/aF3plwQ96laasTJgZi4f1aSOu+M=
-github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U=
github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=