From 1bf4e64fd9120666e16563acebda2379112e3d6c Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Sat, 14 Oct 2023 12:00:04 -0700 Subject: [PATCH 01/10] e2pg: rebuild dashboard --- abi2/abi2.go | 6 +- cmd/e2pg/dashboard.go | 175 ---------------------- cmd/e2pg/main.go | 34 ++--- e2pg/e2pg.go | 327 +++++++++++++++++++++++++++++------------- e2pg/e2pg_test.go | 4 + e2pg/migrations.go | 10 ++ e2pg/schema.sql | 8 +- e2pg/web/index.html | 153 ++++++++++++++++++++ e2pg/web/web.go | 138 ++++++++++++++++++ eth/types.go | 4 + 10 files changed, 566 insertions(+), 293 deletions(-) delete mode 100644 cmd/e2pg/dashboard.go create mode 100644 e2pg/web/index.html create mode 100644 e2pg/web/web.go diff --git a/abi2/abi2.go b/abi2/abi2.go index 3a1d9c52..bd87e173 100644 --- a/abi2/abi2.go +++ b/abi2/abi2.go @@ -553,6 +553,7 @@ type coldef struct { // Implements the [e2pg.Integration] interface type Integration struct { + name string Event Event Table Table Columns []string @@ -585,8 +586,9 @@ type Integration struct { // For example: // // {"name": "my_column", "type": "db_type", "filter_op": "contains", "filter_arg": ["0x000"]} -func New(ev Event, bd []BlockData, table Table) (Integration, error) { +func New(name string, ev Event, bd []BlockData, table Table) (Integration, error) { ig := Integration{ + name: name, Event: ev, Table: table, numIndexed: ev.numIndexed(), @@ -636,6 +638,8 @@ func col(t Table, name string) (Column, error) { return Column{}, fmt.Errorf("table %q doesn't contain column %q", t.Name, name) } +func (ig Integration) Name() string { return ig.name } + func (ig Integration) Events(context.Context) [][]byte { return [][]byte{} } func (ig Integration) Delete(context.Context, wpg.Conn, uint64) error { return nil } diff --git a/cmd/e2pg/dashboard.go b/cmd/e2pg/dashboard.go deleted file mode 100644 index b14bd1fc..00000000 --- a/cmd/e2pg/dashboard.go +++ /dev/null @@ -1,175 +0,0 @@ -package main - -import ( - "encoding/json" - "fmt" - "log/slog" - "net/http" - "sync" - "text/template" - - "github.com/indexsupply/x/e2pg" -) - -type dashHandler struct { - mgr *e2pg.Manager - clientsMutex sync.Mutex - clients map[string]chan e2pg.StatusSnapshot -} - -func newDashHandler(mgr *e2pg.Manager, snaps <-chan e2pg.StatusSnapshot) *dashHandler { - dh := &dashHandler{ - mgr: mgr, - clients: make(map[string]chan e2pg.StatusSnapshot), - } - go func() { - for { - snap := <-snaps - for _, c := range dh.clients { - c <- snap - } - } - }() - return dh -} - -func (dh *dashHandler) Updates(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/event-stream") - w.Header().Set("Cache-Control", "no-cache") - w.Header().Set("Connection", "keep-alive") - - slog.InfoContext(r.Context(), "start sse", "c", r.RemoteAddr, "n", len(dh.clients)) - c := make(chan e2pg.StatusSnapshot) - dh.clientsMutex.Lock() - dh.clients[r.RemoteAddr] = c - dh.clientsMutex.Unlock() - defer func() { - dh.clientsMutex.Lock() - delete(dh.clients, r.RemoteAddr) - dh.clientsMutex.Unlock() - close(c) - slog.InfoContext(r.Context(), "stop sse", "c", r.RemoteAddr, "n", len(dh.clients)) - }() - - for { - var snap e2pg.StatusSnapshot - select { - case snap = <-c: - case <-r.Context().Done(): // disconnect - return - } - sjson, err := json.Marshal(snap) - if err != nil { - slog.ErrorContext(r.Context(), "json error", "e", err) - } - fmt.Fprintf(w, "data: %s\n\n", sjson) - flusher, ok := w.(http.Flusher) - if !ok { - http.Error(w, "Streaming unsupported", http.StatusInternalServerError) - return - } - flusher.Flush() - } -} - -func (dh *dashHandler) Index(w http.ResponseWriter, r *http.Request) { - const dashboardHTML = ` - - - - E2PG - - - -

E2PG

-
- - - - - - - - - - - - - - - {{ range $id, $status := . -}} - {{ $name := $status.Name -}} - - - - - - - - - - - {{ end -}} - -
TaskChainBlockHashBlocksEventsLatencyError
{{ $name }}{{ $id }}{{ $status.Num }}{{ $status.Hash }}{{ $status.BlockCount }}{{ $status.EventCount }}{{ $status.TotalLatencyP50 }}{{ $status.Error }}
-
- - - - ` - tmpl, err := template.New("dashboard").Parse(dashboardHTML) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } - snaps := make(map[uint64]e2pg.StatusSnapshot) - for _, task := range dh.mgr.Tasks() { - s := task.Status() - snaps[s.ChainID] = s - } - err = tmpl.Execute(w, snaps) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } -} diff --git a/cmd/e2pg/main.go b/cmd/e2pg/main.go index f380d9a5..16a44a63 100644 --- a/cmd/e2pg/main.go +++ b/cmd/e2pg/main.go @@ -16,6 +16,7 @@ import ( "time" "github.com/indexsupply/x/e2pg" + "github.com/indexsupply/x/e2pg/web" "github.com/indexsupply/x/pgmig" "github.com/indexsupply/x/wctx" "github.com/indexsupply/x/wos" @@ -43,7 +44,6 @@ func main() { version bool ) flag.StringVar(&cfile, "config", "", "task config file") - flag.BoolVar(&skipMigrate, "skip-migrate", false, "do not run db migrations on startup") flag.StringVar(&listen, "l", ":8546", "dashboard server listen address") flag.BoolVar(¬x, "notx", false, "disable pg tx") @@ -77,36 +77,37 @@ func main() { os.Exit(0) } - var conf e2pg.Config + var ( + conf e2pg.Config + pgurl string + ) switch { case cfile == "": - fmt.Printf("missing config file\n") - os.Exit(1) + pgurl = os.Getenv("DATABASE_URL") case cfile != "": f, err := os.Open(cfile) check(err) check(json.NewDecoder(f).Decode(&conf)) + pgurl = wos.Getenv(conf.PGURL) } if !skipMigrate { - migdb, err := pgxpool.New(ctx, wos.Getenv(conf.PGURL)) + migdb, err := pgxpool.New(ctx, pgurl) check(err) check(pgmig.Migrate(migdb, e2pg.Migrations)) migdb.Close() } - pg, err := pgxpool.New(ctx, wos.Getenv(conf.PGURL)) + pg, err := pgxpool.New(ctx, pgurl) check(err) - var ( - pbuf bytes.Buffer - snaps = make(chan e2pg.StatusSnapshot) - tskmgr = e2pg.NewManager(pg, snaps, conf) - dh = newDashHandler(tskmgr, snaps) + pbuf bytes.Buffer + mgr = e2pg.NewManager(pg, conf) + wh = web.New(mgr, &conf, pg) ) mux := http.NewServeMux() - mux.HandleFunc("/", dh.Index) - mux.HandleFunc("/updates", dh.Updates) + mux.HandleFunc("/", wh.Index) + mux.HandleFunc("/task-updates", wh.Updates) mux.HandleFunc("/debug/pprof/", npprof.Index) mux.HandleFunc("/debug/pprof/cmdline", npprof.Cmdline) mux.HandleFunc("/debug/pprof/profile", npprof.Profile) @@ -120,15 +121,16 @@ func main() { if profile == "cpu" { check(pprof.StartCPUProfile(&pbuf)) } - tskmgr.Run() + + go mgr.Run() + switch profile { case "cpu": pprof.StopCPUProfile() - select {} case "heap": check(pprof.Lookup("heap").WriteTo(&pbuf, 0)) - select {} } + select {} } func log(v bool, h http.Handler) http.Handler { diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index 7029f921..f3a35d48 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -10,7 +10,6 @@ import ( "log/slog" "strings" "sync" - "sync/atomic" "time" "github.com/indexsupply/x/abi2" @@ -21,11 +20,10 @@ import ( "github.com/indexsupply/x/wctx" "github.com/indexsupply/x/wpg" - "github.com/bmizerany/perks/quantile" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" "golang.org/x/sync/errgroup" - "golang.org/x/text/message" ) //go:embed schema.sql @@ -39,6 +37,7 @@ type Source interface { } type Destination interface { + Name() string Insert(context.Context, wpg.Conn, []eth.Block) (int64, error) Delete(context.Context, wpg.Conn, uint64) error Events(context.Context) [][]byte @@ -100,7 +99,10 @@ func WithConcurrency(workers, batch uint64) Option { func WithDestinations(dests ...Destination) Option { return func(t *Task) { - var filter [][]byte + var ( + filter [][]byte + dstat = make(map[string]Dstat) + ) for i := range dests { e := dests[i].Events(t.ctx) // if one integration has no filter @@ -110,9 +112,11 @@ func WithDestinations(dests ...Destination) Option { break } filter = append(filter, e...) + dstat[dests[i].Name()] = Dstat{} } t.dests = dests t.filter = filter + t.dstat = dstat } } @@ -123,18 +127,19 @@ func NewTask(opts ...Option) *Task { buffs: make([]geth.Buffer, 1), batchSize: 1, workers: 1, - stat: status{ - tlat: quantile.NewTargeted(0.50, 0.90, 0.99), - glat: quantile.NewTargeted(0.50, 0.90, 0.99), - pglat: quantile.NewTargeted(0.50, 0.90, 0.99), - }, } for _, opt := range opts { opt(t) } + slog.InfoContext(t.ctx, "starting task", "dest-count", len(t.dests)) return t } +type Dstat struct { + NRows int64 + Latency jsonDuration +} + type Task struct { Name string ctx context.Context @@ -146,58 +151,33 @@ type Task struct { dests []Destination start, stop uint64 + dstatMut sync.Mutex + dstat map[string]Dstat + filter [][]byte batch []eth.Block buffs []geth.Buffer batchSize uint64 workers uint64 - stat status -} - -type status struct { - ehash, ihash []byte - enum, inum uint64 - tlat, glat, pglat *quantile.Stream - - reset time.Time - err error - blocks, events int64 -} - -type StatusSnapshot struct { - ID string `json:"id"` - Name string `json:"name"` - ChainID uint64 `json:"chainID"` - EthHash string `json:"eth_hash"` - EthNum string `json:"eth_num"` - Hash string `json:"hash"` - Num string `json:"num"` - EventCount string `json:"event_count"` - BlockCount string `json:"block_count"` - TotalLatencyP50 string `json:"total_latency_p50"` - TotalLatencyP95 string `json:"total_latency_p95"` - TotalLatencyP99 string `json:"total_latency_p99"` - Error string `json:"error"` -} - -func (task *Task) Status() StatusSnapshot { - printer := message.NewPrinter(message.MatchLanguage("en")) - snap := StatusSnapshot{} - snap.ID = task.id - snap.Name = task.Name - snap.ChainID = wctx.ChainID(task.ctx) - snap.EthHash = fmt.Sprintf("%.4x", task.stat.ehash) - snap.EthNum = fmt.Sprintf("%d", task.stat.enum) - snap.Hash = fmt.Sprintf("%.4x", task.stat.ihash) - snap.Num = printer.Sprintf("%d", task.stat.inum) - snap.BlockCount = fmt.Sprintf("%d", atomic.SwapInt64(&task.stat.blocks, 0)) - snap.EventCount = fmt.Sprintf("%d", atomic.SwapInt64(&task.stat.events, 0)) - snap.TotalLatencyP50 = fmt.Sprintf("%s", time.Duration(task.stat.tlat.Query(0.50)).Round(time.Millisecond)) - snap.TotalLatencyP95 = fmt.Sprintf("%.2f", task.stat.tlat.Query(0.95)) - snap.TotalLatencyP99 = fmt.Sprintf("%.2f", task.stat.tlat.Query(0.99)) - snap.Error = fmt.Sprintf("%v", task.stat.err) - task.stat.tlat.Reset() - return snap +} + +func (t *Task) dstatJSON() []byte { + b, err := json.Marshal(t.dstat) + if err != nil { + slog.ErrorContext(t.ctx, "encoding dstat", err) + return nil + } + return b +} + +func (t *Task) dstatw(name string, n int64, d time.Duration) { + t.dstatMut.Lock() + defer t.dstatMut.Unlock() + + s := t.dstat[name] + s.NRows = n + s.Latency = jsonDuration(d) + t.dstat[name] = s } func (task *Task) Insert(n uint64, h []byte) error { @@ -243,24 +223,19 @@ func (task *Task) Setup() error { return task.Insert(gethNum-1, h) } -func (task *Task) Run1(snaps chan<- StatusSnapshot, notx bool) { +func (task *Task) Run1(updates chan<- string, notx bool) { switch err := task.Converge(notx); { - case err == nil: - go func() { - snap := task.Status() - slog.InfoContext(task.ctx, "", "n", snap.Num, "h", snap.Hash) - select { - case snaps <- snap: - default: - } - }() case errors.Is(err, ErrDone): return case errors.Is(err, ErrNothingNew): time.Sleep(time.Second) - default: + case err != nil: time.Sleep(time.Second) slog.ErrorContext(task.ctx, "error", err) + default: + go func() { + updates <- task.id + }() } } @@ -330,7 +305,7 @@ func (task *Task) Converge(notx bool) error { task.batch[i].SetNum(localNum + i + 1) task.buffs[i].Number = task.batch[i].Num() } - switch err := task.writeIndex(localHash, pg, delta); { + switch nrows, err := task.loadinsert(localHash, pg, delta); { case errors.Is(err, ErrReorg): reorgs++ slog.ErrorContext(task.ctx, "reorg", "n", localNum, "h", fmt.Sprintf("%.4x", localHash)) @@ -346,14 +321,42 @@ func (task *Task) Converge(notx bool) error { } case err != nil: err = errors.Join(rollback(), err) - task.stat.err = err return err default: - task.stat.enum = gethNum - task.stat.ehash = gethHash - task.stat.blocks += int64(delta) - task.stat.tlat.Insert(float64(time.Since(start))) - return commit() + var last = task.batch[delta-1] + const uq = ` + insert into e2pg.task ( + id, + number, + hash, + src_num, + src_hash, + nblocks, + nrows, + latency, + dstat + ) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9) + ` + _, err := pg.Exec(task.ctx, uq, + task.id, + last.Num(), + last.Hash(), + gethNum, + gethHash, + delta, + nrows, + time.Since(start), + task.dstatJSON(), + ) + if err != nil { + return fmt.Errorf("updating task table: %w", err) + } + if err := commit(); err != nil { + return fmt.Errorf("commit converge tx: %w", err) + } + slog.InfoContext(task.ctx, "converge", "n", last.Num()) + return nil } } return errors.Join(ErrReorg, rollback()) @@ -369,8 +372,9 @@ func (task *Task) Converge(notx bool) error { // // The reading of block data and indexing of integrations happens concurrently // with the number of go routines controlled by c. -func (task *Task) writeIndex(localHash []byte, pg wpg.Conn, delta uint64) error { +func (task *Task) loadinsert(localHash []byte, pg wpg.Conn, delta uint64) (int64, error) { var ( + nrows int64 eg1 = errgroup.Group{} wsize = task.batchSize / task.workers ) @@ -385,13 +389,13 @@ func (task *Task) writeIndex(localHash []byte, pg wpg.Conn, delta uint64) error eg1.Go(func() error { return task.src.LoadBlocks(task.filter, bfs, blks) }) } if err := eg1.Wait(); err != nil { - return err + return 0, err } if len(task.batch[0].Header.Parent) != 32 { - return fmt.Errorf("corrupt parent: %x\n", task.batch[0].Header.Parent) + return 0, fmt.Errorf("corrupt parent: %x\n", task.batch[0].Header.Parent) } if !bytes.Equal(localHash, task.batch[0].Header.Parent) { - return ErrReorg + return 0, ErrReorg } var eg2 errgroup.Group for i := uint64(0); i < task.workers && i*wsize < delta; i++ { @@ -403,29 +407,134 @@ func (task *Task) writeIndex(localHash []byte, pg wpg.Conn, delta uint64) error } eg2.Go(func() error { var eg3 errgroup.Group - for _, dest := range task.dests { - dest := dest + for j := range task.dests { + j := j eg3.Go(func() error { - count, err := dest.Insert(task.ctx, pg, blks) - atomic.AddInt64(&task.stat.events, count) + t0 := time.Now() + count, err := task.dests[j].Insert(task.ctx, pg, blks) + task.dstatw(task.dests[j].Name(), count, time.Since(t0)) + nrows += count return err }) } return eg3.Wait() }) } - if err := eg2.Wait(); err != nil { - return fmt.Errorf("writing indexed data: %w", err) + return nrows, eg2.Wait() +} + +type jsonDuration time.Duration + +func (d *jsonDuration) ScanInterval(i pgtype.Interval) error { + *d = jsonDuration(i.Microseconds * 1000) + return nil +} + +func (d *jsonDuration) UnmarshalJSON(data []byte) error { + if len(data) < 2 { + return fmt.Errorf("jsonDuration must be at leaset 2 bytes") } - var last = task.batch[delta-1] - const uq = "insert into e2pg.task (id, number, hash) values ($1, $2, $3)" - _, err := pg.Exec(context.Background(), uq, task.id, last.Num(), last.Hash()) + data = data[1 : len(data)-1] // remove quotes + dur, err := time.ParseDuration(string(data)) + *d = jsonDuration(dur) + return err +} + +func (d jsonDuration) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf(`"%s"`, d.String())), nil +} + +func (d jsonDuration) String() string { + return time.Duration(d).Round(time.Millisecond).String() +} + +type TaskUpdate struct { + ID string + Num uint64 + Hash eth.Bytes + SrcNum uint64 + SrcHash eth.Bytes + NBlocks uint64 + NRows uint64 + Latency jsonDuration + Dstat map[string]Dstat +} + +func TaskUpdate1(ctx context.Context, pg wpg.Conn, id string) (TaskUpdate, error) { + const q = ` + select + id, + number, + hash, + coalesce(src_num, 0), + coalesce(src_hash, '\x00'), + coalesce(nblocks, 0), + coalesce(nrows, 0), + coalesce(latency, '0')::interval, + coalesce(dstat, '{}') + from e2pg.task + where id = $1 + order by number desc + limit 1; + ` + var tu = TaskUpdate{} + err := pg.QueryRow(ctx, q, id).Scan( + &tu.ID, + &tu.Num, + &tu.Hash, + &tu.SrcNum, + &tu.SrcHash, + &tu.NBlocks, + &tu.NRows, + &tu.Latency, + &tu.Dstat, + ) + if err != nil { + return tu, fmt.Errorf("querying for task updates: %w", err) + } + return tu, nil +} + +func TaskUpdates(ctx context.Context, pg wpg.Conn) ([]TaskUpdate, error) { + const q = ` + select distinct on (id) + id, + number, + hash, + coalesce(src_num, 0), + coalesce(src_hash, '\x00'), + coalesce(nblocks, 0), + coalesce(nrows, 0), + coalesce(latency, '0')::interval, + coalesce(dstat, '{}') + from e2pg.task + order by id, number desc; + ` + rows, err := pg.Query(ctx, q) if err != nil { - return fmt.Errorf("updating task table: %w", err) + return nil, fmt.Errorf("querying for task updates: %w", err) } - task.stat.inum = last.Num() - task.stat.ihash = last.Hash() - return nil + defer rows.Close() + var tus []TaskUpdate + for rows.Next() { + var tu TaskUpdate + err := rows.Scan( + &tu.ID, + &tu.Num, + &tu.Hash, + &tu.SrcNum, + &tu.SrcHash, + &tu.NBlocks, + &tu.NRows, + &tu.Latency, + &tu.Dstat, + ) + if err != nil { + return nil, fmt.Errorf("scanning task update: %w", err) + } + tus = append(tus, tu) + } + return tus, nil } var compiled = map[string]Destination{} @@ -435,16 +544,16 @@ var compiled = map[string]Destination{} type Manager struct { running sync.Mutex restart chan struct{} - snaps chan<- StatusSnapshot tasks []*Task + updates chan string pgp *pgxpool.Pool conf Config } -func NewManager(pgp *pgxpool.Pool, snaps chan<- StatusSnapshot, conf Config) *Manager { +func NewManager(pgp *pgxpool.Pool, conf Config) *Manager { return &Manager{ restart: make(chan struct{}), - snaps: snaps, + updates: make(chan string), pgp: pgp, conf: conf, } @@ -455,6 +564,10 @@ func (tm *Manager) Tasks() []*Task { return tm.tasks } +func (tm *Manager) Updates() string { + return <-tm.updates +} + func (tm *Manager) runTask(t *Task) error { if err := t.Setup(); err != nil { return fmt.Errorf("setting up task: %w", err) @@ -465,7 +578,7 @@ func (tm *Manager) runTask(t *Task) error { slog.Info("restart-task", "name", t.Name) return nil default: - t.Run1(tm.snaps, false) + t.Run1(tm.updates, false) } } } @@ -568,7 +681,7 @@ func getDest(pgp *pgxpool.Pool, ig Integration) (Destination, error) { } return cig, nil default: - aig, err := abi2.New(ig.Event, ig.Block, ig.Table) + aig, err := abi2.New(ig.Name, ig.Event, ig.Block, ig.Table) if err != nil { return nil, fmt.Errorf("building abi integration: %w", err) } @@ -591,10 +704,10 @@ func getSource(sc SourceConfig) (Source, error) { } } -func Integrations(ctx context.Context, pgp *pgxpool.Pool) ([]Integration, error) { +func Integrations(ctx context.Context, pg wpg.Conn) ([]Integration, error) { var res []Integration const q = `select conf from e2pg.integrations` - rows, err := pgp.Query(ctx, q) + rows, err := pg.Query(ctx, q) if err != nil { return nil, fmt.Errorf("querying integrations: %w", err) } @@ -669,8 +782,8 @@ func (conf Config) Valid(intg Integration) error { return nil } -func (conf Config) AllIntegrations(ctx context.Context, pgp *pgxpool.Pool) ([]Integration, error) { - res, err := Integrations(ctx, pgp) +func (conf Config) AllIntegrations(ctx context.Context, pg wpg.Conn) ([]Integration, error) { + res, err := Integrations(ctx, pg) if err != nil { return nil, fmt.Errorf("loading db integrations: %w", err) } @@ -680,6 +793,20 @@ func (conf Config) AllIntegrations(ctx context.Context, pgp *pgxpool.Pool) ([]In return res, nil } +func (conf Config) IntegrationsBySource(ctx context.Context, pg wpg.Conn) (map[string][]Integration, error) { + igs, err := conf.AllIntegrations(ctx, pg) + if err != nil { + return nil, fmt.Errorf("querying all integrations: %w", err) + } + res := make(map[string][]Integration) + for _, ig := range igs { + for _, sc := range ig.SourceConfigs { + res[sc.Name] = append(res[sc.Name], ig) + } + } + return res, nil +} + func (conf Config) AllSourceConfigs(ctx context.Context, pgp *pgxpool.Pool) ([]SourceConfig, error) { res, err := SourceConfigs(ctx, pgp) if err != nil { diff --git a/e2pg/e2pg_test.go b/e2pg/e2pg_test.go index 2a4ad3a1..064264ef 100644 --- a/e2pg/e2pg_test.go +++ b/e2pg/e2pg_test.go @@ -78,6 +78,10 @@ func (dest *testDestination) Events(_ context.Context) [][]byte { return nil } +func (dest *testDestination) Name() string { + return "test" +} + type testGeth struct { blocks []eth.Block } diff --git a/e2pg/migrations.go b/e2pg/migrations.go index 6a1e272e..e2f2c406 100644 --- a/e2pg/migrations.go +++ b/e2pg/migrations.go @@ -23,4 +23,14 @@ var Migrations = map[int]pgmig.Migration{ create unique index on e2pg.sources(name); `, }, + 10: pgmig.Migration{ + SQL: ` + alter table e2pg.task add column src_hash bytea; + alter table e2pg.task add column src_num numeric; + alter table e2pg.task add column nblocks numeric; + alter table e2pg.task add column nrows numeric; + alter table e2pg.task add column latency interval; + alter table e2pg.task add column dstat jsonb; + `, + }, } diff --git a/e2pg/schema.sql b/e2pg/schema.sql index d29b63bf..7721a061 100644 --- a/e2pg/schema.sql +++ b/e2pg/schema.sql @@ -47,7 +47,13 @@ CREATE TABLE e2pg.task ( id text NOT NULL, number bigint, hash bytea, - insert_at timestamp with time zone DEFAULT now() + insert_at timestamp with time zone DEFAULT now(), + src_hash bytea, + src_num numeric, + nblocks numeric, + nrows numeric, + latency interval, + dstat jsonb ); diff --git a/e2pg/web/index.html b/e2pg/web/index.html new file mode 100644 index 00000000..975bbbee --- /dev/null +++ b/e2pg/web/index.html @@ -0,0 +1,153 @@ + + + + E2PG + + + +
+

