Skip to content

Commit

Permalink
Overhaul block device interfaces
Browse files Browse the repository at this point in the history
This reshapes how block devices (frontends) interact with their attached
backends.  Rather than associating a backend with a device when it is
created, backend and device are attached after the fact, when each has
had an opportunity to be initialized.

Through its attachment handle, the device is able to notify the backend
of new requests, but also query sizing parameters (previously specified
during device construction), pause and unpause request retrieval, and
(in the future) perform other tasks such as cache mode alteration.

The backend has a corresponding attachment handle through which it
retrieves pending requests from the device -- behavior which is largely
unchanged from the original structure.

Rather than store device-specific information required to issue request
completions to the guest, the device emulation is expecting to use a
`Tracking` structure which will store the completion data to be
retrieved using an ID injected into the Request, which is passed back
with its result when processed by the backend.  This tracking structure
also implements several generic USDT probes for tracing block events,
rather than requiring the use of per-device probes.
  • Loading branch information
pfmooney committed Oct 12, 2023
1 parent ed584e8 commit 2dc742c
Show file tree
Hide file tree
Showing 19 changed files with 1,207 additions and 1,270 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ lazy_static = "1.4"
libc = "0.2"
mockall = "0.11"
num_enum = "0.5.11"
pin-project-lite = "0.2.13"
proc-macro2 = "1.0"
progenitor = { git = "https://github.com/oxidecomputer/progenitor", branch = "main" }
quote = "1.0"
Expand Down
10 changes: 4 additions & 6 deletions bin/propolis-server/src/lib/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,16 @@ pub use propolis_server_config::*;
pub fn create_backend_for_block(
config: &Config,
name: &str,
log: slog::Logger,
_log: slog::Logger,
) -> Result<(Arc<dyn block::Backend>, inventory::ChildRegister), ParseError> {
let entry = config.block_devs.get(name).ok_or_else(|| {
ParseError::KeyNotFound(name.to_string(), "block_dev".to_string())
})?;
blockdev_backend(entry, log)
blockdev_backend(entry)
}

fn blockdev_backend(
dev: &BlockDevice,
log: slog::Logger,
) -> Result<(Arc<dyn block::Backend>, inventory::ChildRegister), ParseError> {
let opts = propolis::block::BackendOpts {
block_size: dev.opts.block_size,
Expand All @@ -51,9 +50,8 @@ fn blockdev_backend(
})?;

let nworkers = NonZeroUsize::new(8).unwrap();
let be = propolis::block::FileBackend::create(
path, opts, nworkers, log,
)?;
let be =
propolis::block::FileBackend::create(path, opts, nworkers)?;
let child =
inventory::ChildRegister::new(&be, Some(path.to_string()));

Expand Down
13 changes: 5 additions & 8 deletions bin/propolis-server/src/lib/initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,6 @@ impl<'a> MachineInitializer<'a> {
..Default::default()
},
nworkers,
self.log.new(
slog::o!("component" => format!("file-{}", spec.path)),
),
)?;

let child =
Expand All @@ -363,13 +360,15 @@ impl<'a> MachineInitializer<'a> {
info!(self.log, "Creating in-memory disk backend";
"len" => bytes.len());

let nworkers = NonZeroUsize::new(8).unwrap();
let be = propolis::block::InMemoryBackend::create(
bytes,
propolis::block::BackendOpts {
block_size: Some(512),
read_only: Some(spec.readonly),
..Default::default()
},
nworkers,
)?;

let child = inventory::ChildRegister::new(
Expand Down Expand Up @@ -447,28 +446,26 @@ impl<'a> MachineInitializer<'a> {
)
})?;

