diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index bd13dbf914..ff95c78fb0 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -42,6 +42,11 @@ pub enum Feature { /// Enable the Relay cardinality limiter. #[serde(rename = "organizations:relay-cardinality-limiter")] CardinalityLimiter, + /// Enable processing and extracting data from non dinamycally sampled profiles. + /// Only some data for slowest function aggregation will be used. The profile + /// itself won't be stored on GCS. + #[serde(rename = "organizations:profiling-ingest-unsampled-profiles")] + IngestUnsampledProfiles, /// Deprecated, still forwarded for older downstream Relays. #[serde(rename = "organizations:transaction-name-mark-scrubbed-as-sanitized")] diff --git a/relay-dynamic-config/src/global.rs b/relay-dynamic-config/src/global.rs index 982e984284..b2b2ad9bd0 100644 --- a/relay-dynamic-config/src/global.rs +++ b/relay-dynamic-config/src/global.rs @@ -50,6 +50,48 @@ pub struct Options { // #[serde(default, rename = "relay.some-option.name")] // pub some_option: Vec, // ``` + /// org IDs for which we'll allow using profiles dropped due to DS for function metrics. + /// This is only intended to be be used initially to limit the feature to sentry org. + /// Once we start to gradually rollout to other orgs this option can be deprecated + #[serde( + default, + rename = "profiling.profile_metrics.unsampled_profiles.allowed_org_ids" + )] + pub profile_metrics_allowed_org_ids: Vec, + + /// org IDs for which we want to avoid using the unsampled profiles for function metrics. + /// This will let us selectively disable the behaviour for entire orgs that may have an + /// extremely high volume increase + #[serde( + default, + rename = "profiling.profile_metrics.unsampled_profiles.excluded_org_ids" + )] + pub profile_metrics_excluded_orgs_ids: Vec, + + /// project IDs for which we want to avoid using the unsampled profiles for function metrics. + /// This will let us selectively disable the behaviour for project that may have an extremely + /// high volume increase + #[serde( + default, + rename = "profiling.profile_metrics.unsampled_profiles.excluded_project_ids" + )] + pub profile_metrics_excluded_project_ids: Vec, + + /// list of platform names for which we allow using unsampled profiles for the purpose + /// of improving profile (function) metrics + #[serde( + default, + rename = "profiling.profile_metrics.unsampled_profiles.platforms" + )] + pub profile_metrics_allowed_platforms: Vec, + + /// sample rate for tuning the amount of unsampled profiles that we "let through" + #[serde( + default, + rename = "profiling.profile_metrics.unsampled_profiles.sample_rate" + )] + pub profile_metrics_sample_rate: f32, + /// All other unknown options. #[serde(flatten)] other: HashMap, diff --git a/relay-protocol/src/value.rs b/relay-protocol/src/value.rs index be774eddfc..1506014820 100644 --- a/relay-protocol/src/value.rs +++ b/relay-protocol/src/value.rs @@ -85,6 +85,14 @@ impl Value { } } + /// Returns the bool if this value is a bool, otherwise `None`. + pub fn as_bool(&self) -> Option { + match self { + Value::Bool(bool) => Some(*bool), + _ => None, + } + } + /// Constructs a `Value` from a `serde_json::Value` object. fn from_json(value: serde_json::Value) -> Option { Some(match value { diff --git a/relay-server/src/actors/processor/dynamic_sampling.rs b/relay-server/src/actors/processor/dynamic_sampling.rs index 2700438a6f..13830be624 100644 --- a/relay-server/src/actors/processor/dynamic_sampling.rs +++ b/relay-server/src/actors/processor/dynamic_sampling.rs @@ -5,7 +5,7 @@ use std::ops::ControlFlow; use chrono::Utc; use relay_base_schema::events::EventType; use relay_config::Config; -use relay_dynamic_config::ErrorBoundary; +use relay_dynamic_config::{ErrorBoundary, Feature}; use relay_event_schema::protocol::{Contexts, Event, TraceContext}; use relay_protocol::{Annotated, Empty}; use relay_sampling::config::{RuleType, SamplingMode}; @@ -14,7 +14,8 @@ use relay_sampling::{DynamicSamplingContext, SamplingConfig}; use crate::actors::outcome::Outcome; use crate::actors::processor::{ProcessEnvelopeState, ProcessingError}; -use crate::utils::{self, SamplingResult}; +use crate::envelope::ItemType; +use crate::utils::{self, ItemAction, SamplingResult}; /// Ensures there is a valid dynamic sampling context and corresponding project state. /// @@ -97,10 +98,29 @@ pub fn run(state: &mut ProcessEnvelopeState, config: &Config) { /// Apply the dynamic sampling decision from `compute_sampling_decision`. pub fn sample_envelope(state: &mut ProcessEnvelopeState) -> Result<(), ProcessingError> { + let project_state = &state.project_state; if let SamplingResult::Match(sampling_match) = std::mem::take(&mut state.sampling_result) { // We assume that sampling is only supposed to work on transactions. if state.event_type() == Some(EventType::Transaction) && sampling_match.should_drop() { + let unsampled_profiles_enabled = + project_state.has_feature(Feature::IngestUnsampledProfiles); + let matched_rules = sampling_match.into_matched_rules(); + let outcome = Outcome::FilteredSampling(matched_rules.clone()); + state.managed_envelope.retain_items(|item| { + if unsampled_profiles_enabled && item.ty() == &ItemType::Profile { + item.set_header("sampled", false); + ItemAction::Keep + } else { + ItemAction::Drop(outcome.clone()) + } + }); + state.managed_envelope.update_freeze_event(); + if state.managed_envelope.envelope().is_empty() { + // Call reject to make sure that outcomes are generated for the transaction event, + // which has already been removed from the envelope for processing. + state.managed_envelope.reject(outcome); + } state .managed_envelope diff --git a/relay-server/src/actors/store.rs b/relay-server/src/actors/store.rs index 1231db9011..7b5c615917 100644 --- a/relay-server/src/actors/store.rs +++ b/relay-server/src/actors/store.rs @@ -13,8 +13,10 @@ use relay_config::Config; use relay_event_schema::protocol::{ self, EventId, SessionAggregates, SessionStatus, SessionUpdate, }; + use relay_kafka::{ClientError, KafkaClient, KafkaTopic, Message}; use relay_metrics::{Bucket, BucketValue, MetricNamespace, MetricResourceIdentifier}; +use relay_protocol::Value; use relay_quotas::Scoping; use relay_statsd::metric; use relay_system::{AsyncResponse, FromMessage, Interface, Sender, Service}; @@ -625,6 +627,10 @@ impl StoreService { project_id, key_id, received: UnixTimestamp::from_instant(start_time).as_secs(), + sampled: item + .get_header("sampled") + .and_then(Value::as_bool) + .unwrap_or(true), payload: item.payload(), }; self.produce( @@ -1021,6 +1027,7 @@ struct ProfileKafkaMessage { project_id: ProjectId, key_id: Option, received: u64, + sampled: bool, payload: Bytes, } diff --git a/relay-server/src/utils/managed_envelope.rs b/relay-server/src/utils/managed_envelope.rs index b41cc438e9..f37c7e11c9 100644 --- a/relay-server/src/utils/managed_envelope.rs +++ b/relay-server/src/utils/managed_envelope.rs @@ -184,6 +184,21 @@ impl ManagedEnvelope { self } + /// Update the context with envelope information, leaving event information untouched. + /// + /// This is useful when the event has already been removed from the envelope for processing, + /// but we still want outcomes to be generated for the event. + pub fn update_freeze_event(&mut self) -> &mut Self { + let event_category = self.context.summary.event_category; + let event_metrics_extracted = self.context.summary.event_metrics_extracted; + + self.context.summary = EnvelopeSummary::compute(self.envelope()); + + self.context.summary.event_category = event_category; + self.context.summary.event_metrics_extracted = event_metrics_extracted; + self + } + /// Retains or drops items based on the [`ItemAction`]. /// /// diff --git a/tests/integration/test_dynamic_sampling.py b/tests/integration/test_dynamic_sampling.py index 76fbd8593b..7a3875054b 100644 --- a/tests/integration/test_dynamic_sampling.py +++ b/tests/integration/test_dynamic_sampling.py @@ -1,3 +1,4 @@ +from datetime import datetime import uuid import json @@ -589,3 +590,185 @@ def test_relay_chain( envelope = mini_sentry.captured_events.get(timeout=1) envelope.get_transaction_event() + + +def test_relay_chain_keep_unsampled_profile( + mini_sentry, + relay, +): + + # Create an envelope with a profile: + def make_envelope(public_key): + trace_uuid = "414e119d37694a32869f9d81b76a0bbb" + transaction_uuid = "414e119d37694a32869f9d81b76a0baa" + + envelope, trace_id, event_id = _create_transaction_envelope( + public_key, + trace_id=trace_uuid, + event_id=transaction_uuid, + ) + te = envelope.get_transaction_event() + profile_payload = get_profile_payload(te) + envelope.add_item( + Item( + payload=PayloadRef(bytes=json.dumps(profile_payload).encode()), + type="profile", + ) + ) + return envelope + + project_id = 42 + relay = relay(relay(mini_sentry)) + config = mini_sentry.add_basic_project_config(project_id) + config["config"]["transactionMetrics"] = {"version": 1} + config["config"]["features"] = ["organizations:profiling-ingest-unsampled-profiles"] + + public_key = config["publicKeys"][0]["publicKey"] + SAMPLE_RATE = 0.0 + _add_sampling_config(config, sample_rate=SAMPLE_RATE, rule_type="transaction") + + envelope = make_envelope(public_key) + relay.send_envelope(project_id, envelope) + envelope = mini_sentry.captured_events.get(timeout=1) + + print(f"Envelope: {envelope}") + # profiles = list( + # filter(lambda item: item.data_category == "profile", envelope.items) + # ) + # print(f"Profiles: {json.loads(profiles[0].payload.bytes.decode('utf-8'))}") + + +def get_profile_payload(transaction): + return { + "debug_meta": {"images": []}, + "device": { + "architecture": "x86_64", + "classification": "", + "locale": "", + "manufacturer": "", + "model": "", + }, + "environment": "prod", + "event_id": "429c1ffa194f41f5b6a6650929744177", + "os": {"build_number": "", "name": "Linux", "version": "5.15.107+"}, + "organization_id": 1, + "platform": "python", + "project_id": 1, + "received": transaction["start_timestamp"], + "release": "backend@c2a460502d5e5e785525d59479d665aa04320a6b", + "retention_days": 90, + "runtime": {"name": "CPython", "version": "3.8.18"}, + "timestamp": datetime.fromtimestamp(transaction["start_timestamp"]).isoformat() + + "Z", + "profile": { + "frames": [ + { + "data": {}, + "filename": "concurrent/futures/thread.py", + "function": "_worker", + "in_app": False, + "lineno": 78, + "module": "concurrent.futures.thread", + "abs_path": "/usr/local/lib/python3.8/concurrent/futures/thread.py", + }, + { + "data": {}, + "filename": "threading.py", + "function": "Thread.run", + "in_app": False, + "lineno": 870, + "module": "threading", + "abs_path": "/usr/local/lib/python3.8/threading.py", + }, + { + "data": {}, + "filename": "sentry_sdk/integrations/threading.py", + "function": "run", + "in_app": False, + "lineno": 70, + "module": "sentry_sdk.integrations.threading", + "abs_path": "/usr/local/lib/python3.8/site-packages/sentry_sdk/integrations/threading.py", + }, + { + "data": {}, + "filename": "threading.py", + "function": "Thread._bootstrap_inner", + "in_app": False, + "lineno": 932, + "module": "threading", + "abs_path": "/usr/local/lib/python3.8/threading.py", + }, + { + "data": {}, + "filename": "threading.py", + "function": "Thread._bootstrap", + "in_app": False, + "lineno": 890, + "module": "threading", + "abs_path": "/usr/local/lib/python3.8/threading.py", + }, + ], + "queue_metadata": None, + "samples": [ + { + "elapsed_since_start_ns": 2668948, + "stack_id": 0, + "thread_id": 140510151571200, + }, + { + "elapsed_since_start_ns": 2668948, + "stack_id": 0, + "thread_id": 140510673217280, + }, + { + "elapsed_since_start_ns": 2668948, + "stack_id": 0, + "thread_id": 140510673217280, + }, + ], + "stacks": [[0, 1, 2, 3, 4]], + "thread_metadata": { + "140510151571200": {"name": "ThreadPoolExecutor-1_4"}, + "140510673217280": {"name": "ThreadPoolExecutor-1_3"}, + "140510710716160": {"name": "Thread-19"}, + "140510719108864": {"name": "Thread-18"}, + "140510727501568": {"name": "ThreadPoolExecutor-1_2"}, + "140511074039552": {"name": "ThreadPoolExecutor-1_1"}, + "140511082432256": {"name": "ThreadPoolExecutor-1_0"}, + "140511090824960": {"name": "Thread-17"}, + "140511099217664": {"name": "raven-sentry.BackgroundWorker"}, + "140511117047552": {"name": "raven-sentry.BackgroundWorker"}, + "140511574738688": {"name": "sentry.profiler.ThreadScheduler"}, + "140511583131392": {"name": "sentry.monitor"}, + "140512539440896": {"name": "uWSGIWorker6Core1"}, + "140512620431104": {"name": "Thread-1"}, + "140514768926528": {"name": "uWSGIWorker6Core0"}, + }, + }, + "transaction": { + "active_thread_id": 140512539440896, + "id": transaction["event_id"], + "name": "/api/0/organizations/{organization_slug}/broadcasts/", + "trace_id": transaction["contexts"]["trace"]["trace_id"], + }, + "transaction_metadata": { + "environment": "prod", + "http.method": "GET", + "release": "backend@c2a460502d5e5e785525d59479d665aa04320a6b", + "transaction": "/api/0/organizations/{organization_slug}/broadcasts/", + "transaction.end": datetime.fromtimestamp( + transaction["timestamp"] + ).isoformat(), + "transaction.op": "http.server", + "transaction.start": datetime.fromtimestamp( + transaction["start_timestamp"] + ).isoformat(), + "transaction.status": "ok", + }, + "transaction_tags": { + "http.status_code": "200", + "organization": "4506315662819328", + "organization.slug": "uta-inc", + }, + "version": "1", + }