E2PG

+
+ {{ range $tu := .TaskUpdates -}} +
+
+
+ {{ (index $.ChainsByTaskID $tu.ID).Name }} +
+
+ {{ $tu.NRows }} +
+
+ {{ $tu.Latency }} +
+
+ {{(printf "0x%x" $tu.Hash)}} +
+
+ {{ $tu.Num }} +
+
+ {{ range $name, $stat := $tu.Dstat -}} +
+
+ {{ $name }} +
+
+ {{ $stat.NRows }} +
+
+ {{ $stat.Latency }} +
+
+ {{ end -}} +
+ {{ end -}} + + + diff --git a/e2pg/web/web.go b/e2pg/web/web.go new file mode 100644 index 00000000..89b882d3 --- /dev/null +++ b/e2pg/web/web.go @@ -0,0 +1,138 @@ +package web + +import ( + _ "embed" + "encoding/json" + "fmt" + "html/template" + "log/slog" + "net/http" + "os" + "sync" + + "github.com/indexsupply/x/e2pg" + + "github.com/jackc/pgx/v5/pgxpool" +) + +type Handler struct { + pgp *pgxpool.Pool + mgr *e2pg.Manager + conf *e2pg.Config + + clientsMutex sync.Mutex + clients map[string]chan string +} + +func New(mgr *e2pg.Manager, conf *e2pg.Config, pgp *pgxpool.Pool) *Handler { + h := &Handler{ + pgp: pgp, + mgr: mgr, + conf: conf, + clients: make(map[string]chan string), + } + go func() { + for { + tid := mgr.Updates() + for _, c := range h.clients { + c <- tid + } + } + }() + return h +} + +func read(file string) string { + b, _ := os.ReadFile(fmt.Sprintf("./e2pg/web/%s.html", file)) + return string(b) +} + +type IndexView struct { + Igs map[string][]e2pg.Integration + TaskUpdates []e2pg.TaskUpdate + ChainsByTaskID map[string]e2pg.SourceConfig +} + +func (h *Handler) Index(w http.ResponseWriter, r *http.Request) { + var ( + ctx = r.Context() + view = IndexView{} + err error + ) + view.TaskUpdates, err = e2pg.TaskUpdates(ctx, h.pgp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + view.Igs, err = h.conf.IntegrationsBySource(ctx, h.pgp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + sources, err := h.conf.AllSourceConfigs(ctx, h.pgp) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + view.ChainsByTaskID = map[string]e2pg.SourceConfig{} + for i := range sources { + k := fmt.Sprintf("%d-main", sources[i].ChainID) + view.ChainsByTaskID[k] = sources[i] + } + + tmpl, err := template.New("index").Parse(read("index")) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + err = tmpl.Execute(w, view) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } +} + +func (h *Handler) Updates(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + slog.InfoContext(r.Context(), "start sse", "c", r.RemoteAddr, "n", len(h.clients)) + c := make(chan string) + h.clientsMutex.Lock() + h.clients[r.RemoteAddr] = c + h.clientsMutex.Unlock() + defer func() { + h.clientsMutex.Lock() + delete(h.clients, r.RemoteAddr) + h.clientsMutex.Unlock() + close(c) + slog.InfoContext(r.Context(), "stop sse", "c", r.RemoteAddr, "n", len(h.clients)) + }() + + for { + var tid string + select { + case tid = <-c: + case <-r.Context().Done(): // disconnect + return + } + tu, err := e2pg.TaskUpdate1(r.Context(), h.pgp, tid) + if err != nil { + slog.ErrorContext(r.Context(), "json error", "e", err) + return + } + sjson, err := json.Marshal(tu) + if err != nil { + slog.ErrorContext(r.Context(), "json error", "e", err) + return + } + fmt.Fprintf(w, "data: %s\n\n", sjson) + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "Streaming unsupported", http.StatusInternalServerError) + return + } + flusher.Flush() + } +} diff --git a/eth/types.go b/eth/types.go index 50c7ed78..5188cd6f 100644 --- a/eth/types.go +++ b/eth/types.go @@ -98,6 +98,10 @@ func (hb *Bytes) UnmarshalJSON(data []byte) error { return err } +func (hb Bytes) MarshalJSON() ([]byte, error) { + return []byte(`"` + fmt.Sprintf("0x%x", hb) + `"`), nil +} + func (hb *Bytes) Write(p []byte) (int, error) { if len(*hb) < len(p) { *hb = append(*hb, make([]byte, len(p)-len(*hb))...) From 6bd96470af35cb070274c0bf7940e9a3c2f0a688 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Sat, 14 Oct 2023 12:10:11 -0700 Subject: [PATCH 02/10] go mod tidy --- go.mod | 3 +-- go.sum | 2 -- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 5d4f58a4..16241e66 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,6 @@ go 1.21 require ( blake.io/pqx v0.2.1 github.com/aws/aws-sdk-go v1.44.285 - github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e github.com/btcsuite/btcd/btcec/v2 v2.3.2 github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 github.com/golang/snappy v0.0.4 @@ -15,7 +14,6 @@ require ( golang.org/x/crypto v0.6.0 golang.org/x/net v0.6.0 golang.org/x/sync v0.1.0 - golang.org/x/text v0.7.0 golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 kr.dev/diff v0.3.0 ) @@ -31,5 +29,6 @@ require ( github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect golang.org/x/sys v0.5.0 // indirect + golang.org/x/text v0.7.0 // indirect kr.dev/errorfmt v0.1.1 // indirect ) diff --git a/go.sum b/go.sum index ba87bbe4..12d5c76c 100644 --- a/go.sum +++ b/go.sum @@ -2,8 +2,6 @@ blake.io/pqx v0.2.1 h1:Qz3yyNmPIFCyRS9HLnxtQNIL809ZC13aWvpeiXU3oS8= blake.io/pqx v0.2.1/go.mod h1:hcG2tklM4QIxdfL+laWGAmtIDVgPKkWtxGG/t7umOfA= github.com/aws/aws-sdk-go v1.44.285 h1:rgoWYl+NdmKzRgoi/fZLEtGXOjCkcWIa5jPH02Uahdo= github.com/aws/aws-sdk-go v1.44.285/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= -github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e h1:mWOqoK5jV13ChKf/aF3plwQ96laasTJgZi4f1aSOu+M= -github.com/bmizerany/perks v0.0.0-20230307044200-03f9df79da1e/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U= github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= From abb0dcc184cba12ac46e1baf027dde9009b2ba4d Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Sat, 14 Oct 2023 12:11:18 -0700 Subject: [PATCH 03/10] remove unused code --- e2pg/e2pg.go | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index f3a35d48..80b62c89 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -559,11 +559,6 @@ func NewManager(pgp *pgxpool.Pool, conf Config) *Manager { } } -// TODO(r): remove once old dashboard is gone -func (tm *Manager) Tasks() []*Task { - return tm.tasks -} - func (tm *Manager) Updates() string { return <-tm.updates } @@ -600,16 +595,15 @@ func (tm *Manager) Run() error { tm.running.Lock() defer tm.running.Unlock() tm.restart = make(chan struct{}) - var err error - tm.tasks, err = loadTasks(context.Background(), tm.pgp, tm.conf) + tasks, err := loadTasks(context.Background(), tm.pgp, tm.conf) if err != nil { return fmt.Errorf("loading tasks: %w", err) } var eg errgroup.Group - for i := range tm.tasks { + for i := range tasks { i := i eg.Go(func() error { - return tm.runTask(tm.tasks[i]) + return tm.runTask(tasks[i]) }) } return eg.Wait() From c875f3acc28826b737304f173bd5a66ca2e548cb Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Sat, 14 Oct 2023 12:31:41 -0700 Subject: [PATCH 04/10] simplify task update scanning --- e2pg/e2pg.go | 108 +++++++++++++++------------------------------ e2pg/migrations.go | 1 + 2 files changed, 36 insertions(+), 73 deletions(-) diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index 80b62c89..30b99652 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -181,13 +181,13 @@ func (t *Task) dstatw(name string, n int64, d time.Duration) { } func (task *Task) Insert(n uint64, h []byte) error { - const q = `insert into e2pg.task (id, number, hash) values ($1, $2, $3)` + const q = `insert into e2pg.task (id, num, hash) values ($1, $2, $3)` _, err := task.pgp.Exec(context.Background(), q, task.id, n, h) return err } func (task *Task) Latest() (uint64, []byte, error) { - const q = `SELECT number, hash FROM e2pg.task WHERE id = $1 ORDER BY number DESC LIMIT 1` + const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1` var n, h = uint64(0), []byte{} err := task.pgp.QueryRow(context.Background(), q, task.id).Scan(&n, &h) if errors.Is(err, pgx.ErrNoRows) { @@ -275,7 +275,7 @@ func (task *Task) Converge(notx bool) error { } for reorgs := 0; reorgs <= 10; { localNum, localHash := uint64(0), []byte{} - const q = `SELECT number, hash FROM e2pg.task WHERE id = $1 ORDER BY number DESC LIMIT 1` + const q = `SELECT num, hash FROM e2pg.task WHERE id = $1 ORDER BY num DESC LIMIT 1` err := pg.QueryRow(task.ctx, q, task.id).Scan(&localNum, &localHash) if err != nil && !errors.Is(err, pgx.ErrNoRows) { return fmt.Errorf("getting latest from task: %w", err) @@ -309,7 +309,7 @@ func (task *Task) Converge(notx bool) error { case errors.Is(err, ErrReorg): reorgs++ slog.ErrorContext(task.ctx, "reorg", "n", localNum, "h", fmt.Sprintf("%.4x", localHash)) - const dq = "delete from e2pg.task where id = $1 AND number >= $2" + const dq = "delete from e2pg.task where id = $1 AND num >= $2" _, err := pg.Exec(task.ctx, dq, task.id, localNum) if err != nil { return fmt.Errorf("deleting block from task table: %w", err) @@ -327,7 +327,7 @@ func (task *Task) Converge(notx bool) error { const uq = ` insert into e2pg.task ( id, - number, + num, hash, src_num, src_hash, @@ -449,92 +449,54 @@ func (d jsonDuration) String() string { } type TaskUpdate struct { - ID string - Num uint64 - Hash eth.Bytes - SrcNum uint64 - SrcHash eth.Bytes - NBlocks uint64 - NRows uint64 - Latency jsonDuration - Dstat map[string]Dstat + ID string `db:"id"` + Num uint64 `db:"num"` + Hash eth.Bytes `db:"hash"` + SrcNum uint64 `db:"src_num"` + SrcHash eth.Bytes `db:"src_hash"` + NBlocks uint64 `db:"nblocks"` + NRows uint64 `db:"nrows"` + Latency jsonDuration `db:"latency"` + Dstat map[string]Dstat `db:"dstat"` } func TaskUpdate1(ctx context.Context, pg wpg.Conn, id string) (TaskUpdate, error) { const q = ` select id, - number, + num, hash, - coalesce(src_num, 0), - coalesce(src_hash, '\x00'), - coalesce(nblocks, 0), - coalesce(nrows, 0), - coalesce(latency, '0')::interval, - coalesce(dstat, '{}') + coalesce(src_num, 0) src_num, + coalesce(src_hash, '\x00') src_hash, + coalesce(nblocks, 0) nblocks, + coalesce(nrows, 0) nrows, + coalesce(latency, '0')::interval latency, + coalesce(dstat, '{}') dstat from e2pg.task where id = $1 - order by number desc + order by num desc limit 1; ` - var tu = TaskUpdate{} - err := pg.QueryRow(ctx, q, id).Scan( - &tu.ID, - &tu.Num, - &tu.Hash, - &tu.SrcNum, - &tu.SrcHash, - &tu.NBlocks, - &tu.NRows, - &tu.Latency, - &tu.Dstat, - ) - if err != nil { - return tu, fmt.Errorf("querying for task updates: %w", err) - } - return tu, nil + row, _ := pg.Query(ctx, q, id) + return pgx.CollectOneRow(row, pgx.RowToStructByName[TaskUpdate]) } func TaskUpdates(ctx context.Context, pg wpg.Conn) ([]TaskUpdate, error) { - const q = ` + rows, _ := pg.Query(ctx, ` select distinct on (id) id, - number, + num, hash, - coalesce(src_num, 0), - coalesce(src_hash, '\x00'), - coalesce(nblocks, 0), - coalesce(nrows, 0), - coalesce(latency, '0')::interval, - coalesce(dstat, '{}') + coalesce(src_num, 0) src_num, + coalesce(src_hash, '\x00') src_hash, + coalesce(nblocks, 0) nblocks, + coalesce(nrows, 0) nrows, + coalesce(latency, '0')::interval latency, + coalesce(dstat, '{}') dstat from e2pg.task - order by id, number desc; - ` - rows, err := pg.Query(ctx, q) - if err != nil { - return nil, fmt.Errorf("querying for task updates: %w", err) - } - defer rows.Close() - var tus []TaskUpdate - for rows.Next() { - var tu TaskUpdate - err := rows.Scan( - &tu.ID, - &tu.Num, - &tu.Hash, - &tu.SrcNum, - &tu.SrcHash, - &tu.NBlocks, - &tu.NRows, - &tu.Latency, - &tu.Dstat, - ) - if err != nil { - return nil, fmt.Errorf("scanning task update: %w", err) - } - tus = append(tus, tu) - } - return tus, nil + order by id, num desc; + `) + return pgx.CollectRows(rows, pgx.RowToStructByName[TaskUpdate]) } var compiled = map[string]Destination{} diff --git a/e2pg/migrations.go b/e2pg/migrations.go index e2f2c406..01b9cbde 100644 --- a/e2pg/migrations.go +++ b/e2pg/migrations.go @@ -25,6 +25,7 @@ var Migrations = map[int]pgmig.Migration{ }, 10: pgmig.Migration{ SQL: ` + alter table e2pg.task rename column number to num; alter table e2pg.task add column src_hash bytea; alter table e2pg.task add column src_num numeric; alter table e2pg.task add column nblocks numeric; From c7dfafd4e671e59847c5430bd1df28bdacdd13f7 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Sat, 14 Oct 2023 12:33:43 -0700 Subject: [PATCH 05/10] don't back up on updates --- e2pg/e2pg.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index 30b99652..8160224e 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -234,7 +234,12 @@ func (task *Task) Run1(updates chan<- string, notx bool) { slog.ErrorContext(task.ctx, "error", err) default: go func() { - updates <- task.id + // try out best to deliver update + // but don't stack up work + select { + case updates <- task.id: + default: + } }() } } From aca657f3d81be47b0838d23104104fffd3cf22db Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Sat, 14 Oct 2023 12:36:22 -0700 Subject: [PATCH 06/10] schema and json --- e2pg/e2pg.go | 11 +---------- e2pg/schema.sql | 4 ++-- 2 files changed, 3 insertions(+), 12 deletions(-) diff --git a/e2pg/e2pg.go b/e2pg/e2pg.go index 8160224e..22d95bdc 100644 --- a/e2pg/e2pg.go +++ b/e2pg/e2pg.go @@ -161,15 +161,6 @@ type Task struct { workers uint64 } -func (t *Task) dstatJSON() []byte { - b, err := json.Marshal(t.dstat) - if err != nil { - slog.ErrorContext(t.ctx, "encoding dstat", err) - return nil - } - return b -} - func (t *Task) dstatw(name string, n int64, d time.Duration) { t.dstatMut.Lock() defer t.dstatMut.Unlock() @@ -352,7 +343,7 @@ func (task *Task) Converge(notx bool) error { delta, nrows, time.Since(start), - task.dstatJSON(), + task.dstat, ) if err != nil { return fmt.Errorf("updating task table: %w", err) diff --git a/e2pg/schema.sql b/e2pg/schema.sql index 7721a061..556552fa 100644 --- a/e2pg/schema.sql +++ b/e2pg/schema.sql @@ -45,7 +45,7 @@ CREATE TABLE e2pg.sources ( CREATE TABLE e2pg.task ( id text NOT NULL, - number bigint, + num bigint, hash bytea, insert_at timestamp with time zone DEFAULT now(), src_hash bytea, @@ -71,7 +71,7 @@ CREATE UNIQUE INDEX sources_name_idx ON e2pg.sources USING btree (name); -CREATE INDEX task_id_number_idx ON e2pg.task USING btree (id, number DESC); +CREATE INDEX task_id_number_idx ON e2pg.task USING btree (id, num DESC); From e8107ee145c08f26425c27148648a5904aff5787 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Sat, 14 Oct 2023 12:48:42 -0700 Subject: [PATCH 07/10] fix html for many dest --- e2pg/web/index.html | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/e2pg/web/index.html b/e2pg/web/index.html index 975bbbee..fecb45a2 100644 --- a/e2pg/web/index.html +++ b/e2pg/web/index.html @@ -18,6 +18,9 @@ background-color: lightyellow; transition: background-color 0.5s ease-out; } + .task { + margin-bottom: 20px; + } .chain { width: 100%; display: flex; @@ -30,7 +33,7 @@ margin-bottom: 20px; } .Name { - width: 20%; + width: 25%; text-transform: capitalize; } .chain .Num { @@ -63,7 +66,7 @@ padding-left: 5px; } .Dest { - font-size: large; + font-size: medium; width: 100%; display: flex; flex-direction: row; @@ -114,12 +117,7 @@

E2PG