Skip to content

Commit

Permalink
e2pg/web: refactor. query and encdoe updates json once for all clients
Browse files Browse the repository at this point in the history
  • Loading branch information
ryandotsmith committed Nov 3, 2023
1 parent bc7dc01 commit a590f2f
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 26 deletions.
4 changes: 4 additions & 0 deletions cmd/e2pg/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func main() {
check(pprof.StartCPUProfile(&pbuf))
}

go func() {
check(wh.PushUpdates())
}()

go func() {
for {
check(e2pg.PruneIntg(ctx, pg))
Expand Down
51 changes: 25 additions & 26 deletions e2pg/web/web.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package web

import (
"context"
_ "embed"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -40,28 +41,38 @@ type Handler struct {
conf *e2pg.Config

clientsMutex sync.Mutex
clients map[string]chan uint64
clients map[string]chan []byte

templates map[string]*template.Template
}

func New(mgr *e2pg.Manager, conf *e2pg.Config, pgp *pgxpool.Pool) *Handler {
h := &Handler{
return &Handler{
pgp: pgp,
mgr: mgr,
conf: conf,
clients: make(map[string]chan uint64),
clients: make(map[string]chan []byte),
templates: make(map[string]*template.Template),
}
go func() {
for {
tid := mgr.Updates()
}

func (h *Handler) PushUpdates() error {
ctx := context.Background()
for ; ; h.mgr.Updates() {
tus, err := e2pg.TaskUpdates(ctx, h.pgp)
if err != nil {
return fmt.Errorf("querying task updates: %w", err)
}
for _, update := range tus {
j, err := json.Marshal(update)
if err != nil {
return fmt.Errorf("marshaling task update: %w", err)
}
for _, c := range h.clients {
c <- tid
c <- j
}
}
}()
return h
}
}

func (h *Handler) template(name string) (*template.Template, error) {
Expand Down Expand Up @@ -219,7 +230,7 @@ func (h *Handler) Updates(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Connection", "keep-alive")

slog.InfoContext(r.Context(), "start sse", "c", r.RemoteAddr, "n", len(h.clients))
c := make(chan uint64)
c := make(chan []byte)
h.clientsMutex.Lock()
h.clients[r.RemoteAddr] = c
h.clientsMutex.Unlock()
Expand All @@ -233,28 +244,16 @@ func (h *Handler) Updates(w http.ResponseWriter, r *http.Request) {

for {
select {
case <-c:
case <-r.Context().Done(): // disconnect
return
}
tus, err := e2pg.TaskUpdates(r.Context(), h.pgp)
if err != nil {
slog.ErrorContext(r.Context(), "json error", "e", err)
return
}
for i := range tus {
sjson, err := json.Marshal(tus[i])
if err != nil {
slog.ErrorContext(r.Context(), "json error", "e", err)
return
}
fmt.Fprintf(w, "data: %s\n\n", sjson)
case j := <-c:
fmt.Fprintf(w, "data: %s\n\n", j)
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming unsupported", http.StatusInternalServerError)
return
}
flusher.Flush()
case <-r.Context().Done(): // disconnect
return
}
}
}
Expand Down

0 comments on commit a590f2f

Please sign in to comment.