From 6a456f04b68b3e19c66cf55937e6cf2be4cada19 Mon Sep 17 00:00:00 2001 From: Robert Burke Date: Wed, 11 Oct 2023 08:42:58 -0700 Subject: [PATCH] Add user counters and msec metrics to prism UI. (#28929) Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com> --- .../beam/runners/prism/internal/web/web.go | 40 ++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/sdks/go/pkg/beam/runners/prism/internal/web/web.go b/sdks/go/pkg/beam/runners/prism/internal/web/web.go index 7bfbe19a910b..b7afad35aeee 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/web/web.go +++ b/sdks/go/pkg/beam/runners/prism/internal/web/web.go @@ -188,6 +188,10 @@ func (h *jobDetailsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { trs := pipeResp.GetPipeline().GetComponents().GetTransforms() col2T, topo := preprocessTransforms(trs) + counters := toTransformMap(results.AllMetrics().Counters()) + distributions := toTransformMap(results.AllMetrics().Distributions()) + msecs := toTransformMap(results.AllMetrics().Msecs()) + data.Transforms = make([]pTransform, 0, len(trs)) for _, id := range topo { pt := trs[id] @@ -224,6 +228,29 @@ func (h *jobDetailsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { strMets = append(strMets, outMets...) } + var msecMets []string + // TODO: Figure out where uniquename or id is being used in prism. It should be all global transform ID to faciliate lookups. + for _, msec := range msecs[pt.GetUniqueName()] { + msecMets = append(msecMets, fmt.Sprintf("\n- %+v", msec.Result())) + } + if len(msecMets) > 0 { + strMets = append(strMets, "Profiling metrics") + strMets = append(strMets, msecMets...) + } + + var userMetrics []string + for _, ctr := range counters[pt.GetUniqueName()] { + userMetrics = append(userMetrics, fmt.Sprintf("\n- %s.%s: %v", ctr.Namespace(), ctr.Name(), ctr.Result())) + } + for _, dist := range distributions[pt.GetUniqueName()] { + userMetrics = append(userMetrics, fmt.Sprintf("\n- %s.%s: %+v", dist.Namespace(), dist.Name(), dist.Result())) + } + + if len(userMetrics) > 0 { + strMets = append(strMets, "User metrics") + strMets = append(strMets, userMetrics...) + } + data.Transforms = append(data.Transforms, pTransform{ ID: id, Transform: pt, @@ -234,6 +261,14 @@ func (h *jobDetailsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { renderPage(jobPage, &data, w) } +func toTransformMap[E interface{ Transform() string }](mets []E) map[string][]E { + ret := map[string][]E{} + for _, met := range mets { + ret[met.Transform()] = append(ret[met.Transform()], met) + } + return ret +} + type pcolParent struct { L string T *pipepb.PTransform @@ -244,7 +279,10 @@ type pcolParent struct { func preprocessTransforms(trs map[string]*pipepb.PTransform) (map[string]pcolParent, []string) { ret := map[string]pcolParent{} var leaves []string - for id, t := range trs { + keys := maps.Keys(trs) + sort.Strings(keys) + for _, id := range keys { + t := trs[id] // Skip composites at this time. if len(t.GetSubtransforms()) > 0 { continue