Skip to content

Commit

Permalink
Spawn a task in oximeter to actually do the collection (#7097)
Browse files Browse the repository at this point in the history
`oximeter` currently starts up a task for each producer. When their
collection interval expires, that task directly makes an HTTP request to
the producer to collect its data, _in-line_ in the same task. This can
cause problems, especially if there are concurrent updates to the
producer's address information. The main `oximeter` HTTP server will
send a message to that task in that case, asking it to update its
producer information, but that task might be off in lalaland making an
HTTP request. This is particularly bad since that HTTP request may never
complete -- the producer's address might be updated, so the old one is
defunct, possibly forever!

This commit addresses this by spawning a new task to actually run the
collection itself. This keeps the first-level task alive and responsive,
such that it can abort any previously-spawned collections if there are
updates. We _also_ replace any existing task if a collection was
explicitly requested, but we ignore existing collections (rather than
spawning a new one) if the timer expires while one is already running.

This resolves test flakes described in #6901.
  • Loading branch information
bnaecker authored Nov 22, 2024
1 parent 4ee02c0 commit df13cc7
Show file tree
Hide file tree
Showing 6 changed files with 518 additions and 78 deletions.
15 changes: 12 additions & 3 deletions nexus/tests/integration_tests/disks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1827,7 +1827,10 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) {
.await;
assert!(measurements.items.is_empty());

oximeter.force_collect().await;
oximeter
.try_force_collect()
.await
.expect("Could not force oximeter collection");
assert_eq!(
get_latest_silo_metric(
cptestctx,
Expand All @@ -1841,7 +1844,10 @@ async fn test_disk_metrics(cptestctx: &ControlPlaneTestContext) {
// Create an instance, attach the disk to it.
create_instance_with_disk(client).await;
wait_for_producer(&cptestctx.oximeter, disk.id()).await;
oximeter.force_collect().await;
oximeter
.try_force_collect()
.await
.expect("Could not force oximeter collection");

for metric in &ALL_METRICS {
let measurements = query_for_metrics(client, &metric_url(metric)).await;
Expand Down Expand Up @@ -1878,7 +1884,10 @@ async fn test_disk_metrics_paginated(cptestctx: &ControlPlaneTestContext) {
wait_for_producer(&cptestctx.oximeter, disk.id()).await;

let oximeter = &cptestctx.oximeter;
oximeter.force_collect().await;
oximeter
.try_force_collect()
.await
.expect("Could not force oximeter collection");
for metric in &ALL_METRICS {
let collection_url = format!(
"/v1/disks/{}/metrics/{}?project={}",
Expand Down
6 changes: 5 additions & 1 deletion nexus/tests/integration_tests/instances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1620,7 +1620,11 @@ async fn assert_metrics(
cpus: i64,
ram: i64,
) {
cptestctx.oximeter.force_collect().await;
cptestctx
.oximeter
.try_force_collect()
.await
.expect("Could not force oximeter collection");

for id in &[None, Some(project_id)] {
assert_eq!(
Expand Down
36 changes: 30 additions & 6 deletions nexus/tests/integration_tests/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ async fn assert_system_metrics(
cpus: i64,
ram: i64,
) {
cptestctx.oximeter.force_collect().await;
cptestctx
.oximeter
.try_force_collect()
.await
.expect("Could not force oximeter collection");
assert_eq!(
get_latest_system_metric(
cptestctx,
Expand All @@ -134,7 +138,11 @@ async fn assert_silo_metrics(
cpus: i64,
ram: i64,
) {
cptestctx.oximeter.force_collect().await;
cptestctx
.oximeter
.try_force_collect()
.await
.expect("Could not force oximeter collection");
assert_eq!(
get_latest_silo_metric(
cptestctx,
Expand Down Expand Up @@ -270,7 +278,11 @@ async fn test_timeseries_schema_list(
// Nexus's HTTP latency distribution. This is defined in Nexus itself, and
// should always exist after we've registered as a producer and start
// producing data. Force a collection to ensure that happens.
cptestctx.oximeter.force_collect().await;
cptestctx
.oximeter
.try_force_collect()
.await
.expect("Could not force oximeter collection");
let client = &cptestctx.external_client;
let url = "/v1/system/timeseries/schemas";
let schema =
Expand All @@ -289,7 +301,11 @@ pub async fn timeseries_query(
query: impl ToString,
) -> Vec<oxql_types::Table> {
// first, make sure the latest timeseries have been collected.
cptestctx.oximeter.force_collect().await;
cptestctx
.oximeter
.try_force_collect()
.await
.expect("Could not force oximeter collection");

// okay, do the query
let body = nexus_types::external_api::params::TimeseriesQuery {
Expand Down Expand Up @@ -365,7 +381,11 @@ async fn test_instance_watcher_metrics(
.await;

// Make sure that the latest metrics have been collected.
oximeter.force_collect().await;
cptestctx
.oximeter
.try_force_collect()
.await
.expect("Could not force oximeter collection");
};

#[track_caller]
Expand Down Expand Up @@ -688,7 +708,11 @@ async fn test_mgs_metrics(
query: &str,
expected: &HashMap<String, usize>,
) -> anyhow::Result<()> {
cptestctx.oximeter.force_collect().await;
cptestctx
.oximeter
.try_force_collect()
.await
.expect("Could not force oximeter collection");
let table = timeseries_query(&cptestctx, &query)
.await
.into_iter()
Expand Down
Loading

0 comments on commit df13cc7

Please sign in to comment.