From a590f2f2f4350d53cc78ec187b1c90c8e8f9fa93 Mon Sep 17 00:00:00 2001 From: Ryan Smith Date: Fri, 3 Nov 2023 10:39:05 -0700 Subject: [PATCH] e2pg/web: refactor. query and encdoe updates json once for all clients --- cmd/e2pg/main.go | 4 ++++ e2pg/web/web.go | 51 ++++++++++++++++++++++++------------------------ 2 files changed, 29 insertions(+), 26 deletions(-) diff --git a/cmd/e2pg/main.go b/cmd/e2pg/main.go index a29bad4f..01ae9d05 100644 --- a/cmd/e2pg/main.go +++ b/cmd/e2pg/main.go @@ -119,6 +119,10 @@ func main() { check(pprof.StartCPUProfile(&pbuf)) } + go func() { + check(wh.PushUpdates()) + }() + go func() { for { check(e2pg.PruneIntg(ctx, pg)) diff --git a/e2pg/web/web.go b/e2pg/web/web.go index ae29ac09..1cf0da0b 100644 --- a/e2pg/web/web.go +++ b/e2pg/web/web.go @@ -1,6 +1,7 @@ package web import ( + "context" _ "embed" "encoding/json" "fmt" @@ -40,28 +41,38 @@ type Handler struct { conf *e2pg.Config clientsMutex sync.Mutex - clients map[string]chan uint64 + clients map[string]chan []byte templates map[string]*template.Template } func New(mgr *e2pg.Manager, conf *e2pg.Config, pgp *pgxpool.Pool) *Handler { - h := &Handler{ + return &Handler{ pgp: pgp, mgr: mgr, conf: conf, - clients: make(map[string]chan uint64), + clients: make(map[string]chan []byte), templates: make(map[string]*template.Template), } - go func() { - for { - tid := mgr.Updates() +} + +func (h *Handler) PushUpdates() error { + ctx := context.Background() + for ; ; h.mgr.Updates() { + tus, err := e2pg.TaskUpdates(ctx, h.pgp) + if err != nil { + return fmt.Errorf("querying task updates: %w", err) + } + for _, update := range tus { + j, err := json.Marshal(update) + if err != nil { + return fmt.Errorf("marshaling task update: %w", err) + } for _, c := range h.clients { - c <- tid + c <- j } } - }() - return h + } } func (h *Handler) template(name string) (*template.Template, error) { @@ -219,7 +230,7 @@ func (h *Handler) Updates(w http.ResponseWriter, r *http.Request) { w.Header().Set("Connection", "keep-alive") slog.InfoContext(r.Context(), "start sse", "c", r.RemoteAddr, "n", len(h.clients)) - c := make(chan uint64) + c := make(chan []byte) h.clientsMutex.Lock() h.clients[r.RemoteAddr] = c h.clientsMutex.Unlock() @@ -233,28 +244,16 @@ func (h *Handler) Updates(w http.ResponseWriter, r *http.Request) { for { select { - case <-c: - case <-r.Context().Done(): // disconnect - return - } - tus, err := e2pg.TaskUpdates(r.Context(), h.pgp) - if err != nil { - slog.ErrorContext(r.Context(), "json error", "e", err) - return - } - for i := range tus { - sjson, err := json.Marshal(tus[i]) - if err != nil { - slog.ErrorContext(r.Context(), "json error", "e", err) - return - } - fmt.Fprintf(w, "data: %s\n\n", sjson) + case j := <-c: + fmt.Fprintf(w, "data: %s\n\n", j) flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming unsupported", http.StatusInternalServerError) return } flusher.Flush() + case <-r.Context().Done(): // disconnect + return } } }