Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(profiling): add support to ingest non-sampled profiles #2374

Closed
wants to merge 10 commits into from
5 changes: 5 additions & 0 deletions relay-dynamic-config/src/feature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ pub enum Feature {
/// Extract spans from transactions and convert them to standalone spans.
#[serde(rename = "projects:extract-standalone-spans")]
ExtractStandaloneSpans,
/// Enable processing and extracting data from non dinamycally sampled profiles.
/// Only some data for slowest function aggregation will be used. The profile
/// itself won't be stored on GCS.
#[serde(rename = "organizations:profiling-ingest-unsampled-profiles")]
IngestUnsampledProfiles,

/// Deprecated, still forwarded for older downstream Relays.
#[serde(rename = "organizations:transaction-name-mark-scrubbed-as-sanitized")]
Expand Down
8 changes: 8 additions & 0 deletions relay-general/src/types/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ impl Value {
}
}

/// Returns the bool if this value is a bool, otherwise `None`.
pub fn as_bool(&self) -> Option<bool> {
match self {
Value::Bool(bool) => Some(*bool),
_ => None,
}
}

/// Constructs a `Value` from a `serde_json::Value` object.
fn from_json(value: serde_json::Value) -> Option<Self> {
Some(match value {
Expand Down
20 changes: 17 additions & 3 deletions relay-server/src/actors/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2346,14 +2346,28 @@ impl EnvelopeProcessorService {

/// Apply the dynamic sampling decision from `compute_sampling_decision`.
fn sample_envelope(&self, state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> {
let project_state = &state.project_state;
match std::mem::take(&mut state.sampling_result) {
// We assume that sampling is only supposed to work on transactions.
SamplingResult::Drop(rule_ids)
if state.event_type() == Some(EventType::Transaction) =>
{
state
.managed_envelope
.reject(Outcome::FilteredSampling(rule_ids.clone()));
let unsampled_profiles_enabled =
project_state.has_feature(Feature::IngestUnsampledProfiles);

let outcome = Outcome::FilteredSampling(rule_ids.clone());
state.managed_envelope.retain_items(|item| {
if unsampled_profiles_enabled && item.ty() == &ItemType::Profile {
item.set_header("sampled", false);
viglia marked this conversation as resolved.
Show resolved Hide resolved
ItemAction::Keep
} else {
ItemAction::Drop(outcome.clone())
}
});
if state.managed_envelope.envelope().is_empty() {
// Just for bookkeeping.
state.managed_envelope.reject(outcome);
}

Err(ProcessingError::Sampled(rule_ids))
}
Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use once_cell::sync::OnceCell;
use relay_common::{ProjectId, UnixTimestamp, Uuid};
use relay_config::Config;
use relay_general::protocol::{self, EventId, SessionAggregates, SessionStatus, SessionUpdate};
use relay_general::types::Value;
use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message};
use relay_metrics::{Bucket, BucketValue, MetricNamespace, MetricResourceIdentifier};
use relay_quotas::Scoping;
Expand Down Expand Up @@ -627,6 +628,10 @@ impl StoreService {
project_id,
key_id,
received: UnixTimestamp::from_instant(start_time).as_secs(),
sampled: item
.get_header("sampled")
.and_then(Value::as_bool)
.unwrap_or(true),
payload: item.payload(),
};
self.produce(
Expand Down Expand Up @@ -1013,6 +1018,7 @@ struct ProfileKafkaMessage {
project_id: ProjectId,
key_id: Option<u64>,
received: u64,
sampled: bool,
payload: Bytes,
}

Expand Down