diff --git a/relay-server/src/endpoints/common.rs b/relay-server/src/endpoints/common.rs index 5a124b84b7..9402195a0d 100644 --- a/relay-server/src/endpoints/common.rs +++ b/relay-server/src/endpoints/common.rs @@ -307,8 +307,8 @@ fn queue_envelope( envelope.scope(scoping); match state.envelope_buffer() { - Some(buffer) => { - if !buffer.has_capacity() { + Some((buffer_state, buffer)) => { + if !buffer_state.has_capacity() { return Err(BadStoreRequest::QueueFailed); } @@ -316,9 +316,7 @@ fn queue_envelope( // envelope's projects. See `handle_check_envelope`. relay_log::trace!("Pushing envelope to V2 buffer"); - buffer - .addr() - .send(EnvelopeBuffer::Push(envelope.into_envelope())); + buffer.send(EnvelopeBuffer::Push(envelope.into_envelope())); } None => { relay_log::trace!("Sending envelope to project cache for V1 buffer"); diff --git a/relay-server/src/service.rs b/relay-server/src/service.rs index d7e954fe86..2224e413cb 100644 --- a/relay-server/src/service.rs +++ b/relay-server/src/service.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::time::Duration; use crate::metrics::{MetricOutcomes, MetricStats}; -use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer}; +use crate::services::buffer::{self, EnvelopeBuffer, EnvelopeBufferService, EnvelopeBufferState}; use crate::services::cogs::{CogsService, CogsServiceRecorder}; use crate::services::global_config::{GlobalConfigManager, GlobalConfigService}; use crate::services::health_check::{HealthCheck, HealthCheckService}; @@ -31,7 +31,7 @@ use relay_redis::redis::Script; #[cfg(feature = "processing")] use relay_redis::{PooledClient, RedisScripts}; use relay_redis::{RedisError, RedisPool, RedisPools}; -use relay_system::{channel, Addr, Service}; +use relay_system::{Addr, Service}; use tokio::runtime::Runtime; use tokio::sync::mpsc; @@ -64,7 +64,7 @@ pub struct Registry { pub global_config: Addr, pub project_cache: Addr, pub upstream_relay: Addr, - pub envelope_buffer: Option, + pub envelope_buffer: Option<(EnvelopeBufferState, Addr)>, } impl fmt::Debug for Registry { @@ -156,8 +156,8 @@ pub struct ServiceState { impl ServiceState { /// Starts all services and returns addresses to all of them. pub fn start(config: Arc) -> Result { - let upstream_relay = UpstreamRelayService::new(config.clone()).start(); - let test_store = TestStoreService::new(config.clone()).start(); + let (_, upstream_relay) = UpstreamRelayService::new(config.clone()).start(); + let (_, test_store) = TestStoreService::new(config.clone()).start(); let redis_pools = config .redis() @@ -181,32 +181,31 @@ impl ServiceState { // Create an address for the `EnvelopeProcessor`, which can be injected into the // other services. - let (processor, processor_rx) = channel(EnvelopeProcessorService::name()); - let outcome_producer = OutcomeProducerService::create( + let (processor, processor_rx) = EnvelopeProcessorService::channel(); + let (_, outcome_producer) = OutcomeProducerService::create( config.clone(), upstream_relay.clone(), processor.clone(), )? .start(); - let outcome_aggregator = OutcomeAggregator::new(&config, outcome_producer.clone()).start(); + let (_, outcome_aggregator) = + OutcomeAggregator::new(&config, outcome_producer.clone()).start(); let (global_config, global_config_rx) = GlobalConfigService::new(config.clone(), upstream_relay.clone()); - let global_config_handle = global_config.handle(); // The global config service must start before dependant services are // started. Messages like subscription requests to the global config // service fail if the service is not running. - let global_config = global_config.start(); + let (global_config_handle, global_config) = global_config.start(); - let (project_cache, project_cache_rx) = channel(ProjectCacheService::name()); + let (project_cache, project_cache_rx) = ProjectCacheService::channel(); - let aggregator = RouterService::new( + let (aggregator_handle, aggregator) = RouterService::new( config.default_aggregator_config().clone(), config.secondary_aggregator_configs().clone(), Some(project_cache.clone().recipient()), - ); - let aggregator_handle = aggregator.handle(); - let aggregator = aggregator.start(); + ) + .start(); let metric_stats = MetricStats::new( config.clone(), @@ -227,12 +226,12 @@ impl ServiceState { outcome_aggregator.clone(), metric_outcomes.clone(), ) - .map(|s| s.start()) + .map(|s| s.start().1) }) .transpose()?; let cogs = CogsService::new(&config); - let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start())); + let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start().1)); EnvelopeProcessorService::new( create_processor_pool(&config)?, @@ -252,7 +251,7 @@ impl ServiceState { }, metric_outcomes.clone(), ) - .spawn_handler(processor_rx); + .start_with_receiver(processor_rx); let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes()); let envelope_buffer = EnvelopeBufferService::new( @@ -266,11 +265,11 @@ impl ServiceState { test_store: test_store.clone(), }, ) - .map(|b| b.start_observable()); + .map(|e| e.start()); // Keep all the services in one context. let project_cache_services = Services { - envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr), + envelope_buffer: envelope_buffer.as_ref().map(|(_, a)| a.clone()), aggregator: aggregator.clone(), envelope_processor: processor.clone(), outcome_aggregator: outcome_aggregator.clone(), @@ -289,14 +288,14 @@ impl ServiceState { .as_ref() .map(|pools| pools.project_configs.clone()), ) - .spawn_handler(project_cache_rx); + .start_with_receiver(project_cache_rx); - let health_check = HealthCheckService::new( + let (_, health_check) = HealthCheckService::new( config.clone(), MemoryChecker::new(memory_stat.clone(), config.clone()), aggregator_handle, upstream_relay.clone(), - envelope_buffer.clone(), + envelope_buffer.as_ref().map(|(s, _)| s.clone()), ) .start(); @@ -308,7 +307,8 @@ impl ServiceState { ) .start(); - let relay_cache = RelayCacheService::new(config.clone(), upstream_relay.clone()).start(); + let (_, relay_cache) = + RelayCacheService::new(config.clone(), upstream_relay.clone()).start(); let registry = Registry { aggregator, @@ -348,7 +348,7 @@ impl ServiceState { } /// Returns the V2 envelope buffer, if present. - pub fn envelope_buffer(&self) -> Option<&ObservableEnvelopeBuffer> { + pub fn envelope_buffer(&self) -> Option<&(EnvelopeBufferState, Addr)> { self.inner.registry.envelope_buffer.as_ref() } diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index 3bcc0c5fea..670d60d352 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -8,8 +8,10 @@ use std::time::Duration; use relay_base_schema::project::ProjectKey; use relay_config::Config; -use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service}; -use relay_system::{Controller, Shutdown}; +use relay_system::Shutdown; +use relay_system::{ + Addr, FromMessage, Interface, NoResponse, Receiver, Service, ShutdownHandle, State, +}; use tokio::sync::mpsc::Permit; use tokio::sync::{mpsc, watch}; use tokio::time::{timeout, Instant}; @@ -70,30 +72,22 @@ impl FromMessage for EnvelopeBuffer { } } -/// Contains the services [`Addr`] and a watch channel to observe its state. -/// -/// This allows outside observers to check the capacity without having to send a message. -/// -// NOTE: This pattern of combining an Addr with some observable state could be generalized into -// `Service` itself. +/// Public state of the [`EnvelopeBufferService`] which allows to check if the service has capacity +/// to accept new [`Envelope`]s. #[derive(Debug, Clone)] -pub struct ObservableEnvelopeBuffer { - addr: Addr, +pub struct EnvelopeBufferState { has_capacity: Arc, } -impl ObservableEnvelopeBuffer { - /// Returns the address of the buffer service. - pub fn addr(&self) -> Addr { - self.addr.clone() - } - +impl EnvelopeBufferState { /// Returns `true` if the buffer has the capacity to accept more elements. pub fn has_capacity(&self) -> bool { self.has_capacity.load(Ordering::Relaxed) } } +impl State for EnvelopeBufferState {} + /// Services that the buffer service communicates with. #[derive(Clone)] pub struct Services { @@ -143,15 +137,6 @@ impl EnvelopeBufferService { }) } - /// Returns both the [`Addr`] to this service, and a reference to the capacity flag. - pub fn start_observable(self) -> ObservableEnvelopeBuffer { - let has_capacity = self.has_capacity.clone(); - ObservableEnvelopeBuffer { - addr: self.start(), - has_capacity, - } - } - /// Wait for the configured amount of time and make sure the project cache is ready to receive. async fn ready_to_pop( &mut self, @@ -373,7 +358,15 @@ impl EnvelopeBufferService { impl Service for EnvelopeBufferService { type Interface = EnvelopeBuffer; - fn spawn_handler(mut self, mut rx: Receiver) { + type PublicState = EnvelopeBufferState; + + fn pre_spawn(&self) -> Self::PublicState { + EnvelopeBufferState { + has_capacity: self.has_capacity.clone(), + } + } + + fn spawn_handler(mut self, mut rx: Receiver, mut shutdown: ShutdownHandle) { let config = self.config.clone(); let memory_checker = MemoryChecker::new(self.memory_stat.clone(), config.clone()); let mut global_config_rx = self.global_config_rx.clone(); @@ -397,8 +390,6 @@ impl Service for EnvelopeBufferService { }; buffer.initialize().await; - let mut shutdown = Controller::shutdown_handle(); - relay_log::info!("EnvelopeBufferService: starting"); loop { let used_capacity = self.services.envelopes_tx.max_capacity() @@ -543,15 +534,15 @@ mod tests { service.has_capacity.store(false, Ordering::Relaxed); - let ObservableEnvelopeBuffer { addr, has_capacity } = service.start_observable(); - assert!(!has_capacity.load(Ordering::Relaxed)); + let (state, addr) = service.start(); + assert!(!state.has_capacity.load(Ordering::Relaxed)); let some_project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap(); addr.send(EnvelopeBuffer::Ready(some_project_key)); tokio::time::advance(Duration::from_millis(100)).await; - assert!(has_capacity.load(Ordering::Relaxed)); + assert!(state.has_capacity.load(Ordering::Relaxed)); } #[tokio::test] @@ -566,7 +557,7 @@ mod tests { outcome_aggregator_rx: _outcome_aggregator_rx, } = envelope_buffer_service(None, global_config::Status::Pending); - let addr = service.start(); + let (_, addr) = service.start(); let envelope = new_envelope(false, "foo"); let project_key = envelope.meta().public_key(); @@ -613,7 +604,7 @@ mod tests { global_config::Status::Ready(Arc::new(GlobalConfig::default())), ); - let addr = service.start(); + let (_, addr) = service.start(); let envelope = new_envelope(false, "foo"); let project_key = envelope.meta().public_key(); @@ -649,7 +640,7 @@ mod tests { ); let config = service.config.clone(); - let addr = service.start(); + let (_, addr) = service.start(); let mut envelope = new_envelope(false, "foo"); envelope @@ -682,7 +673,7 @@ mod tests { global_config::Status::Ready(Arc::new(GlobalConfig::default())), ); - let addr = service.start(); + let (_, addr) = service.start(); let envelope = new_envelope(false, "foo"); let project_key = envelope.meta().public_key(); @@ -730,7 +721,7 @@ mod tests { global_config::Status::Ready(Arc::new(GlobalConfig::default())), ); - let addr = service.start(); + let (_, addr) = service.start(); let envelope = new_envelope(false, "foo"); let project_key = envelope.meta().public_key(); diff --git a/relay-server/src/services/cogs.rs b/relay-server/src/services/cogs.rs index 19daf7516d..75bcd5cc68 100644 --- a/relay-server/src/services/cogs.rs +++ b/relay-server/src/services/cogs.rs @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId}; use relay_config::Config; -use relay_system::{Addr, FromMessage, Interface, Service}; +use relay_system::{Addr, FromMessage, Interface, Service, ShutdownHandle}; use crate::statsd::RelayCounters; @@ -54,7 +54,15 @@ impl CogsService { impl Service for CogsService { type Interface = CogsReport; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { tokio::spawn(async move { while let Some(message) = rx.recv().await { self.handle_report(message); diff --git a/relay-server/src/services/global_config.rs b/relay-server/src/services/global_config.rs index 7e401b626f..ebec5f3f10 100644 --- a/relay-server/src/services/global_config.rs +++ b/relay-server/src/services/global_config.rs @@ -18,7 +18,7 @@ use relay_config::Config; use relay_config::RelayMode; use relay_dynamic_config::GlobalConfig; use relay_statsd::metric; -use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Service}; +use relay_system::{Addr, AsyncResponse, FromMessage, Interface, Service, ShutdownHandle, State}; use reqwest::Method; use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; @@ -172,6 +172,8 @@ impl GlobalConfigHandle { } } +impl State for GlobalConfigHandle {} + impl fmt::Debug for GlobalConfigHandle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_tuple("GlobalConfigHandle") @@ -227,14 +229,6 @@ impl GlobalConfigService { ) } - /// Creates a [`GlobalConfigHandle`] which can be used to retrieve the current state - /// of the global config at any time. - pub fn handle(&self) -> GlobalConfigHandle { - GlobalConfigHandle { - watch: self.global_config_watch.subscribe(), - } - } - /// Handles messages from external services. fn handle_message(&mut self, message: GlobalConfigManager) { match message { @@ -338,10 +332,20 @@ impl GlobalConfigService { impl Service for GlobalConfigService { type Interface = GlobalConfigManager; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { - let mut shutdown_handle = Controller::shutdown_handle(); + type PublicState = GlobalConfigHandle; + fn pre_spawn(&self) -> Self::PublicState { + GlobalConfigHandle { + watch: self.global_config_watch.subscribe(), + } + } + + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + mut shutdown: ShutdownHandle, + ) { + tokio::spawn(async move { relay_log::info!("global config service starting"); if self.config.relay_mode() == RelayMode::Managed { relay_log::info!("requesting global config from upstream"); @@ -378,7 +382,7 @@ impl Service for GlobalConfigService { () = &mut self.fetch_handle => self.request_global_config(), Some(result) = self.internal_rx.recv() => self.handle_result(result), Some(message) = rx.recv() => self.handle_message(message), - _ = shutdown_handle.notified() => self.handle_shutdown(), + _ = shutdown.notified() => self.handle_shutdown(), else => break, } @@ -419,7 +423,7 @@ mod tests { config.regenerate_credentials(false).unwrap(); let fetch_interval = config.global_config_fetch_interval(); - let service = GlobalConfigService::new(Arc::new(config), upstream) + let (_, service) = GlobalConfigService::new(Arc::new(config), upstream) .0 .start(); @@ -450,7 +454,7 @@ mod tests { config.regenerate_credentials(false).unwrap(); let fetch_interval = config.global_config_fetch_interval(); - let service = GlobalConfigService::new(Arc::new(config), upstream) + let (_, service) = GlobalConfigService::new(Arc::new(config), upstream) .0 .start(); service.send(Get).await.unwrap(); @@ -477,7 +481,7 @@ mod tests { let fetch_interval = config.global_config_fetch_interval(); - let service = GlobalConfigService::new(Arc::new(config), upstream) + let (_, service) = GlobalConfigService::new(Arc::new(config), upstream) .0 .start(); service.send(Get).await.unwrap(); diff --git a/relay-server/src/services/health_check.rs b/relay-server/src/services/health_check.rs index 867ddc3330..ba69f91c3b 100644 --- a/relay-server/src/services/health_check.rs +++ b/relay-server/src/services/health_check.rs @@ -1,12 +1,12 @@ use std::sync::Arc; use relay_config::Config; -use relay_system::{Addr, AsyncResponse, Controller, FromMessage, Interface, Sender, Service}; +use relay_system::{Addr, AsyncResponse, FromMessage, Interface, Sender, Service, ShutdownHandle}; use std::future::Future; use tokio::sync::watch; use tokio::time::{timeout, Instant}; -use crate::services::buffer::ObservableEnvelopeBuffer; +use crate::services::buffer::EnvelopeBufferState; use crate::services::metrics::RouterHandle; use crate::services::upstream::{IsAuthenticated, UpstreamRelay}; use crate::statsd::RelayTimers; @@ -86,7 +86,7 @@ pub struct HealthCheckService { memory_checker: MemoryChecker, aggregator: RouterHandle, upstream_relay: Addr, - envelope_buffer: Option, // make non-optional once V1 has been removed + envelope_buffer: Option, // make non-optional once V1 has been removed } impl HealthCheckService { @@ -98,7 +98,7 @@ impl HealthCheckService { memory_checker: MemoryChecker, aggregator: RouterHandle, upstream_relay: Addr, - envelope_buffer: Option, + envelope_buffer: Option, ) -> Self { Self { config, @@ -193,15 +193,21 @@ impl HealthCheckService { impl Service for HealthCheckService { type Interface = HealthCheck; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + shutdown: ShutdownHandle, + ) { let (update_tx, update_rx) = watch::channel(StatusUpdate::new(Status::Unhealthy)); let check_interval = self.config.health_refresh_interval(); // Add 10% buffer to the internal timeouts to avoid race conditions. let status_timeout = (check_interval + self.config.health_probe_timeout()).mul_f64(1.1); tokio::spawn(async move { - let shutdown = Controller::shutdown_handle(); - while shutdown.get().is_none() { let _ = update_tx.send(StatusUpdate::new(relay_statsd::metric!( timer(RelayTimers::HealthCheckDuration), diff --git a/relay-server/src/services/metrics/aggregator.rs b/relay-server/src/services/metrics/aggregator.rs index 707d0eec72..d099f37973 100644 --- a/relay-server/src/services/metrics/aggregator.rs +++ b/relay-server/src/services/metrics/aggregator.rs @@ -7,7 +7,9 @@ use relay_base_schema::project::ProjectKey; use relay_config::AggregatorServiceConfig; use relay_metrics::aggregator::AggregateMetricsError; use relay_metrics::{aggregator, Bucket, UnixTimestamp}; -use relay_system::{Controller, FromMessage, Interface, NoResponse, Recipient, Service, Shutdown}; +use relay_system::{ + FromMessage, Interface, NoResponse, Recipient, Service, Shutdown, ShutdownHandle, +}; use crate::statsd::{RelayCounters, RelayHistograms, RelayTimers}; @@ -246,10 +248,17 @@ impl AggregatorService { impl Service for AggregatorService { type Interface = Aggregator; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + mut shutdown: ShutdownHandle, + ) { tokio::spawn(async move { let mut ticker = tokio::time::interval(Duration::from_millis(self.flush_interval_ms)); - let mut shutdown = Controller::shutdown_handle(); // Note that currently this loop never exits and will run till the tokio runtime shuts // down. This is about to change with the refactoring for the shutdown process. @@ -361,7 +370,15 @@ mod tests { impl Service for TestReceiver { type Interface = TestInterface; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { tokio::spawn(async move { while let Some(message) = rx.recv().await { let buckets = message.0.buckets; @@ -392,7 +409,7 @@ mod tests { tokio::time::pause(); let receiver = TestReceiver::default(); - let recipient = receiver.clone().start().recipient(); + let recipient = receiver.clone().start().1.recipient(); let config = AggregatorServiceConfig { aggregator: AggregatorConfig { @@ -402,7 +419,7 @@ mod tests { }, ..Default::default() }; - let aggregator = AggregatorService::new(config, Some(recipient)).start(); + let (_, aggregator) = AggregatorService::new(config, Some(recipient)).start(); let mut bucket = some_bucket(); bucket.timestamp = UnixTimestamp::now(); diff --git a/relay-server/src/services/metrics/router.rs b/relay-server/src/services/metrics/router.rs index 522eb729f1..cc91c08e12 100644 --- a/relay-server/src/services/metrics/router.rs +++ b/relay-server/src/services/metrics/router.rs @@ -4,7 +4,7 @@ use relay_config::aggregator::Condition; use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig}; use relay_metrics::MetricNamespace; -use relay_system::{Addr, NoResponse, Recipient, Service}; +use relay_system::{Addr, NoResponse, Recipient, Service, ShutdownHandle, State}; use crate::services::metrics::{ Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets, @@ -39,8 +39,14 @@ impl RouterService { let default = AggregatorService::new(default_config, receiver); Self { default, secondary } } +} + +impl Service for RouterService { + type Interface = Aggregator; + + type PublicState = RouterHandle; - pub fn handle(&self) -> RouterHandle { + fn pre_spawn(&self) -> Self::PublicState { let mut handles = vec![self.default.handle()]; for (aggregator, _) in &self.secondary { handles.push(aggregator.handle()); @@ -48,12 +54,12 @@ impl RouterService { RouterHandle(handles) } -} -impl Service for RouterService { - type Interface = Aggregator; - - fn spawn_handler(self, mut rx: relay_system::Receiver) { + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { tokio::spawn(async move { let mut router = StartedRouter::start(self); relay_log::info!("metrics router started"); @@ -94,12 +100,12 @@ impl StartedRouter { .filter(|&namespace| condition.matches(Some(namespace))) .collect(); - (aggregator.start(), namespaces) + (aggregator.start().1, namespaces) }) .collect(); Self { - default: default.start(), + default: default.start().1, secondary, } } @@ -156,3 +162,5 @@ impl RouterHandle { self.0.iter().all(|ah| ah.can_accept_metrics()) } } + +impl State for RouterHandle {} diff --git a/relay-server/src/services/outcome.rs b/relay-server/src/services/outcome.rs index 35527452b3..bba5a7d00f 100644 --- a/relay-server/src/services/outcome.rs +++ b/relay-server/src/services/outcome.rs @@ -27,7 +27,7 @@ use relay_quotas::{DataCategory, ReasonCode, Scoping}; use relay_sampling::config::RuleId; use relay_sampling::evaluation::MatchedRuleIds; use relay_statsd::metric; -use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; +use relay_system::{Addr, FromMessage, Interface, NoResponse, Service, ShutdownHandle}; use serde::{Deserialize, Serialize}; #[cfg(feature = "processing")] @@ -686,7 +686,15 @@ impl HttpOutcomeProducer { impl Service for HttpOutcomeProducer { type Interface = TrackRawOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { tokio::spawn(async move { loop { tokio::select! { @@ -779,7 +787,15 @@ impl ClientReportOutcomeProducer { impl Service for ClientReportOutcomeProducer { type Interface = TrackOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { tokio::spawn(async move { loop { tokio::select! { @@ -983,8 +999,8 @@ impl ProducerInner { match self { #[cfg(feature = "processing")] ProducerInner::Kafka(inner) => OutcomeBroker::Kafka(inner), - ProducerInner::Http(inner) => OutcomeBroker::Http(inner.start()), - ProducerInner::ClientReport(inner) => OutcomeBroker::ClientReport(inner.start()), + ProducerInner::Http(inner) => OutcomeBroker::Http(inner.start().1), + ProducerInner::ClientReport(inner) => OutcomeBroker::ClientReport(inner.start().1), ProducerInner::Disabled => OutcomeBroker::Disabled, } } @@ -1038,7 +1054,15 @@ impl OutcomeProducerService { impl Service for OutcomeProducerService { type Interface = OutcomeProducer; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { let Self { config, inner } = self; tokio::spawn(async move { diff --git a/relay-server/src/services/outcome_aggregator.rs b/relay-server/src/services/outcome_aggregator.rs index 0a13cbf361..1de4c6f041 100644 --- a/relay-server/src/services/outcome_aggregator.rs +++ b/relay-server/src/services/outcome_aggregator.rs @@ -9,7 +9,7 @@ use relay_common::time::UnixTimestamp; use relay_config::{Config, EmitOutcomes}; use relay_quotas::{DataCategory, Scoping}; use relay_statsd::metric; -use relay_system::{Addr, Controller, Service, Shutdown}; +use relay_system::{Addr, Controller, Service, Shutdown, ShutdownHandle}; use crate::services::outcome::{Outcome, OutcomeProducer, TrackOutcome}; use crate::statsd::RelayTimers; @@ -138,7 +138,15 @@ impl OutcomeAggregator { impl Service for OutcomeAggregator { type Interface = TrackOutcome; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { tokio::spawn(async move { let mut shutdown = Controller::shutdown_handle(); relay_log::info!("outcome aggregator started"); diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index 640fdb0ee7..d5506d2f03 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -38,7 +38,7 @@ use relay_quotas::{DataCategory, RateLimits, Scoping}; use relay_sampling::config::RuleId; use relay_sampling::evaluation::{ReservoirCounters, ReservoirEvaluator, SamplingDecision}; use relay_statsd::metric; -use relay_system::{Addr, FromMessage, NoResponse, Service}; +use relay_system::{Addr, FromMessage, NoResponse, Service, ShutdownHandle}; use reqwest::header; use smallvec::{smallvec, SmallVec}; @@ -2926,7 +2926,15 @@ impl EnvelopeProcessorService { impl Service for EnvelopeProcessorService { type Interface = EnvelopeProcessor; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { tokio::spawn(async move { while let Some(message) = rx.recv().await { let service = self.clone(); diff --git a/relay-server/src/services/projects/cache.rs b/relay-server/src/services/projects/cache.rs index dc29f20bc2..2174edf0ff 100644 --- a/relay-server/src/services/projects/cache.rs +++ b/relay-server/src/services/projects/cache.rs @@ -18,7 +18,7 @@ use relay_metrics::{Bucket, MetricMeta}; use relay_quotas::RateLimits; use relay_redis::RedisPool; use relay_statsd::metric; -use relay_system::{Addr, FromMessage, Interface, Sender, Service}; +use relay_system::{Addr, FromMessage, Interface, Sender, Service, ShutdownHandle}; use tokio::sync::{mpsc, watch}; use tokio::time::Instant; @@ -1214,7 +1214,15 @@ impl ProjectCacheService { impl Service for ProjectCacheService { type Interface = ProjectCache; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { let Self { config, memory_checker, @@ -1255,7 +1263,7 @@ impl Service for ProjectCacheService { project_cache, test_store, }; - let buffer = match BufferService::create( + let (_, buffer) = match BufferService::create( memory_checker.clone(), buffer_services, config.clone(), @@ -1442,7 +1450,7 @@ mod tests { project_cache: services.project_cache.clone(), test_store: services.test_store.clone(), }; - let buffer = + let (_, buffer) = match BufferService::create(memory_checker.clone(), buffer_services, config.clone()) .await { diff --git a/relay-server/src/services/projects/source/local.rs b/relay-server/src/services/projects/source/local.rs index 45b12bf95d..2056f6147c 100644 --- a/relay-server/src/services/projects/source/local.rs +++ b/relay-server/src/services/projects/source/local.rs @@ -5,7 +5,9 @@ use std::sync::Arc; use relay_base_schema::project::{ProjectId, ProjectKey}; use relay_config::Config; -use relay_system::{AsyncResponse, FromMessage, Interface, Receiver, Sender, Service}; +use relay_system::{ + AsyncResponse, FromMessage, Interface, Receiver, Sender, Service, ShutdownHandle, +}; use tokio::sync::mpsc; use tokio::time::Instant; @@ -171,7 +173,11 @@ async fn spawn_poll_local_states( impl Service for LocalProjectSourceService { type Interface = LocalProjectSource; - fn spawn_handler(mut self, mut rx: Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler(mut self, mut rx: Receiver, _shutdown: ShutdownHandle) { // Use a channel with size 1. If the channel is full because the consumer does not // collect the result, the producer will block, which is acceptable. let (state_tx, mut state_rx) = mpsc::channel(1); diff --git a/relay-server/src/services/projects/source/mod.rs b/relay-server/src/services/projects/source/mod.rs index 5b208ef3ae..c5c5133626 100644 --- a/relay-server/src/services/projects/source/mod.rs +++ b/relay-server/src/services/projects/source/mod.rs @@ -47,8 +47,8 @@ impl ProjectSource { upstream_relay: Addr, _redis: Option, ) -> Self { - let local_source = LocalProjectSourceService::new(config.clone()).start(); - let upstream_source = + let (_, local_source) = LocalProjectSourceService::new(config.clone()).start(); + let (_, upstream_source) = UpstreamProjectSourceService::new(config.clone(), upstream_relay).start(); #[cfg(feature = "processing")] diff --git a/relay-server/src/services/projects/source/upstream.rs b/relay-server/src/services/projects/source/upstream.rs index bcbfd5a279..67d4070edc 100644 --- a/relay-server/src/services/projects/source/upstream.rs +++ b/relay-server/src/services/projects/source/upstream.rs @@ -13,6 +13,7 @@ use relay_dynamic_config::ErrorBoundary; use relay_statsd::metric; use relay_system::{ Addr, BroadcastChannel, BroadcastResponse, BroadcastSender, FromMessage, Interface, Service, + ShutdownHandle, }; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; @@ -640,7 +641,15 @@ impl UpstreamProjectSourceService { impl Service for UpstreamProjectSourceService { type Interface = UpstreamProjectSource; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { tokio::spawn(async move { relay_log::info!("project upstream cache started"); loop { @@ -693,7 +702,8 @@ mod tests { }}; } - let service = UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start(); + let (_, service) = + UpstreamProjectSourceService::new(Arc::clone(&config), upstream_addr).start(); let mut response1 = service.send(FetchProjectState { project_key, diff --git a/relay-server/src/services/relays.rs b/relay-server/src/services/relays.rs index 922b1c7dda..c491cae9ad 100644 --- a/relay-server/src/services/relays.rs +++ b/relay-server/src/services/relays.rs @@ -7,6 +7,7 @@ use relay_auth::{PublicKey, RelayId}; use relay_config::{Config, RelayInfo}; use relay_system::{ Addr, BroadcastChannel, BroadcastResponse, BroadcastSender, FromMessage, Interface, Service, + ShutdownHandle, }; use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; @@ -334,7 +335,15 @@ impl RelayCacheService { impl Service for RelayCacheService { type Interface = RelayCache; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { tokio::spawn(async move { relay_log::info!("key cache started"); diff --git a/relay-server/src/services/server.rs b/relay-server/src/services/server.rs index d52fa76148..1ac3c1ff06 100644 --- a/relay-server/src/services/server.rs +++ b/relay-server/src/services/server.rs @@ -10,7 +10,7 @@ use axum_server::accept::Accept; use axum_server::Handle; use hyper_util::rt::TokioTimer; use relay_config::Config; -use relay_system::{Controller, Service, Shutdown}; +use relay_system::{Controller, Service, Shutdown, ShutdownHandle}; use socket2::TcpKeepalive; use tokio::net::{TcpSocket, TcpStream}; use tower::ServiceBuilder; @@ -227,7 +227,15 @@ impl HttpServer { impl Service for HttpServer { type Interface = (); - fn spawn_handler(self, _rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + self, + _rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { let Self { config, service, diff --git a/relay-server/src/services/spooler/mod.rs b/relay-server/src/services/spooler/mod.rs index 54c2530550..a57a3ad014 100644 --- a/relay-server/src/services/spooler/mod.rs +++ b/relay-server/src/services/spooler/mod.rs @@ -40,7 +40,7 @@ use hashbrown::HashSet; use relay_base_schema::project::{ParseProjectKeyError, ProjectKey}; use relay_config::Config; use relay_statsd::metric; -use relay_system::{Addr, Controller, FromMessage, Interface, Sender, Service}; +use relay_system::{Addr, FromMessage, Interface, Sender, Service, ShutdownHandle}; use smallvec::{smallvec, SmallVec}; use sqlx::migrate::MigrateError; use sqlx::sqlite::{ @@ -1272,10 +1272,16 @@ impl BufferService { impl Service for BufferService { type Interface = Buffer; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { - tokio::spawn(async move { - let mut shutdown = Controller::shutdown_handle(); + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + mut shutdown: ShutdownHandle, + ) { + tokio::spawn(async move { loop { tokio::select! { biased; @@ -1401,7 +1407,7 @@ mod tests { let service = BufferService::create(memory_checker, services(), config) .await .unwrap(); - let addr = service.start(); + let (_, addr) = service.start(); let (tx, mut rx) = mpsc::unbounded_channel(); // Test cases: @@ -1622,7 +1628,7 @@ mod tests { let buffer = BufferService::create(memory_checker, services, config) .await .unwrap(); - let addr = buffer.start(); + let (_, addr) = buffer.start(); addr.send(RestoreIndex); // Give some time to process the message tokio::time::sleep(Duration::from_millis(500)).await; @@ -1667,7 +1673,7 @@ mod tests { let buffer = BufferService::create(memory_checker, services, config) .await .unwrap(); - let addr = buffer.start(); + let (_, addr) = buffer.start(); let mut keys = HashSet::new(); for _ in 1..=300 { diff --git a/relay-server/src/services/stats.rs b/relay-server/src/services/stats.rs index a7b5adc66e..c5a2b4dc27 100644 --- a/relay-server/src/services/stats.rs +++ b/relay-server/src/services/stats.rs @@ -4,7 +4,7 @@ use relay_config::{Config, RelayMode}; #[cfg(feature = "processing")] use relay_redis::{RedisPool, RedisPools}; use relay_statsd::metric; -use relay_system::{Addr, Service}; +use relay_system::{Addr, Service, ShutdownHandle}; use tokio::time::interval; use crate::services::upstream::{IsNetworkOutage, UpstreamRelay}; @@ -136,7 +136,15 @@ impl RelayStats { impl Service for RelayStats { type Interface = (); - fn spawn_handler(self, _rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + self, + _rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { let Some(mut ticker) = self.config.metrics_periodic_interval().map(interval) else { return; }; diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 1af723ed68..8468e8c9be 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -22,7 +22,7 @@ use relay_metrics::{ }; use relay_quotas::Scoping; use relay_statsd::metric; -use relay_system::{Addr, FromMessage, Interface, NoResponse, Service}; +use relay_system::{Addr, FromMessage, Interface, NoResponse, Service, ShutdownHandle}; use serde::{Deserialize, Serialize}; use serde_json::value::RawValue; use serde_json::Deserializer; @@ -1044,7 +1044,15 @@ impl StoreService { impl Service for StoreService { type Interface = Store; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { let this = Arc::new(self); tokio::spawn(async move { diff --git a/relay-server/src/services/test_store.rs b/relay-server/src/services/test_store.rs index c61621fc0c..bc77559b91 100644 --- a/relay-server/src/services/test_store.rs +++ b/relay-server/src/services/test_store.rs @@ -3,14 +3,14 @@ use std::sync::Arc; use relay_config::{Config, RelayMode}; use relay_event_schema::protocol::EventId; -use relay_system::{AsyncResponse, FromMessage, NoResponse, Sender}; +use relay_system::{AsyncResponse, FromMessage, NoResponse, Sender, Service, ShutdownHandle}; use crate::envelope::Envelope; use crate::services::outcome::Outcome; use crate::services::processor::Processed; use crate::utils::TypedEnvelope; -/// Either a captured envelope or an error that occured during processing. +/// Either a captured envelope or an error that occurred during processing. pub type CapturedEnvelope = Result, String>; /// Inserts an envelope or failure into internal captures. @@ -26,7 +26,7 @@ pub struct Capture { impl Capture { /// Returns `true` if Relay is in capture mode. /// - /// The `Capture` message can still be sent and and will be ignored. This function is purely for + /// The `Capture` message can still be sent and will be ignored. This function is purely for /// optimization purposes. pub fn should_capture(config: &Config) -> bool { matches!(config.relay_mode(), RelayMode::Capture) @@ -131,10 +131,18 @@ impl TestStoreService { } } -impl relay_system::Service for TestStoreService { +impl Service for TestStoreService { type Interface = TestStore; - fn spawn_handler(mut self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + mut self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { tokio::spawn(async move { while let Some(message) = rx.recv().await { self.handle_message(message); diff --git a/relay-server/src/services/upstream.rs b/relay-server/src/services/upstream.rs index e44861a122..29a6dce048 100644 --- a/relay-server/src/services/upstream.rs +++ b/relay-server/src/services/upstream.rs @@ -22,6 +22,7 @@ use relay_quotas::{ }; use relay_system::{ AsyncResponse, FromMessage, Interface, MessageResponse, NoResponse, Sender, Service, + ShutdownHandle, }; use reqwest::header; pub use reqwest::Method; @@ -1498,7 +1499,15 @@ impl UpstreamRelayService { impl Service for UpstreamRelayService { type Interface = UpstreamRelay; - fn spawn_handler(self, mut rx: relay_system::Receiver) { + type PublicState = (); + + fn pre_spawn(&self) -> Self::PublicState {} + + fn spawn_handler( + self, + mut rx: relay_system::Receiver, + _shutdown: ShutdownHandle, + ) { let Self { config } = self; let client = SharedClient::build(config.clone()); diff --git a/relay-system/src/controller.rs b/relay-system/src/controller.rs index 60c5c829a1..7bf0ddb692 100644 --- a/relay-system/src/controller.rs +++ b/relay-system/src/controller.rs @@ -121,7 +121,7 @@ impl ShutdownHandle { /// ### Example /// /// ``` -/// use relay_system::{Controller, Service, Shutdown, ShutdownMode}; +/// use relay_system::{Controller, Service, Shutdown, ShutdownMode, ShutdownHandle}; /// use std::time::Duration; /// /// struct MyService; @@ -129,10 +129,13 @@ impl ShutdownHandle { /// impl Service for MyService { /// type Interface = (); /// -/// fn spawn_handler(self, mut rx: relay_system::Receiver) { -/// tokio::spawn(async move { -/// let mut shutdown = Controller::shutdown_handle(); +/// type PublicState = (); /// +/// fn pre_spawn(&self) -> Self::PublicState { } +/// +/// fn spawn_handler(self, mut rx: relay_system::Receiver, mut shutdown: ShutdownHandle) { +/// use relay_system::ShutdownHandle; +/// tokio::spawn(async move { /// loop { /// tokio::select! { /// shutdown = shutdown.notified() => break, // Handle shutdown here diff --git a/relay-system/src/service.rs b/relay-system/src/service.rs index cd25c02222..b4f92c2867 100644 --- a/relay-system/src/service.rs +++ b/relay-system/src/service.rs @@ -7,14 +7,14 @@ use std::sync::Arc; use std::task::{Context, Poll}; use std::time::{Duration, Instant}; +use crate::statsd::{SystemCounters, SystemGauges}; +use crate::{Controller, ShutdownHandle}; use futures::future::Shared; use futures::FutureExt; use tokio::runtime::Runtime; use tokio::sync::{mpsc, oneshot}; use tokio::time::MissedTickBehavior; -use crate::statsd::{SystemCounters, SystemGauges}; - /// Interval for recording backlog metrics on service channels. const BACKLOG_INTERVAL: Duration = Duration::from_secs(1); @@ -899,6 +899,11 @@ pub fn channel(name: &'static str) -> (Addr, Receiver) { (addr, receiver) } +/// A state interface for [services](`Service`). +pub trait State {} + +impl State for () {} + /// An asynchronous unit responding to messages. /// /// Services receive messages conforming to some [`Interface`] through an [`Addr`] and handle them @@ -917,7 +922,7 @@ pub fn channel(name: &'static str) -> (Addr, Receiver) { /// synchronous, so that this needs to spawn at least one task internally: /// /// ```no_run -/// use relay_system::{FromMessage, Interface, NoResponse, Receiver, Service}; +/// use relay_system::{FromMessage, Interface, NoResponse, Receiver, Service, ShutdownHandle}; /// /// struct MyMessage; /// @@ -936,7 +941,11 @@ pub fn channel(name: &'static str) -> (Addr, Receiver) { /// impl Service for MyService { /// type Interface = MyMessage; /// -/// fn spawn_handler(self, mut rx: Receiver) { +/// type PublicState = (); +/// +/// fn pre_spawn(&self) -> Self::PublicState { } +/// +/// fn spawn_handler(self, mut rx: Receiver, shutdown: ShutdownHandle) { /// tokio::spawn(async move { /// while let Some(message) = rx.recv().await { /// // handle the message @@ -1002,21 +1011,47 @@ pub trait Service: Sized { /// can be handled by this service. type Interface: Interface; + /// The public state of this service. + /// + /// The state has to be generated in the `pre_spawn` method given the `self` reference. The goal + /// of having shared state is to avoid querying for state via messages with responses which can + /// be too slow for some use cases. + type PublicState: State; + + /// Called before the task is spawned. + /// + /// This method is meant to run all initialization code that is necessary for generating the + /// [`Self::PublicState`]. + fn pre_spawn(&self) -> Self::PublicState; + /// Spawns a task to handle service messages. /// /// Receives an inbound channel for all messages sent through the service's [`Addr`]. Note /// that this function is synchronous, so that this needs to spawn a task internally. - fn spawn_handler(self, rx: Receiver); + fn spawn_handler(self, rx: Receiver, shutdown: ShutdownHandle); + + /// Starts this service with the provided [`Receiver`]. + /// + /// This method should be used only when the `start` method can't be used because of some + /// cyclic dependencies between services. + fn start_with_receiver(self, rx: Receiver) -> Self::PublicState { + let shutdown = Controller::shutdown_handle(); + let public_state = self.pre_spawn(); + self.spawn_handler(rx, shutdown); + + public_state + } /// Starts the service in the current runtime and returns an address for it. - fn start(self) -> Addr { + fn start(self) -> (Self::PublicState, Addr) { let (addr, rx) = channel(Self::name()); - self.spawn_handler(rx); - addr + let public_state = self.start_with_receiver(rx); + + (public_state, addr) } /// Starts the service in the given runtime and returns an address for it. - fn start_in(self, runtime: &Runtime) -> Addr { + fn start_in(self, runtime: &Runtime) -> (Self::PublicState, Addr) { let _guard = runtime.enter(); self.start() } @@ -1028,11 +1063,17 @@ pub trait Service: Sized { fn name() -> &'static str { std::any::type_name::() } + + /// Generates the [`Addr`] and [`Receiver`] channels that are used for communicating + fn channel() -> (Addr, Receiver) { + channel(Self::name()) + } } #[cfg(test)] mod tests { use super::*; + use tokio::sync::RwLock; struct MockMessage; @@ -1046,14 +1087,36 @@ mod tests { } } - struct MockService; + struct MockService { + state: MockPublicState, + } + + impl MockService { + fn new() -> Self { + Self { + state: MockPublicState::default(), + } + } + } + + #[derive(Clone, Default)] + struct MockPublicState(Arc>); + + impl State for MockPublicState {} impl Service for MockService { type Interface = MockMessage; - fn spawn_handler(self, mut rx: Receiver) { + type PublicState = MockPublicState; + + fn pre_spawn(&self) -> Self::PublicState { + self.state.clone() + } + + fn spawn_handler(self, mut rx: Receiver, _shutdown: ShutdownHandle) { tokio::spawn(async move { while rx.recv().await.is_some() { + *self.state.0.write().await += 1; tokio::time::sleep(BACKLOG_INTERVAL * 2).await; } }); @@ -1082,7 +1145,7 @@ mod tests { tokio::time::pause(); // Mock service takes 2 * BACKLOG_INTERVAL for every message - let addr = MockService.start(); + let (state, addr) = MockService::new().start(); // Advance the timer by a tiny offset to trigger the first metric emission. let captures = relay_statsd::with_capturing_test_client(|| { @@ -1123,5 +1186,11 @@ mod tests { "service.back_pressure:0|g|#service:mock", // 6 * INTERVAL ] ); + + // We assert that the state changed. The reason for why we don't want to assert a specific + // number is that we do not have any synchronization points in this test. Technically also + // this assertion could be flaky, but it's highly unlikely no increment during the test's + // execution. + assert_ne!(*state.0.blocking_read(), 0); } }