From 78ee981d763db52046cbf35ad95c04274ae14704 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 25 Oct 2024 10:22:13 -0700 Subject: [PATCH] [sled-agent] Don't block InstanceManager on full MPSCs (#6913) Sled-agent's `InstanceManager` task is responsible for managing the table of all instances presently running on the sled. When the sled-agent receives a request relating to an individual instance on the sled, it's sent to the `InstanceManager` over a `tokio::sync::mpsc` channel, and is then dispatched by the `InstanceManager` to the `InstanceRunner` task responsible for that individual instance by sending it over a *second* `tokio::sync::mpsc` channel. This is where things start to get interesting.[^1] `tokio::sync::mpsc` is a *bounded* channel: there is a maximum number of messages which may be queued by a given MPSC channel at any given time. The `mpsc::Sender::send` method is an `async fn`, and if the channel is at capacity, that method will _wait_ until there is once again space in the channel to send the message being sent. Presently, `mpsc::Sender::send` is called by the `InstanceManager`'s main run loop when dispatching a request to an individual instance. As you may have already started to piece together, this means that if a given `InstanceRunner` task is not able to process requests fast enough to drain its channel, the entire `InstanceManager` loop will wait when dispatching a request to that instance until the queue has been drained. This means that if one instance's runner task has gotten stuck on something, like waiting for a Crucible flush that will never complete (as seen in #6911), that instance will prevent requests being dispatched to *any other instance* managed by the sled-agent. This is quite unfortunate! This commit fixes this behavior by changing the functions that send requests to an individual instance's task to instead *shed load* when that instance's request queue is full. We now use the `mpsc::Sender::try_send` method, rather than `mpsc::Sender::send`, which does not wait and instead immediately returns an error when the channel is full. This allows the `InstanceManager` to instead return an error to the client indicating the channel is full, and move on to processing requests to other instances which may not be stuck. Thus, a single stuck instance can no longer block requests from being dispatched to other, perfectly fine instances. The error returned when the channel is at capacity is converted to an HTTP 503 Service Unavailable error by the API. This indicates to the client that their request to that instance was not able to be processed at this time, but that it may be processed successfully in the future.[^2] Now, we can shed load while allowing clients to retry later, which seems much better than the present situation. [^1]: In the sense of "may you live in interesting times", naturally. [^2]: I also considered returning 429 Too Many Requests here, but my understanding is that that status code is supposed to indicate that too many requests have been received from *that specific client*. In this case, we haven't hit a per-client rate limit; we're just overloaded by requests more broadly, so it's not that particular client's fault. --- sled-agent/src/instance.rs | 194 ++++++++++++++++++++--------- sled-agent/src/instance_manager.rs | 69 +++++----- sled-agent/src/sled_agent.rs | 10 ++ 3 files changed, 181 insertions(+), 92 deletions(-) diff --git a/sled-agent/src/instance.rs b/sled-agent/src/instance.rs index 8a4c5cf669..b47aeb6508 100644 --- a/sled-agent/src/instance.rs +++ b/sled-agent/src/instance.rs @@ -119,6 +119,12 @@ pub enum Error { #[error("Failed to send request to Instance: Channel closed")] FailedSendChannelClosed, + #[error( + "Failed to send request to Instance: channel at capacity \ + ({QUEUE_SIZE})" + )] + FailedSendChannelFull, + #[error( "Failed to send request from Instance Runner: Client Channel closed" )] @@ -217,10 +223,10 @@ enum InstanceRequest { tx: oneshot::Sender>, }, GetFilesystemPool { - tx: oneshot::Sender>, + tx: oneshot::Sender, ManagerError>>, }, CurrentState { - tx: oneshot::Sender, + tx: oneshot::Sender>, }, PutState { state: VmmStateRequested, @@ -248,6 +254,58 @@ enum InstanceRequest { }, } +impl InstanceRequest { + /// Handle an error returned by [`mpsc::Sender::try_send`] when attempting + /// to send a request to the instance. + /// + /// This is a bit complex: the returned [`mpsc::error::TrySendError`] will + /// contain the [`InstanceRequest`] we were trying to send, and thus the + /// [`oneshot::Sender`] for that request's response. This function handles + /// the `TrySendError` by inspecting the error to determine whether the + /// channel has closed or is full, constructing the relevant [`Error`], and + /// extracting the response oneshot channel from the request, and then + /// sending back the error over that channel. + /// + /// If sending the error back to the client fails, this function returns an + /// error, so that the client having given up can be logged; otherwise, it returns `Ok(())`. + fn fail_try_send( + err: mpsc::error::TrySendError, + ) -> Result<(), Error> { + let (error, this) = match err { + mpsc::error::TrySendError::Closed(this) => { + (Error::FailedSendChannelClosed, this) + } + mpsc::error::TrySendError::Full(this) => { + (Error::FailedSendChannelFull, this) + } + }; + + match this { + Self::RequestZoneBundle { tx } => tx + .send(Err(BundleError::FailedSend(anyhow!(error)))) + .map_err(|_| Error::FailedSendClientClosed), + Self::GetFilesystemPool { tx } => tx + .send(Err(error.into())) + .map_err(|_| Error::FailedSendClientClosed), + Self::CurrentState { tx } => tx + .send(Err(error.into())) + .map_err(|_| Error::FailedSendClientClosed), + Self::PutState { tx, .. } => tx + .send(Err(error.into())) + .map_err(|_| Error::FailedSendClientClosed), + Self::Terminate { tx, .. } => tx + .send(Err(error.into())) + .map_err(|_| Error::FailedSendClientClosed), + Self::IssueSnapshotRequest { tx, .. } + | Self::AddExternalIp { tx, .. } + | Self::DeleteExternalIp { tx, .. } + | Self::RefreshExternalIps { tx } => tx + .send(Err(error.into())) + .map_err(|_| Error::FailedSendClientClosed), + } + } +} + // A small task which tracks the state of the instance, by constantly querying // the state of Propolis for updates. // @@ -488,11 +546,11 @@ impl InstanceRunner { .map_err(|_| Error::FailedSendClientClosed) }, Some(GetFilesystemPool { tx } ) => { - tx.send(self.get_filesystem_zpool()) + tx.send(Ok(self.get_filesystem_zpool())) .map_err(|_| Error::FailedSendClientClosed) }, Some(CurrentState{ tx }) => { - tx.send(self.current_state()) + tx.send(Ok(self.current_state())) .map_err(|_| Error::FailedSendClientClosed) }, Some(PutState{ state, tx }) => { @@ -562,9 +620,9 @@ impl InstanceRunner { RequestZoneBundle { tx } => tx .send(Err(BundleError::InstanceTerminating)) .map_err(|_| ()), - GetFilesystemPool { tx } => tx.send(None).map_err(|_| ()), + GetFilesystemPool { tx } => tx.send(Ok(None)).map_err(|_| ()), CurrentState { tx } => { - tx.send(self.current_state()).map_err(|_| ()) + tx.send(Ok(self.current_state())).map_err(|_| ()) } PutState { tx, .. } => { tx.send(Err(Error::Terminating.into())).map_err(|_| ()) @@ -1092,13 +1150,48 @@ fn propolis_error_code( } /// Describes a single Propolis server that incarnates a specific instance. +#[derive(Clone)] pub struct Instance { id: InstanceUuid, + /// Request channel for communicating with the instance task. + /// + /// # Extremely Serious Warning + /// + /// This channel is used by the `InstanceManager` task to communicate to the + /// instance task corresponding to each instance on this sled. Note that all + /// of the methods on this type which send [`InstanceRequest`]s over this + /// channel use [`mpsc::Sender::try_send`], which fails if the channel is at + /// capacity, and *not* [`mpsc::Sender::send`], which is an async method + /// that *waits* until capacity is available. THIS IS VERY IMPORTANT. + /// + /// This is because the `InstanceManager` task will call these methods in + /// its request-processing loop as it receives requests from clients, in + /// order to forward the request to the relevant instance. If the instance's + /// channel has filled up because the instance is currently processing a + /// slow request, `await`ing a call to [`mpsc::Sender::send`] will block the + /// `InstanceManager`'s main loop from proceeding until the instance task + /// has finished what it's doing and drained the next request from channel. + /// Critically, this means that requests to *other, unrelated instances* on + /// this sled would have to wait until this instance has finished what it's + /// doing. That means a single deadlocked instance task, which is waiting + /// for something that never completes, can render *all* instances on this + /// sled inaccessible. + /// + /// Therefore, any time we send requests to the `Instance` over this channel + /// from code that's called in the `InstanceManager`'s run loop MUST use + /// [`mpsc::Sender::try_send`] rather than [`mpsc::Sender::send`]. Should + /// the channel be at capacity, we return an + /// [`Error::FailedSendChannelFull`], which eventually becomes a 503 Service + /// Unavailable error when returned to the client. It is acceptable to call + /// [`mpsc::Sender::send`] on this channel ONLY from code which runs + /// exclusively in tasks that are not blocking the `InstanceManager`'s run + /// loop. tx: mpsc::Sender, + /// This is reference-counted so that the `Instance` struct may be cloned. #[allow(dead_code)] - runner_handle: tokio::task::JoinHandle<()>, + runner_handle: Arc>, } #[derive(Debug)] @@ -1250,7 +1343,7 @@ impl Instance { let runner_handle = tokio::task::spawn(async move { runner.run().await }); - Ok(Instance { id, tx, runner_handle }) + Ok(Instance { id, tx, runner_handle: Arc::new(runner_handle) }) } pub fn id(&self) -> InstanceUuid { @@ -1258,35 +1351,31 @@ impl Instance { } /// Create bundle from an instance zone. - pub async fn request_zone_bundle( + pub fn request_zone_bundle( &self, tx: oneshot::Sender>, - ) -> Result<(), BundleError> { + ) -> Result<(), Error> { self.tx - .send(InstanceRequest::RequestZoneBundle { tx }) - .await - .map_err(|err| BundleError::FailedSend(anyhow!(err)))?; - Ok(()) + .try_send(InstanceRequest::RequestZoneBundle { tx }) + .or_else(InstanceRequest::fail_try_send) } - pub async fn get_filesystem_zpool( + pub fn get_filesystem_zpool( &self, - ) -> Result, Error> { - let (tx, rx) = oneshot::channel(); + tx: oneshot::Sender, ManagerError>>, + ) -> Result<(), Error> { self.tx - .send(InstanceRequest::GetFilesystemPool { tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(rx.await?) + .try_send(InstanceRequest::GetFilesystemPool { tx }) + .or_else(InstanceRequest::fail_try_send) } - pub async fn current_state(&self) -> Result { - let (tx, rx) = oneshot::channel(); + pub fn current_state( + &self, + tx: oneshot::Sender>, + ) -> Result<(), Error> { self.tx - .send(InstanceRequest::CurrentState { tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(rx.await?) + .try_send(InstanceRequest::CurrentState { tx }) + .or_else(InstanceRequest::fail_try_send) } /// Attempts to update the current state of the instance by launching a @@ -1300,84 +1389,72 @@ impl Instance { /// instance begins to stop when Propolis has just begun to handle a prior /// request to reboot, the instance's state may proceed from Stopping to /// Rebooting to Running to Stopping to Stopped. - pub async fn put_state( + pub fn put_state( &self, tx: oneshot::Sender>, state: VmmStateRequested, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::PutState { state, tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .try_send(InstanceRequest::PutState { state, tx }) + .or_else(InstanceRequest::fail_try_send) } /// Rudely terminates this instance's Propolis (if it has one) and /// immediately transitions the instance to the Destroyed state. - pub async fn terminate( + pub fn terminate( &self, tx: oneshot::Sender>, mark_failed: bool, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::Terminate { mark_failed, tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .try_send(InstanceRequest::Terminate { mark_failed, tx }) + .or_else(InstanceRequest::fail_try_send) } - pub async fn issue_snapshot_request( + pub fn issue_snapshot_request( &self, tx: oneshot::Sender>, disk_id: Uuid, snapshot_id: Uuid, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::IssueSnapshotRequest { + .try_send(InstanceRequest::IssueSnapshotRequest { disk_id, snapshot_id, tx, }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .or_else(InstanceRequest::fail_try_send) } - pub async fn add_external_ip( + pub fn add_external_ip( &self, tx: oneshot::Sender>, ip: &InstanceExternalIpBody, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::AddExternalIp { ip: *ip, tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .try_send(InstanceRequest::AddExternalIp { ip: *ip, tx }) + .or_else(InstanceRequest::fail_try_send) } - pub async fn delete_external_ip( + pub fn delete_external_ip( &self, tx: oneshot::Sender>, ip: &InstanceExternalIpBody, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::DeleteExternalIp { ip: *ip, tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .try_send(InstanceRequest::DeleteExternalIp { ip: *ip, tx }) + .or_else(InstanceRequest::fail_try_send) } /// Reinstalls an instance's set of external IPs within OPTE, using /// up-to-date IP<->IGW mappings. This will not disrupt existing flows. - pub async fn refresh_external_ips( + pub fn refresh_external_ips( &self, tx: oneshot::Sender>, ) -> Result<(), Error> { self.tx - .send(InstanceRequest::RefreshExternalIps { tx }) - .await - .map_err(|_| Error::FailedSendChannelClosed)?; - Ok(()) + .try_send(InstanceRequest::RefreshExternalIps { tx }) + .or_else(InstanceRequest::fail_try_send) } } @@ -2104,7 +2181,6 @@ mod tests { // pretending we're InstanceManager::ensure_state, start our "instance" // (backed by fakes and propolis_mock_server) inst.put_state(put_tx, VmmStateRequested::Running) - .await .expect("failed to send Instance::put_state"); // even though we ignore this result at instance creation time in @@ -2198,7 +2274,6 @@ mod tests { // pretending we're InstanceManager::ensure_state, try in vain to start // our "instance", but no propolis server is running inst.put_state(put_tx, VmmStateRequested::Running) - .await .expect("failed to send Instance::put_state"); let timeout_fut = timeout(TIMEOUT_DURATION, put_rx); @@ -2305,7 +2380,6 @@ mod tests { // pretending we're InstanceManager::ensure_state, try in vain to start // our "instance", but the zone never finishes installing inst.put_state(put_tx, VmmStateRequested::Running) - .await .expect("failed to send Instance::put_state"); // Timeout our future waiting for the instance-state-change at 1s. This diff --git a/sled-agent/src/instance_manager.rs b/sled-agent/src/instance_manager.rs index af09def0c1..34fb8e493d 100644 --- a/sled-agent/src/instance_manager.rs +++ b/sled-agent/src/instance_manager.rs @@ -465,25 +465,25 @@ impl InstanceManagerRunner { tx.send(self.ensure_registered(propolis_id, instance, *sled_identifiers).await).map_err(|_| Error::FailedSendClientClosed) }, Some(EnsureUnregistered { propolis_id, tx }) => { - self.ensure_unregistered(tx, propolis_id).await + self.ensure_unregistered(tx, propolis_id) }, Some(EnsureState { propolis_id, target, tx }) => { - self.ensure_state(tx, propolis_id, target).await + self.ensure_state(tx, propolis_id, target) }, Some(IssueDiskSnapshot { propolis_id, disk_id, snapshot_id, tx }) => { - self.issue_disk_snapshot_request(tx, propolis_id, disk_id, snapshot_id).await + self.issue_disk_snapshot_request(tx, propolis_id, disk_id, snapshot_id) }, Some(CreateZoneBundle { name, tx }) => { - self.create_zone_bundle(tx, &name).await.map_err(Error::from) + self.create_zone_bundle(tx, &name).map_err(Error::from) }, Some(AddExternalIp { propolis_id, ip, tx }) => { - self.add_external_ip(tx, propolis_id, &ip).await + self.add_external_ip(tx, propolis_id, &ip) }, Some(DeleteExternalIp { propolis_id, ip, tx }) => { - self.delete_external_ip(tx, propolis_id, &ip).await + self.delete_external_ip(tx, propolis_id, &ip) }, Some(RefreshExternalIps { tx }) => { - self.refresh_external_ips(tx).await + self.refresh_external_ips(tx) } Some(GetState { propolis_id, tx }) => { // TODO(eliza): it could potentially be nice to @@ -491,7 +491,7 @@ impl InstanceManagerRunner { // than having to force `GetState` requests to // serialize with the requests that actually update // the state... - self.get_instance_state(tx, propolis_id).await + self.get_instance_state(tx, propolis_id) }, Some(OnlyUseDisks { disks, tx } ) => { self.use_only_these_disks(disks).await; @@ -631,14 +631,15 @@ impl InstanceManagerRunner { &self.jobs.get(&propolis_id).unwrap() } }; - - Ok(instance.current_state().await?) + let (tx, rx) = oneshot::channel(); + instance.current_state(tx)?; + rx.await? } /// Idempotently ensures this VM is not registered with this instance /// manager. If this Propolis job is registered and has a running zone, the /// zone is rudely terminated. - async fn ensure_unregistered( + fn ensure_unregistered( &mut self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -653,13 +654,13 @@ impl InstanceManagerRunner { // Otherwise, we pipeline the request, and send it to the instance, // where it can receive an appropriate response. let mark_failed = false; - instance.terminate(tx, mark_failed).await?; + instance.terminate(tx, mark_failed)?; Ok(()) } /// Idempotently attempts to drive the supplied Propolis into the supplied /// runtime state. - async fn ensure_state( + fn ensure_state( &mut self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -670,11 +671,11 @@ impl InstanceManagerRunner { .map_err(|_| Error::FailedSendClientClosed)?; return Ok(()); }; - instance.put_state(tx, target).await?; + instance.put_state(tx, target)?; Ok(()) } - async fn issue_disk_snapshot_request( + fn issue_disk_snapshot_request( &self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -686,12 +687,11 @@ impl InstanceManagerRunner { instance .issue_snapshot_request(tx, disk_id, snapshot_id) - .await .map_err(Error::from) } /// Create a zone bundle from a named instance zone, if it exists. - async fn create_zone_bundle( + fn create_zone_bundle( &self, tx: oneshot::Sender>, name: &str, @@ -711,10 +711,13 @@ impl InstanceManagerRunner { let Some(instance) = self.jobs.get(&vmm_id) else { return Err(BundleError::NoSuchZone { name: name.to_string() }); }; - instance.request_zone_bundle(tx).await + instance + .request_zone_bundle(tx) + .map_err(|e| BundleError::FailedSend(anyhow!(e)))?; + Ok(()) } - async fn add_external_ip( + fn add_external_ip( &self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -723,11 +726,11 @@ impl InstanceManagerRunner { let Some(instance) = self.get_propolis(propolis_id) else { return Err(Error::NoSuchVmm(propolis_id)); }; - instance.add_external_ip(tx, ip).await?; + instance.add_external_ip(tx, ip)?; Ok(()) } - async fn delete_external_ip( + fn delete_external_ip( &self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -737,18 +740,18 @@ impl InstanceManagerRunner { return Err(Error::NoSuchVmm(propolis_id)); }; - instance.delete_external_ip(tx, ip).await?; + instance.delete_external_ip(tx, ip)?; Ok(()) } - async fn refresh_external_ips( + fn refresh_external_ips( &self, tx: oneshot::Sender>, ) -> Result<(), Error> { let mut channels = vec![]; for (_, instance) in &self.jobs { let (tx, rx_new) = oneshot::channel(); - instance.refresh_external_ips(tx).await?; + instance.refresh_external_ips(tx)?; channels.push(rx_new); } @@ -766,7 +769,7 @@ impl InstanceManagerRunner { Ok(()) } - async fn get_instance_state( + fn get_instance_state( &self, tx: oneshot::Sender>, propolis_id: PropolisUuid, @@ -776,9 +779,7 @@ impl InstanceManagerRunner { .send(Err(Error::NoSuchVmm(propolis_id))) .map_err(|_| Error::FailedSendClientClosed); }; - - let state = instance.current_state().await?; - tx.send(Ok(state)).map_err(|_| Error::FailedSendClientClosed)?; + instance.current_state(tx)?; Ok(()) } @@ -804,9 +805,13 @@ impl InstanceManagerRunner { for (id, instance) in self.jobs.iter() { // If we can read the filesystem pool, consider it. Otherwise, move // on, to prevent blocking the cleanup of other instances. - let Ok(Some(filesystem_pool)) = - instance.get_filesystem_zpool().await - else { + // TODO(eliza): clone each instance and spawn a task to handle it, + // so that a single misbehaving instance cannot block the instance + // manager's run loop... + let (tx, rx) = oneshot::channel(); + // This will fail if the tx has been dropped, which we just...don't do. + let _ = instance.get_filesystem_zpool(tx); + let Ok(Ok(Some(filesystem_pool))) = rx.await else { info!(self.log, "use_only_these_disks: Cannot read filesystem pool"; "instance_id" => ?id); continue; }; @@ -820,7 +825,7 @@ impl InstanceManagerRunner { if let Some(instance) = self.jobs.remove(&id) { let (tx, rx) = oneshot::channel(); let mark_failed = true; - if let Err(e) = instance.terminate(tx, mark_failed).await { + if let Err(e) = instance.terminate(tx, mark_failed) { warn!(self.log, "use_only_these_disks: Failed to request instance removal"; "err" => ?e); continue; } diff --git a/sled-agent/src/sled_agent.rs b/sled-agent/src/sled_agent.rs index 4a4be08f76..8a5b15aaaf 100644 --- a/sled-agent/src/sled_agent.rs +++ b/sled-agent/src/sled_agent.rs @@ -186,11 +186,21 @@ impl From for omicron_common::api::external::Error { impl From for dropshot::HttpError { fn from(err: Error) -> Self { const NO_SUCH_INSTANCE: &str = "NO_SUCH_INSTANCE"; + const INSTANCE_CHANNEL_FULL: &str = "INSTANCE_CHANNEL_FULL"; match err { Error::Instance(crate::instance_manager::Error::Instance( instance_error, )) => { match instance_error { + // The instance's request channel is full, so it cannot + // currently process this request. Shed load, but indicate + // to the client that it can try again later. + err @ crate::instance::Error::FailedSendChannelFull => { + HttpError::for_unavail( + Some(INSTANCE_CHANNEL_FULL.to_string()), + err.to_string(), + ) + } crate::instance::Error::Propolis(propolis_error) => { // Work around dropshot#693: HttpError::for_status // only accepts client errors and asserts on server