Skip to content

Commit

Permalink
e2pg: new task manager. move e2pg/config to e2pg
Browse files Browse the repository at this point in the history
The config package was a hack to workaround import cycles. The prior few
commits have been to undo the import cycle fiasco.

The new task manager provides a way to restart tasks. This will be used
when we have an interface for adding/removing tasks without restarting
the process.

The task manager also loads tasks from the newly created database
tables. The config file can be used alongside the database and the
database config will be overwritten by the config file in the case of
conflicts.
  • Loading branch information
ryandotsmith committed Oct 12, 2023
1 parent 227d53d commit 822099b
Show file tree
Hide file tree
Showing 12 changed files with 441 additions and 282 deletions.
8 changes: 4 additions & 4 deletions cmd/e2pg/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (
)

type dashHandler struct {
tasks []*e2pg.Task
mgr *e2pg.Manager
clientsMutex sync.Mutex
clients map[string]chan e2pg.StatusSnapshot
}

func newDashHandler(tasks []*e2pg.Task, snaps <-chan e2pg.StatusSnapshot) *dashHandler {
func newDashHandler(mgr *e2pg.Manager, snaps <-chan e2pg.StatusSnapshot) *dashHandler {
dh := &dashHandler{
tasks: tasks,
mgr: mgr,
clients: make(map[string]chan e2pg.StatusSnapshot),
}
go func() {
Expand Down Expand Up @@ -163,7 +163,7 @@ func (dh *dashHandler) Index(w http.ResponseWriter, r *http.Request) {
return
}
snaps := make(map[uint64]e2pg.StatusSnapshot)
for _, task := range dh.tasks {
for _, task := range dh.mgr.Tasks() {
s := task.Status()
snaps[s.ChainID] = s
}
Expand Down
24 changes: 9 additions & 15 deletions cmd/e2pg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ import (
"time"

"github.com/indexsupply/x/e2pg"
"github.com/indexsupply/x/e2pg/config"
"github.com/indexsupply/x/pgmig"
"github.com/indexsupply/x/wctx"
"github.com/indexsupply/x/wos"
"github.com/indexsupply/x/wslog"

"github.com/jackc/pgx/v5/pgxpool"
"golang.org/x/sync/errgroup"
)

func check(err error) {
Expand Down Expand Up @@ -78,7 +77,7 @@ func main() {
os.Exit(0)
}

var conf config.Config
var conf e2pg.Config
switch {
case cfile == "":
fmt.Printf("missing config file\n")
Expand All @@ -90,19 +89,20 @@ func main() {
}

if !skipMigrate {
migdb, err := pgxpool.New(ctx, config.Env(conf.PGURL))
migdb, err := pgxpool.New(ctx, wos.Getenv(conf.PGURL))
check(err)
check(pgmig.Migrate(migdb, e2pg.Migrations))
migdb.Close()
}

tasks, err := config.NewTasks(conf)
pg, err := pgxpool.New(ctx, wos.Getenv(conf.PGURL))
check(err)

var (
pbuf bytes.Buffer
snaps = make(chan e2pg.StatusSnapshot)
dh = newDashHandler(tasks, snaps)
pbuf bytes.Buffer
snaps = make(chan e2pg.StatusSnapshot)
tskmgr = e2pg.NewManager(pg, snaps, conf)
dh = newDashHandler(tskmgr, snaps)
)
mux := http.NewServeMux()
mux.HandleFunc("/", dh.Index)
Expand All @@ -120,13 +120,7 @@ func main() {
if profile == "cpu" {
check(pprof.StartCPUProfile(&pbuf))
}
var eg errgroup.Group
for i := range tasks {
i := i
check(tasks[i].Setup())
eg.Go(func() error { tasks[i].Run(snaps, notx); return nil })
}
eg.Wait()
tskmgr.Run()
switch profile {
case "cpu":
pprof.StopCPUProfile()
Expand Down
151 changes: 0 additions & 151 deletions e2pg/config/config.go

This file was deleted.

82 changes: 0 additions & 82 deletions e2pg/config/testhelper/helper.go

This file was deleted.

Loading

0 comments on commit 822099b

Please sign in to comment.