Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(service): Add notion of State #4188

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,18 +307,16 @@ fn queue_envelope(
envelope.scope(scoping);

match state.envelope_buffer() {
Some(buffer) => {
if !buffer.has_capacity() {
Some((buffer_state, buffer)) => {
if !buffer_state.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
}

// NOTE: This assumes that a `prefetch` has already been scheduled for both the
// envelope's projects. See `handle_check_envelope`.
relay_log::trace!("Pushing envelope to V2 buffer");

buffer
.addr()
.send(EnvelopeBuffer::Push(envelope.into_envelope()));
buffer.send(EnvelopeBuffer::Push(envelope.into_envelope()));
}
None => {
relay_log::trace!("Sending envelope to project cache for V1 buffer");
Expand Down
50 changes: 25 additions & 25 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use std::time::Duration;

use crate::metrics::{MetricOutcomes, MetricStats};
use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer};
use crate::services::buffer::{self, EnvelopeBuffer, EnvelopeBufferService, EnvelopeBufferState};
use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
use crate::services::health_check::{HealthCheck, HealthCheckService};
Expand All @@ -31,7 +31,7 @@ use relay_redis::redis::Script;
#[cfg(feature = "processing")]
use relay_redis::{PooledClient, RedisScripts};
use relay_redis::{RedisError, RedisPool, RedisPools};
use relay_system::{channel, Addr, Service};
use relay_system::{Addr, Service};
use tokio::runtime::Runtime;
use tokio::sync::mpsc;

Expand Down Expand Up @@ -64,7 +64,7 @@ pub struct Registry {
pub global_config: Addr<GlobalConfigManager>,
pub project_cache: Addr<ProjectCache>,
pub upstream_relay: Addr<UpstreamRelay>,
pub envelope_buffer: Option<ObservableEnvelopeBuffer>,
pub envelope_buffer: Option<(EnvelopeBufferState, Addr<EnvelopeBuffer>)>,
}

impl fmt::Debug for Registry {
Expand Down Expand Up @@ -156,8 +156,8 @@ pub struct ServiceState {
impl ServiceState {
/// Starts all services and returns addresses to all of them.
pub fn start(config: Arc<Config>) -> Result<Self> {
let upstream_relay = UpstreamRelayService::new(config.clone()).start();
let test_store = TestStoreService::new(config.clone()).start();
let (_, upstream_relay) = UpstreamRelayService::new(config.clone()).start();
let (_, test_store) = TestStoreService::new(config.clone()).start();

let redis_pools = config
.redis()
Expand All @@ -181,32 +181,31 @@ impl ServiceState {

// Create an address for the `EnvelopeProcessor`, which can be injected into the
// other services.
let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
let outcome_producer = OutcomeProducerService::create(
let (processor, processor_rx) = EnvelopeProcessorService::channel();
let (_, outcome_producer) = OutcomeProducerService::create(
config.clone(),
upstream_relay.clone(),
processor.clone(),
)?
.start();
let outcome_aggregator = OutcomeAggregator::new(&config, outcome_producer.clone()).start();
let (_, outcome_aggregator) =
OutcomeAggregator::new(&config, outcome_producer.clone()).start();

let (global_config, global_config_rx) =
GlobalConfigService::new(config.clone(), upstream_relay.clone());
let global_config_handle = global_config.handle();
// The global config service must start before dependant services are
// started. Messages like subscription requests to the global config
// service fail if the service is not running.
let global_config = global_config.start();
let (global_config_handle, global_config) = global_config.start();

let (project_cache, project_cache_rx) = channel(ProjectCacheService::name());
let (project_cache, project_cache_rx) = ProjectCacheService::channel();

let aggregator = RouterService::new(
let (aggregator_handle, aggregator) = RouterService::new(
config.default_aggregator_config().clone(),
config.secondary_aggregator_configs().clone(),
Some(project_cache.clone().recipient()),
);
let aggregator_handle = aggregator.handle();
let aggregator = aggregator.start();
)
.start();

let metric_stats = MetricStats::new(
config.clone(),
Expand All @@ -227,12 +226,12 @@ impl ServiceState {
outcome_aggregator.clone(),
metric_outcomes.clone(),
)
.map(|s| s.start())
.map(|s| s.start().1)
})
.transpose()?;

let cogs = CogsService::new(&config);
let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start()));
let cogs = Cogs::new(CogsServiceRecorder::new(&config, cogs.start().1));

EnvelopeProcessorService::new(
create_processor_pool(&config)?,
Expand All @@ -252,7 +251,7 @@ impl ServiceState {
},
metric_outcomes.clone(),
)
.spawn_handler(processor_rx);
.start_with_receiver(processor_rx);

let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes());
let envelope_buffer = EnvelopeBufferService::new(
Expand All @@ -266,11 +265,11 @@ impl ServiceState {
test_store: test_store.clone(),
},
)
.map(|b| b.start_observable());
.map(|e| e.start());

// Keep all the services in one context.
let project_cache_services = Services {
envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr),
envelope_buffer: envelope_buffer.as_ref().map(|(_, a)| a.clone()),
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
Expand All @@ -289,14 +288,14 @@ impl ServiceState {
.as_ref()
.map(|pools| pools.project_configs.clone()),
)
.spawn_handler(project_cache_rx);
.start_with_receiver(project_cache_rx);

let health_check = HealthCheckService::new(
let (_, health_check) = HealthCheckService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
aggregator_handle,
upstream_relay.clone(),
envelope_buffer.clone(),
envelope_buffer.as_ref().map(|(s, _)| s.clone()),
)
.start();

Expand All @@ -308,7 +307,8 @@ impl ServiceState {
)
.start();

let relay_cache = RelayCacheService::new(config.clone(), upstream_relay.clone()).start();
let (_, relay_cache) =
RelayCacheService::new(config.clone(), upstream_relay.clone()).start();

let registry = Registry {
aggregator,
Expand Down Expand Up @@ -348,7 +348,7 @@ impl ServiceState {
}

/// Returns the V2 envelope buffer, if present.
pub fn envelope_buffer(&self) -> Option<&ObservableEnvelopeBuffer> {
pub fn envelope_buffer(&self) -> Option<&(EnvelopeBufferState, Addr<EnvelopeBuffer>)> {
self.inner.registry.envelope_buffer.as_ref()
}

Expand Down
63 changes: 27 additions & 36 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use std::time::Duration;

use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Receiver, Service};
use relay_system::{Controller, Shutdown};
use relay_system::Shutdown;
use relay_system::{
Addr, FromMessage, Interface, NoResponse, Receiver, Service, ShutdownHandle, State,
};
use tokio::sync::mpsc::Permit;
use tokio::sync::{mpsc, watch};
use tokio::time::{timeout, Instant};
Expand Down Expand Up @@ -70,30 +72,22 @@ impl FromMessage<Self> for EnvelopeBuffer {
}
}

/// Contains the services [`Addr`] and a watch channel to observe its state.
///
/// This allows outside observers to check the capacity without having to send a message.
///
// NOTE: This pattern of combining an Addr with some observable state could be generalized into
// `Service` itself.
/// Public state of the [`EnvelopeBufferService`] which allows to check if the service has capacity
/// to accept new [`Envelope`]s.
#[derive(Debug, Clone)]
pub struct ObservableEnvelopeBuffer {
addr: Addr<EnvelopeBuffer>,
pub struct EnvelopeBufferState {
has_capacity: Arc<AtomicBool>,
}

impl ObservableEnvelopeBuffer {
/// Returns the address of the buffer service.
pub fn addr(&self) -> Addr<EnvelopeBuffer> {
self.addr.clone()
}

impl EnvelopeBufferState {
/// Returns `true` if the buffer has the capacity to accept more elements.
pub fn has_capacity(&self) -> bool {
self.has_capacity.load(Ordering::Relaxed)
}
}

impl State for EnvelopeBufferState {}

/// Services that the buffer service communicates with.
#[derive(Clone)]
pub struct Services {
Expand Down Expand Up @@ -143,15 +137,6 @@ impl EnvelopeBufferService {
})
}

/// Returns both the [`Addr`] to this service, and a reference to the capacity flag.
pub fn start_observable(self) -> ObservableEnvelopeBuffer {
let has_capacity = self.has_capacity.clone();
ObservableEnvelopeBuffer {
addr: self.start(),
has_capacity,
}
}

/// Wait for the configured amount of time and make sure the project cache is ready to receive.
async fn ready_to_pop(
&mut self,
Expand Down Expand Up @@ -373,7 +358,15 @@ impl EnvelopeBufferService {
impl Service for EnvelopeBufferService {
type Interface = EnvelopeBuffer;

fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>) {
type PublicState = EnvelopeBufferState;

fn pre_spawn(&self) -> Self::PublicState {
EnvelopeBufferState {
has_capacity: self.has_capacity.clone(),
}
}

fn spawn_handler(mut self, mut rx: Receiver<Self::Interface>, mut shutdown: ShutdownHandle) {
let config = self.config.clone();
let memory_checker = MemoryChecker::new(self.memory_stat.clone(), config.clone());
let mut global_config_rx = self.global_config_rx.clone();
Expand All @@ -397,8 +390,6 @@ impl Service for EnvelopeBufferService {
};
buffer.initialize().await;

let mut shutdown = Controller::shutdown_handle();

relay_log::info!("EnvelopeBufferService: starting");
loop {
let used_capacity = self.services.envelopes_tx.max_capacity()
Expand Down Expand Up @@ -543,15 +534,15 @@ mod tests {

service.has_capacity.store(false, Ordering::Relaxed);

let ObservableEnvelopeBuffer { addr, has_capacity } = service.start_observable();
assert!(!has_capacity.load(Ordering::Relaxed));
let (state, addr) = service.start();
assert!(!state.has_capacity.load(Ordering::Relaxed));

let some_project_key = ProjectKey::parse("a94ae32be2584e0bbd7a4cbb95971fee").unwrap();
addr.send(EnvelopeBuffer::Ready(some_project_key));

tokio::time::advance(Duration::from_millis(100)).await;

assert!(has_capacity.load(Ordering::Relaxed));
assert!(state.has_capacity.load(Ordering::Relaxed));
}

#[tokio::test]
Expand All @@ -566,7 +557,7 @@ mod tests {
outcome_aggregator_rx: _outcome_aggregator_rx,
} = envelope_buffer_service(None, global_config::Status::Pending);

let addr = service.start();
let (_, addr) = service.start();

let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
Expand Down Expand Up @@ -613,7 +604,7 @@ mod tests {
global_config::Status::Ready(Arc::new(GlobalConfig::default())),
);

let addr = service.start();
let (_, addr) = service.start();

let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
Expand Down Expand Up @@ -649,7 +640,7 @@ mod tests {
);

let config = service.config.clone();
let addr = service.start();
let (_, addr) = service.start();

let mut envelope = new_envelope(false, "foo");
envelope
Expand Down Expand Up @@ -682,7 +673,7 @@ mod tests {
global_config::Status::Ready(Arc::new(GlobalConfig::default())),
);

let addr = service.start();
let (_, addr) = service.start();

let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
Expand Down Expand Up @@ -730,7 +721,7 @@ mod tests {
global_config::Status::Ready(Arc::new(GlobalConfig::default())),
);

let addr = service.start();
let (_, addr) = service.start();

let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
Expand Down
12 changes: 10 additions & 2 deletions relay-server/src/services/cogs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, Ordering};

use relay_cogs::{CogsMeasurement, CogsRecorder, ResourceId};
use relay_config::Config;
use relay_system::{Addr, FromMessage, Interface, Service};
use relay_system::{Addr, FromMessage, Interface, Service, ShutdownHandle};

use crate::statsd::RelayCounters;

Expand Down Expand Up @@ -54,7 +54,15 @@ impl CogsService {
impl Service for CogsService {
type Interface = CogsReport;

fn spawn_handler(mut self, mut rx: relay_system::Receiver<Self::Interface>) {
type PublicState = ();

fn pre_spawn(&self) -> Self::PublicState {}

fn spawn_handler(
mut self,
mut rx: relay_system::Receiver<Self::Interface>,
_shutdown: ShutdownHandle,
) {
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
self.handle_report(message);
Expand Down
Loading
Loading