diff --git a/nexus/tests/integration_tests/disks.rs b/nexus/tests/integration_tests/disks.rs index 939fd61249..8bfe665c17 100644 --- a/nexus/tests/integration_tests/disks.rs +++ b/nexus/tests/integration_tests/disks.rs @@ -1827,7 +1827,10 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { .await; assert!(measurements.items.is_empty()); - oximeter.force_collect().await; + oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); assert_eq!( get_latest_silo_metric( cptestctx, @@ -1841,7 +1844,10 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) { // Create an instance, attach the disk to it. create_instance_with_disk(client).await; wait_for_producer(&cptestctx.oximeter, disk.id()).await; - oximeter.force_collect().await; + oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); for metric in &ALL_METRICS { let measurements = query_for_metrics(client, &metric_url(metric)).await; @@ -1878,7 +1884,10 @@ async fn test_disk_metrics_paginated(cptestctx: &ControlPlaneTestContext) { wait_for_producer(&cptestctx.oximeter, disk.id()).await; let oximeter = &cptestctx.oximeter; - oximeter.force_collect().await; + oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); for metric in &ALL_METRICS { let collection_url = format!( "/v1/disks/{}/metrics/{}?project={}", diff --git a/nexus/tests/integration_tests/instances.rs b/nexus/tests/integration_tests/instances.rs index be124e14b8..2949ea4560 100644 --- a/nexus/tests/integration_tests/instances.rs +++ b/nexus/tests/integration_tests/instances.rs @@ -1620,7 +1620,11 @@ async fn assert_metrics( cpus: i64, ram: i64, ) { - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); for id in &[None, Some(project_id)] { assert_eq!( diff --git a/nexus/tests/integration_tests/metrics.rs b/nexus/tests/integration_tests/metrics.rs index 6281326216..4bac62f9d8 100644 --- a/nexus/tests/integration_tests/metrics.rs +++ b/nexus/tests/integration_tests/metrics.rs @@ -107,7 +107,11 @@ async fn assert_system_metrics( cpus: i64, ram: i64, ) { - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); assert_eq!( get_latest_system_metric( cptestctx, @@ -134,7 +138,11 @@ async fn assert_silo_metrics( cpus: i64, ram: i64, ) { - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); assert_eq!( get_latest_silo_metric( cptestctx, @@ -270,7 +278,11 @@ async fn test_timeseries_schema_list( // Nexus's HTTP latency distribution. This is defined in Nexus itself, and // should always exist after we've registered as a producer and start // producing data. Force a collection to ensure that happens. - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); let client = &cptestctx.external_client; let url = "/v1/system/timeseries/schemas"; let schema = @@ -289,7 +301,11 @@ pub async fn timeseries_query( query: impl ToString, ) -> Vec { // first, make sure the latest timeseries have been collected. - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); // okay, do the query let body = nexus_types::external_api::params::TimeseriesQuery { @@ -365,7 +381,11 @@ async fn test_instance_watcher_metrics( .await; // Make sure that the latest metrics have been collected. - oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); }; #[track_caller] @@ -688,7 +708,11 @@ async fn test_mgs_metrics( query: &str, expected: &HashMap, ) -> anyhow::Result<()> { - cptestctx.oximeter.force_collect().await; + cptestctx + .oximeter + .try_force_collect() + .await + .expect("Could not force oximeter collection"); let table = timeseries_query(&cptestctx, &query) .await .into_iter() diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index 8e14428d33..8572f7f508 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -40,14 +40,43 @@ use std::sync::Arc; use std::sync::Mutex as StdMutex; use std::time::Duration; use tokio::sync::mpsc; +use tokio::sync::mpsc::error::TrySendError; use tokio::sync::oneshot; +use tokio::sync::watch; use tokio::sync::Mutex; use tokio::sync::MutexGuard; use tokio::task::JoinHandle; use tokio::time::interval; use uuid::Uuid; -type CollectionToken = oneshot::Sender<()>; +/// A token used to force a collection. +/// +/// If the collection is successfully completed, `Ok(())` will be sent back on the +/// contained oneshot channel. Note that that "successful" means the actual +/// request completed, _not_ that results were successfully collected. I.e., it +/// means "this attempt is done". +/// +/// If the collection could not be queued because there are too many outstanding +/// force collection attempts, an `Err(ForcedCollectionQueueFull)` is returned. +type CollectionToken = oneshot::Sender>; + +/// Error returned when a forced collection fails. +#[derive(Clone, Copy, Debug)] +pub enum ForcedCollectionError { + /// The internal queue of requests is full. + QueueFull, + /// We failed to send the request because the channel was closed. + Closed, +} + +/// Timeout on any single collection from a producer. +const COLLECTION_TIMEOUT: Duration = Duration::from_secs(30); + +/// The number of forced collections queued before we start to deny them. +const N_QUEUED_FORCED_COLLECTIONS: usize = 1; + +/// The number of timer-based collections queued before we start to deny them. +const N_QUEUED_TIMER_COLLECTIONS: usize = 1; // Messages for controlling a collection task #[derive(Debug)] @@ -69,19 +98,18 @@ enum CollectionMessage { }, } +/// Run a single collection from the producer. async fn perform_collection( - log: &Logger, - self_target: &mut self_stats::CollectionTaskStats, - client: &reqwest::Client, - producer: &ProducerEndpoint, - outbox: &mpsc::Sender<(Option, ProducerResults)>, - token: Option, -) { + log: Logger, + client: reqwest::Client, + producer: ProducerEndpoint, +) -> Result { debug!(log, "collecting from producer"); let res = client - .get(format!("http://{}/{}", producer.address, producer.id,)) + .get(format!("http://{}/{}", producer.address, producer.id)) .send() .await; + trace!(log, "sent collection request to producer"); match res { Ok(res) => { if res.status().is_success() { @@ -92,8 +120,7 @@ async fn perform_collection( "collected results from producer"; "n_results" => results.len() ); - self_target.collections.datum.increment(); - outbox.send((token, results)).await.unwrap(); + Ok(results) } Err(e) => { warn!( @@ -101,12 +128,7 @@ async fn perform_collection( "failed to collect results from producer"; "error" => ?e, ); - self_target - .failures_for_reason( - self_stats::FailureReason::Deserialization, - ) - .datum - .increment() + Err(self_stats::FailureReason::Deserialization) } } } else { @@ -115,12 +137,7 @@ async fn perform_collection( "failed to receive metric results from producer"; "status_code" => res.status().as_u16(), ); - self_target - .failures_for_reason(self_stats::FailureReason::Other( - res.status(), - )) - .datum - .increment() + Err(self_stats::FailureReason::Other(res.status())) } } Err(e) => { @@ -129,10 +146,128 @@ async fn perform_collection( "failed to send collection request to producer"; "error" => ?e ); - self_target - .failures_for_reason(self_stats::FailureReason::Unreachable) - .datum - .increment() + Err(self_stats::FailureReason::Unreachable) + } + } +} + +// The type of one collection task run to completion. +// +// An `Err(_)` means we failed to collect, and contains the reason so that we +// can bump the self-stat counter accordingly. +type CollectionResult = Result; + +// The type of one response message sent from the collection task. +type CollectionResponse = (Option, CollectionResult); + +/// Task that actually performs collections from the producer. +async fn inner_collection_loop( + log: Logger, + mut producer_info_rx: watch::Receiver, + mut forced_collection_rx: mpsc::Receiver, + mut timer_collection_rx: mpsc::Receiver<()>, + result_tx: mpsc::Sender, +) { + let client = reqwest::Client::builder() + .timeout(COLLECTION_TIMEOUT) + .build() + // Safety: `build()` only fails if TLS couldn't be initialized or the + // system DNS configuration could not be loaded. + .unwrap(); + loop { + // Wait for notification that we have a collection to perform, from + // either the forced- or timer-collection queue. + trace!(log, "top of inner collection loop, waiting for next request",); + let maybe_token = tokio::select! { + maybe_request = forced_collection_rx.recv() => { + let Some(request) = maybe_request else { + debug!( + log, + "forced collection request queue closed, exiting" + ); + return; + }; + Some(request) + } + maybe_request = timer_collection_rx.recv() => { + if maybe_request.is_none() { + debug!( + log, + "timer collection request queue closed, exiting" + ); + return; + }; + None + } + }; + + // Make a future to represent the actual collection. + let mut collection_fut = Box::pin(perform_collection( + log.clone(), + client.clone(), + *producer_info_rx.borrow_and_update(), + )); + + // Wait for that collection to complete or fail, or for an update to the + // producer's information. In the latter case, recreate the future for + // the collection itself with the new producer information. + let collection_result = 'collection: loop { + tokio::select! { + biased; + + maybe_update = producer_info_rx.changed() => { + match maybe_update { + Ok(_) => { + let update = *producer_info_rx.borrow_and_update(); + debug!( + log, + "received producer info update with an outstanding \ + collection running, cancelling it and recreating \ + with the new info"; + "new_info" => ?&update, + ); + collection_fut = Box::pin(perform_collection( + log.new(o!("address" => update.address)), + client.clone(), + update, + )); + continue 'collection; + } + Err(e) => { + error!( + log, + "failed to receive on producer update \ + watch channel, exiting"; + "error" => ?e, + ); + return; + } + } + } + + collection_result = &mut collection_fut => { + // NOTE: This break here is intentional. We cannot just call + // `result_tx.send()` in this loop, because that moves out + // of `maybe_token`, which isn't Copy. Break the loop, and + // then send it after we know we've completed the + // collection. + break 'collection collection_result; + } + } + }; + + // Now that the collection has completed, send on the results, along + // with any collection token we may have gotten with the request. + match result_tx.send((maybe_token, collection_result)).await { + Ok(_) => trace!(log, "forwarded results to main collection loop"), + Err(_) => { + error!( + log, + "failed to forward results to \ + collection loop, channel is closed, exiting", + ); + return; + } } } } @@ -143,15 +278,13 @@ async fn perform_collection( // endlessly, and collects metrics from the assigned producer on a timeout. The assigned agent can // also send a `CollectionMessage`, for example to update the collection interval. This is not // currently used, but will likely be exposed via control plane interfaces in the future. -async fn collection_task( - orig_log: Logger, +async fn collection_loop( + log: Logger, collector: self_stats::OximeterCollector, - mut producer: ProducerEndpoint, + producer: ProducerEndpoint, mut inbox: mpsc::Receiver, outbox: mpsc::Sender<(Option, ProducerResults)>, ) { - let mut log = orig_log.new(o!("address" => producer.address)); - let client = reqwest::Client::new(); let mut collection_timer = interval(producer.interval); debug!( log, @@ -164,21 +297,91 @@ async fn collection_task( let mut self_collection_timer = interval(self_stats::COLLECTION_INTERVAL); self_collection_timer.tick().await; + // Spawn a task to run the actual collections. + // + // This is so that we can possibly interrupt and restart collections that + // are in-progress when we get an update to the producer's information. In + // that case, the collection is likely doomed, since the producer has moved + // and won't be available at the address the collection started with. This + // lets us restart that collection with the new information. + let (producer_info_tx, producer_info_rx) = watch::channel(producer); + let (forced_collection_tx, forced_collection_rx) = + mpsc::channel(N_QUEUED_FORCED_COLLECTIONS); + let (timer_collection_tx, timer_collection_rx) = + mpsc::channel(N_QUEUED_TIMER_COLLECTIONS); + let (result_tx, mut result_rx) = mpsc::channel(1); + tokio::task::spawn(inner_collection_loop( + log.clone(), + producer_info_rx, + forced_collection_rx, + timer_collection_rx, + result_tx, + )); + loop { tokio::select! { message = inbox.recv() => { match message { None => { - debug!(log, "collection task inbox closed, shutting down"); + debug!( + log, + "collection task inbox closed, shutting down" + ); return; } Some(CollectionMessage::Shutdown) => { - debug!(log, "collection task received shutdown request"); + debug!( + log, + "collection task received shutdown request" + ); return; }, Some(CollectionMessage::Collect(token)) => { - debug!(log, "collection task received explicit request to collect"); - perform_collection(&log, &mut stats, &client, &producer, &outbox, Some(token)).await; + debug!( + log, + "collection task received explicit request to collect" + ); + match forced_collection_tx.try_send(token) { + Ok(_) => trace!( + log, "forwarded explicit request to collection task" + ), + Err(e) => { + match e { + TrySendError::Closed(tok) => { + debug!( + log, + "collection task forced collection \ + queue is closed. Attempting to \ + notify caller and exiting.", + ); + let _ = tok.send(Err(ForcedCollectionError::Closed)); + return; + } + TrySendError::Full(tok) => { + error!( + log, + "collection task forced collection \ + queue is full! This should never \ + happen, and probably indicates \ + a bug in your test code, such as \ + calling `force_collection()` many \ + times" + ); + if tok + .send(Err(ForcedCollectionError::QueueFull)) + .is_err() + { + warn!( + log, + "failed to notify caller of \ + force_collection(), oneshot is \ + closed" + ); + } + } + } + } + } }, Some(CollectionMessage::Update(new_info)) => { // If the collection interval is shorter than the @@ -187,21 +390,37 @@ async fn collection_task( // do the update if the information has changed. This // should also be guarded against by the main agent, but // we're being cautious here. - if producer == new_info { + let updated_producer_info = |info: &mut ProducerEndpoint| { + if new_info == *info { + false + } else { + *info = new_info; + true + } + }; + if !producer_info_tx.send_if_modified(updated_producer_info) { + trace!( + log, + "collection task received update with \ + identical producer information, no \ + updates will be sent to the collection task" + ); continue; } - producer = new_info; + + // We have an actual update to the producer information. + // + // Rebuild our timer to reflect the possibly-new + // interval. The collection task has already been + // notified above. debug!( log, "collection task received request to update \ its producer information"; - "interval" => ?producer.interval, - "address" => producer.address, + "interval" => ?new_info.interval, + "address" => new_info.address, ); - - // Update the logger with the new information as well. - log = orig_log.new(o!("address" => producer.address)); - collection_timer = interval(producer.interval); + collection_timer = interval(new_info.interval); collection_timer.tick().await; // completes immediately } #[cfg(test)] @@ -224,12 +443,71 @@ async fn collection_task( } } } + maybe_result = result_rx.recv() => { + let Some((maybe_token, result)) = maybe_result else { + error!( + log, + "channel for receiving results from collection task \ + is closed, exiting", + ); + return; + }; + match result { + Ok(results) => { + stats.collections.datum.increment(); + if outbox.send((maybe_token, results)).await.is_err() { + error!( + log, + "failed to send results to outbox, channel is \ + closed, exiting", + ); + return; + } + } + Err(reason) => stats.failures_for_reason(reason).datum.increment(), + } + } _ = self_collection_timer.tick() => { - debug!(log, "reporting oximeter self-collection statistics"); + debug!( + log, + "reporting oximeter self-collection statistics" + ); outbox.send((None, stats.sample())).await.unwrap(); } _ = collection_timer.tick() => { - perform_collection(&log, &mut stats, &client, &producer, &outbox, None).await; + match timer_collection_tx.try_send(()) { + Ok(_) => { + debug!( + log, + "sent timer-based collection request to \ + the collection task" + ); + } + Err(TrySendError::Closed(_)) => { + error!( + log, + "timer-based collection request queue is \ + closed, exiting" + ); + return; + } + Err(TrySendError::Full(_)) => { + error!( + log, + "timer-based collection request queue is \ + full! This may indicate that the producer \ + has a sampling interval that is too fast \ + for the amount of data it generates"; + "interval" => ?producer_info_tx.borrow().interval, + ); + stats + .failures_for_reason( + self_stats::FailureReason::CollectionsInProgress + ) + .datum + .increment() + } + } } } } @@ -364,7 +642,7 @@ async fn results_sink( } if let Some(token) = collection_token { - let _ = token.send(()); + let _ = token.send(Ok(())); } } } @@ -610,7 +888,7 @@ impl OximeterAgent { let info_clone = info; let target = self.collection_target; let task = tokio::spawn(async move { - collection_task(log, target, info_clone, rx, q).await; + collection_loop(log, target, info_clone, rx, q).await; }); value.insert((info, CollectionTask { inbox: tx, task })); } @@ -650,9 +928,16 @@ impl OximeterAgent { /// Forces a collection from all producers. /// - /// Returns once all those values have been inserted into Clickhouse, + /// Returns once all those values have been inserted into ClickHouse, /// or an error occurs trying to perform the collection. - pub async fn force_collection(&self) { + /// + /// NOTE: This collection is best effort, as the name implies. It's possible + /// that we lose track of requests internally, in cases where there are + /// many concurrent calls. Callers should strive to avoid this, since it + /// rarely makes sense to do that. + pub async fn try_force_collection( + &self, + ) -> Result<(), ForcedCollectionError> { let mut collection_oneshots = vec![]; let collection_tasks = self.collection_tasks.lock().await; for (_id, (_endpoint, task)) in collection_tasks.iter() { @@ -660,7 +945,7 @@ impl OximeterAgent { // Scrape from each producer, into oximeter... task.inbox.send(CollectionMessage::Collect(tx)).await.unwrap(); // ... and keep track of the token that indicates once the metric - // has made it into Clickhouse. + // has made it into ClickHouse. collection_oneshots.push(rx); } drop(collection_tasks); @@ -670,7 +955,20 @@ impl OximeterAgent { // // NOTE: This can either mean that the collection completed // successfully, or an error occurred in the collection pathway. - futures::future::join_all(collection_oneshots).await; + // + // We use `join_all` to ensure that all futures are run, rather than + // bailing on the first error. We extract the first error we received, + // or map an actual `RecvError` to `Closed`, since it does really mean + // the other side hung up without sending. + let results = futures::future::join_all(collection_oneshots).await; + for result in results.into_iter() { + match result { + Ok(Ok(_)) => {} + Ok(Err(e)) => return Err(e), + Err(_) => return Err(ForcedCollectionError::Closed), + } + } + Ok(()) } /// List existing producers. @@ -897,11 +1195,20 @@ mod tests { use super::OximeterAgent; use super::ProducerEndpoint; use crate::self_stats::FailureReason; + use dropshot::HttpError; + use dropshot::HttpResponseOk; + use dropshot::Path; + use dropshot::RequestContext; + use dropshot::ServerBuilder; use omicron_common::api::internal::nexus::ProducerKind; use omicron_test_utils::dev::test_setup_log; + use oximeter::types::ProducerResults; use std::net::Ipv6Addr; use std::net::SocketAddr; use std::net::SocketAddrV6; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering; + use std::sync::Arc; use std::time::Duration; use tokio::sync::oneshot; use tokio::time::Instant; @@ -925,6 +1232,63 @@ mod tests { COLLECTION_INTERVAL.as_millis() as u64 * N_COLLECTIONS, ); + #[derive( + Clone, + Copy, + Debug, + schemars::JsonSchema, + serde::Deserialize, + serde::Serialize, + )] + struct IdPath { + id: Uuid, + } + + /// Simplified API for a producer, implemented for tests below. + #[dropshot::api_description] + trait ProducerApi { + type Context; + + #[endpoint { + method = GET, + path = "/{id}", + }] + async fn collect( + request_context: RequestContext, + path: Path, + ) -> Result, HttpError>; + } + + /// A producer that always responds successfully with no samples. + struct EmptyProducer; + + impl ProducerApi for EmptyProducer { + type Context = Arc; + + async fn collect( + request_context: RequestContext, + _: Path, + ) -> Result, HttpError> { + request_context.context().fetch_add(1, Ordering::SeqCst); + Ok(HttpResponseOk(vec![])) + } + } + + /// A producer that always responds with a 500. + struct DedProducer; + + impl ProducerApi for DedProducer { + type Context = Arc; + + async fn collect( + request_context: RequestContext, + _: Path, + ) -> Result, HttpError> { + request_context.context().fetch_add(1, Ordering::SeqCst); + Err(HttpError::for_internal_error(String::from("i'm ded"))) + } + } + // Test that we count successful collections from a target correctly. #[tokio::test] async fn test_self_stat_collection_count() { @@ -943,17 +1307,21 @@ mod tests { .unwrap(); // Spawn the mock server that always reports empty statistics. - let server = httpmock::MockServer::start(); - let mock_ok = server.mock(|when, then| { - when.any_request(); - then.status(reqwest::StatusCode::OK).body("[]"); - }); + let collection_count = Arc::new(AtomicUsize::new(0)); + let server = ServerBuilder::new( + producer_api_mod::api_description::().unwrap(), + collection_count.clone(), + log.new(slog::o!("component" => "dropshot")), + ) + .config(Default::default()) + .start() + .expect("failed to spawn empty dropshot server"); // Register the dummy producer. let endpoint = ProducerEndpoint { id: Uuid::new_v4(), kind: ProducerKind::Service, - address: *server.address(), + address: server.local_addr(), interval: COLLECTION_INTERVAL, }; collector @@ -991,7 +1359,13 @@ mod tests { let count = stats.collections.datum.value() as usize; assert!(count != 0); - mock_ok.assert_calls(count); + assert_eq!( + count, + collection_count.load(Ordering::SeqCst), + "number of collections reported by the collection \ + task differs from the number reported by the empty \ + producer server itself" + ); assert!(stats.failed_collections.is_empty()); logctx.cleanup_successful(); } @@ -1083,17 +1457,21 @@ mod tests { .unwrap(); // Spawn the mock server that always responds with a server error - let server = httpmock::MockServer::start(); - let mock_fail = server.mock(|when, then| { - when.any_request(); - then.status(500).body("im ded"); - }); + let collection_count = Arc::new(AtomicUsize::new(0)); + let server = ServerBuilder::new( + producer_api_mod::api_description::().unwrap(), + collection_count.clone(), + log.new(slog::o!("component" => "dropshot")), + ) + .config(Default::default()) + .start() + .expect("failed to spawn empty dropshot server"); // Register the rather flaky producer. let endpoint = ProducerEndpoint { id: Uuid::new_v4(), kind: ProducerKind::Service, - address: *server.address(), + address: server.local_addr(), interval: COLLECTION_INTERVAL, }; collector @@ -1138,7 +1516,13 @@ mod tests { assert_eq!(stats.collections.datum.value(), 0); assert!(count != 0); - mock_fail.assert_calls(count); + assert_eq!( + count, + collection_count.load(Ordering::SeqCst), + "number of collections reported by the collection \ + task differs from the number reported by the always-ded \ + producer server itself" + ); assert_eq!(stats.failed_collections.len(), 1); logctx.cleanup_successful(); } diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index 623ba3f294..cc0ef92c13 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -6,6 +6,7 @@ // Copyright 2023 Oxide Computer Company +pub use agent::ForcedCollectionError; use dropshot::ConfigDropshot; use dropshot::ConfigLogging; use dropshot::HttpError; @@ -455,8 +456,13 @@ impl Oximeter { /// /// This is particularly useful during tests, which would prefer to /// avoid waiting until a collection interval completes. - pub async fn force_collect(&self) { - self.server.app_private().force_collection().await + /// + /// NOTE: As the name implies, this is best effort. It can fail if there are + /// already outstanding calls to force a collection. It rarely makes sense + /// to have multiple concurrent calls here, so that should not impact most + /// callers. + pub async fn try_force_collect(&self) -> Result<(), ForcedCollectionError> { + self.server.app_private().try_force_collection().await } /// List producers. diff --git a/oximeter/collector/src/self_stats.rs b/oximeter/collector/src/self_stats.rs index b2272117da..2ab7b201e5 100644 --- a/oximeter/collector/src/self_stats.rs +++ b/oximeter/collector/src/self_stats.rs @@ -33,6 +33,12 @@ pub enum FailureReason { Unreachable, /// Error during deserialization. Deserialization, + /// The collection interval has expired while an outstanding collection is + /// already in progress. + /// + /// This may indicate that the producer's collection interval is too short + /// for the amount of data it generates, and the collector cannot keep up. + CollectionsInProgress, /// Some other reason, which includes the status code. Other(StatusCode), } @@ -42,6 +48,9 @@ impl std::fmt::Display for FailureReason { match self { Self::Unreachable => f.write_str(Self::UNREACHABLE), Self::Deserialization => f.write_str(Self::DESERIALIZATION), + Self::CollectionsInProgress => { + f.write_str(Self::COLLECTIONS_IN_PROGRESS) + } Self::Other(c) => write!(f, "{}", c.as_u16()), } } @@ -50,11 +59,15 @@ impl std::fmt::Display for FailureReason { impl FailureReason { const UNREACHABLE: &'static str = "unreachable"; const DESERIALIZATION: &'static str = "deserialization"; + const COLLECTIONS_IN_PROGRESS: &'static str = "collections in progress"; fn as_string(&self) -> Cow<'static, str> { match self { Self::Unreachable => Cow::Borrowed(Self::UNREACHABLE), Self::Deserialization => Cow::Borrowed(Self::DESERIALIZATION), + Self::CollectionsInProgress => { + Cow::Borrowed(Self::COLLECTIONS_IN_PROGRESS) + } Self::Other(c) => Cow::Owned(c.as_u16().to_string()), } }