let be_info = backend.info();
match device_interface {
DeviceInterface::Virtio => {
let vioblk = virtio::PciVirtioBlock::new(0x100, be_info);
let vioblk = virtio::PciVirtioBlock::new(0x100);
let id =
self.inv.register_instance(&vioblk, bdf.to_string())?;
let _ = self.inv.register_child(child, id).unwrap();
backend.attach(vioblk.clone())?;
block::attach(backend, vioblk.clone());
chipset.device().pci_attach(bdf, vioblk);
}
DeviceInterface::Nvme => {
let nvme = nvme::PciNvme::create(
name.to_string(),
be_info,
self.log.new(
slog::o!("component" => format!("nvme-{}", name)),
),
);
let id =
self.inv.register_instance(&nvme, bdf.to_string())?;
let _ = self.inv.register_child(child, id).unwrap();
backend.attach(nvme.clone())?;
block::attach(backend, nvme.clone());
chipset.device().pci_attach(bdf, nvme);
}
};
Expand Down
1 change: 1 addition & 0 deletions bin/propolis-standalone/src/cidata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ pub(crate) fn build_cidata_be(
read_only: Some(true),
..Default::default()
},
std::num::NonZeroUsize::new(8).unwrap(),
)
.context("could not create block backend")
}
1 change: 0 additions & 1 deletion bin/propolis-standalone/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ pub fn block_backend(
parsed.workers.unwrap_or(DEFAULT_WORKER_COUNT),
)
.unwrap(),
log.clone(),
)
.unwrap();

Expand Down
10 changes: 4 additions & 6 deletions bin/propolis-standalone/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -854,12 +854,11 @@ fn setup_instance(
let (backend, creg) = config::block_backend(&config, dev, log);
let bdf = bdf.unwrap();

let info = backend.info();
let vioblk = hw::virtio::PciVirtioBlock::new(0x100, info);
let vioblk = hw::virtio::PciVirtioBlock::new(0x100);
let id = inv.register_instance(&vioblk, bdf.to_string())?;
let _be_id = inv.register_child(creg, id)?;

backend.attach(vioblk.clone() as Arc<dyn block::Device>)?;
block::attach(backend, vioblk.clone());

chipset.pci_attach(bdf, vioblk);
}
Expand All @@ -884,14 +883,13 @@ fn setup_instance(
.as_str()
.unwrap()
.to_string();
let info = backend.info();
let log = log.new(slog::o!("dev" => format!("nvme-{}", name)));
let nvme = hw::nvme::PciNvme::create(dev_serial, info, log);
let nvme = hw::nvme::PciNvme::create(dev_serial, log);

let id = inv.register_instance(&nvme, bdf.to_string())?;
let _be_id = inv.register_child(creg, id)?;

backend.attach(nvme.clone())?;
block::attach(backend, nvme.clone());

chipset.pci_attach(bdf, nvme);
}
Expand Down
1 change: 1 addition & 0 deletions lib/propolis/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ propolis_types.workspace = true
usdt = { workspace = true, features = ["asm"] }
tokio = { workspace = true, features = ["full"] }
futures.workspace = true
pin-project-lite.workspace = true
anyhow.workspace = true
rfb.workspace = true
slog.workspace = true
Expand Down
224 changes: 224 additions & 0 deletions lib/propolis/src/block/backend.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Mechanisms required to implement a block backend
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Condvar, Mutex, Weak};
use std::task::{Context, Poll};

use crate::accessors::MemAccessor;
use crate::block::{device, Device, Request};

use pin_project_lite::pin_project;
use tokio::sync::{futures::Notified, Notify};

/// Reason why next request is unavailable from associated device
pub enum ReqError {
/// No request is pending from the device
NonePending,
/// Processing of requests from the device is paused
Paused,
/// Backend is not attached to any device
Detached,
/// Backend is halting workers
Halted,
}

pub(super) struct AttachState {
sibling: Weak<Mutex<Option<device::AttachInner>>>,
device: Arc<dyn Device>,
acc_mem: MemAccessor,
dev_is_paused: bool,
backend_is_halted: bool,
}
impl AttachState {
fn next_req(&self) -> Result<Request, ReqError> {
if self.backend_is_halted {
// The backend being halted is the most pressing status to consider,
// so it must be checked first
Err(ReqError::Halted)
} else if self.dev_is_paused {
// Do not allow the backend to pull any requests while the device is
// in the paused state
Err(ReqError::Paused)
} else {
self.device.next().ok_or(ReqError::NonePending)
}
}
pub(super) fn new(
dev_attach: &device::Attachment,
device: &Arc<dyn Device>,
) -> Self {
Self {
sibling: Arc::downgrade(&dev_attach.0),
device: device.clone(),
acc_mem: device.accessor_mem(),
dev_is_paused: false,
backend_is_halted: false,
}
}
pub(super) fn set_paused(&mut self, is_paused: bool) {
self.dev_is_paused = is_paused;
}
pub(super) fn same_as_sibling(
&self,
other: &Arc<Mutex<Option<device::AttachInner>>>,
) -> bool {
self.sibling.ptr_eq(&Arc::downgrade(other))
}
}
pub(super) struct AttachInner {
pub(super) state: Mutex<Option<AttachState>>,
req_notifier: Notify,
cv: Condvar,
}
impl AttachInner {
fn new() -> Self {
Self {
state: Mutex::new(None),
req_notifier: Notify::new(),
cv: Condvar::new(),
}
}
}

