diff --git a/CHANGELOG.md b/CHANGELOG.md index 42a3206..0d9fd6d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -110,6 +110,7 @@ - Move calltree generation into readjob ([#514](https://github.com/getsentry/vroom/pull/514)) - Stop writing profile examples to metrics_summary ([#519](https://github.com/getsentry/vroom/pull/519)) - Update materialized_version for profile functions metrics ([#522](https://github.com/getsentry/vroom/pull/522)) +- Support writing functions metrics we extract from chunks into the functions dataset ([#524](https://github.com/getsentry/vroom/pull/524)) - Keep top N samples in flamegraph. ([#526](https://github.com/getsentry/vroom/pull/526)) ## 23.12.0 diff --git a/cmd/vroom/chunk.go b/cmd/vroom/chunk.go index ccdbeea..3c90d4c 100644 --- a/cmd/vroom/chunk.go +++ b/cmd/vroom/chunk.go @@ -157,13 +157,44 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) return } - s = sentry.StartSpan(ctx, "processing") s.Description = "Extract functions" functions := metrics.ExtractFunctionsFromCallTrees(callTrees) functions = metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true) s.Finish() + // This block writes into the functions dataset + s = sentry.StartSpan(ctx, "json.marshal") + s.Description = "Marshal functions Kafka message" + b, err := json.Marshal(buildChunkFunctionsKafkaMessage(sc, functions)) + s.Finish() + if err != nil { + if hub != nil { + hub.CaptureException(err) + } + w.WriteHeader(http.StatusInternalServerError) + return + } + s = sentry.StartSpan(ctx, "processing") + s.Description = "Send functions to Kafka" + err = env.profilingWriter.WriteMessages(ctx, kafka.Message{ + Topic: env.config.CallTreesKafkaTopic, + Value: b, + }) + s.Finish() + if hub != nil { + hub.Scope().SetContext("Call functions payload", map[string]interface{}{ + "Size": len(b), + }) + } + if err != nil { + if hub != nil { + hub.CaptureException(err) + } + } + + // this block is writing into the generic metrics dataset + // TODO: remove once we fully move to functions dataset s = sentry.StartSpan(ctx, "processing") s.Description = "Extract metrics from functions" metrics := extractMetricsFromSampleChunkFunctions(sc, functions) diff --git a/cmd/vroom/kafka.go b/cmd/vroom/kafka.go index 84c4531..d0d1157 100644 --- a/cmd/vroom/kafka.go +++ b/cmd/vroom/kafka.go @@ -3,6 +3,7 @@ package main import ( "context" + "github.com/getsentry/vroom/internal/chunk" "github.com/getsentry/vroom/internal/nodetree" "github.com/getsentry/vroom/internal/platform" "github.com/getsentry/vroom/internal/profile" @@ -22,6 +23,9 @@ type ( RetentionDays int `json:"retention_days"` Timestamp int64 `json:"timestamp"` TransactionName string `json:"transaction_name"` + StartTimestamp float64 `json:"start_timestamp,omitempty"` + EndTimestamp float64 `json:"end_timestamp,omitempty"` + ProfilingType string `json:"profiling_type,omitempty"` MaterializationVersion uint8 `json:"materialization_version"` } @@ -90,6 +94,26 @@ func buildFunctionsKafkaMessage(p profile.Profile, functions []nodetree.CallTree } } +// Metrics extraction is only supported for sample chunks right now. +// TODO: support metrics extraction for Android chunks. +func buildChunkFunctionsKafkaMessage(c *chunk.SampleChunk, functions []nodetree.CallTreeFunction) FunctionsKafkaMessage { + return FunctionsKafkaMessage{ + Environment: c.Environment, + Functions: functions, + ID: c.ID, + Platform: c.Platform, + ProjectID: c.ProjectID, + Received: int64(c.Received), + Release: c.Release, + RetentionDays: c.RetentionDays, + Timestamp: int64(c.StartTimestamp()), + StartTimestamp: c.StartTimestamp(), + EndTimestamp: c.EndTimestamp(), + ProfilingType: "continuous", + MaterializationVersion: 1, + } +} + func buildProfileKafkaMessage(p profile.Profile) ProfileKafkaMessage { t := p.Transaction() m := p.Metadata() diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index fe3bd3c..77f1ffa 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -5,6 +5,7 @@ import ( "errors" "math" "sort" + "strconv" "github.com/getsentry/sentry-go" "github.com/getsentry/vroom/internal/chunk" @@ -117,7 +118,7 @@ func ExtractFunctionsFromCallTreesForThread( functions := make(map[uint32]nodetree.CallTreeFunction, 0) for _, callTree := range callTreesForThread { - callTree.CollectFunctions(functions) + callTree.CollectFunctions(functions, "") } return mergeAndSortFunctions(functions) @@ -127,10 +128,15 @@ func ExtractFunctionsFromCallTrees[T comparable]( callTrees map[T][]*nodetree.Node, ) []nodetree.CallTreeFunction { functions := make(map[uint32]nodetree.CallTreeFunction, 0) - - for _, callTreesForThread := range callTrees { + for tid, callTreesForThread := range callTrees { + threadID := "" + if t, ok := any(tid).(string); ok { + threadID = t + } else if t, ok := any(tid).(uint64); ok { + threadID = strconv.FormatUint(t, 10) + } for _, callTree := range callTreesForThread { - callTree.CollectFunctions(functions) + callTree.CollectFunctions(functions, threadID) } } diff --git a/internal/nodetree/nodetree.go b/internal/nodetree/nodetree.go index 23cfdcd..3d3f3e1 100644 --- a/internal/nodetree/nodetree.go +++ b/internal/nodetree/nodetree.go @@ -105,6 +105,8 @@ type CallTreeFunction struct { SelfTimesNS []uint64 `json:"self_times_ns"` SumSelfTimeNS uint64 `json:"-"` SampleCount int `json:"-"` + ThreadID string `json:"thread_id"` + MaxDuration uint64 `json:"-"` } // `CollectionFunctions` walks the node tree, collects any function with a non zero @@ -127,13 +129,14 @@ type CallTreeFunction struct { // 100ms - 30ms = 70ms. func (n *Node) CollectFunctions( results map[uint32]CallTreeFunction, + threadID string, ) (uint64, uint64) { var childrenApplicationDurationNS uint64 var childrenSystemDurationNS uint64 // determine the amount of time spent in application vs system functions in the children for _, child := range n.Children { - applicationDurationNS, systemDurationNS := child.CollectFunctions(results) + applicationDurationNS, systemDurationNS := child.CollectFunctions(results, threadID) childrenApplicationDurationNS += applicationDurationNS childrenSystemDurationNS += systemDurationNS } @@ -185,11 +188,19 @@ func (n *Node) CollectFunctions( SelfTimesNS: []uint64{selfTimeNS}, SumSelfTimeNS: selfTimeNS, SampleCount: n.SampleCount, + ThreadID: threadID, + MaxDuration: selfTimeNS, } } else { function.SelfTimesNS = append(function.SelfTimesNS, selfTimeNS) function.SumSelfTimeNS += selfTimeNS function.SampleCount += n.SampleCount + if selfTimeNS > function.MaxDuration { + function.MaxDuration = selfTimeNS + if threadID != function.ThreadID { + function.ThreadID = threadID + } + } results[fingerprint] = function } } diff --git a/internal/nodetree/nodetree_test.go b/internal/nodetree/nodetree_test.go index 706bd81..4c98134 100644 --- a/internal/nodetree/nodetree_test.go +++ b/internal/nodetree/nodetree_test.go @@ -42,6 +42,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { Package: "foo", SelfTimesNS: []uint64{10}, SumSelfTimeNS: 10, + MaxDuration: 10, }, }, }, @@ -64,6 +65,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { Package: "foo", SelfTimesNS: []uint64{10}, SumSelfTimeNS: 10, + MaxDuration: 10, }, }, }, @@ -96,6 +98,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { Package: "foo", SelfTimesNS: []uint64{10}, SumSelfTimeNS: 10, + MaxDuration: 10, }, fingerprintBar: { Fingerprint: fingerprintBar, @@ -104,6 +107,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { Package: "bar", SelfTimesNS: []uint64{10}, SumSelfTimeNS: 10, + MaxDuration: 10, }, }, }, @@ -156,6 +160,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { Package: "foo", SelfTimesNS: []uint64{10}, SumSelfTimeNS: 10, + MaxDuration: 10, }, fingerprintBaz: { Fingerprint: fingerprintBaz, @@ -164,11 +169,12 @@ func TestNodeTreeCollectFunctions(t *testing.T) { Package: "baz", SelfTimesNS: []uint64{10}, SumSelfTimeNS: 10, + MaxDuration: 10, }, }, }, { - name: "mutitple occurrences of same functions", + name: "multitple occurrences of same functions", platform: platform.Python, node: Node{ DurationNS: 40, @@ -260,6 +266,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { Package: "foo", SelfTimesNS: []uint64{10, 20}, SumSelfTimeNS: 30, + MaxDuration: 20, }, fingerprintBaz: { Fingerprint: fingerprintBaz, @@ -268,6 +275,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { Package: "baz", SelfTimesNS: []uint64{10, 20}, SumSelfTimeNS: 30, + MaxDuration: 20, }, fingerprintQux: { Fingerprint: fingerprintQux, @@ -276,6 +284,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { Package: "qux", SelfTimesNS: []uint64{10}, SumSelfTimeNS: 10, + MaxDuration: 10, }, fingerprintMain: { Fingerprint: fingerprintMain, @@ -284,6 +293,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { Package: "main", SelfTimesNS: []uint64{10}, SumSelfTimeNS: 10, + MaxDuration: 10, }, }, }, @@ -336,6 +346,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { InApp: true, SelfTimesNS: []uint64{10}, SumSelfTimeNS: 10, + MaxDuration: 10, }, }, }, @@ -370,6 +381,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { InApp: true, SelfTimesNS: []uint64{10}, SumSelfTimeNS: 10, + MaxDuration: 10, }, }, }, @@ -392,7 +404,7 @@ func TestNodeTreeCollectFunctions(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { results := make(map[uint32]CallTreeFunction) - tt.node.CollectFunctions(results) + tt.node.CollectFunctions(results, "") if diff := testutil.Diff(results, tt.want); diff != "" { t.Fatalf("Result mismatch: got - want +\n%s", diff) }