Skip to content

Commit

Permalink
Add user counters and msec metrics to prism UI. (#28929)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Oct 11, 2023
1 parent 9cdcb03 commit 6a456f0
Showing 1 changed file with 39 additions and 1 deletion.
40 changes: 39 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,10 @@ func (h *jobDetailsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
trs := pipeResp.GetPipeline().GetComponents().GetTransforms()
col2T, topo := preprocessTransforms(trs)

counters := toTransformMap(results.AllMetrics().Counters())
distributions := toTransformMap(results.AllMetrics().Distributions())
msecs := toTransformMap(results.AllMetrics().Msecs())

data.Transforms = make([]pTransform, 0, len(trs))
for _, id := range topo {
pt := trs[id]
Expand Down Expand Up @@ -224,6 +228,29 @@ func (h *jobDetailsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
strMets = append(strMets, outMets...)
}

var msecMets []string
// TODO: Figure out where uniquename or id is being used in prism. It should be all global transform ID to faciliate lookups.
for _, msec := range msecs[pt.GetUniqueName()] {
msecMets = append(msecMets, fmt.Sprintf("\n- %+v", msec.Result()))
}
if len(msecMets) > 0 {
strMets = append(strMets, "Profiling metrics")
strMets = append(strMets, msecMets...)
}

var userMetrics []string
for _, ctr := range counters[pt.GetUniqueName()] {
userMetrics = append(userMetrics, fmt.Sprintf("\n- %s.%s: %v", ctr.Namespace(), ctr.Name(), ctr.Result()))
}
for _, dist := range distributions[pt.GetUniqueName()] {
userMetrics = append(userMetrics, fmt.Sprintf("\n- %s.%s: %+v", dist.Namespace(), dist.Name(), dist.Result()))
}

if len(userMetrics) > 0 {
strMets = append(strMets, "User metrics")
strMets = append(strMets, userMetrics...)
}

data.Transforms = append(data.Transforms, pTransform{
ID: id,
Transform: pt,
Expand All @@ -234,6 +261,14 @@ func (h *jobDetailsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
renderPage(jobPage, &data, w)
}

func toTransformMap[E interface{ Transform() string }](mets []E) map[string][]E {
ret := map[string][]E{}
for _, met := range mets {
ret[met.Transform()] = append(ret[met.Transform()], met)
}
return ret
}

type pcolParent struct {
L string
T *pipepb.PTransform
Expand All @@ -244,7 +279,10 @@ type pcolParent struct {
func preprocessTransforms(trs map[string]*pipepb.PTransform) (map[string]pcolParent, []string) {
ret := map[string]pcolParent{}
var leaves []string
for id, t := range trs {
keys := maps.Keys(trs)
sort.Strings(keys)
for _, id := range keys {
t := trs[id]
// Skip composites at this time.
if len(t.GetSubtransforms()) > 0 {
continue
Expand Down

0 comments on commit 6a456f0

Please sign in to comment.