Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref: use chunk interface instead of specific implementations #540

Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- Add more metadata fields to Chunk Kafka message. ([#518](https://github.com/getsentry/vroom/pull/518))
- Ingest Android profile chunks. ([#521](https://github.com/getsentry/vroom/pull/521))
- Handle profile chunks in regressed endpoint. ([#527](https://github.com/getsentry/vroom/pull/527))
- Add support for android chunks. ([#540](https://github.com/getsentry/vroom/pull/540))

**Bug Fixes**:

Expand Down
179 changes: 114 additions & 65 deletions cmd/vroom/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) {
var c chunk.Chunk
switch p.Platform {
case platform.Android:
c = new(chunk.AndroidChunk)
c = chunk.New(new(chunk.AndroidChunk))
default:
c = new(chunk.SampleChunk)
c = chunk.New(new(chunk.SampleChunk))
}

s = sentry.StartSpan(ctx, "json.unmarshal")
s.Description = "Unmarshal profile"
err = json.Unmarshal(body, c)
err = json.Unmarshal(body, &c)
s.Finish()
if err != nil {
if hub != nil {
Expand Down Expand Up @@ -138,59 +138,52 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) {
}
s.Finish()

options := c.GetOptions()
sc, ok := c.(*chunk.SampleChunk)

// Metrics extraction is only supported for sample chunks right now.
// TODO: support metrics extraction for Android chunks.
if options.ProjectDSN != "" && c.GetPlatform() != platform.Android && ok {
// nb.: here we don't have a specific thread ID, so we're going to ingest
// functions metrics from all the thread.
// That's ok as this data is not supposed to be transaction/span scoped,
// plus, we'll only retain application frames, so much of the system functions
// chaff will be dropped.
s = sentry.StartSpan(ctx, "processing")
callTrees, err := sc.CallTrees(nil)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Extract functions"
functions := metrics.ExtractFunctionsFromCallTrees(callTrees)
functions = metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true)
s.Finish()
// nb.: here we don't have a specific thread ID, so we're going to ingest
// functions metrics from all the thread.
// That's ok as this data is not supposed to be transaction/span scoped,
// plus, we'll only retain application frames, so much of the system functions
// chaff will be dropped.
s = sentry.StartSpan(ctx, "processing")
callTrees, err := c.CallTrees(nil)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Extract functions"
functions := metrics.ExtractFunctionsFromCallTrees(callTrees)
functions = metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true)
s.Finish()

// This block writes into the functions dataset
s = sentry.StartSpan(ctx, "json.marshal")
s.Description = "Marshal functions Kafka message"
b, err := json.Marshal(buildChunkFunctionsKafkaMessage(sc, functions))
s.Finish()
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
w.WriteHeader(http.StatusInternalServerError)
return
// This block writes into the functions dataset
s = sentry.StartSpan(ctx, "json.marshal")
s.Description = "Marshal functions Kafka message"
b, err = json.Marshal(buildChunkFunctionsKafkaMessage(&c, functions))
s.Finish()
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Send functions to Kafka"
err = env.profilingWriter.WriteMessages(ctx, kafka.Message{
Topic: env.config.CallTreesKafkaTopic,
Value: b,
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Send functions to Kafka"
err = env.profilingWriter.WriteMessages(ctx, kafka.Message{
Topic: env.config.CallTreesKafkaTopic,
Value: b,
})
s.Finish()
if hub != nil {
hub.Scope().SetContext("Call functions payload", map[string]interface{}{
"Size": len(b),
})
s.Finish()
}
if err != nil {
if hub != nil {
hub.Scope().SetContext("Call functions payload", map[string]interface{}{
"Size": len(b),
})
}
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
hub.CaptureException(err)
}
}

Expand Down Expand Up @@ -268,7 +261,7 @@ func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.R
}
}

chunks := make([]chunk.SampleChunk, 0, len(requestBody.ChunkIDs))
chunks := make([]chunk.Chunk, 0, len(requestBody.ChunkIDs))
// read the output of each tasks
for i := 0; i < len(requestBody.ChunkIDs); i++ {
res := <-results
Expand Down Expand Up @@ -312,26 +305,82 @@ func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.R

s = sentry.StartSpan(ctx, "chunks.merge")
s.Description = "Merge profile chunks into a single one"
chunk, err := chunk.MergeSampleChunks(chunks, requestBody.Start, requestBody.End)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
if len(chunks) == 0 {
w.WriteHeader(http.StatusBadRequest)
return
}
var resp []byte
// Here we check what type of chunks we're dealing with,
// since Android chunks and Sample chunks return completely
// different types (Chunk vs Speedscope), hence we can't hide
// the implementation behind an interface.
//
// We check the first chunk type, and use that to assert the
// type of all the elements in the slice and then call the
// appropriate utility.
switch chunks[0].Chunk().(type) {
case *chunk.SampleChunk:
sampleChunks := make([]chunk.SampleChunk, 0, len(chunks))
for _, c := range chunks {
sc, ok := c.Chunk().(*chunk.SampleChunk)
if !ok {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "error: mix of sampled and android chunks")
return
}
sampleChunks = append(sampleChunks, *sc)
}
mergedChunk, err := chunk.MergeSampleChunks(sampleChunks, requestBody.Start, requestBody.End)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "json.marshal")
resp, err = json.Marshal(postProfileFromChunkIDsResponse{Chunk: mergedChunk})
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

s = sentry.StartSpan(ctx, "json.marshal")
defer s.Finish()
b, err := json.Marshal(postProfileFromChunkIDsResponse{Chunk: chunk})
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
case *chunk.AndroidChunk:
androidChunks := make([]chunk.AndroidChunk, 0, len(chunks))
for _, c := range chunks {
ac, ok := c.Chunk().(*chunk.AndroidChunk)
if !ok {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "error: mix of android and sample chunks")
return
}
androidChunks = append(androidChunks, *ac)
}
sp, err := chunk.SpeedscopeFromAndroidChunks(androidChunks, requestBody.Start, requestBody.End)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "json.marshal")
resp, err = json.Marshal(sp)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
default:
// Should never happen.
w.WriteHeader(http.StatusBadRequest)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(b)
_, _ = w.Write(resp)
}

type (
Expand Down
5 changes: 3 additions & 2 deletions cmd/vroom/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestPostAndReadSampleChunk(t *testing.T) {
Release: "1.2",
OrganizationID: 1,
ProjectID: 1,
Version: "2",
Profile: chunk.SampleData{
Frames: []frame.Frame{
{
Expand Down Expand Up @@ -125,7 +126,7 @@ func TestPostAndReadSampleChunk(t *testing.T) {

// read the chunk with UnmarshalCompressed and make sure that we can unmarshal
// the data into the Chunk struct and that it matches the original
var c chunk.SampleChunk
var c chunk.Chunk
err = storageutil.UnmarshalCompressed(
context.Background(),
test.blobBucket,
Expand All @@ -135,7 +136,7 @@ func TestPostAndReadSampleChunk(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if diff := testutil.Diff(chunkData, c); diff != "" {
if diff := testutil.Diff(&chunkData, c.Chunk()); diff != "" {
t.Fatalf("Result mismatch: got - want +\n%s", diff)
}
})
Expand Down
16 changes: 8 additions & 8 deletions cmd/vroom/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ func buildFunctionsKafkaMessage(p profile.Profile, functions []nodetree.CallTree

// Metrics extraction is only supported for sample chunks right now.
// TODO: support metrics extraction for Android chunks.
func buildChunkFunctionsKafkaMessage(c *chunk.SampleChunk, functions []nodetree.CallTreeFunction) FunctionsKafkaMessage {
func buildChunkFunctionsKafkaMessage(c *chunk.Chunk, functions []nodetree.CallTreeFunction) FunctionsKafkaMessage {
return FunctionsKafkaMessage{
Environment: c.Environment,
Environment: c.GetEnvironment(),
Functions: functions,
ID: c.ProfilerID,
Platform: c.Platform,
ProjectID: c.ProjectID,
Received: int64(c.Received),
Release: c.Release,
RetentionDays: c.RetentionDays,
ID: c.GetProfilerID(),
Platform: c.GetPlatform(),
ProjectID: c.GetProjectID(),
Received: int64(c.GetReceived()),
Release: c.GetRelease(),
RetentionDays: c.GetRetentionDays(),
Timestamp: int64(c.StartTimestamp()),
StartTimestamp: c.StartTimestamp(),
EndTimestamp: c.EndTimestamp(),
Expand Down
Loading
Loading