From df13cc736d02df9128d61b8105e6983f54d6144c Mon Sep 17 00:00:00 2001 From: Benjamin Naecker Date: Thu, 21 Nov 2024 17:05:27 -0800 Subject: [PATCH] Spawn a task in `oximeter` to actually do the collection (#7097) `oximeter` currently starts up a task for each producer. When their collection interval expires, that task directly makes an HTTP request to the producer to collect its data, _in-line_ in the same task. This can cause problems, especially if there are concurrent updates to the producer's address information. The main `oximeter` HTTP server will send a message to that task in that case, asking it to update its producer information, but that task might be off in lalaland making an HTTP request. This is particularly bad since that HTTP request may never complete -- the producer's address might be updated, so the old one is defunct, possibly forever! This commit addresses this by spawning a new task to actually run the collection itself. This keeps the first-level task alive and responsive, such that it can abort any previously-spawned collections if there are updates. We _also_ replace any existing task if a collection was explicitly requested, but we ignore existing collections (rather than spawning a new one) if the timer expires while one is already running. This resolves test flakes described in #6901. --- nexus/tests/integration_tests/disks.rs | 15 +- nexus/tests/integration_tests/instances.rs | 6 +- nexus/tests/integration_tests/metrics.rs | 36 +- oximeter/collector/src/agent.rs | 516 ++++++++++++++++++--- oximeter/collector/src/lib.rs | 10 +- oximeter/collector/src/self_stats.rs | 13 + 6 files changed, 518 insertions(+), 78 deletions(-) diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index 939fd61249..8bfe665c17 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -1827,7 +1827,10 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { .await; assert!(measurements.items.is_empty()); - oximeter.force_collect().await; + oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); assert_eq!( get_latest_silo_metric( cptestctx, @@ -1841,7 +1844,10 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { // Create an instance, attach the disk to it. create_instance_with_disk(client).await; wait_for_producer(&cptestctx.oximeter, disk.id()).await; - oximeter.force_collect().await; + oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); for metric in &ALL_METRICS { let measurements = query_for_metrics(client, &metric_url(metric)).await; @@ -1878,7 +1884,10 @@ async fn test_disk_metrics_paginated(cptestctx: &ControlPlaneTestContext) { wait_for_producer(&cptestctx.oximeter, disk.id()).await; let oximeter = &cptestctx.oximeter; - oximeter.force_collect().await; + oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); for metric in &ALL_METRICS { let collection_url = format!( "/v1/disks/{}/metrics/{}?project={}", diff --git a/nexus/tests/integration_tests/instances.rs b/nexus/tests/integration_tests/instances.rs index be124e14b8..2949ea4560 100644 --- a/nexus/tests/integration_tests/instances.rs +++ b/nexus/tests/integration_tests/instances.rs @@ -1620,7 +1620,11 @@ async fn assert_metrics( cpus: i64, ram: i64, ) { - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); for id in &[None, Some(project_id)] { assert_eq!( diff --git a/nexus/tests/integration_tests/metrics.rs b/nexus/tests/integration_tests/metrics.rs index 6281326216..4bac62f9d8 100644 --- a/nexus/tests/integration_tests/metrics.rs +++ b/nexus/tests/integration_tests/metrics.rs @@ -107,7 +107,11 @@ async fn assert_system_metrics( cpus: i64, ram: i64, ) { - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); assert_eq!( get_latest_system_metric( cptestctx, @@ -134,7 +138,11 @@ async fn assert_silo_metrics( cpus: i64, ram: i64, ) { - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); assert_eq!( get_latest_silo_metric( cptestctx, @@ -270,7 +278,11 @@ async fn test_timeseries_schema_list( // Nexus's HTTP latency distribution. This is defined in Nexus itself, and // should always exist after we've registered as a producer and start // producing data. Force a collection to ensure that happens. - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); let client = &cptestctx.external_client; let url = "/v1/system/timeseries/schemas"; let schema = @@ -289,7 +301,11 @@ pub async fn timeseries_query( query: impl ToString, ) -> Vec { // first, make sure the latest timeseries have been collected. - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); // okay, do the query let body = nexus_types::external_api::params::TimeseriesQuery { @@ -365,7 +381,11 @@ async fn test_instance_watcher_metrics( .await; // Make sure that the latest metrics have been collected. - oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); }; #[track_caller] @@ -688,7 +708,11 @@ async fn test_mgs_metrics( query: &str, expected: &HashMap, ) -> anyhow::Result<()> { - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); let table = timeseries_query(&cptestctx, &query) .await .into_iter() diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index 8e14428d33..8572f7f508 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -40,14 +40,43 @@ use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::time::Duration; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tokio::sync::oneshot; +use tokio::sync::watch; use tokio::sync::Mutex; use tokio::sync::MutexGuard; use tokio::task::JoinHandle; use tokio::time::interval; use uuid::Uuid; -type CollectionToken = oneshot::Sender<()>; +/// A token used to force a collection. +/// +/// If the collection is successfully completed, `Ok(())` will be sent back on the +/// contained oneshot channel. Note that that "successful" means the actual +/// request completed, _not_ that results were successfully collected. I.e., it +/// means "this attempt is done". +/// +/// If the collection could not be queued because there are too many outstanding +/// force collection attempts, an `Err(ForcedCollectionQueueFull)` is returned. +type CollectionToken = oneshot::Sender>; + +/// Error returned when a forced collection fails. +#[derive(Clone, Copy, Debug)] +pub enum ForcedCollectionError { + /// The internal queue of requests is full. + QueueFull, + /// We failed to send the request because the channel was closed. + Closed, +} + +/// Timeout on any single collection from a producer. +const COLLECTION_TIMEOUT: Duration = Duration::from_secs(30); + +/// The number of forced collections queued before we start to deny them. +const N_QUEUED_FORCED_COLLECTIONS: usize = 1; + +/// The number of timer-based collections queued before we start to deny them. +const N_QUEUED_TIMER_COLLECTIONS: usize = 1; // Messages for controlling a collection task #[derive(Debug)] @@ -69,19 +98,18 @@ enum CollectionMessage { }, } +/// Run a single collection from the producer. async fn perform_collection( - log: &Logger, - self_target: &mut self_stats::CollectionTaskStats, - client: &reqwest::Client, - producer: &ProducerEndpoint, - outbox: &mpsc::Sender<(Option, ProducerResults)>, - token: Option, -) { + log: Logger, + client: reqwest::Client, + producer: ProducerEndpoint, +) -> Result { debug!(log, "collecting from producer"); let res = client - .get(format!("http://{}/{}", producer.address, producer.id,)) + .get(format!("http://{}/{}", producer.address, producer.id)) .send() .await; + trace!(log, "sent collection request to producer"); match res { Ok(res) => { if res.status().is_success() { @@ -92,8 +120,7 @@ async fn perform_collection( "collected results from producer"; "n_results" => results.len() ); - self_target.collections.datum.increment(); - outbox.send((token, results)).await.unwrap(); + Ok(results) } Err(e) => { warn!( @@ -101,12 +128,7 @@ async fn perform_collection( "failed to collect results from producer"; "error" => ?e, ); - self_target - .failures_for_reason( - self_stats::FailureReason::Deserialization, - ) - .datum - .increment() + Err(self_stats::FailureReason::Deserialization) } } } else { @@ -115,12 +137,7 @@ async fn perform_collection( "failed to receive metric results from producer"; "status_code" => res.status().as_u16(), ); - self_target - .failures_for_reason(self_stats::FailureReason::Other( - res.status(), - )) - .datum - .increment() + Err(self_stats::FailureReason::Other(res.status())) } } Err(e) => { @@ -129,10 +146,128 @@ async fn perform_collection( "failed to send collection request to producer"; "error" => ?e ); - self_target - .failures_for_reason(self_stats::FailureReason::Unreachable) - .datum - .increment() + Err(self_stats::FailureReason::Unreachable) + } + } +} + +// The type of one collection task run to completion. +// +// An `Err(_)` means we failed to collect, and contains the reason so that we +// can bump the self-stat counter accordingly. +type CollectionResult = Result; + +// The type of one response message sent from the collection task. +type CollectionResponse = (Option, CollectionResult); + +/// Task that actually performs collections from the producer. +async fn inner_collection_loop( + log: Logger, + mut producer_info_rx: watch::Receiver, + mut forced_collection_rx: mpsc::Receiver, + mut timer_collection_rx: mpsc::Receiver<()>, + result_tx: mpsc::Sender, +) { + let client = reqwest::Client::builder() + .timeout(COLLECTION_TIMEOUT) + .build() + // Safety: `build()` only fails if TLS couldn't be initialized or the + // system DNS configuration could not be loaded. + .unwrap(); + loop { + // Wait for notification that we have a collection to perform, from + // either the forced- or timer-collection queue. + trace!(log, "top of inner collection loop, waiting for next request",); + let maybe_token = tokio::select! { + maybe_request = forced_collection_rx.recv() => { + let Some(request) = maybe_request else { + debug!( + log, + "forced collection request queue closed, exiting" + ); + return; + }; + Some(request) + } + maybe_request = timer_collection_rx.recv() => { + if maybe_request.is_none() { + debug!( + log, + "timer collection request queue closed, exiting" + ); + return; + }; + None + } + }; + + // Make a future to represent the actual collection. + let mut collection_fut = Box::pin(perform_collection( + log.clone(), + client.clone(), + *producer_info_rx.borrow_and_update(), + )); + + // Wait for that collection to complete or fail, or for an update to the + // producer's information. In the latter case, recreate the future for + // the collection itself with the new producer information. + let collection_result = 'collection: loop { + tokio::select! { + biased; + + maybe_update = producer_info_rx.changed() => { + match maybe_update { + Ok(_) => { + let update = *producer_info_rx.borrow_and_update(); + debug!( + log, + "received producer info update with an outstanding \ + collection running, cancelling it and recreating \ + with the new info"; + "new_info" => ?&update, + ); + collection_fut = Box::pin(perform_collection( + log.new(o!("address" => update.address)), + client.clone(), + update, + )); + continue 'collection; + } + Err(e) => { + error!( + log, + "failed to receive on producer update \ + watch channel, exiting"; + "error" => ?e, + ); + return; + } + } + } + + collection_result = &mut collection_fut => { + // NOTE: This break here is intentional. We cannot just call + // `result_tx.send()` in this loop, because that moves out + // of `maybe_token`, which isn't Copy. Break the loop, and + // then send it after we know we've completed the + // collection. + break 'collection collection_result; + } + } + }; + + // Now that the collection has completed, send on the results, along + // with any collection token we may have gotten with the request. + match result_tx.send((maybe_token, collection_result)).await { + Ok(_) => trace!(log, "forwarded results to main collection loop"), + Err(_) => { + error!( + log, + "failed to forward results to \ + collection loop, channel is closed, exiting", + ); + return; + } } } } @@ -143,15 +278,13 @@ async fn perform_collection( // endlessly, and collects metrics from the assigned producer on a timeout. The assigned agent can // also send a `CollectionMessage`, for example to update the collection interval. This is not // currently used, but will likely be exposed via control plane interfaces in the future. -async fn collection_task( - orig_log: Logger, +async fn collection_loop( + log: Logger, collector: self_stats::OximeterCollector, - mut producer: ProducerEndpoint, + producer: ProducerEndpoint, mut inbox: mpsc::Receiver, outbox: mpsc::Sender<(Option, ProducerResults)>, ) { - let mut log = orig_log.new(o!("address" => producer.address)); - let client = reqwest::Client::new(); let mut collection_timer = interval(producer.interval); debug!( log, @@ -164,21 +297,91 @@ async fn collection_task( let mut self_collection_timer = interval(self_stats::COLLECTION_INTERVAL); self_collection_timer.tick().await; + // Spawn a task to run the actual collections. + // + // This is so that we can possibly interrupt and restart collections that + // are in-progress when we get an update to the producer's information. In + // that case, the collection is likely doomed, since the producer has moved + // and won't be available at the address the collection started with. This + // lets us restart that collection with the new information. + let (producer_info_tx, producer_info_rx) = watch::channel(producer); + let (forced_collection_tx, forced_collection_rx) = + mpsc::channel(N_QUEUED_FORCED_COLLECTIONS); + let (timer_collection_tx, timer_collection_rx) = + mpsc::channel(N_QUEUED_TIMER_COLLECTIONS); + let (result_tx, mut result_rx) = mpsc::channel(1); + tokio::task::spawn(inner_collection_loop( + log.clone(), + producer_info_rx, + forced_collection_rx, + timer_collection_rx, + result_tx, + )); + loop { tokio::select! { message = inbox.recv() => { match message { None => { - debug!(log, "collection task inbox closed, shutting down"); + debug!( + log, + "collection task inbox closed, shutting down" + ); return; } Some(CollectionMessage::Shutdown) => { - debug!(log, "collection task received shutdown request"); + debug!( + log, + "collection task received shutdown request" + ); return; }, Some(CollectionMessage::Collect(token)) => { - debug!(log, "collection task received explicit request to collect"); - perform_collection(&log, &mut stats, &client, &producer, &outbox, Some(token)).await; + debug!( + log, + "collection task received explicit request to collect" + ); + match forced_collection_tx.try_send(token) { + Ok(_) => trace!( + log, "forwarded explicit request to collection task" + ), + Err(e) => { + match e { + TrySendError::Closed(tok) => { + debug!( + log, + "collection task forced collection \ + queue is closed. Attempting to \ + notify caller and exiting.", + ); + let _ = tok.send(Err(ForcedCollectionError::Closed)); + return; + } + TrySendError::Full(tok) => { + error!( + log, + "collection task forced collection \ + queue is full! This should never \ + happen, and probably indicates \ + a bug in your test code, such as \ + calling `force_collection()` many \ + times" + ); + if tok + .send(Err(ForcedCollectionError::QueueFull)) + .is_err() + { + warn!( + log, + "failed to notify caller of \ + force_collection(), oneshot is \ + closed" + ); + } + } + } + } + } }, Some(CollectionMessage::Update(new_info)) => { // If the collection interval is shorter than the @@ -187,21 +390,37 @@ async fn collection_task( // do the update if the information has changed. This // should also be guarded against by the main agent, but // we're being cautious here. - if producer == new_info { + let updated_producer_info = |info: &mut ProducerEndpoint| { + if new_info == *info { + false + } else { + *info = new_info; + true + } + }; + if !producer_info_tx.send_if_modified(updated_producer_info) { + trace!( + log, + "collection task received update with \ + identical producer information, no \ + updates will be sent to the collection task" + ); continue; } - producer = new_info; + + // We have an actual update to the producer information. + // + // Rebuild our timer to reflect the possibly-new + // interval. The collection task has already been + // notified above. debug!( log, "collection task received request to update \ its producer information"; - "interval" => ?producer.interval, - "address" => producer.address, + "interval" => ?new_info.interval, + "address" => new_info.address, ); - - // Update the logger with the new information as well. - log = orig_log.new(o!("address" => producer.address)); - collection_timer = interval(producer.interval); + collection_timer = interval(new_info.interval); collection_timer.tick().await; // completes immediately } #[cfg(test)] @@ -224,12 +443,71 @@ async fn collection_task( } } } + maybe_result = result_rx.recv() => { + let Some((maybe_token, result)) = maybe_result else { + error!( + log, + "channel for receiving results from collection task \ + is closed, exiting", + ); + return; + }; + match result { + Ok(results) => { + stats.collections.datum.increment(); + if outbox.send((maybe_token, results)).await.is_err() { + error!( + log, + "failed to send results to outbox, channel is \ + closed, exiting", + ); + return; + } + } + Err(reason) => stats.failures_for_reason(reason).datum.increment(), + } + } _ = self_collection_timer.tick() => { - debug!(log, "reporting oximeter self-collection statistics"); + debug!( + log, + "reporting oximeter self-collection statistics" + ); outbox.send((None, stats.sample())).await.unwrap(); } _ = collection_timer.tick() => { - perform_collection(&log, &mut stats, &client, &producer, &outbox, None).await; + match timer_collection_tx.try_send(()) { + Ok(_) => { + debug!( + log, + "sent timer-based collection request to \ + the collection task" + ); + } + Err(TrySendError::Closed(_)) => { + error!( + log, + "timer-based collection request queue is \ + closed, exiting" + ); + return; + } + Err(TrySendError::Full(_)) => { + error!( + log, + "timer-based collection request queue is \ + full! This may indicate that the producer \ + has a sampling interval that is too fast \ + for the amount of data it generates"; + "interval" => ?producer_info_tx.borrow().interval, + ); + stats + .failures_for_reason( + self_stats::FailureReason::CollectionsInProgress + ) + .datum + .increment() + } + } } } } @@ -364,7 +642,7 @@ async fn results_sink( } if let Some(token) = collection_token { - let _ = token.send(()); + let _ = token.send(Ok(())); } } } @@ -610,7 +888,7 @@ impl OximeterAgent { let info_clone = info; let target = self.collection_target; let task = tokio::spawn(async move { - collection_task(log, target, info_clone, rx, q).await; + collection_loop(log, target, info_clone, rx, q).await; }); value.insert((info, CollectionTask { inbox: tx, task })); } @@ -650,9 +928,16 @@ impl OximeterAgent { /// Forces a collection from all producers. /// - /// Returns once all those values have been inserted into Clickhouse, + /// Returns once all those values have been inserted into ClickHouse, /// or an error occurs trying to perform the collection. - pub async fn force_collection(&self) { + /// + /// NOTE: This collection is best effort, as the name implies. It's possible + /// that we lose track of requests internally, in cases where there are + /// many concurrent calls. Callers should strive to avoid this, since it + /// rarely makes sense to do that. + pub async fn try_force_collection( + &self, + ) -> Result<(), ForcedCollectionError> { let mut collection_oneshots = vec![]; let collection_tasks = self.collection_tasks.lock().await; for (_id, (_endpoint, task)) in collection_tasks.iter() { @@ -660,7 +945,7 @@ impl OximeterAgent { // Scrape from each producer, into oximeter... task.inbox.send(CollectionMessage::Collect(tx)).await.unwrap(); // ... and keep track of the token that indicates once the metric - // has made it into Clickhouse. + // has made it into ClickHouse. collection_oneshots.push(rx); } drop(collection_tasks); @@ -670,7 +955,20 @@ impl OximeterAgent { // // NOTE: This can either mean that the collection completed // successfully, or an error occurred in the collection pathway. - futures::future::join_all(collection_oneshots).await; + // + // We use `join_all` to ensure that all futures are run, rather than + // bailing on the first error. We extract the first error we received, + // or map an actual `RecvError` to `Closed`, since it does really mean + // the other side hung up without sending. + let results = futures::future::join_all(collection_oneshots).await; + for result in results.into_iter() { + match result { + Ok(Ok(_)) => {} + Ok(Err(e)) => return Err(e), + Err(_) => return Err(ForcedCollectionError::Closed), + } + } + Ok(()) } /// List existing producers. @@ -897,11 +1195,20 @@ mod tests { use super::OximeterAgent; use super::ProducerEndpoint; use crate::self_stats::FailureReason; + use dropshot::HttpError; + use dropshot::HttpResponseOk; + use dropshot::Path; + use dropshot::RequestContext; + use dropshot::ServerBuilder; use omicron_common::api::internal::nexus::ProducerKind; use omicron_test_utils::dev::test_setup_log; + use oximeter::types::ProducerResults; use std::net::Ipv6Addr; use std::net::SocketAddr; use std::net::SocketAddrV6; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use std::sync::Arc; use std::time::Duration; use tokio::sync::oneshot; use tokio::time::Instant; @@ -925,6 +1232,63 @@ mod tests { COLLECTION_INTERVAL.as_millis() as u64 * N_COLLECTIONS, ); + #[derive( + Clone, + Copy, + Debug, + schemars::JsonSchema, + serde::Deserialize, + serde::Serialize, + )] + struct IdPath { + id: Uuid, + } + + /// Simplified API for a producer, implemented for tests below. + #[dropshot::api_description] + trait ProducerApi { + type Context; + + #[endpoint { + method = GET, + path = "/{id}", + }] + async fn collect( + request_context: RequestContext, + path: Path, + ) -> Result, HttpError>; + } + + /// A producer that always responds successfully with no samples. + struct EmptyProducer; + + impl ProducerApi for EmptyProducer { + type Context = Arc; + + async fn collect( + request_context: RequestContext, + _: Path, + ) -> Result, HttpError> { + request_context.context().fetch_add(1, Ordering::SeqCst); + Ok(HttpResponseOk(vec![])) + } + } + + /// A producer that always responds with a 500. + struct DedProducer; + + impl ProducerApi for DedProducer { + type Context = Arc; + + async fn collect( + request_context: RequestContext, + _: Path, + ) -> Result, HttpError> { + request_context.context().fetch_add(1, Ordering::SeqCst); + Err(HttpError::for_internal_error(String::from("i'm ded"))) + } + } + // Test that we count successful collections from a target correctly. #[tokio::test] async fn test_self_stat_collection_count() { @@ -943,17 +1307,21 @@ mod tests { .unwrap(); // Spawn the mock server that always reports empty statistics. - let server = httpmock::MockServer::start(); - let mock_ok = server.mock(|when, then| { - when.any_request(); - then.status(reqwest::StatusCode::OK).body("[]"); - }); + let collection_count = Arc::new(AtomicUsize::new(0)); + let server = ServerBuilder::new( + producer_api_mod::api_description::().unwrap(), + collection_count.clone(), + log.new(slog::o!("component" => "dropshot")), + ) + .config(Default::default()) + .start() + .expect("failed to spawn empty dropshot server"); // Register the dummy producer. let endpoint = ProducerEndpoint { id: Uuid::new_v4(), kind: ProducerKind::Service, - address: *server.address(), + address: server.local_addr(), interval: COLLECTION_INTERVAL, }; collector @@ -991,7 +1359,13 @@ mod tests { let count = stats.collections.datum.value() as usize; assert!(count != 0); - mock_ok.assert_calls(count); + assert_eq!( + count, + collection_count.load(Ordering::SeqCst), + "number of collections reported by the collection \ + task differs from the number reported by the empty \ + producer server itself" + ); assert!(stats.failed_collections.is_empty()); logctx.cleanup_successful(); } @@ -1083,17 +1457,21 @@ mod tests { .unwrap(); // Spawn the mock server that always responds with a server error - let server = httpmock::MockServer::start(); - let mock_fail = server.mock(|when, then| { - when.any_request(); - then.status(500).body("im ded"); - }); + let collection_count = Arc::new(AtomicUsize::new(0)); + let server = ServerBuilder::new( + producer_api_mod::api_description::().unwrap(), + collection_count.clone(), + log.new(slog::o!("component" => "dropshot")), + ) + .config(Default::default()) + .start() + .expect("failed to spawn empty dropshot server"); // Register the rather flaky producer. let endpoint = ProducerEndpoint { id: Uuid::new_v4(), kind: ProducerKind::Service, - address: *server.address(), + address: server.local_addr(), interval: COLLECTION_INTERVAL, }; collector @@ -1138,7 +1516,13 @@ mod tests { assert_eq!(stats.collections.datum.value(), 0); assert!(count != 0); - mock_fail.assert_calls(count); + assert_eq!( + count, + collection_count.load(Ordering::SeqCst), + "number of collections reported by the collection \ + task differs from the number reported by the always-ded \ + producer server itself" + ); assert_eq!(stats.failed_collections.len(), 1); logctx.cleanup_successful(); } diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 623ba3f294..cc0ef92c13 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -6,6 +6,7 @@ // Copyright 2023 Oxide Computer Company +pub use agent::ForcedCollectionError; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; use dropshot::HttpError; @@ -455,8 +456,13 @@ impl Oximeter { /// /// This is particularly useful during tests, which would prefer to /// avoid waiting until a collection interval completes. - pub async fn force_collect(&self) { - self.server.app_private().force_collection().await + /// + /// NOTE: As the name implies, this is best effort. It can fail if there are + /// already outstanding calls to force a collection. It rarely makes sense + /// to have multiple concurrent calls here, so that should not impact most + /// callers. + pub async fn try_force_collect(&self) -> Result<(), ForcedCollectionError> { + self.server.app_private().try_force_collection().await } /// List producers. diff --git a/oximeter/collector/src/self_stats.rs b/oximeter/collector/src/self_stats.rs index b2272117da..2ab7b201e5 100644 --- a/oximeter/collector/src/self_stats.rs +++ b/oximeter/collector/src/self_stats.rs @@ -33,6 +33,12 @@ pub enum FailureReason { Unreachable, /// Error during deserialization. Deserialization, + /// The collection interval has expired while an outstanding collection is + /// already in progress. + /// + /// This may indicate that the producer's collection interval is too short + /// for the amount of data it generates, and the collector cannot keep up. + CollectionsInProgress, /// Some other reason, which includes the status code. Other(StatusCode), } @@ -42,6 +48,9 @@ impl std::fmt::Display for FailureReason { match self { Self::Unreachable => f.write_str(Self::UNREACHABLE), Self::Deserialization => f.write_str(Self::DESERIALIZATION), + Self::CollectionsInProgress => { + f.write_str(Self::COLLECTIONS_IN_PROGRESS) + } Self::Other(c) => write!(f, "{}", c.as_u16()), } } @@ -50,11 +59,15 @@ impl std::fmt::Display for FailureReason { impl FailureReason { const UNREACHABLE: &'static str = "unreachable"; const DESERIALIZATION: &'static str = "deserialization"; + const COLLECTIONS_IN_PROGRESS: &'static str = "collections in progress"; fn as_string(&self) -> Cow<'static, str> { match self { Self::Unreachable => Cow::Borrowed(Self::UNREACHABLE), Self::Deserialization => Cow::Borrowed(Self::DESERIALIZATION), + Self::CollectionsInProgress => { + Cow::Borrowed(Self::COLLECTIONS_IN_PROGRESS) + } Self::Other(c) => Cow::Owned(c.as_u16().to_string()), } }