Skip to content

Commit

Permalink
Merge branch 'main' of github.com:getsentry/vroom into txiao/fix/pass…
Browse files Browse the repository at this point in the history
…-readjob-result-as-pointer
  • Loading branch information
Zylphrex committed Dec 6, 2024
2 parents baca8c1 + dc73a44 commit 305c63b
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 253 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
- Add more metadata fields to Chunk Kafka message. ([#518](https://github.com/getsentry/vroom/pull/518))
- Ingest Android profile chunks. ([#521](https://github.com/getsentry/vroom/pull/521))
- Handle profile chunks in regressed endpoint. ([#527](https://github.com/getsentry/vroom/pull/527))
- Add support for android chunks. ([#540](https://github.com/getsentry/vroom/pull/540))

**Bug Fixes**:

Expand Down Expand Up @@ -118,9 +119,11 @@
- Add utility to merge a list of android chunks and generate a speedscope result ([#531](https://github.com/getsentry/vroom/pull/531))
- Remove unused legacy flamegraph code path. ([#533](https://github.com/getsentry/vroom/pull/533))
- Remove generic metrics ingestion ([#534](https://github.com/getsentry/vroom/pull/534))
- Add android chunk calltree implementation and signature definition to the chunk interface ([#536](https://github.com/getsentry/vroom/pull/536))
- Update sentry-go dependency to v0.29.1 ([#535](https://github.com/getsentry/vroom/pull/535))
- Lower number of concurrent reads ([#537](https://github.com/getsentry/vroom/pull/537))
- Remove unused metrics summary kafka writer ([#538](https://github.com/getsentry/vroom/pull/538))
- Remove unused chunks flamegraph endpoint ([#541](https://github.com/getsentry/vroom/pull/541))
- Pass readjob result as pointer ([#542](https://github.com/getsentry/vroom/pull/542))

## 23.12.0
Expand Down
179 changes: 114 additions & 65 deletions cmd/vroom/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) {
var c chunk.Chunk
switch p.Platform {
case platform.Android:
c = new(chunk.AndroidChunk)
c = chunk.New(new(chunk.AndroidChunk))
default:
c = new(chunk.SampleChunk)
c = chunk.New(new(chunk.SampleChunk))
}

s = sentry.StartSpan(ctx, "json.unmarshal")
s.Description = "Unmarshal profile"
err = json.Unmarshal(body, c)
err = json.Unmarshal(body, &c)
s.Finish()
if err != nil {
if hub != nil {
Expand Down Expand Up @@ -138,59 +138,52 @@ func (env *environment) postChunk(w http.ResponseWriter, r *http.Request) {
}
s.Finish()

options := c.GetOptions()
sc, ok := c.(*chunk.SampleChunk)

// Metrics extraction is only supported for sample chunks right now.
// TODO: support metrics extraction for Android chunks.
if options.ProjectDSN != "" && c.GetPlatform() != platform.Android && ok {
// nb.: here we don't have a specific thread ID, so we're going to ingest
// functions metrics from all the thread.
// That's ok as this data is not supposed to be transaction/span scoped,
// plus, we'll only retain application frames, so much of the system functions
// chaff will be dropped.
s = sentry.StartSpan(ctx, "processing")
callTrees, err := sc.CallTrees(nil)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Extract functions"
functions := metrics.ExtractFunctionsFromCallTrees(callTrees)
functions = metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true)
s.Finish()
// nb.: here we don't have a specific thread ID, so we're going to ingest
// functions metrics from all the thread.
// That's ok as this data is not supposed to be transaction/span scoped,
// plus, we'll only retain application frames, so much of the system functions
// chaff will be dropped.
s = sentry.StartSpan(ctx, "processing")
callTrees, err := c.CallTrees(nil)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Extract functions"
functions := metrics.ExtractFunctionsFromCallTrees(callTrees)
functions = metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true)
s.Finish()

// This block writes into the functions dataset
s = sentry.StartSpan(ctx, "json.marshal")
s.Description = "Marshal functions Kafka message"
b, err := json.Marshal(buildChunkFunctionsKafkaMessage(sc, functions))
s.Finish()
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
w.WriteHeader(http.StatusInternalServerError)
return
// This block writes into the functions dataset
s = sentry.StartSpan(ctx, "json.marshal")
s.Description = "Marshal functions Kafka message"
b, err = json.Marshal(buildChunkFunctionsKafkaMessage(&c, functions))
s.Finish()
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Send functions to Kafka"
err = env.profilingWriter.WriteMessages(ctx, kafka.Message{
Topic: env.config.CallTreesKafkaTopic,
Value: b,
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "processing")
s.Description = "Send functions to Kafka"
err = env.profilingWriter.WriteMessages(ctx, kafka.Message{
Topic: env.config.CallTreesKafkaTopic,
Value: b,
})
s.Finish()
if hub != nil {
hub.Scope().SetContext("Call functions payload", map[string]interface{}{
"Size": len(b),
})
s.Finish()
}
if err != nil {
if hub != nil {
hub.Scope().SetContext("Call functions payload", map[string]interface{}{
"Size": len(b),
})
}
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
hub.CaptureException(err)
}
}

Expand Down Expand Up @@ -268,7 +261,7 @@ func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.R
}
}

chunks := make([]chunk.SampleChunk, 0, len(requestBody.ChunkIDs))
chunks := make([]chunk.Chunk, 0, len(requestBody.ChunkIDs))
// read the output of each tasks
for i := 0; i < len(requestBody.ChunkIDs); i++ {
res := <-results
Expand Down Expand Up @@ -312,26 +305,82 @@ func (env *environment) postProfileFromChunkIDs(w http.ResponseWriter, r *http.R

s = sentry.StartSpan(ctx, "chunks.merge")
s.Description = "Merge profile chunks into a single one"
chunk, err := chunk.MergeSampleChunks(chunks, requestBody.Start, requestBody.End)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
if len(chunks) == 0 {
w.WriteHeader(http.StatusBadRequest)
return
}
var resp []byte
// Here we check what type of chunks we're dealing with,
// since Android chunks and Sample chunks return completely
// different types (Chunk vs Speedscope), hence we can't hide
// the implementation behind an interface.
//
// We check the first chunk type, and use that to assert the
// type of all the elements in the slice and then call the
// appropriate utility.
switch chunks[0].Chunk().(type) {
case *chunk.SampleChunk:
sampleChunks := make([]chunk.SampleChunk, 0, len(chunks))
for _, c := range chunks {
sc, ok := c.Chunk().(*chunk.SampleChunk)
if !ok {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "error: mix of sampled and android chunks")
return
}
sampleChunks = append(sampleChunks, *sc)
}
mergedChunk, err := chunk.MergeSampleChunks(sampleChunks, requestBody.Start, requestBody.End)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "json.marshal")
resp, err = json.Marshal(postProfileFromChunkIDsResponse{Chunk: mergedChunk})
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

s = sentry.StartSpan(ctx, "json.marshal")
defer s.Finish()
b, err := json.Marshal(postProfileFromChunkIDsResponse{Chunk: chunk})
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
case *chunk.AndroidChunk:
androidChunks := make([]chunk.AndroidChunk, 0, len(chunks))
for _, c := range chunks {
ac, ok := c.Chunk().(*chunk.AndroidChunk)
if !ok {
w.WriteHeader(http.StatusBadRequest)
fmt.Fprint(w, "error: mix of android and sample chunks")
return
}
androidChunks = append(androidChunks, *ac)
}
sp, err := chunk.SpeedscopeFromAndroidChunks(androidChunks, requestBody.Start, requestBody.End)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
s = sentry.StartSpan(ctx, "json.marshal")
resp, err = json.Marshal(sp)
s.Finish()
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
default:
// Should never happen.
w.WriteHeader(http.StatusBadRequest)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(b)
_, _ = w.Write(resp)
}

type (
Expand Down
5 changes: 3 additions & 2 deletions cmd/vroom/chunk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestPostAndReadSampleChunk(t *testing.T) {
Release: "1.2",
OrganizationID: 1,
ProjectID: 1,
Version: "2",
Profile: chunk.SampleData{
Frames: []frame.Frame{
{
Expand Down Expand Up @@ -125,7 +126,7 @@ func TestPostAndReadSampleChunk(t *testing.T) {

// read the chunk with UnmarshalCompressed and make sure that we can unmarshal
// the data into the Chunk struct and that it matches the original
var c chunk.SampleChunk
var c chunk.Chunk
err = storageutil.UnmarshalCompressed(
context.Background(),
test.blobBucket,
Expand All @@ -135,7 +136,7 @@ func TestPostAndReadSampleChunk(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if diff := testutil.Diff(chunkData, c); diff != "" {
if diff := testutil.Diff(&chunkData, c.Chunk()); diff != "" {
t.Fatalf("Result mismatch: got - want +\n%s", diff)
}
})
Expand Down
75 changes: 0 additions & 75 deletions cmd/vroom/flamegraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,81 +13,6 @@ import (
"github.com/getsentry/vroom/internal/utils"
)

type postFlamegraphFromChunksMetadataBody struct {
ChunksMetadata []flamegraph.ChunkMetadata `json:"chunks_metadata"`
}

func (env *environment) postFlamegraphFromChunksMetadata(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
hub := sentry.GetHubFromContext(ctx)
ps := httprouter.ParamsFromContext(ctx)
rawOrganizationID := ps.ByName("organization_id")
organizationID, err := strconv.ParseUint(rawOrganizationID, 10, 64)
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
w.WriteHeader(http.StatusBadRequest)
return
}

hub.Scope().SetTag("organization_id", rawOrganizationID)

rawProjectID := ps.ByName("project_id")
projectID, err := strconv.ParseUint(rawProjectID, 10, 64)
if err != nil {
sentry.CaptureException(err)
w.WriteHeader(http.StatusBadRequest)
return
}
if hub != nil {
hub.Scope().SetTag("project_id", rawProjectID)
}

var body postFlamegraphFromChunksMetadataBody
s := sentry.StartSpan(ctx, "processing")
s.Description = "Decoding data"
err = json.NewDecoder(r.Body).Decode(&body)
s.Finish()
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
w.WriteHeader(http.StatusBadRequest)
return
}

s = sentry.StartSpan(ctx, "processing")
speedscope, err := flamegraph.GetFlamegraphFromChunks(ctx, organizationID, projectID, env.storage, body.ChunksMetadata, readJobs)
s.Finish()
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
w.WriteHeader(http.StatusInternalServerError)
return
}

if hub != nil {
hub.Scope().SetTag("requested_chunks", strconv.Itoa(len(body.ChunksMetadata)))
}

s = sentry.StartSpan(ctx, "json.marshal")
defer s.Finish()
b, err := json.Marshal(speedscope)
if err != nil {
if hub != nil {
hub.CaptureException(err)
}
w.WriteHeader(http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(b)
}

type (
postFlamegraphBody struct {
Transaction []utils.TransactionProfileCandidate `json:"transaction"`
Expand Down
16 changes: 8 additions & 8 deletions cmd/vroom/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,16 +76,16 @@ func buildFunctionsKafkaMessage(p profile.Profile, functions []nodetree.CallTree

// Metrics extraction is only supported for sample chunks right now.
// TODO: support metrics extraction for Android chunks.
func buildChunkFunctionsKafkaMessage(c *chunk.SampleChunk, functions []nodetree.CallTreeFunction) FunctionsKafkaMessage {
func buildChunkFunctionsKafkaMessage(c *chunk.Chunk, functions []nodetree.CallTreeFunction) FunctionsKafkaMessage {
return FunctionsKafkaMessage{
Environment: c.Environment,
Environment: c.GetEnvironment(),
Functions: functions,
ID: c.ProfilerID,
Platform: c.Platform,
ProjectID: c.ProjectID,
Received: int64(c.Received),
Release: c.Release,
RetentionDays: c.RetentionDays,
ID: c.GetProfilerID(),
Platform: c.GetPlatform(),
ProjectID: c.GetProjectID(),
Received: int64(c.GetReceived()),
Release: c.GetRelease(),
RetentionDays: c.GetRetentionDays(),
Timestamp: int64(c.StartTimestamp()),
StartTimestamp: c.StartTimestamp(),
EndTimestamp: c.EndTimestamp(),
Expand Down
5 changes: 0 additions & 5 deletions cmd/vroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,6 @@ func (e *environment) newRouter() (*httprouter.Router, error) {
"/organizations/:organization_id/projects/:project_id/raw_profiles/:profile_id",
e.getRawProfile,
},
{
http.MethodPost,
"/organizations/:organization_id/projects/:project_id/chunks-flamegraph",
e.postFlamegraphFromChunksMetadata,
},
{
http.MethodPost,
"/organizations/:organization_id/projects/:project_id/chunks",
Expand Down
Loading

0 comments on commit 305c63b

Please sign in to comment.