From dc73a446529cfb457424e989f26e219667094e57 Mon Sep 17 00:00:00 2001 From: viglia Date: Wed, 4 Dec 2024 14:14:50 +0100 Subject: [PATCH] ref: use chunk interface instead of specific implementations (#540) * ingest metrics regardless of the platform * rename the chunk interface to chunkInterface and define a Chunk type that contains a chunk field of type chunkInterface. This, like for Profile, will let us unmarshal to the right type. * refactor code to use chunk interface * remove pointers from getter * specify error message if we receive a list of chunks with mixed android and sample chunks * return a 400 response if we were passed an empty list of chunk IDs * update changelog --- CHANGELOG.md | 1 + cmd/vroom/chunk.go | 179 ++++++++++++++++++++----------- cmd/vroom/chunk_test.go | 5 +- cmd/vroom/kafka.go | 16 +-- internal/chunk/chunk.go | 111 ++++++++++++++++++- internal/chunk/sample_readjob.go | 4 +- internal/metrics/metrics.go | 6 +- 7 files changed, 241 insertions(+), 81 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8396cd2..e27611e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -28,6 +28,7 @@ - Add more metadata fields to Chunk Kafka message. ([#518](https://github.com/getsentry/vroom/pull/518)) - Ingest Android profile chunks. ([#521](https://github.com/getsentry/vroom/pull/521)) - Handle profile chunks in regressed endpoint. ([#527](https://github.com/getsentry/vroom/pull/527)) +- Add support for android chunks. ([#540](https://github.com/getsentry/vroom/pull/540)) **Bug Fixes**: diff --git a/cmd/vroom/chunk.go b/cmd/vroom/chunk.go index 0839d2d..b272c43 100644 --- a/cmd/vroom/chunk.go +++ b/cmd/vroom/chunk.go @@ -57,14 +57,14 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) { var c chunk.Chunk switch p.Platform { case platform.Android: - c = new(chunk.AndroidChunk) + c = chunk.New(new(chunk.AndroidChunk)) default: - c = new(chunk.SampleChunk) + c = chunk.New(new(chunk.SampleChunk)) } s = sentry.StartSpan(ctx, "json.unmarshal") s.Description = "Unmarshal profile" - err = json.Unmarshal(body, c) + err = json.Unmarshal(body, &c) s.Finish() if err != nil { if hub != nil { @@ -138,59 +138,52 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) { } s.Finish() - options := c.GetOptions() - sc, ok := c.(*chunk.SampleChunk) - - // Metrics extraction is only supported for sample chunks right now. - // TODO: support metrics extraction for Android chunks. - if options.ProjectDSN != "" && c.GetPlatform() != platform.Android && ok { - // nb.: here we don't have a specific thread ID, so we're going to ingest - // functions metrics from all the thread. - // That's ok as this data is not supposed to be transaction/span scoped, - // plus, we'll only retain application frames, so much of the system functions - // chaff will be dropped. - s = sentry.StartSpan(ctx, "processing") - callTrees, err := sc.CallTrees(nil) - s.Finish() - if err != nil { - hub.CaptureException(err) - w.WriteHeader(http.StatusInternalServerError) - return - } - s = sentry.StartSpan(ctx, "processing") - s.Description = "Extract functions" - functions := metrics.ExtractFunctionsFromCallTrees(callTrees) - functions = metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true) - s.Finish() + // nb.: here we don't have a specific thread ID, so we're going to ingest + // functions metrics from all the thread. + // That's ok as this data is not supposed to be transaction/span scoped, + // plus, we'll only retain application frames, so much of the system functions + // chaff will be dropped. + s = sentry.StartSpan(ctx, "processing") + callTrees, err := c.CallTrees(nil) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + s = sentry.StartSpan(ctx, "processing") + s.Description = "Extract functions" + functions := metrics.ExtractFunctionsFromCallTrees(callTrees) + functions = metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true) + s.Finish() - // This block writes into the functions dataset - s = sentry.StartSpan(ctx, "json.marshal") - s.Description = "Marshal functions Kafka message" - b, err := json.Marshal(buildChunkFunctionsKafkaMessage(sc, functions)) - s.Finish() - if err != nil { - if hub != nil { - hub.CaptureException(err) - } - w.WriteHeader(http.StatusInternalServerError) - return + // This block writes into the functions dataset + s = sentry.StartSpan(ctx, "json.marshal") + s.Description = "Marshal functions Kafka message" + b, err = json.Marshal(buildChunkFunctionsKafkaMessage(&c, functions)) + s.Finish() + if err != nil { + if hub != nil { + hub.CaptureException(err) } - s = sentry.StartSpan(ctx, "processing") - s.Description = "Send functions to Kafka" - err = env.profilingWriter.WriteMessages(ctx, kafka.Message{ - Topic: env.config.CallTreesKafkaTopic, - Value: b, + w.WriteHeader(http.StatusInternalServerError) + return + } + s = sentry.StartSpan(ctx, "processing") + s.Description = "Send functions to Kafka" + err = env.profilingWriter.WriteMessages(ctx, kafka.Message{ + Topic: env.config.CallTreesKafkaTopic, + Value: b, + }) + s.Finish() + if hub != nil { + hub.Scope().SetContext("Call functions payload", map[string]interface{}{ + "Size": len(b), }) - s.Finish() + } + if err != nil { if hub != nil { - hub.Scope().SetContext("Call functions payload", map[string]interface{}{ - "Size": len(b), - }) - } - if err != nil { - if hub != nil { - hub.CaptureException(err) - } + hub.CaptureException(err) } } @@ -268,7 +261,7 @@ func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.R } } - chunks := make([]chunk.SampleChunk, 0, len(requestBody.ChunkIDs)) + chunks := make([]chunk.Chunk, 0, len(requestBody.ChunkIDs)) // read the output of each tasks for i := 0; i < len(requestBody.ChunkIDs); i++ { res := <-results @@ -312,26 +305,82 @@ func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.R s = sentry.StartSpan(ctx, "chunks.merge") s.Description = "Merge profile chunks into a single one" - chunk, err := chunk.MergeSampleChunks(chunks, requestBody.Start, requestBody.End) - s.Finish() - if err != nil { - hub.CaptureException(err) - w.WriteHeader(http.StatusInternalServerError) + if len(chunks) == 0 { + w.WriteHeader(http.StatusBadRequest) return } + var resp []byte + // Here we check what type of chunks we're dealing with, + // since Android chunks and Sample chunks return completely + // different types (Chunk vs Speedscope), hence we can't hide + // the implementation behind an interface. + // + // We check the first chunk type, and use that to assert the + // type of all the elements in the slice and then call the + // appropriate utility. + switch chunks[0].Chunk().(type) { + case *chunk.SampleChunk: + sampleChunks := make([]chunk.SampleChunk, 0, len(chunks)) + for _, c := range chunks { + sc, ok := c.Chunk().(*chunk.SampleChunk) + if !ok { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, "error: mix of sampled and android chunks") + return + } + sampleChunks = append(sampleChunks, *sc) + } + mergedChunk, err := chunk.MergeSampleChunks(sampleChunks, requestBody.Start, requestBody.End) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + s = sentry.StartSpan(ctx, "json.marshal") + resp, err = json.Marshal(postProfileFromChunkIDsResponse{Chunk: mergedChunk}) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } - s = sentry.StartSpan(ctx, "json.marshal") - defer s.Finish() - b, err := json.Marshal(postProfileFromChunkIDsResponse{Chunk: chunk}) - if err != nil { - hub.CaptureException(err) - w.WriteHeader(http.StatusInternalServerError) + case *chunk.AndroidChunk: + androidChunks := make([]chunk.AndroidChunk, 0, len(chunks)) + for _, c := range chunks { + ac, ok := c.Chunk().(*chunk.AndroidChunk) + if !ok { + w.WriteHeader(http.StatusBadRequest) + fmt.Fprint(w, "error: mix of android and sample chunks") + return + } + androidChunks = append(androidChunks, *ac) + } + sp, err := chunk.SpeedscopeFromAndroidChunks(androidChunks, requestBody.Start, requestBody.End) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + s = sentry.StartSpan(ctx, "json.marshal") + resp, err = json.Marshal(sp) + s.Finish() + if err != nil { + hub.CaptureException(err) + w.WriteHeader(http.StatusInternalServerError) + return + } + default: + // Should never happen. + w.WriteHeader(http.StatusBadRequest) return } w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - _, _ = w.Write(b) + _, _ = w.Write(resp) } type ( diff --git a/cmd/vroom/chunk_test.go b/cmd/vroom/chunk_test.go index 2ef6051..e995699 100644 --- a/cmd/vroom/chunk_test.go +++ b/cmd/vroom/chunk_test.go @@ -60,6 +60,7 @@ func TestPostAndReadSampleChunk(t *testing.T) { Release: "1.2", OrganizationID: 1, ProjectID: 1, + Version: "2", Profile: chunk.SampleData{ Frames: []frame.Frame{ { @@ -125,7 +126,7 @@ func TestPostAndReadSampleChunk(t *testing.T) { // read the chunk with UnmarshalCompressed and make sure that we can unmarshal // the data into the Chunk struct and that it matches the original - var c chunk.SampleChunk + var c chunk.Chunk err = storageutil.UnmarshalCompressed( context.Background(), test.blobBucket, @@ -135,7 +136,7 @@ func TestPostAndReadSampleChunk(t *testing.T) { if err != nil { t.Fatal(err) } - if diff := testutil.Diff(chunkData, c); diff != "" { + if diff := testutil.Diff(&chunkData, c.Chunk()); diff != "" { t.Fatalf("Result mismatch: got - want +\n%s", diff) } }) diff --git a/cmd/vroom/kafka.go b/cmd/vroom/kafka.go index 5e6625e..0f4c6fa 100644 --- a/cmd/vroom/kafka.go +++ b/cmd/vroom/kafka.go @@ -76,16 +76,16 @@ func buildFunctionsKafkaMessage(p profile.Profile, functions []nodetree.CallTree // Metrics extraction is only supported for sample chunks right now. // TODO: support metrics extraction for Android chunks. -func buildChunkFunctionsKafkaMessage(c *chunk.SampleChunk, functions []nodetree.CallTreeFunction) FunctionsKafkaMessage { +func buildChunkFunctionsKafkaMessage(c *chunk.Chunk, functions []nodetree.CallTreeFunction) FunctionsKafkaMessage { return FunctionsKafkaMessage{ - Environment: c.Environment, + Environment: c.GetEnvironment(), Functions: functions, - ID: c.ProfilerID, - Platform: c.Platform, - ProjectID: c.ProjectID, - Received: int64(c.Received), - Release: c.Release, - RetentionDays: c.RetentionDays, + ID: c.GetProfilerID(), + Platform: c.GetPlatform(), + ProjectID: c.GetProjectID(), + Received: int64(c.GetReceived()), + Release: c.GetRelease(), + RetentionDays: c.GetRetentionDays(), Timestamp: int64(c.StartTimestamp()), StartTimestamp: c.StartTimestamp(), EndTimestamp: c.EndTimestamp(), diff --git a/internal/chunk/chunk.go b/internal/chunk/chunk.go index 196fcd6..b5013b2 100644 --- a/internal/chunk/chunk.go +++ b/internal/chunk/chunk.go @@ -1,6 +1,7 @@ package chunk import ( + "encoding/json" "fmt" "github.com/getsentry/vroom/internal/frame" @@ -10,7 +11,7 @@ import ( ) type ( - Chunk interface { + chunkInterface interface { GetEnvironment() string GetID() string GetOrganizationID() uint64 @@ -33,8 +34,45 @@ type ( Normalize() } + + Chunk struct { + chunk chunkInterface + } ) +func New(c chunkInterface) Chunk { + return Chunk{ + chunk: c, + } +} + +type version struct { + Version string `json:"version"` +} + +func (c *Chunk) UnmarshalJSON(b []byte) error { + var v version + err := json.Unmarshal(b, &v) + if err != nil { + return err + } + switch v.Version { + case "": + c.chunk = new(AndroidChunk) + default: + c.chunk = new(SampleChunk) + } + return json.Unmarshal(b, &c.chunk) +} + +func (c Chunk) MarshalJSON() ([]byte, error) { + return json.Marshal(c.chunk) +} + +func (c Chunk) Chunk() chunkInterface { + return c.chunk +} + func StoragePath(OrganizationID uint64, ProjectID uint64, ProfilerID string, ID string) string { return fmt.Sprintf( "%d/%d/%s/%s", @@ -44,3 +82,74 @@ func StoragePath(OrganizationID uint64, ProjectID uint64, ProfilerID string, ID ID, ) } + +func (c Chunk) GetEnvironment() string { + return c.chunk.GetEnvironment() +} + +func (c Chunk) GetID() string { + return c.chunk.GetID() +} + +func (c Chunk) GetOrganizationID() uint64 { + return c.chunk.GetOrganizationID() +} + +func (c Chunk) GetPlatform() platform.Platform { + return c.chunk.GetPlatform() +} + +func (c Chunk) GetProfilerID() string { + return c.chunk.GetProfilerID() +} + +func (c Chunk) GetProjectID() uint64 { + return c.chunk.GetProjectID() +} + +func (c Chunk) GetReceived() float64 { + return c.chunk.GetReceived() +} + +func (c Chunk) GetRelease() string { + return c.chunk.GetRelease() +} + +func (c Chunk) GetRetentionDays() int { + return c.chunk.GetRetentionDays() +} + +func (c Chunk) GetOptions() utils.Options { + return c.chunk.GetOptions() +} + +func (c Chunk) GetFrameWithFingerprint(f uint32) (frame.Frame, error) { + return c.chunk.GetFrameWithFingerprint(f) +} + +func (c Chunk) CallTrees(activeThreadID *string) (map[string][]*nodetree.Node, error) { + return c.chunk.CallTrees(activeThreadID) +} + +func (c Chunk) DurationMS() uint64 { + return c.chunk.DurationMS() +} +func (c Chunk) EndTimestamp() float64 { + return c.chunk.EndTimestamp() +} +func (c Chunk) SDKName() string { + return c.chunk.SDKName() +} +func (c Chunk) SDKVersion() string { + return c.chunk.SDKVersion() +} +func (c Chunk) StartTimestamp() float64 { + return c.chunk.StartTimestamp() +} +func (c Chunk) StoragePath() string { + return c.chunk.StoragePath() +} + +func (c *Chunk) Normalize() { + c.chunk.Normalize() +} diff --git a/internal/chunk/sample_readjob.go b/internal/chunk/sample_readjob.go index db68224..0774949 100644 --- a/internal/chunk/sample_readjob.go +++ b/internal/chunk/sample_readjob.go @@ -25,7 +25,7 @@ type ( ReadJobResult struct { Err error - Chunk SampleChunk + Chunk Chunk TransactionID string ThreadID *string Start uint64 @@ -34,7 +34,7 @@ type ( ) func (job ReadJob) Read() { - var chunk SampleChunk + var chunk Chunk err := storageutil.UnmarshalCompressed( job.Ctx, diff --git a/internal/metrics/metrics.go b/internal/metrics/metrics.go index 77f1ffa..3ad4d60 100644 --- a/internal/metrics/metrics.go +++ b/internal/metrics/metrics.go @@ -261,9 +261,9 @@ func (ma *Aggregator) GetMetricsFromCandidates( } resultMetadata = utils.NewExampleFromProfilerChunk( - result.Chunk.ProjectID, - result.Chunk.ProfilerID, - result.Chunk.ID, + result.Chunk.GetProjectID(), + result.Chunk.GetProfilerID(), + result.Chunk.GetID(), result.TransactionID, result.ThreadID, result.Start,