/// State held by the backend about the attached (if any) device
pub struct Attachment(pub(super) Arc<AttachInner>);
impl Attachment {
pub fn new() -> Self {
Attachment(Arc::new(AttachInner::new()))
}

/// Attempt to retreive the next [`Request`] from the attached (if any)
/// device.
///
/// Will return an error if:
///
/// - No device is attached
/// - The device is paused
/// - The backend is halted
/// - No requests are queued in the device
pub fn next_req(&self) -> Result<Request, ReqError> {
let guard = self.0.state.lock().unwrap();
let inner = guard.as_ref().ok_or(ReqError::Detached)?;
inner.next_req()
}

/// Block (synchronously) in order to retreive the next [`Request`] from the
/// device. Will return [`None`] if no device is attached, or the backend
/// is halted, otherwise it will block until a request is available.
pub fn block_for_req(&self) -> Option<Request> {
let mut guard = self.0.state.lock().unwrap();
loop {
// bail if not attached
let inner = guard.as_ref()?;
if inner.backend_is_halted {
return None;
}

if let Ok(req) = inner.next_req() {
return Some(req);
}

guard = self.0.cv.wait(guard).unwrap();
}
}

pub fn wait_for_req(&self) -> WaitForReq {
WaitForReq { attachment: self, wait: self.0.req_notifier.notified() }
}

/// Run provided function against [`MemAccessor`] for this backend.
///
/// Intended to provide caller with means of creating/associated child
/// accessors.
pub fn accessor_mem<R>(
&self,
f: impl FnOnce(Option<&MemAccessor>) -> R,
) -> R {
match self.0.state.lock().unwrap().as_ref() {
Some(inner) => f(Some(&inner.acc_mem)),
None => f(None),
}
}

/// Assert halted state on Attachment
pub fn halt(&self) {
if let Some(state) = self.0.state.lock().unwrap().as_mut() {
state.backend_is_halted = true;
}
self.notify();
}

/// Clear halted state from Attachment
pub fn start(&self) {
if let Some(state) = self.0.state.lock().unwrap().as_mut() {
state.backend_is_halted = false;
}
self.notify();
}

/// Detach from the associated (if any) device.
pub fn detach(&self) -> Option<()> {
// lock ordering demands we approach this from the device side
let be_lock = self.0.state.lock().unwrap();
let be_inner = be_lock.as_ref()?;
let dev_inner = be_inner.sibling.upgrade()?;
drop(be_lock);

device::AttachInner::detach(&dev_inner)
}

/// Notify any [blocked](Self::block_for_req()) or
/// [waiting](Self::wait_for_req()) tasks of a state change. This could be
/// a change to the device, to the backend, or simply new request(s)
/// available.
pub fn notify(&self) {
// TODO: avoid thundering herd?
self.0.req_notifier.notify_waiters();
self.0.cv.notify_all();
}
}

pin_project! {
pub struct WaitForReq<'a> {
attachment: &'a Attachment,
#[pin]
wait: Notified<'a>,
}
}
impl Future for WaitForReq<'_> {
type Output = Option<Request>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match this.attachment.next_req() {
Ok(req) => return Poll::Ready(Some(req)),
Err(ReqError::Detached) | Err(ReqError::Halted) => {
// Let the consumer know that they should bail
return Poll::Ready(None);
}
Err(ReqError::NonePending) | Err(ReqError::Paused) => {
if let Poll::Ready(_) =
Notified::poll(this.wait.as_mut(), cx)
{
// The `Notified` future is fused, so we must "refresh"
// prior to any subsequent attempts to poll it after it
// emits `Ready`
this.wait
.set(this.attachment.0.req_notifier.notified());

// Take another lap if woken by the notifier to check
// for a pending request
continue;
}
return Poll::Pending;
}
}
}
}
}
Loading

0 comments on commit 2dc742c

Please sign in to comment.