diff --git a/CHANGELOG.md b/CHANGELOG.md index c5ae708..79df509 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ - Add more metadata fields to Chunk Kafka message. ([#518](https://github.com/getsentry/vroom/pull/518)) - Ingest Android profile chunks. ([#521](https://github.com/getsentry/vroom/pull/521)) - Handle profile chunks in regressed endpoint. ([#527](https://github.com/getsentry/vroom/pull/527)) +- Add support for android chunks. ([#540](https://github.com/getsentry/vroom/pull/540)) **Bug Fixes**: @@ -118,9 +119,11 @@ - Add utility to merge a list of android chunks and generate a speedscope result ([#531](https://github.com/getsentry/vroom/pull/531)) - Remove unused legacy flamegraph code path. ([#533](https://github.com/getsentry/vroom/pull/533)) - Remove generic metrics ingestion ([#534](https://github.com/getsentry/vroom/pull/534)) +- Add android chunk calltree implementation and signature definition to the chunk interface ([#536](https://github.com/getsentry/vroom/pull/536)) - Update sentry-go dependency to v0.29.1 ([#535](https://github.com/getsentry/vroom/pull/535)) - Lower number of concurrent reads ([#537](https://github.com/getsentry/vroom/pull/537)) - Remove unused metrics summary kafka writer ([#538](https://github.com/getsentry/vroom/pull/538)) +- Remove unused chunks flamegraph endpoint ([#541](https://github.com/getsentry/vroom/pull/541)) - Pass readjob result as pointer ([#542](https://github.com/getsentry/vroom/pull/542)) ## 23.12.0 diff --git a/cmd/vroom/chunk.go b/cmd/vroom/chunk.go index b47386c..e79c6e7 100644 --- a/cmd/vroom/chunk.go +++ b/cmd/vroom/chunk.go @@ -57,14 +57,14 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) { var c chunk.Chunk switch p.Platform { case platform.Android: - c = new(chunk.AndroidChunk) + c = chunk.New(new(chunk.AndroidChunk)) default: - c = new(chunk.SampleChunk) + c = chunk.New(new(chunk.SampleChunk)) } s = sentry.StartSpan(ctx, "json.unmarshal") s.Description = "Unmarshal profile" - err = json.Unmarshal(body, c) + err = json.Unmarshal(body, &c) s.Finish() if err != nil { if hub != nil { @@ -138,59 +138,52 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) { } s.Finish() - options := c.GetOptions() - sc, ok := c.(*chunk.SampleChunk) - - // Metrics extraction is only supported for sample chunks right now. - // TODO: support metrics extraction for Android chunks. - if options.ProjectDSN != "" && c.GetPlatform() != platform.Android && ok { - // nb.: here we don't have a specific thread ID, so we're going to ingest - // functions metrics from all the thread. - // That's ok as this data is not supposed to be transaction/span scoped, - // plus, we'll only retain application frames, so much of the system functions - // chaff will be dropped. - s = sentry.StartSpan(ctx, "processing") - callTrees, err := sc.CallTrees(nil) - s.Finish() - if err != nil { - hub.CaptureException(err) - w.WriteHeader(http.StatusInternalServerError) - return - } - s = sentry.StartSpan(ctx, "processing") - s.Description = "Extract functions" - functions := metrics.ExtractFunctionsFromCallTrees(callTrees) - functions = metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true) - s.Finish() + // nb.: here we don't have a specific thread ID, so we're going to ingest + // functions metrics from all the thread. + // That's ok as this data is not supposed to be transaction/span scoped, + // plus, we'll only retain application frames, so much of the system functions + // chaff will be dropped. + s = sentry.StartSpan(ctx, "processing") + callTrees, err := c.CallTrees(nil) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + s = sentry.StartSpan(ctx, "processing") + s.Description = "Extract functions" + functions := metrics.ExtractFunctionsFromCallTrees(callTrees) + functions = metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true) + s.Finish() - // This block writes into the functions dataset - s = sentry.StartSpan(ctx, "json.marshal") - s.Description = "Marshal functions Kafka message" - b, err := json.Marshal(buildChunkFunctionsKafkaMessage(sc, functions)) - s.Finish() - if err != nil { - if hub != nil { - hub.CaptureException(err) - } - w.WriteHeader(http.StatusInternalServerError) - return + // This block writes into the functions dataset + s = sentry.StartSpan(ctx, "json.marshal") + s.Description = "Marshal functions Kafka message" + b, err = json.Marshal(buildChunkFunctionsKafkaMessage(&c, functions)) + s.Finish() + if err != nil { + if hub != nil { + hub.CaptureException(err) } - s = sentry.StartSpan(ctx, "processing") - s.Description = "Send functions to Kafka" - err = env.profilingWriter.WriteMessages(ctx, kafka.Message{ - Topic: env.config.CallTreesKafkaTopic, - Value: b, + w.WriteHeader(http.StatusInternalServerError) + return + } + s = sentry.StartSpan(ctx, "processing") + s.Description = "Send functions to Kafka" + err = env.profilingWriter.WriteMessages(ctx, kafka.Message{ + Topic: env.config.CallTreesKafkaTopic, + Value: b, + }) + s.Finish() + if hub != nil { + hub.Scope().SetContext("Call functions payload", map[string]interface{}{ + "Size": len(b), }) - s.Finish() + } + if err != nil { if hub != nil { - hub.Scope().SetContext("Call functions payload", map[string]interface{}{ - "Size": len(b), - }) - } - if err != nil { - if hub != nil { - hub.CaptureException(err) - } + hub.CaptureException(err) } } @@ -268,7 +261,7 @@ func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.R } } - chunks := make([]chunk.SampleChunk, 0, len(requestBody.ChunkIDs)) + chunks := make([]chunk.Chunk, 0, len(requestBody.ChunkIDs)) // read the output of each tasks for i := 0; i < len(requestBody.ChunkIDs); i++ { res := <-results @@ -312,26 +305,82 @@ func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.R s = sentry.StartSpan(ctx, "chunks.merge") s.Description = "Merge profile chunks into a single one" - chunk, err := chunk.MergeSampleChunks(chunks, requestBody.Start, requestBody.End) - s.Finish() - if err != nil { - hub.CaptureException(err) - w.WriteHeader(http.StatusInternalServerError) + if len(chunks) == 0 { + w.WriteHeader(http.StatusBadRequest) return } + var resp []byte + // Here we check what type of chunks we're dealing with, + // since Android chunks and Sample chunks return completely + // different types (Chunk vs Speedscope), hence we can't hide + // the implementation behind an interface. + // + // We check the first chunk type, and use that to assert the + // type of all the elements in the slice and then call the + // appropriate utility. + switch chunks[0].Chunk().(type) { + case *chunk.SampleChunk: + sampleChunks := make([]chunk.SampleChunk, 0, len(chunks)) + for _, c := range chunks { + sc, ok := c.Chunk().(*chunk.SampleChunk) + if !ok { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, "error: mix of sampled and android chunks") + return + } + sampleChunks = append(sampleChunks, *sc) + } + mergedChunk, err := chunk.MergeSampleChunks(sampleChunks, requestBody.Start, requestBody.End) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + s = sentry.StartSpan(ctx, "json.marshal") + resp, err = json.Marshal(postProfileFromChunkIDsResponse{Chunk: mergedChunk}) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } - s = sentry.StartSpan(ctx, "json.marshal") - defer s.Finish() - b, err := json.Marshal(postProfileFromChunkIDsResponse{Chunk: chunk}) - if err != nil { - hub.CaptureException(err) - w.WriteHeader(http.StatusInternalServerError) + case *chunk.AndroidChunk: + androidChunks := make([]chunk.AndroidChunk, 0, len(chunks)) + for _, c := range chunks { + ac, ok := c.Chunk().(*chunk.AndroidChunk) + if !ok { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, "error: mix of android and sample chunks") + return + } + androidChunks = append(androidChunks, *ac) + } + sp, err := chunk.SpeedscopeFromAndroidChunks(androidChunks, requestBody.Start, requestBody.End) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + s = sentry.StartSpan(ctx, "json.marshal") + resp, err = json.Marshal(sp) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + default: + // Should never happen. + w.WriteHeader(http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - _, _ = w.Write(b) + _, _ = w.Write(resp) } type ( diff --git a/cmd/vroom/chunk_test.go b/cmd/vroom/chunk_test.go index 2ef6051..e995699 100644 --- a/cmd/vroom/chunk_test.go +++ b/cmd/vroom/chunk_test.go @@ -60,6 +60,7 @@ func TestPostAndReadSampleChunk(t *testing.T) { Release: "1.2", OrganizationID: 1, ProjectID: 1, + Version: "2", Profile: chunk.SampleData{ Frames: []frame.Frame{ { @@ -125,7 +126,7 @@ func TestPostAndReadSampleChunk(t *testing.T) { // read the chunk with UnmarshalCompressed and make sure that we can unmarshal // the data into the Chunk struct and that it matches the original - var c chunk.SampleChunk + var c chunk.Chunk err = storageutil.UnmarshalCompressed( context.Background(), test.blobBucket, @@ -135,7 +136,7 @@ func TestPostAndReadSampleChunk(t *testing.T) { if err != nil { t.Fatal(err) } - if diff := testutil.Diff(chunkData, c); diff != "" { + if diff := testutil.Diff(&chunkData, c.Chunk()); diff != "" { t.Fatalf("Result mismatch: got - want +\n%s", diff) } }) diff --git a/cmd/vroom/flamegraph.go b/cmd/vroom/flamegraph.go index 73d4a14..590ad22 100644 --- a/cmd/vroom/flamegraph.go +++ b/cmd/vroom/flamegraph.go @@ -13,81 +13,6 @@ import ( "github.com/getsentry/vroom/internal/utils" ) -type postFlamegraphFromChunksMetadataBody struct { - ChunksMetadata []flamegraph.ChunkMetadata `json:"chunks_metadata"` -} - -func (env *environment) postFlamegraphFromChunksMetadata(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - hub := sentry.GetHubFromContext(ctx) - ps := httprouter.ParamsFromContext(ctx) - rawOrganizationID := ps.ByName("organization_id") - organizationID, err := strconv.ParseUint(rawOrganizationID, 10, 64) - if err != nil { - if hub != nil { - hub.CaptureException(err) - } - w.WriteHeader(http.StatusBadRequest) - return - } - - hub.Scope().SetTag("organization_id", rawOrganizationID) - - rawProjectID := ps.ByName("project_id") - projectID, err := strconv.ParseUint(rawProjectID, 10, 64) - if err != nil { - sentry.CaptureException(err) - w.WriteHeader(http.StatusBadRequest) - return - } - if hub != nil { - hub.Scope().SetTag("project_id", rawProjectID) - } - - var body postFlamegraphFromChunksMetadataBody - s := sentry.StartSpan(ctx, "processing") - s.Description = "Decoding data" - err = json.NewDecoder(r.Body).Decode(&body) - s.Finish() - if err != nil { - if hub != nil { - hub.CaptureException(err) - } - w.WriteHeader(http.StatusBadRequest) - return - } - - s = sentry.StartSpan(ctx, "processing") - speedscope, err := flamegraph.GetFlamegraphFromChunks(ctx, organizationID, projectID, env.storage, body.ChunksMetadata, readJobs) - s.Finish() - if err != nil { - if hub != nil { - hub.CaptureException(err) - } - w.WriteHeader(http.StatusInternalServerError) - return - } - - if hub != nil { - hub.Scope().SetTag("requested_chunks", strconv.Itoa(len(body.ChunksMetadata))) - } - - s = sentry.StartSpan(ctx, "json.marshal") - defer s.Finish() - b, err := json.Marshal(speedscope) - if err != nil { - if hub != nil { - hub.CaptureException(err) - } - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusOK) - _, _ = w.Write(b) -} - type ( postFlamegraphBody struct { Transaction []utils.TransactionProfileCandidate `json:"transaction"` diff --git a/cmd/vroom/kafka.go b/cmd/vroom/kafka.go index 5e6625e..0f4c6fa 100644 --- a/cmd/vroom/kafka.go +++ b/cmd/vroom/kafka.go @@ -76,16 +76,16 @@ func buildFunctionsKafkaMessage(p profile.Profile, functions []nodetree.CallTree // Metrics extraction is only supported for sample chunks right now. // TODO: support metrics extraction for Android chunks. -func buildChunkFunctionsKafkaMessage(c *chunk.SampleChunk, functions []nodetree.CallTreeFunction) FunctionsKafkaMessage { +func buildChunkFunctionsKafkaMessage(c *chunk.Chunk, functions []nodetree.CallTreeFunction) FunctionsKafkaMessage { return FunctionsKafkaMessage{ - Environment: c.Environment, + Environment: c.GetEnvironment(), Functions: functions, - ID: c.ProfilerID, - Platform: c.Platform, - ProjectID: c.ProjectID, - Received: int64(c.Received), - Release: c.Release, - RetentionDays: c.RetentionDays, + ID: c.GetProfilerID(), + Platform: c.GetPlatform(), + ProjectID: c.GetProjectID(), + Received: int64(c.GetReceived()), + Release: c.GetRelease(), + RetentionDays: c.GetRetentionDays(), Timestamp: int64(c.StartTimestamp()), StartTimestamp: c.StartTimestamp(), EndTimestamp: c.EndTimestamp(), diff --git a/cmd/vroom/main.go b/cmd/vroom/main.go index 5127054..81748a3 100644 --- a/cmd/vroom/main.go +++ b/cmd/vroom/main.go @@ -120,11 +120,6 @@ func (e *environment) newRouter() (*httprouter.Router, error) { "/organizations/:organization_id/projects/:project_id/raw_profiles/:profile_id", e.getRawProfile, }, - { - http.MethodPost, - "/organizations/:organization_id/projects/:project_id/chunks-flamegraph", - e.postFlamegraphFromChunksMetadata, - }, { http.MethodPost, "/organizations/:organization_id/projects/:project_id/chunks", diff --git a/internal/chunk/android.go b/internal/chunk/android.go index a2da10b..a001311 100644 --- a/internal/chunk/android.go +++ b/internal/chunk/android.go @@ -2,6 +2,7 @@ package chunk import ( "encoding/json" + "strconv" "time" "github.com/getsentry/vroom/internal/clientsdk" @@ -53,8 +54,14 @@ func (c AndroidChunk) DurationMS() uint64 { return uint64(time.Duration(c.DurationNS).Milliseconds()) } -func (c AndroidChunk) CallTrees() map[uint64][]*nodetree.Node { - return c.Profile.CallTrees() +func (c AndroidChunk) CallTrees(_ *string) (map[string][]*nodetree.Node, error) { + callTrees := c.Profile.CallTrees() + stringThreadCallTrees := make(map[string][]*nodetree.Node) + for tid, callTree := range callTrees { + threadID := strconv.FormatUint(tid, 10) + stringThreadCallTrees[threadID] = callTree + } + return stringThreadCallTrees, nil } func (c AndroidChunk) SDKName() string { diff --git a/internal/chunk/chunk.go b/internal/chunk/chunk.go index cc715c8..b5013b2 100644 --- a/internal/chunk/chunk.go +++ b/internal/chunk/chunk.go @@ -1,15 +1,17 @@ package chunk import ( + "encoding/json" "fmt" "github.com/getsentry/vroom/internal/frame" + "github.com/getsentry/vroom/internal/nodetree" "github.com/getsentry/vroom/internal/platform" "github.com/getsentry/vroom/internal/utils" ) type ( - Chunk interface { + chunkInterface interface { GetEnvironment() string GetID() string GetOrganizationID() uint64 @@ -21,6 +23,7 @@ type ( GetRetentionDays() int GetOptions() utils.Options GetFrameWithFingerprint(uint32) (frame.Frame, error) + CallTrees(activeThreadID *string) (map[string][]*nodetree.Node, error) DurationMS() uint64 EndTimestamp() float64 @@ -31,8 +34,45 @@ type ( Normalize() } + + Chunk struct { + chunk chunkInterface + } ) +func New(c chunkInterface) Chunk { + return Chunk{ + chunk: c, + } +} + +type version struct { + Version string `json:"version"` +} + +func (c *Chunk) UnmarshalJSON(b []byte) error { + var v version + err := json.Unmarshal(b, &v) + if err != nil { + return err + } + switch v.Version { + case "": + c.chunk = new(AndroidChunk) + default: + c.chunk = new(SampleChunk) + } + return json.Unmarshal(b, &c.chunk) +} + +func (c Chunk) MarshalJSON() ([]byte, error) { + return json.Marshal(c.chunk) +} + +func (c Chunk) Chunk() chunkInterface { + return c.chunk +} + func StoragePath(OrganizationID uint64, ProjectID uint64, ProfilerID string, ID string) string { return fmt.Sprintf( "%d/%d/%s/%s", @@ -42,3 +82,74 @@ func StoragePath(OrganizationID uint64, ProjectID uint64, ProfilerID string, ID ID, ) } + +func (c Chunk) GetEnvironment() string { + return c.chunk.GetEnvironment() +} + +func (c Chunk) GetID() string { + return c.chunk.GetID() +} + +func (c Chunk) GetOrganizationID() uint64 { + return c.chunk.GetOrganizationID() +} + +func (c Chunk) GetPlatform() platform.Platform { + return c.chunk.GetPlatform() +} + +func (c Chunk) GetProfilerID() string { + return c.chunk.GetProfilerID() +} + +func (c Chunk) GetProjectID() uint64 { + return c.chunk.GetProjectID() +} + +func (c Chunk) GetReceived() float64 { + return c.chunk.GetReceived() +} + +func (c Chunk) GetRelease() string { + return c.chunk.GetRelease() +} + +func (c Chunk) GetRetentionDays() int { + return c.chunk.GetRetentionDays() +} + +func (c Chunk) GetOptions() utils.Options { + return c.chunk.GetOptions() +} + +func (c Chunk) GetFrameWithFingerprint(f uint32) (frame.Frame, error) { + return c.chunk.GetFrameWithFingerprint(f) +} + +func (c Chunk) CallTrees(activeThreadID *string) (map[string][]*nodetree.Node, error) { + return c.chunk.CallTrees(activeThreadID) +} + +func (c Chunk) DurationMS() uint64 { + return c.chunk.DurationMS() +} +func (c Chunk) EndTimestamp() float64 { + return c.chunk.EndTimestamp() +} +func (c Chunk) SDKName() string { + return c.chunk.SDKName() +} +func (c Chunk) SDKVersion() string { + return c.chunk.SDKVersion() +} +func (c Chunk) StartTimestamp() float64 { + return c.chunk.StartTimestamp() +} +func (c Chunk) StoragePath() string { + return c.chunk.StoragePath() +} + +func (c *Chunk) Normalize() { + c.chunk.Normalize() +} diff --git a/internal/chunk/sample_readjob.go b/internal/chunk/sample_readjob.go index 5079310..3f646cf 100644 --- a/internal/chunk/sample_readjob.go +++ b/internal/chunk/sample_readjob.go @@ -25,7 +25,7 @@ type ( ReadJobResult struct { Err error - Chunk *SampleChunk + Chunk *Chunk TransactionID string ThreadID *string Start uint64 @@ -34,7 +34,7 @@ type ( ) func (job ReadJob) Read() { - var chunk SampleChunk + var chunk Chunk err := storageutil.UnmarshalCompressed( job.Ctx, diff --git a/internal/flamegraph/flamegraph.go b/internal/flamegraph/flamegraph.go index 7faf757..b1fa268 100644 --- a/internal/flamegraph/flamegraph.go +++ b/internal/flamegraph/flamegraph.go @@ -7,7 +7,6 @@ import ( "encoding/hex" "errors" "fmt" - "strconv" "github.com/getsentry/sentry-go" "github.com/getsentry/vroom/internal/chunk" @@ -27,12 +26,6 @@ type ( } CallTrees map[uint64][]*nodetree.Node - - ChunkMetadata struct { - ProfilerID string `json:"profiler_id"` - ChunkID string `json:"chunk_id"` - SpanIntervals []utils.Interval `json:"span_intervals,omitempty"` - } ) var ( @@ -380,89 +373,6 @@ func (f *flamegraph) getProfilesIndices(profiles map[utils.ExampleMetadata]struc return indices } -func GetFlamegraphFromChunks( - ctx context.Context, - organizationID uint64, - projectID uint64, - storage *blob.Bucket, - chunksMetadata []ChunkMetadata, - jobs chan storageutil.ReadJob) (speedscope.Output, error) { - hub := sentry.GetHubFromContext(ctx) - results := make(chan storageutil.ReadJobResult, len(chunksMetadata)) - defer close(results) - - chunkIDToMetadata := make(map[string]ChunkMetadata) - for _, chunkMetadata := range chunksMetadata { - chunkIDToMetadata[chunkMetadata.ChunkID] = chunkMetadata - jobs <- chunk.ReadJob{ - Ctx: ctx, - ProfilerID: chunkMetadata.ProfilerID, - ChunkID: chunkMetadata.ChunkID, - OrganizationID: organizationID, - ProjectID: projectID, - Storage: storage, - Result: results, - } - } - - var flamegraphTree []*nodetree.Node - countChunksAggregated := 0 - // read the output of each tasks - for i := 0; i < len(chunksMetadata); i++ { - res := <-results - result, ok := res.(chunk.ReadJobResult) - if !ok { - continue - } - if result.Err != nil { - if errors.Is(result.Err, storageutil.ErrObjectNotFound) { - continue - } - if errors.Is(result.Err, context.DeadlineExceeded) { - return speedscope.Output{}, result.Err - } - if hub != nil { - hub.CaptureException(result.Err) - } - continue - } - cm := chunkIDToMetadata[result.Chunk.ID] - for _, interval := range cm.SpanIntervals { - callTrees, err := result.Chunk.CallTrees(&interval.ActiveThreadID) - if err != nil { - if hub != nil { - hub.CaptureException(err) - } - continue - } - intervals := []utils.Interval{interval} - - annotate := annotateWithProfileExample( - utils.NewExampleFromProfilerChunk( - result.Chunk.ProjectID, - result.Chunk.ProfilerID, - result.Chunk.ID, - result.TransactionID, - result.ThreadID, - result.Start, - result.End, - ), - ) - for _, callTree := range callTrees { - slicedTree := sliceCallTree(&callTree, &intervals) - addCallTreeToFlamegraph(&flamegraphTree, slicedTree, annotate) - } - } - countChunksAggregated++ - } - - sp := toSpeedscope(ctx, flamegraphTree, 1000, projectID) - if hub != nil { - hub.Scope().SetTag("processed_chunks", strconv.Itoa(countChunksAggregated)) - } - return sp, nil -} - func GetFlamegraphFromCandidates( ctx context.Context, storage *blob.Bucket, diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 77f1ffa..3ad4d60 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -261,9 +261,9 @@ func (ma *Aggregator) GetMetricsFromCandidates( } resultMetadata = utils.NewExampleFromProfilerChunk( - result.Chunk.ProjectID, - result.Chunk.ProfilerID, - result.Chunk.ID, + result.Chunk.GetProjectID(), + result.Chunk.GetProfilerID(), + result.Chunk.GetID(), result.TransactionID, result.ThreadID, result.Start,