Skip to content

Commit

Permalink
e2pg/web: use intg table for task/integration updates
Browse files Browse the repository at this point in the history
This commit also shows the progress value for a task / integration.
  • Loading branch information
ryandotsmith committed Oct 31, 2023
1 parent d09bd5c commit b4c4c3c
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 46 deletions.
82 changes: 80 additions & 2 deletions e2pg/e2pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,10 +711,18 @@ func (d jsonDuration) MarshalJSON() ([]byte, error) {
}

func (d jsonDuration) String() string {
return time.Duration(d).Round(time.Millisecond).String()
switch td := time.Duration(d); {
case td < 10*time.Millisecond:
return td.Truncate(10 * time.Millisecond).String()
case td < 100*time.Millisecond:
return td.Truncate(100 * time.Millisecond).String()
default:
return td.Round(time.Second).String()
}
}

type TaskUpdate struct {
DOMID string `db:"-"`
SrcName string `db:"src_name"`
Backfill bool `db:"backfill"`
Num uint64 `db:"num"`
Expand Down Expand Up @@ -752,7 +760,76 @@ func TaskUpdates(ctx context.Context, pg wpg.Conn) ([]TaskUpdate, error) {
and e2pg.task.backfill = f.backfill
and e2pg.task.num = f.num;
`)
return pgx.CollectRows(rows, pgx.RowToStructByName[TaskUpdate])
tus, err := pgx.CollectRows(rows, pgx.RowToStructByName[TaskUpdate])
if err != nil {
return nil, fmt.Errorf("querying for task updates: %w", err)
}
for i := range tus {
switch {
case tus[i].Backfill:
tus[i].DOMID = fmt.Sprintf("%s-backfill", tus[i].SrcName)
default:
tus[i].DOMID = fmt.Sprintf("%s-main", tus[i].SrcName)
}
}
return tus, nil
}

type IntgUpdate struct {
DOMID string `db:"-"`
Name string `db:"name"`
SrcName string `db:"src_name"`
Backfill bool `db:"backfill"`
Num uint64 `db:"num"`
Stop uint64 `db:"stop"`
NRows uint64 `db:"nrows"`
Latency jsonDuration `db:"latency"`
}

func (iu IntgUpdate) TaskID() string {
switch {
case iu.Backfill:
return fmt.Sprintf("%s-backfill", iu.SrcName)
default:
return fmt.Sprintf("%s-main", iu.SrcName)
}
}

func IntgUpdates(ctx context.Context, pg wpg.Conn) ([]IntgUpdate, error) {
rows, _ := pg.Query(ctx, `
with f as (
select name, src_name, backfill, max(num) num
from e2pg.intg
group by 1, 2, 3
) select
f.name,
f.src_name,
f.backfill,
f.num,
stop,
coalesce(nrows, 0) nrows,
coalesce(latency, '0')::interval latency
from f
left join e2pg.intg latest
on latest.name = f.name
and latest.src_name = f.src_name
and latest.backfill = f.backfill
and latest.num = f.num
`)
ius, err := pgx.CollectRows(rows, pgx.RowToStructByName[IntgUpdate])
if err != nil {
return nil, fmt.Errorf("querying for intg updates: %w", err)
}
for i := range ius {
switch {
case ius[i].Backfill:
ius[i].DOMID = fmt.Sprintf("%s-backfill-%s", ius[i].SrcName, ius[i].Name)
default:
ius[i].DOMID = fmt.Sprintf("%s-main-%s", ius[i].SrcName, ius[i].Name)
}
}
return ius, nil
}

var compiled = map[string]Destination{}
Expand Down Expand Up @@ -790,6 +867,7 @@ func (tm *Manager) runTask(t *Task) {
default:
switch err := t.Converge(false); {
case errors.Is(err, ErrDone):
slog.InfoContext(t.ctx, "done")
return
case errors.Is(err, ErrNothingNew):
time.Sleep(time.Second)
Expand Down
83 changes: 50 additions & 33 deletions e2pg/web/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,25 @@
margin-bottom: 10px;
color: dimgray;
}
.taskHeader .NRows, .taskHeader .Latency , .taskHeader .Hash, .taskHeader .Num {
.taskHeader .Progress, .taskHeader .NRows, .taskHeader .Latency , .taskHeader .Hash, .taskHeader .Num {
font-family: system-ui;
text-transform: capitalize;
}
.backfill {
margin: 30px 0 0 0;
}
.backfill h2 {
letter-spacing: 90%;
font-size: large;
font-weight: normal;
font-style: italic;
color: dimgray;
margin: 0 0 5px 0;
margin: 0 0 10px 0;
}
.backfill .Name {
font-size: medium;
}
.backfill .NRows, .backfill .Latency , .backfill .Hash, .backfill .Num {
.backfill .Progress, .backfill .NRows, .backfill .Latency , .backfill .Hash, .backfill .Num {
font-family: monospace;
font-size: medium;
}
Expand Down Expand Up @@ -79,7 +82,7 @@
text-transform: capitalize;
}
.NRows {
width: 10%;
width: 5%;
text-transform: lowercase;
text-align: left;
font-family: monospace;
Expand All @@ -95,16 +98,21 @@
text-transform: lowercase;
text-align: left;
font-family: monospace;
padding-left: 20px;
}
.Hash {
width: 24%;
width: 20%;
text-transform: lowercase;
text-align: left;
font-family: monospace;
overflow: hidden;
text-overflow: ellipsis;
}
.Progress {
width: 5%;
text-transform: lowercase;
text-align: right;
font-family: monospace;
}
</style>
</head>
<body>
Expand All @@ -122,25 +130,30 @@ <h1>E2PG</h1>
<div class="Latency"><span>Latency</span></div>
<div class="Hash"><span>Hash</span></div>
<div class="Num"><span>Number</span></div>
<div class="Progress"><span>Sync</span></div>
{{ end -}}
</div>
<div class="tasks">
{{ range $sc := .SourceConfigs -}}
{{ with $tu := (index $.TaskUpdates $sc.Name) -}}
<div class="task" id="{{ $sc.Name }}">
{{ with $tu := (index $.TaskUpdates (printf "%s-main" $sc.Name)) -}}
<div class="task" id="{{ $tu.DOMID }}">
<div class="source">
<div class="Name">{{ $sc.Name }}</div>
<div class="NRows">{{ $tu.NRows }}</div>
<div class="Latency">{{ $tu.Latency }}</div>
<div class="Hash">{{(printf "0x%x" $tu.Hash)}}</div>
<div class="Num addComma">{{ $tu.Num }}</div>
<div class="Progress">-</div>
</div>
<div class="destinations" style="display:none;">
{{ range $name, $stat := $tu.Dstat -}}
<div class="destination" id="{{ $tu.SrcName }}-{{$name}}">
<div class="Name">{{ $name }}</div>
<div class="NRows">{{ $stat.NRows }}</div>
<div class="Latency">{{ $stat.Latency }}</div>
{{ range $iu := (index $.IntgUpdates $tu.DOMID) -}}
<div class="destination" id="{{ $iu.DOMID }}">
<div class="Name">{{ $iu.Name }}</div>
<div class="NRows">{{ $iu.NRows }}</div>
<div class="Latency">{{ $iu.Latency }}</div>
<div class="Hash"></div>
<div class="Num"></div>
<div class="Progress"></div>
</div>
{{ end -}}
</div>
Expand All @@ -152,21 +165,25 @@ <h1>E2PG</h1>
<h2>Backfill Tasks</h2>
<div class="backfill-tasks">
{{ range $sc := .SourceConfigs -}}
{{ with $tu := (index $.TaskUpdatesBF $sc.Name) -}}
<div class="task" id="{{ printf "%s-backfill" $sc.Name }}">
{{ with $tu := (index $.TaskUpdates (printf "%s-backfill" $sc.Name)) -}}
<div class="task" id="{{ $tu.DOMID }}">
<div class="source">
<div class="Name">{{ $sc.Name }}</div>
<div class="NRows">{{ $tu.NRows }}</div>
<div class="Latency">{{ $tu.Latency }}</div>
<div class="Hash">{{(printf "0x%x" $tu.Hash)}}</div>
<div class="Num addComma">{{ $tu.Num }}</div>
<div class="Progress">-</div>
</div>
<div class="destinations" style="display:none;">
{{ range $name, $stat := $tu.Dstat -}}
<div class="destination" id="{{ $tu.SrcName }}-{{$name}}">
<div class="Name">{{ $name }}</div>
<div class="NRows">{{ $stat.NRows }}</div>
<div class="Latency">{{ $stat.Latency }}</div>
{{ range $iu := (index $.IntgUpdates $tu.DOMID) -}}
<div class="destination" id="{{ $iu.DOMID }}">
<div class="Name">{{ $iu.Name }}</div>
<div class="NRows">{{ $iu.NRows }}</div>
<div class="Latency">{{ $iu.Latency }}</div>
<div class="Hash"></div>
<div class="Num addComma">{{ $iu.Num }}</div>
<div class="Progress"></div>
</div>
{{ end -}}
</div>
Expand All @@ -177,11 +194,15 @@ <h2>Backfill Tasks</h2>
</div>
</body>
<script>
function tuid(tu) {
if (tu.Backfill) {
return tu.SrcName + "-backfill";
function progress(start, stop) {
if (stop == 0) {
return "100%";
}
const p = (start / stop) * 100;
if (p == 100) {
return "100%";
} else {
return tu.SrcName;
return `${p.toFixed(2)}%`;
}
};
function comma(s) {
Expand All @@ -208,15 +229,11 @@ <h2>Backfill Tasks</h2>
let updates = new EventSource("/task-updates");
updates.onmessage = function(event) {
const tu = JSON.parse(event.data);
update(tuid(tu), "Num", comma(tu.Num));
update(tuid(tu), "Hash", tu.Hash);
update(tuid(tu), "Latency", tu.Latency);
update(tuid(tu), "NRows", tu.NRows);
for (const [k1, v1] of Object.entries(tu.Dstat)) {
for (const [k2, v2] of Object.entries(v1)) {
update(`${tuid(tu)}-${k1}`, k2, v2);
}
}
update(tu.DOMID, "Num", comma(tu.Num));
update(tu.DOMID, "Hash", tu.Hash);
update(tu.DOMID, "Latency", tu.Latency);
update(tu.DOMID, "NRows", tu.NRows);
update(tu.DOMID, "Progress", progress(tu.Num, tu.Stop));
};
});
</script>
Expand Down
27 changes: 16 additions & 11 deletions e2pg/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (h *Handler) AddIntegration(w http.ResponseWriter, r *http.Request) {

type IndexView struct {
TaskUpdates map[string]e2pg.TaskUpdate
TaskUpdatesBF map[string]e2pg.TaskUpdate
IntgUpdates map[string][]e2pg.IntgUpdate
SourceConfigs []e2pg.SourceConfig
}

Expand All @@ -171,24 +171,29 @@ func (h *Handler) Index(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
view.TaskUpdates = make(map[string]e2pg.TaskUpdate)
for _, tu := range tus {
if !tu.Backfill {
view.TaskUpdates[tu.SrcName] = tu
}
ius, err := e2pg.IntgUpdates(ctx, h.pgp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
view.TaskUpdatesBF = make(map[string]e2pg.TaskUpdate)

view.IntgUpdates = make(map[string][]e2pg.IntgUpdate)
for _, iu := range ius {
view.IntgUpdates[iu.TaskID()] = append(
view.IntgUpdates[iu.TaskID()],
iu,
)
}
view.TaskUpdates = make(map[string]e2pg.TaskUpdate)
for _, tu := range tus {
if tu.Backfill {
view.TaskUpdatesBF[tu.SrcName] = tu
}
view.TaskUpdates[tu.DOMID] = tu
}

view.SourceConfigs, err = h.conf.AllSourceConfigs(ctx, h.pgp)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

t, err := h.template("index")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
Expand Down

0 comments on commit b4c4c3c

Please sign in to comment.