Skip to content

Commit

Permalink
stop writing profile examples to metrics_summary
Browse files Browse the repository at this point in the history
  • Loading branch information
viglia committed Oct 7, 2024
1 parent 16074e6 commit c83b763
Show file tree
Hide file tree
Showing 2 changed files with 1 addition and 70 deletions.
55 changes: 0 additions & 55 deletions cmd/vroom/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,13 @@ package main

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/google/uuid"

"github.com/getsentry/sentry-go"
"github.com/getsentry/vroom/internal/nodetree"
"github.com/getsentry/vroom/internal/platform"
"github.com/getsentry/vroom/internal/profile"
"github.com/segmentio/kafka-go"
)

const profilesFunctionMri = "d:profiles/function.duration@millisecond"

type (
// FunctionsKafkaMessage is representing the struct we send to Kafka to insert functions in ClickHouse.
FunctionsKafkaMessage struct {
Expand Down Expand Up @@ -127,53 +119,6 @@ func buildProfileKafkaMessage(p profile.Profile) ProfileKafkaMessage {
}
}

func generateMetricSummariesKafkaMessageBatch(p *profile.Profile, metrics []sentry.Metric, metricsSummary []MetricSummary) ([]kafka.Message, error) {
if len(metrics) != len(metricsSummary) {
return nil, fmt.Errorf("len(metrics): %d - len(metrics_summary): %d", len(metrics), len(metricsSummary))
}
messages := make([]kafka.Message, 0, len(metrics))
for i, metric := range metrics {
// add profile_id to the metrics_summary tags
tags := metric.GetTags()
tags["profile_id"] = p.ID()
ms := MetricsSummaryKafkaMessage{
Count: metricsSummary[i].Count,
DurationMs: uint32(p.TransactionMetadata().TransactionEnd.UnixMilli() - p.TransactionMetadata().TransactionStart.UnixMilli()),
EndTimestamp: float64(p.TransactionMetadata().TransactionEnd.Unix()),
Max: metricsSummary[i].Max,
Min: metricsSummary[i].Min,
Sum: metricsSummary[i].Sum,
Mri: profilesFunctionMri,
ProjectID: p.ProjectID(),
Received: p.Received().Unix(),
RetentionDays: p.RetentionDays(),
Tags: tags,
TraceID: p.Transaction().TraceID,
// currently we need to set this to a randomly generated span_id because
// the metrics_summaries dataset is defined with a ReplaceMergingTree engine
// and given its ORDER BY definition we would not be able to store samples
// with the same span_id.
// see: https://github.com/getsentry/snuba/blob/master/snuba/snuba_migrations/metrics_summaries/0001_metrics_summaries_create_table.py#L44-L45
//
// That's ok for our use case as we currently don't need span_id for profile function,
// but, once we'll recreate the table and get rid of the ReplaceMergineTree
// we can set it back to p.Transaction().SegmentID for the sake of consistency
SpanID: strings.Replace(uuid.New().String(), "-", "", -1)[16:],
IsSegment: true,
SegmentID: p.Transaction().SegmentID,
}
b, err := json.Marshal(ms)
if err != nil {
return nil, err
}
msg := kafka.Message{
Value: b,
}
messages = append(messages, msg)
}
return messages, nil
}

type KafkaWriter interface {
WriteMessages(ctx context.Context, msgs ...kafka.Message) error
Close() error
Expand Down
16 changes: 1 addition & 15 deletions cmd/vroom/profile.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (env *environment) postProfile(w http.ResponseWriter, r *http.Request) {
s.Description = "Extract metrics from functions"
// Cap and filter out system frames.
functionsMetricPlatform := metrics.CapAndFilterFunctions(functions, maxUniqueFunctionsPerProfile, true)
metrics, metricsSummary := extractMetricsFromFunctions(&p, functionsMetricPlatform)
metrics, _ := extractMetricsFromFunctions(&p, functionsMetricPlatform)
s.Finish()

if len(metrics) > 0 {
Expand All @@ -195,20 +195,6 @@ func (env *environment) postProfile(w http.ResponseWriter, r *http.Request) {
sendMetrics(ctx, p.GetOptions().ProjectDSN, metrics, env.metricsClient)
s.Finish()
}

// Only send a profile sample to the metrics_summary if it's an indexed profile
if p.IsSampled() && len(metrics) > 0 {
kafkaMessages, err := generateMetricSummariesKafkaMessageBatch(&p, metrics, metricsSummary)
if err != nil {
hub.CaptureException(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
err = env.metricSummaryWriter.WriteMessages(ctx, kafkaMessages...)
if err != nil {
hub.CaptureException(err)
}
}
}
}

Expand Down

0 comments on commit c83b763

Please sign in to comment.