diff --git a/oximeter/collector/src/agent.rs b/oximeter/collector/src/agent.rs index bcfa3b4f4d..60d74b2c25 100644 --- a/oximeter/collector/src/agent.rs +++ b/oximeter/collector/src/agent.rs @@ -231,8 +231,19 @@ async fn collection_task( // Struct representing a task for collecting metric data from a single producer #[derive(Debug)] struct CollectionTask { - // Channel used to send messages from the agent to the actual task. The task owns the other - // side. + // Information about the producer we're collecting from in this task. + pub producer: ProducerEndpoint, + // The generation number of our _parent_ `ProducerMap` in which this task + // was last added or modified. + // + // Producers can be added to us both as they register with Nexus, and as we + // periodically refresh our list from Nexus. These can happen concurrently. + // This tracks the generation of our parent map in which we were added, so + // that we can be sure not to prune producers that registered while oximeter + // refreshed its list. + pub generation: u64, + // Channel used to send messages from the agent to the actual task. The task + // owns the other side. pub inbox: mpsc::Sender, // Handle to the actual tokio task running the collection loop. #[allow(dead_code)] @@ -362,6 +373,19 @@ async fn results_sink( } } +// A mapping of all the producers we've been assigned. +#[derive(Debug)] +struct ProducerMap { + pub generation: u64, + pub tasks: BTreeMap, +} + +impl ProducerMap { + fn new() -> Self { + Self { generation: 0, tasks: BTreeMap::new() } + } +} + /// The internal agent the oximeter server uses to collect metrics from producers. #[derive(Clone, Debug)] pub struct OximeterAgent { @@ -372,9 +396,8 @@ pub struct OximeterAgent { collection_target: self_stats::OximeterCollector, // Handle to the TX-side of a channel for collecting results from the collection tasks result_sender: mpsc::Sender<(Option, ProducerResults)>, - // The actual tokio tasks running the collection on a timer. - collection_tasks: - Arc>>, + // The producers we are responsible for collecting from. + producers: Arc>, // The interval on which we refresh our list of producers from Nexus refresh_interval: Duration, // Handle to the task used to periodically refresh the list of producers. @@ -384,9 +407,16 @@ pub struct OximeterAgent { } impl OximeterAgent { - /// Construct a new agent with the given ID and logger. + /// Construct a new agent. + /// + /// `id`: Our own UUID + /// `address`: Our own `oximeter-collector` server's address + /// `refresh_interval`: Interval on which we refresh our producer list from + /// Nexus. + /// `db_config`: Configuration for inserting into the ClickHouse database. + /// `resolver`: Resolver we used to look up the ClickHouse server. + /// `log`: Our logger // TODO(cleanup): Remove this lint when we have only a native resolver. - #[allow(clippy::too_many_arguments)] pub async fn with_id( id: Uuid, address: SocketAddrV6, @@ -395,7 +425,6 @@ impl OximeterAgent { http_resolver: BoxedResolver, native_resolver: BoxedResolver, log: &Logger, - replicated: bool, ) -> Result { let (result_sender, result_receiver) = mpsc::channel(8); let log = log.new(o!( @@ -430,7 +459,7 @@ impl OximeterAgent { debug!(log, "oximeter database does not exist, creating"); client .initialize_db_with_version( - replicated, + db_config.replicated, oximeter_db::OXIMETER_VERSION, ) .await?; @@ -462,7 +491,7 @@ impl OximeterAgent { log, collection_target, result_sender, - collection_tasks: Arc::new(Mutex::new(BTreeMap::new())), + producers: Arc::new(Mutex::new(ProducerMap::new())), refresh_interval, refresh_task: Arc::new(StdMutex::new(None)), last_refresh_time: Arc::new(StdMutex::new(None)), @@ -563,7 +592,7 @@ impl OximeterAgent { log, collection_target, result_sender, - collection_tasks: Arc::new(Mutex::new(BTreeMap::new())), + producers: Arc::new(Mutex::new(ProducerMap::new())), refresh_interval, refresh_task: Arc::new(StdMutex::new(None)), last_refresh_time, @@ -575,29 +604,35 @@ impl OximeterAgent { &self, info: ProducerEndpoint, ) -> Result<(), Error> { - let mut tasks = self.collection_tasks.lock().await; - self.register_producer_locked(&mut tasks, info).await; + let mut producers = self.producers.lock().await; + + // First increment the generation number of the entire collection, since + // we're modifying it by adding in this new producer. + producers.generation += 1; + self.register_producer_locked(&mut producers, info).await; Ok(()) } // Internal implementation that registers a producer, assuming the lock on // the map is held. + // + // NOTE: The caller is required to increment the generation number before + // calling this, if needed. async fn register_producer_locked( &self, - tasks: &mut MutexGuard< - '_, - BTreeMap, - >, + producers: &mut MutexGuard<'_, ProducerMap>, info: ProducerEndpoint, ) { let id = info.id; - match tasks.entry(id) { + let generation = producers.generation; + match producers.tasks.entry(id) { Entry::Vacant(value) => { debug!( self.log, "registered new metric producer"; "producer_id" => id.to_string(), "address" => info.address, + "generation" => generation, ); // Build channel to control the task and receive results. @@ -612,25 +647,27 @@ impl OximeterAgent { let task = tokio::spawn(async move { collection_task(log, target, info_clone, rx, q).await; }); - value.insert((info, CollectionTask { inbox: tx, task })); + value.insert(CollectionTask { + inbox: tx, + task, + producer: info, + generation, + }); } Entry::Occupied(mut value) => { debug!( self.log, "received request to register existing metric \ producer, updating collection information"; - "producer_id" => id.to_string(), - "interval" => ?info.interval, - "address" => info.address, + "producer_id" => id.to_string(), + "interval" => ?info.interval, + "address" => info.address, + "generation" => generation, ); - value.get_mut().0 = info.clone(); - value - .get() - .1 - .inbox - .send(CollectionMessage::Update(info)) - .await - .unwrap(); + let task = value.get_mut(); + task.producer = info.clone(); + task.generation = generation; + task.inbox.send(CollectionMessage::Update(info)).await.unwrap(); } } } @@ -641,8 +678,8 @@ impl OximeterAgent { /// or an error occurs trying to perform the collection. pub async fn force_collection(&self) { let mut collection_oneshots = vec![]; - let collection_tasks = self.collection_tasks.lock().await; - for (_id, (_endpoint, task)) in collection_tasks.iter() { + let producers = self.producers.lock().await; + for task in producers.tasks.values() { let (tx, rx) = oneshot::channel(); // Scrape from each producer, into oximeter... task.inbox.send(CollectionMessage::Collect(tx)).await.unwrap(); @@ -650,7 +687,7 @@ impl OximeterAgent { // has made it into Clickhouse. collection_oneshots.push(rx); } - drop(collection_tasks); + drop(producers); // Only return once all producers finish processing the token we // provided. @@ -671,32 +708,31 @@ impl OximeterAgent { } else { Bound::Unbounded }; - self.collection_tasks + self.producers .lock() .await + .tasks .range((start, Bound::Unbounded)) .take(limit) - .map(|(_id, (info, _t))| info.clone()) + .map(|(_id, task)| task.producer.clone()) .collect() } /// Delete a producer by ID, stopping its collection task. pub async fn delete_producer(&self, id: Uuid) -> Result<(), Error> { - let mut tasks = self.collection_tasks.lock().await; - self.delete_producer_locked(&mut tasks, id).await + let mut producers = self.producers.lock().await; + producers.generation += 1; + self.delete_producer_locked(&mut producers, id).await } // Internal implementation that deletes a producer, assuming the lock on // the map is held. async fn delete_producer_locked( &self, - tasks: &mut MutexGuard< - '_, - BTreeMap, - >, + producers: &mut MutexGuard<'_, ProducerMap>, id: Uuid, ) -> Result<(), Error> { - let Some((_info, task)) = tasks.remove(&id) else { + let Some(task) = producers.tasks.remove(&id) else { // We have no such producer, so good news, we've removed it! return Ok(()); }; @@ -723,23 +759,61 @@ impl OximeterAgent { // Ensure that exactly the set of producers is registered with `self`. // + // The provided generation should be the generation of the collection right + // before we started to collect any producers. + // // Errors logged, but not returned, and an attempt to register all producers // is made, even if an error is encountered part-way through. // - // This returns the number of pruned tasks. + // This returns the new generation and number of pruned tasks. async fn ensure_producers( &self, - expected_producers: BTreeMap, - ) -> usize { - let mut tasks = self.collection_tasks.lock().await; + desired_producers: BTreeMap, + generation_at_refresh: u64, + ) -> (u64, usize) { + let mut producers = self.producers.lock().await; + producers.generation += 1; - // First prune unwanted collection tasks. + // Next, prune unwanted collection tasks. // // This is set of all producers that we currently have, which are not in - // the new list from Nexus. - let ids_to_prune: Vec<_> = tasks - .keys() - .filter(|id| !expected_producers.contains_key(id)) + // the new list from Nexus. Note that we cannot prune any with a newer + // generation number, since these were by definition added while we + // started to refresh our list of producers from Nexus -- we didn't get + // them in the list, but they were registered concurrently. + let ids_to_prune: Vec<_> = producers + .tasks + .iter() + .filter_map(|(id, task)| { + if desired_producers.contains_key(id) { + trace!( + self.log, + "keeping producer in both current and desired set"; + "id" => %id, + ); + return None; + } + if task.generation > generation_at_refresh { + trace!( + self.log, + "keeping producer not in desired set, \ + but with newer generation number"; + "id" => %id, + "generation_at_refresh" => generation_at_refresh, + "producer_generation" => task.generation, + ); + return None; + } + trace!( + self.log, + "pruning old producer not in map and with \ + stale generation number"; + "id" => %id, + "generation_at_refresh" => generation_at_refresh, + "producer_generation" => task.generation, + ); + Some(id) + }) .copied() .collect(); let n_pruned = ids_to_prune.len(); @@ -748,31 +822,76 @@ impl OximeterAgent { // exist in the current tasks. That is impossible, because we hold // the lock, and we've just computed this as the set that _is_ in // the map, and not in the new set from Nexus. - self.delete_producer_locked(&mut tasks, id).await.unwrap(); + self.delete_producer_locked(&mut producers, id).await.unwrap(); } // And then ensure everything in the list. // // This will insert new tasks, and update any that we already know // about. - for info in expected_producers.into_values() { - self.register_producer_locked(&mut tasks, info).await; + // + // NOTE: It's technically possible for the following to happen: + // + // - we start the refresh list, which fetches the first page + // - a producer on that list deletes itself + // - we complete the list and enter this method, with that now-deleted + // producer + // + // This is fine, if not ideal. We have no way of knowing at this point + // that we've removed the producer previously, so we really can't tell + // the difference between brand new producers and this case. Second, + // we'll try to collect from them, which will fail, but we'll remove + // them from the list on the next pass through the refresh operation. + for info in desired_producers.into_values() { + self.register_producer_locked(&mut producers, info).await; } - n_pruned + (producers.generation, n_pruned) + } + + // Return the current generation number of the collection of producers. + async fn collection_generation(&self) -> u64 { + self.producers.lock().await.generation } } // A task which periodically updates our list of producers from Nexus. +// +// It's important to remember that while we're refreshing this list, producers +// can still concurrently register with us. We need to take care not to prune +// those, if we got unlucky with the ordering of those events. For example, +// suppose: +// +// - We start pulling our list of producers, and fetch the first page +// - A new one registers with Nexus, which happens to be ordered such that it +// would fit in that first page we just fetched. +// - Nexus sends that to us, and we insert it in our map +// - We complete our refresh, and have a map that does _not_ contain this new +// producer. We need to avoid pruning this one. +// +// This check is done with generation numbers. We keep a generation on the +// entire collection of producers, and for each producer, the generation of the +// collection in which it was added. We then record the collection's generation +// when we start this refresh, and make sure not to prune any producers with a +// later one at the end of refresh. async fn refresh_producer_list( agent: OximeterAgent, nexus_pool: Pool, ) { let mut interval = tokio::time::interval(agent.refresh_interval); - loop { + 'refresh: loop { interval.tick().await; - info!(agent.log, "refreshing list of producers from Nexus"); - let client = claim_nexus_with_backoff(&agent.log, &nexus_pool).await; + + // Record the generation number of our collection _before_ we start to + // collect any producers. Anything added after this line will have a + // later generation number. + let generation = agent.collection_generation().await; + info!( + agent.log, + "refreshing list of producers from Nexus"; + "generation" => generation, + ); + let mut stream = client.cpapi_assigned_producers_list_stream( &agent.id, // This is a _total_ limit, not a page size, so `None` means "get @@ -781,7 +900,7 @@ async fn refresh_producer_list( Some(IdSortMode::IdAscending), ); let mut expected_producers = BTreeMap::new(); - loop { + 'next_producer: loop { match stream.try_next().await { Err(e) => { error!( @@ -789,6 +908,7 @@ async fn refresh_producer_list( "error fetching next assigned producer"; "err" => ?e, ); + continue 'refresh; } Ok(Some(p)) => { let endpoint = match ProducerEndpoint::try_from(p) { @@ -800,7 +920,7 @@ async fn refresh_producer_list( from Nexus, skipping producer"; "err" => e ); - continue; + continue 'next_producer; } }; let old = expected_producers.insert(endpoint.id, endpoint); @@ -812,15 +932,18 @@ async fn refresh_producer_list( ); } } - Ok(None) => break, + Ok(None) => break 'next_producer, } } let n_current_tasks = expected_producers.len(); - let n_pruned_tasks = agent.ensure_producers(expected_producers).await; + let (new_generation, n_pruned_tasks) = + agent.ensure_producers(expected_producers, generation).await; *agent.last_refresh_time.lock().unwrap() = Some(Utc::now()); info!( agent.log, "refreshed list of producers from Nexus"; + "generation_at_refresh" => generation, + "new_generation" => new_generation, "n_pruned_tasks" => n_pruned_tasks, "n_current_tasks" => n_current_tasks, ); @@ -939,13 +1062,13 @@ mod tests { // Request the statistics from the task itself. let (reply_tx, rx) = oneshot::channel(); collector - .collection_tasks + .producers .lock() .await + .tasks .values() .next() .unwrap() - .1 .inbox .send(CollectionMessage::Statistics { reply_tx }) .await @@ -1004,13 +1127,13 @@ mod tests { // Request the statistics from the task itself. let (reply_tx, rx) = oneshot::channel(); collector - .collection_tasks + .producers .lock() .await + .tasks .values() .next() .unwrap() - .1 .inbox .send(CollectionMessage::Statistics { reply_tx }) .await @@ -1079,13 +1202,13 @@ mod tests { // Request the statistics from the task itself. let (reply_tx, rx) = oneshot::channel(); collector - .collection_tasks + .producers .lock() .await + .tasks .values() .next() .unwrap() - .1 .inbox .send(CollectionMessage::Statistics { reply_tx }) .await diff --git a/oximeter/collector/src/lib.rs b/oximeter/collector/src/lib.rs index b2bd191feb..322677e1d3 100644 --- a/oximeter/collector/src/lib.rs +++ b/oximeter/collector/src/lib.rs @@ -289,7 +289,6 @@ impl Oximeter { http_resolver, native_resolver, &log, - config.db.replicated, ) .await?, ))