Skip to content

Commit

Permalink
ref: use chunk interface instead of specific implementations (#540)
Browse files Browse the repository at this point in the history
* ingest metrics regardless of the platform

* rename the chunk interface to chunkInterface and define a Chunk type
that contains a chunk field of type chunkInterface.

This, like for Profile, will let us unmarshal to the right type.

* refactor code to use chunk interface

* remove pointers from getter

* specify error message if we receive a list of chunks with mixed android
and sample chunks

* return a 400 response if we were passed an empty list of chunk IDs

* update changelog
  • Loading branch information
viglia authored Dec 4, 2024
1 parent aa0a4c7 commit dc73a44
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 81 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- Add more metadata fields to Chunk Kafka message. ([#518](https://github.com/getsentry/vroom/pull/518))
- Ingest Android profile chunks. ([#521](https://github.com/getsentry/vroom/pull/521))
- Handle profile chunks in regressed endpoint. ([#527](https://github.com/getsentry/vroom/pull/527))
- Add support for android chunks. ([#540](https://github.com/getsentry/vroom/pull/540))

**Bug Fixes**:

Expand Down
179 changes: 114 additions & 65 deletions cmd/vroom/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) {
var c chunk.Chunk
switch p.Platform {
case platform.Android:
c = new(chunk.AndroidChunk)
c = chunk.New(new(chunk.AndroidChunk))
default:
c = new(chunk.SampleChunk)
c = chunk.New(new(chunk.SampleChunk))
}

s = sentry.StartSpan(ctx, "json.unmarshal")
s.Description = "Unmarshal profile"
err = json.Unmarshal(body, c)
err = json.Unmarshal(body, &c)
s.Finish()
if err != nil {
if hub != nil {
Expand Down Expand Up @@ -138,59 +138,52 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) {
}
s.Finish()

options := c.GetOptions()
sc, ok := c.(*chunk.SampleChunk)

// Metrics extraction is only supported for sample chunks right now.
// TODO: support metrics extraction for Android chunks.
if options.ProjectDSN != "" && c.GetPlatform() != platform.Android && ok {
// nb.: here we don't have a specific thread ID, so we're going to ingest
// functions metrics from all the thread.
// That's ok as this data is not supposed to be transaction/span scoped,
// plus, we'll only retain application frames, so much of the system functions
// chaff will be dropped.
s = sentry.StartSpan(ctx, "processing")
callTrees, err := sc.CallTrees(nil)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Extract functions"
functions := metrics.ExtractFunctionsFromCallTrees(callTrees)
functions = metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true)
s.Finish()
// nb.: here we don't have a specific thread ID, so we're going to ingest
// functions metrics from all the thread.
// That's ok as this data is not supposed to be transaction/span scoped,
// plus, we'll only retain application frames, so much of the system functions
// chaff will be dropped.
s = sentry.StartSpan(ctx, "processing")
callTrees, err := c.CallTrees(nil)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Extract functions"
functions := metrics.ExtractFunctionsFromCallTrees(callTrees)
functions = metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true)
s.Finish()

// This block writes into the functions dataset
s = sentry.StartSpan(ctx, "json.marshal")
s.Description = "Marshal functions Kafka message"
b, err := json.Marshal(buildChunkFunctionsKafkaMessage(sc, functions))
s.Finish()
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
w.WriteHeader(http.StatusInternalServerError)
return
// This block writes into the functions dataset
s = sentry.StartSpan(ctx, "json.marshal")
s.Description = "Marshal functions Kafka message"
b, err = json.Marshal(buildChunkFunctionsKafkaMessage(&c, functions))
s.Finish()
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Send functions to Kafka"
err = env.profilingWriter.WriteMessages(ctx, kafka.Message{
Topic: env.config.CallTreesKafkaTopic,
Value: b,
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Send functions to Kafka"
err = env.profilingWriter.WriteMessages(ctx, kafka.Message{
Topic: env.config.CallTreesKafkaTopic,
Value: b,
})
s.Finish()
if hub != nil {
hub.Scope().SetContext("Call functions payload", map[string]interface{}{
"Size": len(b),
})
s.Finish()
}
if err != nil {
if hub != nil {
hub.Scope().SetContext("Call functions payload", map[string]interface{}{
"Size": len(b),
})
}
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
hub.CaptureException(err)
}
}

Expand Down Expand Up @@ -268,7 +261,7 @@ func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.R
}
}

chunks := make([]chunk.SampleChunk, 0, len(requestBody.ChunkIDs))
chunks := make([]chunk.Chunk, 0, len(requestBody.ChunkIDs))
// read the output of each tasks
for i := 0; i < len(requestBody.ChunkIDs); i++ {
res := <-results
Expand Down Expand Up @@ -312,26 +305,82 @@ func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.R

s = sentry.StartSpan(ctx, "chunks.merge")
s.Description = "Merge profile chunks into a single one"
chunk, err := chunk.MergeSampleChunks(chunks, requestBody.Start, requestBody.End)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
if len(chunks) == 0 {
w.WriteHeader(http.StatusBadRequest)
return
}
var resp []byte
// Here we check what type of chunks we're dealing with,
// since Android chunks and Sample chunks return completely
// different types (Chunk vs Speedscope), hence we can't hide
// the implementation behind an interface.
//
// We check the first chunk type, and use that to assert the
// type of all the elements in the slice and then call the
// appropriate utility.
switch chunks[0].Chunk().(type) {
case *chunk.SampleChunk:
sampleChunks := make([]chunk.SampleChunk, 0, len(chunks))
for _, c := range chunks {
sc, ok := c.Chunk().(*chunk.SampleChunk)
if !ok {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "error: mix of sampled and android chunks")
return
}
sampleChunks = append(sampleChunks, *sc)
}
mergedChunk, err := chunk.MergeSampleChunks(sampleChunks, requestBody.Start, requestBody.End)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "json.marshal")
resp, err = json.Marshal(postProfileFromChunkIDsResponse{Chunk: mergedChunk})
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

s = sentry.StartSpan(ctx, "json.marshal")
defer s.Finish()
b, err := json.Marshal(postProfileFromChunkIDsResponse{Chunk: chunk})
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
case *chunk.AndroidChunk:
androidChunks := make([]chunk.AndroidChunk, 0, len(chunks))
for _, c := range chunks {
ac, ok := c.Chunk().(*chunk.AndroidChunk)
if !ok {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "error: mix of android and sample chunks")
return
}
androidChunks = append(androidChunks, *ac)
}
sp, err := chunk.SpeedscopeFromAndroidChunks(androidChunks, requestBody.Start, requestBody.End)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "json.marshal")
resp, err = json.Marshal(sp)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
default:
// Should never happen.
w.WriteHeader(http.StatusBadRequest)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(b)
_, _ = w.Write(resp)
}

type (
Expand Down
5 changes: 3 additions & 2 deletions cmd/vroom/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestPostAndReadSampleChunk(t *testing.T) {
Release: "1.2",
OrganizationID: 1,
ProjectID: 1,
Version: "2",
Profile: chunk.SampleData{
Frames: []frame.Frame{
{
Expand Down Expand Up @@ -125,7 +126,7 @@ func TestPostAndReadSampleChunk(t *testing.T) {

// read the chunk with UnmarshalCompressed and make sure that we can unmarshal
// the data into the Chunk struct and that it matches the original
var c chunk.SampleChunk
var c chunk.Chunk
err = storageutil.UnmarshalCompressed(
context.Background(),
test.blobBucket,
Expand All @@ -135,7 +136,7 @@ func TestPostAndReadSampleChunk(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if diff := testutil.Diff(chunkData, c); diff != "" {
if diff := testutil.Diff(&chunkData, c.Chunk()); diff != "" {
t.Fatalf("Result mismatch: got - want +\n%s", diff)
}
})
Expand Down
16 changes: 8 additions & 8 deletions cmd/vroom/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ func buildFunctionsKafkaMessage(p profile.Profile, functions []nodetree.CallTree

// Metrics extraction is only supported for sample chunks right now.
// TODO: support metrics extraction for Android chunks.
func buildChunkFunctionsKafkaMessage(c *chunk.SampleChunk, functions []nodetree.CallTreeFunction) FunctionsKafkaMessage {
func buildChunkFunctionsKafkaMessage(c *chunk.Chunk, functions []nodetree.CallTreeFunction) FunctionsKafkaMessage {
return FunctionsKafkaMessage{
Environment: c.Environment,
Environment: c.GetEnvironment(),
Functions: functions,
ID: c.ProfilerID,
Platform: c.Platform,
ProjectID: c.ProjectID,
Received: int64(c.Received),
Release: c.Release,
RetentionDays: c.RetentionDays,
ID: c.GetProfilerID(),
Platform: c.GetPlatform(),
ProjectID: c.GetProjectID(),
Received: int64(c.GetReceived()),
Release: c.GetRelease(),
RetentionDays: c.GetRetentionDays(),
Timestamp: int64(c.StartTimestamp()),
StartTimestamp: c.StartTimestamp(),
EndTimestamp: c.EndTimestamp(),
Expand Down
Loading

0 comments on commit dc73a44

Please sign in to comment.