From 915593f5cb5545db03a135cf78a214d4ca527c2c Mon Sep 17 00:00:00 2001 From: Andrew Werner Date: Tue, 11 Jul 2023 11:30:01 -0400 Subject: [PATCH] aya: Implement RingBuf This implements the userspace binding for RingBuf. Instead of streaming the samples as heap buffers, the process_ring function takes a callback to which we pass the event's byte region, roughly following [libbpf]'s API design. This avoids a copy and allows marking the consumer pointer in a timely manner. [libbpf]: https://github.com/libbpf/libbpf/blob/master/src/ringbuf.c Additionally, integration tests are added to demonstrate the usage of the new APIs and to ensure that they work end-to-end. Co-authored-by: William Findlay Co-authored-by: Tatsuyuki Ishi --- Cargo.toml | 4 + aya/src/bpf.rs | 136 +++++- aya/src/maps/mod.rs | 8 +- aya/src/maps/ring_buf.rs | 400 ++++++++++++++++++ bpf/aya-bpf/Cargo.toml | 5 + bpf/aya-bpf/src/lib.rs | 5 + bpf/aya-bpf/src/maps/mod.rs | 2 + bpf/aya-bpf/src/maps/ring_buf.rs | 163 ++++++++ test/integration-ebpf/Cargo.toml | 4 + test/integration-ebpf/src/ring_buf.rs | 59 +++ test/integration-test/Cargo.toml | 7 +- test/integration-test/src/lib.rs | 1 + test/integration-test/src/tests.rs | 1 + test/integration-test/src/tests/ring_buf.rs | 433 ++++++++++++++++++++ xtask/public-api/aya-bpf.txt | 87 ++++ xtask/public-api/aya.txt | 110 +++++ 16 files changed, 1404 insertions(+), 21 deletions(-) create mode 100644 aya/src/maps/ring_buf.rs create mode 100644 bpf/aya-bpf/src/maps/ring_buf.rs create mode 100644 test/integration-ebpf/src/ring_buf.rs create mode 100644 test/integration-test/src/tests/ring_buf.rs diff --git a/Cargo.toml b/Cargo.toml index b534f5678..cfda2f5dd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -62,10 +62,13 @@ bitflags = { version = "2.2.1", default-features = false } bytes = { version = "1", default-features = false } cargo_metadata = { version = "0.18.0", default-features = false } clap = { version = "4", default-features = false } +const-assert = { version = "1.0.1", default-features = false } core-error = { version = "0.0.0", default-features = false } dialoguer = { version = "0.11", default-features = false } diff = { version = "0.1.13", default-features = false } env_logger = { version = "0.10", default-features = false } +epoll = { version = "4.3.3", default-features = false } +futures = { version = "0.3.28", default-features = false } hashbrown = { version = "0.14", default-features = false } indoc = { version = "2.0", default-features = false } integration-ebpf = { path = "test/integration-ebpf", default-features = false } @@ -80,6 +83,7 @@ proc-macro-error = { version = "1.0", default-features = false } proc-macro2 = { version = "1", default-features = false } public-api = { version = "0.32.0", default-features = false } quote = { version = "1", default-features = false } +rand = { version = "0.8", default-features = false } rbpf = { version = "0.2.0", default-features = false } rustdoc-json = { version = "0.8.6", default-features = false } rustup-toolchain = { version = "0.1.5", default-features = false } diff --git a/aya/src/bpf.rs b/aya/src/bpf.rs index 3ee18a3dc..d25457606 100644 --- a/aya/src/bpf.rs +++ b/aya/src/bpf.rs @@ -43,7 +43,7 @@ use crate::{ is_probe_read_kernel_supported, is_prog_id_supported, is_prog_name_supported, retry_with_verifier_logs, }, - util::{bytes_of, bytes_of_slice, possible_cpus, POSSIBLE_CPUS}, + util::{bytes_of, bytes_of_slice, page_size, possible_cpus, POSSIBLE_CPUS}, }; pub(crate) const BPF_OBJ_NAME_LEN: usize = 16; @@ -461,23 +461,23 @@ impl<'a> BpfLoader<'a> { { continue; } - - match max_entries.get(name.as_str()) { - Some(size) => obj.set_max_entries(*size), - None => { - if obj.map_type() == BPF_MAP_TYPE_PERF_EVENT_ARRAY as u32 - && obj.max_entries() == 0 - { - obj.set_max_entries( - possible_cpus() - .map_err(|error| BpfError::FileError { - path: PathBuf::from(POSSIBLE_CPUS), - error, - })? - .len() as u32, - ); - } - } + let num_cpus = || -> Result { + Ok(possible_cpus() + .map_err(|error| BpfError::FileError { + path: PathBuf::from(POSSIBLE_CPUS), + error, + })? + .len() as u32) + }; + let map_type: bpf_map_type = obj.map_type().try_into().map_err(MapError::from)?; + if let Some(max_entries) = max_entries_override( + map_type, + max_entries.get(name.as_str()).copied(), + || obj.max_entries(), + num_cpus, + || page_size() as u32, + )? { + obj.set_max_entries(max_entries) } match obj.map_type().try_into() { Ok(BPF_MAP_TYPE_CPUMAP) => { @@ -716,6 +716,7 @@ fn parse_map(data: (String, MapData)) -> Result<(String, Map), BpfError> { BPF_MAP_TYPE_PERCPU_HASH => Map::PerCpuHashMap(map), BPF_MAP_TYPE_LRU_PERCPU_HASH => Map::PerCpuLruHashMap(map), BPF_MAP_TYPE_PERF_EVENT_ARRAY => Map::PerfEventArray(map), + BPF_MAP_TYPE_RINGBUF => Map::RingBuf(map), BPF_MAP_TYPE_SOCKHASH => Map::SockHash(map), BPF_MAP_TYPE_SOCKMAP => Map::SockMap(map), BPF_MAP_TYPE_BLOOM_FILTER => Map::BloomFilter(map), @@ -736,6 +737,105 @@ fn parse_map(data: (String, MapData)) -> Result<(String, Map), BpfError> { Ok((name, map)) } +/// Computes the value which should be used to override the max_entries value of the map +/// based on the user-provided override and the rules for that map type. +fn max_entries_override( + map_type: bpf_map_type, + user_override: Option, + current_value: impl Fn() -> u32, + num_cpus: impl Fn() -> Result, + page_size: impl Fn() -> u32, +) -> Result, BpfError> { + let max_entries = || user_override.unwrap_or_else(¤t_value); + Ok(match map_type { + BPF_MAP_TYPE_PERF_EVENT_ARRAY if max_entries() == 0 => Some(num_cpus()?), + BPF_MAP_TYPE_RINGBUF => Some(adjust_to_page_size(max_entries(), page_size())) + .filter(|adjusted| *adjusted != max_entries()) + .or(user_override), + _ => user_override, + }) +} + +// Adjusts the byte size of a RingBuf map to match a power-of-two multiple of the page size. +// +// This mirrors the logic used by libbpf. +// See https://github.com/libbpf/libbpf/blob/ec6f716eda43/src/libbpf.c#L2461-L2463 +fn adjust_to_page_size(byte_size: u32, page_size: u32) -> u32 { + // If the byte_size is zero, return zero and let the verifier reject the map + // when it is loaded. This is the behavior of libbpf. + if byte_size == 0 { + return 0; + } + // TODO: Replace with primitive method when int_roundings (https://github.com/rust-lang/rust/issues/88581) + // is stabilized. + fn div_ceil(n: u32, rhs: u32) -> u32 { + let d = n / rhs; + let r = n % rhs; + if r > 0 && rhs > 0 { + d + 1 + } else { + d + } + } + let pages_needed = div_ceil(byte_size, page_size); + page_size * pages_needed.next_power_of_two() +} + +#[cfg(test)] +mod tests { + use crate::generated::bpf_map_type::*; + + const PAGE_SIZE: u32 = 4096; + const NUM_CPUS: u32 = 4; + + #[test] + fn test_adjust_to_page_size() { + use super::adjust_to_page_size; + [ + (0, 0), + (4096, 1), + (4096, 4095), + (4096, 4096), + (8192, 4097), + (8192, 8192), + (16384, 8193), + ] + .into_iter() + .for_each(|(exp, input)| assert_eq!(exp, adjust_to_page_size(input, PAGE_SIZE))) + } + + #[test] + fn test_max_entries_override() { + use super::max_entries_override; + [ + (BPF_MAP_TYPE_RINGBUF, Some(1), 1, Some(PAGE_SIZE)), + (BPF_MAP_TYPE_RINGBUF, None, 1, Some(PAGE_SIZE)), + (BPF_MAP_TYPE_RINGBUF, None, PAGE_SIZE, None), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 1, None), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, Some(42), 1, Some(42)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, Some(0), 1, Some(NUM_CPUS)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 0, Some(NUM_CPUS)), + (BPF_MAP_TYPE_PERF_EVENT_ARRAY, None, 42, None), + (BPF_MAP_TYPE_ARRAY, None, 1, None), + (BPF_MAP_TYPE_ARRAY, Some(2), 1, Some(2)), + ] + .into_iter() + .for_each(|(map_type, user_override, current_value, exp)| { + assert_eq!( + exp, + max_entries_override( + map_type, + user_override, + || { current_value }, + || Ok(NUM_CPUS), + || PAGE_SIZE + ) + .unwrap() + ) + }) + } +} + impl Default for BpfLoader<'_> { fn default() -> Self { BpfLoader::new() diff --git a/aya/src/maps/mod.rs b/aya/src/maps/mod.rs index 43cfc5be1..2dc10aad4 100644 --- a/aya/src/maps/mod.rs +++ b/aya/src/maps/mod.rs @@ -80,6 +80,7 @@ pub mod hash_map; pub mod lpm_trie; pub mod perf; pub mod queue; +pub mod ring_buf; pub mod sock; pub mod stack; pub mod stack_trace; @@ -94,6 +95,7 @@ pub use lpm_trie::LpmTrie; pub use perf::AsyncPerfEventArray; pub use perf::PerfEventArray; pub use queue::Queue; +pub use ring_buf::RingBuf; pub use sock::{SockHash, SockMap}; pub use stack::Stack; pub use stack_trace::StackTraceMap; @@ -279,7 +281,9 @@ pub enum Map { ProgramArray(MapData), /// A [`Queue`] map. Queue(MapData), - /// A [`SockHash`] map. + /// A [`RingBuf`] map + RingBuf(MapData), + /// A [`SockHash`] map SockHash(MapData), /// A [`SockMap`] map. SockMap(MapData), @@ -311,6 +315,7 @@ impl Map { Self::PerfEventArray(map) => map.obj.map_type(), Self::ProgramArray(map) => map.obj.map_type(), Self::Queue(map) => map.obj.map_type(), + Self::RingBuf(map) => map.obj.map_type(), Self::SockHash(map) => map.obj.map_type(), Self::SockMap(map) => map.obj.map_type(), Self::Stack(map) => map.obj.map_type(), @@ -376,6 +381,7 @@ impl_try_from_map!(() { DevMapHash, PerfEventArray, ProgramArray, + RingBuf, SockMap, StackTraceMap, XskMap, diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs new file mode 100644 index 000000000..66f54fe8e --- /dev/null +++ b/aya/src/maps/ring_buf.rs @@ -0,0 +1,400 @@ +//! A [ring buffer map][ringbuf] that may be used to receive events from eBPF programs. +//! As of Linux 5.8, this is the preferred way to transfer per-event data from eBPF +//! programs to userspace. +//! +//! [ringbuf]: https://www.kernel.org/doc/html/latest/bpf/ringbuf.html + +use std::{ + fmt::Debug, + io, + ops::Deref, + os::fd::{AsFd as _, AsRawFd, BorrowedFd, RawFd}, + ptr, + ptr::NonNull, + sync::atomic::{AtomicU32, AtomicUsize, Ordering}, +}; + +use libc::{c_int, c_void, munmap, off_t, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE}; + +use crate::{ + generated::{BPF_RINGBUF_BUSY_BIT, BPF_RINGBUF_DISCARD_BIT, BPF_RINGBUF_HDR_SZ}, + maps::{MapData, MapError}, + sys::{mmap, SyscallError}, +}; + +/// A map that can be used to receive events from eBPF programs. +/// +/// This is similar to [`crate::maps::PerfEventArray`], but different in a few ways: +/// * It's shared across all CPUs, which allows a strong ordering between events. +/// * Data notifications are delivered precisely instead of being sampled for every N events; the +/// eBPF program can also control notification delivery if sampling is desired for performance +/// reasons. By default, a notification will be sent if the consumer is caught up at the time of +/// committing. The eBPF program can use the `BPF_RB_NO_WAKEUP` or `BPF_RB_FORCE_WAKEUP` flags to +/// control this behavior. +/// * On the eBPF side, it supports the reverse-commit pattern where the event can be directly +/// written into the ring without copying from a temporary location. +/// * Dropped sample notifications go to the eBPF program as the return value of `reserve`/`output`, +/// and not the userspace reader. This might require extra code to handle, but allows for more +/// flexible schemes to handle dropped samples. +/// +/// To receive events you need to: +/// * Construct [`RingBuf`] using [`RingBuf::try_from`]. +/// * Call [`RingBuf::next`] to poll events from the [`RingBuf`]. +/// +/// To receive async notifications of data availability, you may construct an +/// [`tokio::io::unix::AsyncFd`] from the [`RingBuf`]'s file descriptor and poll it for readiness. +/// +/// # Minimum kernel version +/// +/// The minimum kernel version required to use this feature is 5.8. +#[doc(alias = "BPF_MAP_TYPE_RINGBUF")] +pub struct RingBuf { + map: T, + consumer: ConsumerPos, + producer: ProducerData, +} + +impl> RingBuf { + pub(crate) fn new(map: T) -> Result { + let data: &MapData = map.borrow(); + let page_size = crate::util::page_size(); + let map_fd = data.fd().as_fd(); + let byte_size = data.obj.max_entries(); + let consumer_metadata = ConsumerMetadata::new(map_fd, 0, page_size)?; + let consumer = ConsumerPos::new(consumer_metadata); + let producer = ProducerData::new(map_fd, page_size, page_size, byte_size)?; + Ok(Self { + map, + consumer, + producer, + }) + } +} + +impl RingBuf { + /// Try to take a new entry from the ringbuf. + /// + /// Returns `Some(item)` if the ringbuf is not empty. Returns `None` if the ringbuf is empty, in + /// which case the caller may register for availability notifications through `epoll` or other + /// APIs. Only one RingBufItem may be outstanding at a time. + // + // This is not an implementation of `Iterator` because we need to be able to refer to the + // lifetime of the iterator in the returned `RingBufItem`. If the Iterator::Item leveraged GATs, + // one could imagine an implementation of `Iterator` that would work. GATs are stabilized in + // Rust 1.65, but there's not yet a trait that the community seems to have standardized around. + #[allow(clippy::should_implement_trait)] + pub fn next(&mut self) -> Option> { + let Self { + consumer, producer, .. + } = self; + producer.next(consumer) + } +} + +/// Access to the RawFd can be used to construct an AsyncFd for use with epoll. +impl> AsRawFd for RingBuf { + fn as_raw_fd(&self) -> RawFd { + let Self { + map, + consumer: _, + producer: _, + } = self; + map.borrow().fd().as_fd().as_raw_fd() + } +} + +/// The current outstanding item read from the ringbuf. +pub struct RingBufItem<'a> { + data: &'a [u8], + consumer: &'a mut ConsumerPos, +} + +impl Deref for RingBufItem<'_> { + type Target = [u8]; + + fn deref(&self) -> &Self::Target { + let Self { data, .. } = self; + data + } +} + +impl Drop for RingBufItem<'_> { + fn drop(&mut self) { + let Self { consumer, data } = self; + consumer.consume(data.len()) + } +} + +impl Debug for RingBufItem<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + data, + consumer: ConsumerPos { pos, .. }, + } = self; + // In general Relaxed here is sufficient, for debugging, it certainly is. + f.debug_struct("RingBufItem") + .field("pos", pos) + .field("len", &data.len()) + .finish() + } +} + +struct ConsumerMetadata(MMap); + +impl ConsumerMetadata { + fn new(fd: BorrowedFd<'_>, offset: usize, page_size: usize) -> Result { + Ok(Self(MMap::new( + fd, + page_size, + PROT_READ | PROT_WRITE, + MAP_SHARED, + offset.try_into().unwrap(), + )?)) + } +} + +impl AsRef for ConsumerMetadata { + fn as_ref(&self) -> &AtomicUsize { + let Self(MMap { ptr, .. }) = self; + unsafe { ptr.cast::().as_ref() } + } +} + +struct ConsumerPos { + pos: usize, + metadata: ConsumerMetadata, +} + +impl ConsumerPos { + fn new(metadata: ConsumerMetadata) -> Self { + // Load the initial value of the consumer position. After this point we'll be the only + // thread ever writing to this value, and we'll do it based on our local bookkeeping. We + // just mmap'ed this data, so we can assume that the kernel has flushed the any previous + // consumers writes, so Relaxed would almost certainly work, but because this is + // construction time and is not on a hot path, we'll be paranoid. + let pos = metadata.as_ref().load(Ordering::SeqCst); + Self { pos, metadata } + } + + fn consume(&mut self, len: usize) { + let Self { pos, metadata } = self; + + // TODO: Use primitive method when https://github.com/rust-lang/rust/issues/88581 is stabilized. + fn next_multiple_of(n: usize, multiple: usize) -> usize { + match n % multiple { + 0 => n, + rem => n + (multiple - rem), + } + } + *pos += next_multiple_of(usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap() + len, 8); + + // Write operation needs to be properly ordered with respect to the producer committing new + // data to the ringbuf. The producer uses xchg (SeqCst) to commit new data [1]. The producer + // reads the consumer offset after clearing the busy bit on a new entry [2]. By using SeqCst + // here we ensure that either a subsequent read by the consumer to consume messages will see + // an available message, or the producer in the kernel will see the updated consumer offset + // that is caught up. + // + // [1]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L487-L488 + // [2]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L494 + metadata.as_ref().store(*pos, Ordering::SeqCst); + } +} + +struct ProducerData { + mmap: MMap, + + // Offset in the mmap where the data starts. + data_offset: usize, + + // A cache of the value of the producer position. It is used to avoid re-reading the producer + // position when we know there is more data to consume. + pos_cache: usize, + + // A bitmask which truncates u32 values to the domain of valid offsets in the ringbuf. + mask: u32, +} + +impl ProducerData { + fn new( + fd: BorrowedFd<'_>, + offset: usize, + page_size: usize, + byte_size: u32, + ) -> Result { + // The producer pages have one page of metadata and then the data pages, all mapped + // read-only. Note that the length of the mapping includes the data pages twice as the + // kernel will map them two time consecutively to avoid special handling of entries that + // cross over the end of the ring buffer. + // + // The kernel diagram below shows the layout of the ring buffer. It references "meta pages", + // but we only map exactly one producer meta page read-only. The consumer meta page is mapped + // read-write elsewhere, and is taken into consideration via the offset parameter. + // + // From kernel/bpf/ringbuf.c[0]: + // + // Each data page is mapped twice to allow "virtual" + // continuous read of samples wrapping around the end of ring + // buffer area: + // ------------------------------------------------------ + // | meta pages | real data pages | same data pages | + // ------------------------------------------------------ + // | | 1 2 3 4 5 6 7 8 9 | 1 2 3 4 5 6 7 8 9 | + // ------------------------------------------------------ + // | | TA DA | TA DA | + // ------------------------------------------------------ + // ^^^^^^^ + // | + // Here, no need to worry about special handling of wrapped-around + // data due to double-mapped data pages. This works both in kernel and + // when mmap()'ed in user-space, simplifying both kernel and + // user-space implementations significantly. + // + // [0]: https://github.com/torvalds/linux/blob/3f01e9fe/kernel/bpf/ringbuf.c#L108-L124 + let len = page_size + 2 * usize::try_from(byte_size).unwrap(); + let mmap = MMap::new(fd, len, PROT_READ, MAP_SHARED, offset.try_into().unwrap())?; + + // byte_size is required to be a power of two multiple of page_size (which implicitly is a + // power of 2), so subtracting one will create a bitmask for values less than byte_size. + debug_assert!(byte_size.is_power_of_two()); + let mask = byte_size - 1; + Ok(Self { + mmap, + data_offset: page_size, + pos_cache: 0, + mask, + }) + } + + fn next<'a>(&'a mut self, consumer: &'a mut ConsumerPos) -> Option> { + let Self { + ref mmap, + data_offset, + pos_cache, + mask, + } = self; + let pos = unsafe { mmap.ptr.cast().as_ref() }; + let data_pages = mmap.as_ref().get(*data_offset..).unwrap(); + while data_available(pos, pos_cache, consumer) { + match read_item(data_pages, *mask, consumer) { + Item::Busy => return None, + Item::Discard { len } => consumer.consume(len), + Item::Data(data) => return Some(RingBufItem { data, consumer }), + } + } + return None; + + enum Item<'a> { + Busy, + Discard { len: usize }, + Data(&'a [u8]), + } + + fn data_available( + producer: &AtomicUsize, + cache: &mut usize, + consumer: &ConsumerPos, + ) -> bool { + let ConsumerPos { pos: consumer, .. } = consumer; + if consumer == cache { + // This value is written using Release by the kernel [1], and should be read with + // Acquire to ensure that the prior writes to the entry header are visible. + // + // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L447-L448 + *cache = producer.load(Ordering::Acquire); + } + + // Note that we don't compare the order of the values because the producer position may + // overflow u32 and wrap around to 0. Instead we just compare equality and assume that + // the consumer position is always logically less than the producer position. + // + // Note also that the kernel, at the time of writing [1], doesn't seem to handle this + // overflow correctly at all, and it's not clear that one can produce events after the + // producer position has wrapped around. + // + // [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440 + consumer != cache + } + + fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> { + let ConsumerPos { pos, .. } = pos; + let offset = pos & usize::try_from(mask).unwrap(); + let header_ptr = data + .get(offset..offset + core::mem::size_of::()) + .expect("offset out of bounds") + .as_ptr() as *const AtomicU32; + // Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This + // ensures data written by the producer will be visible. + // + // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488 + let header = unsafe { &*header_ptr }.load(Ordering::SeqCst); + if header & BPF_RINGBUF_BUSY_BIT != 0 { + Item::Busy + } else { + let len = usize::try_from(header & mask).unwrap(); + if header & BPF_RINGBUF_DISCARD_BIT != 0 { + Item::Discard { len } + } else { + let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap(); + let data = data + .get(data_offset..data_offset + len) + .expect("offset out of bounds"); + Item::Data(data) + } + } + } + } +} + +// MMap corresponds to a memory-mapped region. +// +// The data is unmapped in Drop. +struct MMap { + ptr: NonNull, + len: usize, +} + +impl MMap { + fn new( + fd: BorrowedFd<'_>, + len: usize, + prot: c_int, + flags: c_int, + offset: off_t, + ) -> Result { + match unsafe { mmap(ptr::null_mut(), len, prot, flags, fd, offset) } { + MAP_FAILED => Err(MapError::SyscallError(SyscallError { + call: "mmap", + io_error: io::Error::last_os_error(), + })), + ptr => Ok(Self { + ptr: std::ptr::NonNull::new(ptr).ok_or( + // This should never happen, but to be paranoid, and so we never need to talk + // about a null pointer, we check it anyway. + MapError::SyscallError(SyscallError { + call: "mmap", + io_error: io::Error::new( + io::ErrorKind::Other, + "mmap returned null pointer", + ), + }), + )?, + len, + }), + } + } +} + +impl AsRef<[u8]> for MMap { + fn as_ref(&self) -> &[u8] { + let Self { ptr, len } = self; + unsafe { std::slice::from_raw_parts(ptr.as_ptr().cast(), *len) } + } +} + +impl Drop for MMap { + fn drop(&mut self) { + let Self { ptr, len } = *self; + unsafe { munmap(ptr.as_ptr(), len) }; + } +} diff --git a/bpf/aya-bpf/Cargo.toml b/bpf/aya-bpf/Cargo.toml index 0af23767d..903366b2a 100644 --- a/bpf/aya-bpf/Cargo.toml +++ b/bpf/aya-bpf/Cargo.toml @@ -8,6 +8,11 @@ edition = "2021" aya-bpf-cty = { path = "../aya-bpf-cty" } aya-bpf-macros = { path = "../../aya-bpf-macros" } aya-bpf-bindings = { path = "../aya-bpf-bindings" } +const-assert = { workspace = true, optional = true } [build-dependencies] rustversion = { workspace = true } + +[features] +default = [] +const_assert = ["const-assert"] diff --git a/bpf/aya-bpf/src/lib.rs b/bpf/aya-bpf/src/lib.rs index a10833a3b..d482de0c4 100644 --- a/bpf/aya-bpf/src/lib.rs +++ b/bpf/aya-bpf/src/lib.rs @@ -8,6 +8,11 @@ html_logo_url = "https://aya-rs.dev/assets/images/crabby.svg", html_favicon_url = "https://aya-rs.dev/assets/images/crabby.svg" )] +#![cfg_attr( + feature = "const_assert", + allow(incomplete_features), + feature(generic_const_exprs) +)] #![cfg_attr(unstable, feature(never_type))] #![cfg_attr(target_arch = "bpf", feature(asm_experimental_arch))] #![allow(clippy::missing_safety_doc)] diff --git a/bpf/aya-bpf/src/maps/mod.rs b/bpf/aya-bpf/src/maps/mod.rs index b46bf0843..ead24dc38 100644 --- a/bpf/aya-bpf/src/maps/mod.rs +++ b/bpf/aya-bpf/src/maps/mod.rs @@ -13,6 +13,7 @@ pub mod per_cpu_array; pub mod perf; pub mod program_array; pub mod queue; +pub mod ring_buf; pub mod sock_hash; pub mod sock_map; pub mod stack; @@ -27,6 +28,7 @@ pub use per_cpu_array::PerCpuArray; pub use perf::{PerfEventArray, PerfEventByteArray}; pub use program_array::ProgramArray; pub use queue::Queue; +pub use ring_buf::RingBuf; pub use sock_hash::SockHash; pub use sock_map::SockMap; pub use stack::Stack; diff --git a/bpf/aya-bpf/src/maps/ring_buf.rs b/bpf/aya-bpf/src/maps/ring_buf.rs new file mode 100644 index 000000000..83c8d465c --- /dev/null +++ b/bpf/aya-bpf/src/maps/ring_buf.rs @@ -0,0 +1,163 @@ +use core::{ + cell::UnsafeCell, + mem, + mem::MaybeUninit, + ops::{Deref, DerefMut}, +}; + +#[cfg(feature = "const_assert")] +use const_assert::{Assert, IsTrue}; + +use crate::{ + bindings::{bpf_map_def, bpf_map_type::BPF_MAP_TYPE_RINGBUF}, + helpers::{ + bpf_ringbuf_discard, bpf_ringbuf_output, bpf_ringbuf_query, bpf_ringbuf_reserve, + bpf_ringbuf_submit, + }, + maps::PinningType, +}; + +#[repr(transparent)] +pub struct RingBuf { + def: UnsafeCell, +} + +unsafe impl Sync for RingBuf {} + +/// A ring buffer entry, returned from [`RingBuf::reserve`]. +/// +/// You must [`submit`] or [`discard`] this entry before it gets dropped. +/// +/// [`submit`]: RingBufEntry::submit +/// [`discard`]: RingBufEntry::discard +#[must_use = "BPF verifier requires ring buffer entries to be either submitted or discarded"] +pub struct RingBufEntry(&'static mut MaybeUninit); + +impl Deref for RingBufEntry { + type Target = MaybeUninit; + + fn deref(&self) -> &Self::Target { + self.0 + } +} + +impl DerefMut for RingBufEntry { + fn deref_mut(&mut self) -> &mut Self::Target { + self.0 + } +} + +impl RingBufEntry { + /// Discard this ring buffer entry. The entry will be skipped by the userspace reader. + pub fn discard(self, flags: u64) { + unsafe { bpf_ringbuf_discard(self.0.as_mut_ptr() as *mut _, flags) }; + } + + /// Commit this ring buffer entry. The entry will be made visible to the userspace reader. + pub fn submit(self, flags: u64) { + unsafe { bpf_ringbuf_submit(self.0.as_mut_ptr() as *mut _, flags) }; + } +} + +impl RingBuf { + /// Declare a BPF ring buffer. + /// + /// The linux kernel requires that `byte_size` be a power-of-2 multiple of the page size. + /// The loading program may coerce the size when loading the map. + pub const fn with_byte_size(byte_size: u32, flags: u32) -> Self { + Self::new(byte_size, flags, PinningType::None) + } + + /// Declare a pinned BPF ring buffer. + /// + /// The linux kernel requires that `byte_size` be a power-of-2 multiple of the page size. + /// The loading program may coerce the size when loading the map. + pub const fn pinned(byte_size: u32, flags: u32) -> Self { + Self::new(byte_size, flags, PinningType::ByName) + } + + const fn new(byte_size: u32, flags: u32, pinning_type: PinningType) -> Self { + Self { + def: UnsafeCell::new(bpf_map_def { + type_: BPF_MAP_TYPE_RINGBUF, + key_size: 0, + value_size: 0, + max_entries: byte_size, + map_flags: flags, + id: 0, + pinning: pinning_type as u32, + }), + } + } + + /// Reserve memory in the ring buffer that can fit `T`. + /// + /// Returns `None` if the ring buffer is full. + #[cfg(feature = "const_assert")] + pub fn reserve(&self, flags: u64) -> Option> + where + Assert<{ 8 % core::mem::align_of::() == 0 }>: IsTrue, + { + self.reserve_impl(flags) + } + + /// Reserve memory in the ring buffer that can fit `T`. + /// + /// Returns `None` if the ring buffer is full. + /// + /// Note: `T` must be aligned to no more than 8 bytes; it's not possible to fulfill larger + /// alignment requests. If you use this with a `T` that isn't properly aligned, this function will + /// be compiled to a panic and silently make your eBPF program fail to load. + /// See [here](https://github.com/torvalds/linux/blob/3f01e9fed/kernel/bpf/ringbuf.c#L418). + #[cfg(not(feature = "const_assert"))] + pub fn reserve(&self, flags: u64) -> Option> { + assert_eq!(8 % core::mem::align_of::(), 0); + self.reserve_impl(flags) + } + + fn reserve_impl(&self, flags: u64) -> Option> { + let ptr = unsafe { + bpf_ringbuf_reserve(self.def.get() as *mut _, mem::size_of::() as _, flags) + } as *mut MaybeUninit; + unsafe { ptr.as_mut() }.map(|ptr| RingBufEntry(ptr)) + } + + /// Copy `data` to the ring buffer output. + /// + /// Consider using [`reserve`] and [`submit`] if `T` is statically sized and you want to save a + /// copy from either a map buffer or the stack. + /// + /// Unlike [`reserve`], this function can handle dynamically sized types (which is hard to + /// create in eBPF but still possible, e.g. by slicing an array). + /// + /// Note: `T` must be aligned to no more than 8 bytes; it's not possible to fulfill larger + /// alignment requests. If you use this with a `T` that isn't properly aligned, this function will + /// be compiled to a panic and silently make your eBPF program fail to load. + /// See [here](https://github.com/torvalds/linux/blob/3f01e9fed/kernel/bpf/ringbuf.c#L418). + /// + /// [`reserve`]: RingBuf::reserve + /// [`submit`]: RingBufEntry::submit + pub fn output(&self, data: &T, flags: u64) -> Result<(), i64> { + assert_eq!(8 % core::mem::align_of_val(data), 0); + let ret = unsafe { + bpf_ringbuf_output( + self.def.get() as *mut _, + data as *const _ as *mut _, + mem::size_of_val(data) as _, + flags, + ) + }; + if ret < 0 { + Err(ret) + } else { + Ok(()) + } + } + + /// Query various information about the ring buffer. + /// + /// Consult `bpf_ringbuf_query` documentation for a list of allowed flags. + pub fn query(&self, flags: u64) -> u64 { + unsafe { bpf_ringbuf_query(self.def.get() as *mut _, flags) } + } +} diff --git a/test/integration-ebpf/Cargo.toml b/test/integration-ebpf/Cargo.toml index e72e0a87c..d471acf31 100644 --- a/test/integration-ebpf/Cargo.toml +++ b/test/integration-ebpf/Cargo.toml @@ -51,3 +51,7 @@ path = "src/redirect.rs" [[bin]] name = "xdp_sec" path = "src/xdp_sec.rs" + +[[bin]] +name = "ring_buf" +path = "src/ring_buf.rs" diff --git a/test/integration-ebpf/src/ring_buf.rs b/test/integration-ebpf/src/ring_buf.rs new file mode 100644 index 000000000..ff9a21a0f --- /dev/null +++ b/test/integration-ebpf/src/ring_buf.rs @@ -0,0 +1,59 @@ +#![no_std] +#![no_main] + +use aya_bpf::{ + macros::{map, uprobe}, + maps::{PerCpuArray, RingBuf}, + programs::ProbeContext, +}; + +#[map] +static RING_BUF: RingBuf = RingBuf::with_byte_size(0, 0); + +// This structure's definition is duplicated in userspace. +#[repr(C)] +struct Registers { + dropped: u64, + rejected: u64, +} + +// Use a PerCpuArray to store the registers so that we can update the values from multiple CPUs +// without needing synchronization. Atomics exist [1], but aren't exposed. +// +// [1]: https://lwn.net/Articles/838884/ +#[map] +static REGISTERS: PerCpuArray = PerCpuArray::with_max_entries(1, 0); + +#[uprobe] +pub fn ring_buf_test(ctx: ProbeContext) { + let Registers { dropped, rejected } = match REGISTERS.get_ptr_mut(0) { + Some(regs) => unsafe { &mut *regs }, + None => return, + }; + let mut entry = match RING_BUF.reserve::(0) { + Some(entry) => entry, + None => { + *dropped += 1; + return; + } + }; + // Write the first argument to the function back out to RING_BUF if it is even, + // otherwise increment the counter in REJECTED. This exercises discarding data. + let arg: u64 = match ctx.arg(0) { + Some(arg) => arg, + None => return, + }; + if arg % 2 == 0 { + entry.write(arg); + entry.submit(0); + } else { + *rejected += 1; + entry.discard(0); + } +} + +#[cfg(not(test))] +#[panic_handler] +fn panic(_info: &core::panic::PanicInfo) -> ! { + loop {} +} diff --git a/test/integration-test/Cargo.toml b/test/integration-test/Cargo.toml index 4d1a466b6..4db38be61 100644 --- a/test/integration-test/Cargo.toml +++ b/test/integration-test/Cargo.toml @@ -10,13 +10,16 @@ assert_matches = { workspace = true } aya = { workspace = true } aya-log = { workspace = true } aya-obj = { workspace = true } +epoll = { workspace = true } +futures = { workspace = true, features = ["std"] } libc = { workspace = true } log = { workspace = true } netns-rs = { workspace = true } -object = { workspace = true } +object = { workspace = true, features = ["elf", "read_core", "std"] } +rand = { workspace = true, features = ["std", "std_rng"] } rbpf = { workspace = true } test-case = { workspace = true } -tokio = { workspace = true, features = ["macros", "time"] } +tokio = { workspace = true, features = ["macros", "rt-multi-thread", "time"] } [build-dependencies] cargo_metadata = { workspace = true } diff --git a/test/integration-test/src/lib.rs b/test/integration-test/src/lib.rs index 79be5fd39..d47080336 100644 --- a/test/integration-test/src/lib.rs +++ b/test/integration-test/src/lib.rs @@ -21,6 +21,7 @@ pub const BPF_PROBE_READ: &[u8] = include_bytes_aligned!(concat!(env!("OUT_DIR"), "/bpf_probe_read")); pub const REDIRECT: &[u8] = include_bytes_aligned!(concat!(env!("OUT_DIR"), "/redirect")); pub const XDP_SEC: &[u8] = include_bytes_aligned!(concat!(env!("OUT_DIR"), "/xdp_sec")); +pub const RING_BUF: &[u8] = include_bytes_aligned!(concat!(env!("OUT_DIR"), "/ring_buf")); #[cfg(test)] mod tests; diff --git a/test/integration-test/src/tests.rs b/test/integration-test/src/tests.rs index 3a9951800..f37d54bbe 100644 --- a/test/integration-test/src/tests.rs +++ b/test/integration-test/src/tests.rs @@ -5,5 +5,6 @@ mod load; mod log; mod rbpf; mod relocations; +mod ring_buf; mod smoke; mod xdp; diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs new file mode 100644 index 000000000..a7c266964 --- /dev/null +++ b/test/integration-test/src/tests/ring_buf.rs @@ -0,0 +1,433 @@ +use core::panic; +use std::{os::fd::AsRawFd as _, sync::atomic::Ordering, thread}; + +use anyhow::Context as _; +use assert_matches::assert_matches; +use aya::{ + maps::{array::PerCpuArray, ring_buf::RingBuf, MapData}, + programs::UProbe, + Bpf, BpfLoader, Btf, Pod, +}; +use aya_obj::generated::BPF_RINGBUF_HDR_SZ; +use rand::Rng as _; +use tokio::{ + io::unix::AsyncFd, + time::{sleep, Duration}, +}; + +// This structure's definition is duplicated in the probe. +#[repr(C)] +#[derive(Clone, Copy, Debug, Eq, PartialEq, Default)] +struct Registers { + dropped: u64, + rejected: u64, +} + +impl core::ops::Add for Registers { + type Output = Self; + fn add(self, rhs: Self) -> Self::Output { + Self { + dropped: self.dropped + rhs.dropped, + rejected: self.rejected + rhs.rejected, + } + } +} + +impl<'a> std::iter::Sum<&'a Registers> for Registers { + fn sum>(iter: I) -> Self { + iter.fold(Default::default(), |a, b| a + *b) + } +} + +unsafe impl Pod for Registers {} + +/// Generate a variable length vector of u64s. +struct RingBufTest { + _bpf: Bpf, + ring_buf: RingBuf, + regs: PerCpuArray, +} + +// Note that it is important for this test that RING_BUF_MAX_ENTRIES ends up creating a ring buffer +// that is exactly a power-of-two multiple of the page size. The synchronous test will fail if +// that's not the case because the actual size will be rounded up, and fewer entries will be dropped +// than expected. +const RING_BUF_MAX_ENTRIES: usize = 512; + +impl RingBufTest { + fn new() -> Self { + const RING_BUF_BYTE_SIZE: u32 = (RING_BUF_MAX_ENTRIES + * (core::mem::size_of::() + BPF_RINGBUF_HDR_SZ as usize)) + as u32; + + // Use the loader API to control the size of the ring_buf. + let mut bpf = BpfLoader::new() + .btf(Some(Btf::from_sys_fs().unwrap()).as_ref()) + .set_max_entries("RING_BUF", RING_BUF_BYTE_SIZE) + .load(crate::RING_BUF) + .unwrap(); + let ring_buf = bpf.take_map("RING_BUF").unwrap(); + let ring_buf = RingBuf::try_from(ring_buf).unwrap(); + let regs = bpf.take_map("REGISTERS").unwrap(); + let regs = PerCpuArray::<_, Registers>::try_from(regs).unwrap(); + let prog: &mut UProbe = bpf + .program_mut("ring_buf_test") + .unwrap() + .try_into() + .unwrap(); + prog.load().unwrap(); + prog.attach( + Some("ring_buf_trigger_ebpf_program"), + 0, + "/proc/self/exe", + None, + ) + .unwrap(); + + Self { + _bpf: bpf, + ring_buf, + regs, + } + } +} + +struct WithData(RingBufTest, Vec); + +impl WithData { + fn new() -> Self { + Self(RingBufTest::new(), { + let mut rng = rand::thread_rng(); + // Generate more entries than there is space so we can test dropping entries. + let n = rng.gen_range(1..=RING_BUF_MAX_ENTRIES * 2); + std::iter::repeat_with(|| rng.gen()).take(n).collect() + }) + } +} + +#[test] +fn ring_buf() { + let WithData( + RingBufTest { + ref mut ring_buf, + ref regs, + .. + }, + ref data, + ) = WithData::new(); + + // Note that after expected_capacity has been submitted, reserve calls in the probe will fail + // and the probe will give up. + let expected_capacity = RING_BUF_MAX_ENTRIES - 1; + + // Call the function that the uprobe is attached to with randomly generated data. + let mut expected = Vec::new(); + let mut expected_rejected = 0u64; + let mut expected_dropped = 0u64; + for (i, &v) in data.iter().enumerate() { + ring_buf_trigger_ebpf_program(v); + if i >= expected_capacity { + expected_dropped += 1; + } else if v % 2 == 0 { + expected.push(v); + } else { + expected_rejected += 1; + } + } + + let mut seen = Vec::::new(); + while seen.len() < expected.len() { + if let Some(read) = ring_buf.next() { + let read: [u8; 8] = (*read) + .try_into() + .with_context(|| format!("data: {:?}", read.len())) + .unwrap(); + let arg = u64::from_ne_bytes(read); + assert_eq!(arg % 2, 0, "got {arg} from probe"); + seen.push(arg); + } + } + + // Make sure that there is nothing else in the ring_buf. + assert_matches!(ring_buf.next(), None); + + // Ensure that the data that was read matches what was passed, and the rejected count was set + // properly. + assert_eq!(seen, expected); + let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum(); + assert_eq!(dropped, expected_dropped); + assert_eq!(rejected, expected_rejected); +} + +#[no_mangle] +#[inline(never)] +pub extern "C" fn ring_buf_trigger_ebpf_program(arg: u64) { + std::hint::black_box(arg); +} + +// This test differs from the other async test in that it's possible for the producer +// to fill the ring_buf. We just ensure that the number of events we see is sane given +// what the producer sees, and that the logic does not hang. This exercises interleaving +// discards, successful commits, and drops due to the ring_buf being full. +#[tokio::test(flavor = "multi_thread")] +async fn ring_buf_async_with_drops() { + let WithData( + RingBufTest { + ref mut ring_buf, + ref regs, + .. + }, + ref data, + ) = WithData::new(); + + let raw_fd = ring_buf.as_raw_fd(); + let async_fd = AsyncFd::new(raw_fd).unwrap(); + + // Spawn the writer which internally will spawn many parallel writers. + // Construct an AsyncFd from the RingBuf in order to receive readiness notifications. + let mut seen = 0; + let mut process_ring_buf = || { + while let Some(read) = ring_buf.next() { + let read: [u8; 8] = (*read) + .try_into() + .with_context(|| format!("data: {:?}", read.len())) + .unwrap(); + let arg = u64::from_ne_bytes(read); + assert_eq!(arg % 2, 0, "got {arg} from probe"); + seen += 1; + } + }; + use futures::future::{ + select, + Either::{Left, Right}, + }; + let writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| { + tokio::spawn(async { + for value in v { + ring_buf_trigger_ebpf_program(value); + } + }) + })); + let readable = { + let mut writer = writer; + loop { + let readable = Box::pin(async_fd.readable()); + writer = match select(readable, writer).await { + Left((guard, writer)) => { + let mut guard = guard.unwrap(); + process_ring_buf(); + guard.clear_ready(); + writer + } + Right((writer, readable)) => { + writer.unwrap(); + break readable; + } + } + } + }; + + // If there's more to read, we should receive a readiness notification in a timely manner. + // If we don't then, then assert that there's nothing else to read. Note that it's important + // to wait some time before attempting to read, otherwise we may catch up with the producer + // before epoll has an opportunity to send a notification; our consumer thread can race + // with the kernel epoll check. + let sleep_fut = sleep(Duration::from_millis(10)); + tokio::pin!(sleep_fut); + match select(sleep_fut, readable).await { + Left(((), _)) => { + assert_matches!(ring_buf.next(), None); + } + Right((guard, _)) => { + process_ring_buf(); + guard.unwrap().clear_ready(); + } + } + + let max_dropped: u64 = u64::try_from( + data.len() + .checked_sub(RING_BUF_MAX_ENTRIES - 1) + .unwrap_or_default(), + ) + .unwrap(); + let max_seen = u64::try_from(data.iter().filter(|v| *v % 2 == 0).count()).unwrap(); + let max_rejected = u64::try_from(data.len()).unwrap() - max_seen; + let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum(); + let total = u64::try_from(data.len()).unwrap(); + let min_seen = max_seen.checked_sub(max_dropped).unwrap_or_default(); + let min_rejected = max_rejected.checked_sub(dropped).unwrap_or_default(); + let facts = format!( + "seen={seen}, rejected={rejected}, dropped={dropped}, total={total}, max_seen={max_seen}, \ + max_rejected={max_rejected}, max_dropped={max_dropped}", + ); + assert_eq!(seen + rejected + dropped, total, "{facts}",); + assert!( + (0u64..=max_dropped).contains(&dropped), + "dropped={dropped} not in 0..={max_dropped}; {facts}", + ); + assert!( + (min_rejected..=max_rejected).contains(&rejected), + "rejected={rejected} not in {min_rejected}..={max_rejected}; {facts}", + ); + assert!( + (min_seen..=max_seen).contains(&seen), + "seen={seen} not in {min_seen}..={max_seen}, rejected={rejected}; {facts}", + ); +} + +#[tokio::test(flavor = "multi_thread")] +async fn ring_buf_async_no_drop() { + let WithData( + RingBufTest { + ring_buf, + ref regs, + _bpf, + }, + ref data, + ) = WithData::new(); + + let writer = { + let data = data.to_owned(); + tokio::spawn(async move { + for value in data { + // Sleep a tad so we feel confident that the consumer will keep up + // and no messages will be dropped. + let dur = Duration::from_nanos(rand::thread_rng().gen_range(0..10)); + sleep(dur).await; + ring_buf_trigger_ebpf_program(value); + } + }) + }; + + // Construct an AsyncFd from the RingBuf in order to receive readiness notifications. + let mut async_fd = AsyncFd::new(ring_buf).unwrap(); + // Note that unlike in the synchronous case where all of the entries are written before any of + // them are read, in this case we expect all of the entries to make their way to userspace + // because entries are being consumed as they are produced. + let expected: Vec = data.iter().cloned().filter(|v| *v % 2 == 0).collect(); + let expected_len = expected.len(); + let reader = async move { + let mut seen = Vec::with_capacity(expected_len); + while seen.len() < expected_len { + let mut guard = async_fd.readable_mut().await.unwrap(); + let ring_buf = guard.get_inner_mut(); + while let Some(read) = ring_buf.next() { + let read: [u8; 8] = (*read) + .try_into() + .with_context(|| format!("data: {:?}", read.len())) + .unwrap(); + let arg = u64::from_ne_bytes(read); + seen.push(arg); + } + guard.clear_ready(); + } + (seen, async_fd.into_inner()) + }; + let (writer, (seen, mut ring_buf)) = futures::future::join(writer, reader).await; + writer.unwrap(); + + // Make sure that there is nothing else in the ring_buf. + assert_matches!(ring_buf.next(), None); + + // Ensure that the data that was read matches what was passed. + assert_eq!(&seen, &expected); + let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum(); + assert_eq!(dropped, 0); + assert_eq!(rejected, (data.len() - expected.len()).try_into().unwrap()); +} + +// This test reproduces a bug where the ring buffer would not be notified of new entries if the +// state was not properly synchronized between the producer and consumer. This would result in the +// consumer never being woken up and the test hanging. +#[test] +fn ring_buf_epoll_wakeup() { + let RingBufTest { + mut ring_buf, + _bpf, + regs: _, + } = RingBufTest::new(); + + let epoll_fd = epoll::create(false).unwrap(); + epoll::ctl( + epoll_fd, + epoll::ControlOptions::EPOLL_CTL_ADD, + ring_buf.as_raw_fd(), + // The use of EPOLLET is intentional. Without it, level-triggering would result in + // more notifications, and would mask the underlying bug this test reproduced when + // the synchronization logic in the RingBuf mirrored that of libbpf. Also, tokio's + // AsyncFd always uses this flag (as demonstrated in the subsequent test). + epoll::Event::new(epoll::Events::EPOLLIN | epoll::Events::EPOLLET, 0), + ) + .unwrap(); + let mut epoll_event_buf = [epoll::Event::new(epoll::Events::EPOLLIN, 0); 1]; + let mut total_events: u64 = 0; + let threads = WriterThreads::spawn(); + while total_events < WriterThreads::NUM_MESSAGES { + epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap(); + while let Some(read) = ring_buf.next() { + assert_eq!(read.len(), 8); + total_events += 1; + } + } + threads.join(); +} + +// This test is like the above test but uses tokio and AsyncFd instead of raw epoll. +#[tokio::test] +async fn ring_buf_asyncfd_events() { + let RingBufTest { + ring_buf, + regs: _, + _bpf, + } = RingBufTest::new(); + + let mut async_fd = AsyncFd::new(ring_buf).unwrap(); + let mut total_events = 0; + let threads = WriterThreads::spawn(); + while total_events < WriterThreads::NUM_MESSAGES { + let mut guard = async_fd.readable_mut().await.unwrap(); + let rb = guard.get_inner_mut(); + while let Some(read) = rb.next() { + assert_eq!(read.len(), 8); + total_events += 1; + } + guard.clear_ready(); + } + threads.join(); +} + +// WriterThreads is used by both the epoll and async fd test that +struct WriterThreads { + threads: Vec>, + done: std::sync::Arc, +} + +impl WriterThreads { + // This is enough messages for the test to fail relatively reliably in a hardware accelerated VM + // with 2 vCPUs when the bug was present. + const NUM_MESSAGES: u64 = 20_000; + + fn spawn() -> Self { + let done = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let threads: Vec<_> = (0..4/* num_threads */) + .map(|_: usize| { + let done = done.clone(); + thread::spawn(move || { + while !done.load(Ordering::Relaxed) { + ring_buf_trigger_ebpf_program(2); + } + }) + }) + .collect(); + Self { threads, done } + } + + fn join(self) { + let Self { threads, done } = self; + done.store(true, Ordering::Relaxed); + threads + .into_iter() + .map(|t| t.join()) + .for_each(|res| res.unwrap()); + } +} diff --git a/xtask/public-api/aya-bpf.txt b/xtask/public-api/aya-bpf.txt index e221d2cca..ba5ea3d8b 100644 --- a/xtask/public-api/aya-bpf.txt +++ b/xtask/public-api/aya-bpf.txt @@ -448,6 +448,65 @@ impl core::borrow::BorrowMut for aya_bpf::maps::queue::Queue where T: c pub fn aya_bpf::maps::queue::Queue::borrow_mut(&mut self) -> &mut T impl core::convert::From for aya_bpf::maps::queue::Queue pub fn aya_bpf::maps::queue::Queue::from(t: T) -> T +pub mod aya_bpf::maps::ring_buf +#[repr(transparent)] pub struct aya_bpf::maps::ring_buf::RingBuf +impl aya_bpf::maps::ring_buf::RingBuf +pub fn aya_bpf::maps::ring_buf::RingBuf::output(&self, data: &T, flags: u64) -> core::result::Result<(), i64> +pub const fn aya_bpf::maps::ring_buf::RingBuf::pinned(byte_size: u32, flags: u32) -> Self +pub fn aya_bpf::maps::ring_buf::RingBuf::query(&self, flags: u64) -> u64 +pub fn aya_bpf::maps::ring_buf::RingBuf::reserve(&self, flags: u64) -> core::option::Option> where const_assert::Assert<{ _ }>: const_assert::IsTrue +pub const fn aya_bpf::maps::ring_buf::RingBuf::with_byte_size(byte_size: u32, flags: u32) -> Self +impl core::marker::Sync for aya_bpf::maps::ring_buf::RingBuf +impl core::marker::Send for aya_bpf::maps::ring_buf::RingBuf +impl core::marker::Unpin for aya_bpf::maps::ring_buf::RingBuf +impl !core::panic::unwind_safe::RefUnwindSafe for aya_bpf::maps::ring_buf::RingBuf +impl core::panic::unwind_safe::UnwindSafe for aya_bpf::maps::ring_buf::RingBuf +impl core::convert::Into for aya_bpf::maps::ring_buf::RingBuf where U: core::convert::From +pub fn aya_bpf::maps::ring_buf::RingBuf::into(self) -> U +impl core::convert::TryFrom for aya_bpf::maps::ring_buf::RingBuf where U: core::convert::Into +pub type aya_bpf::maps::ring_buf::RingBuf::Error = core::convert::Infallible +pub fn aya_bpf::maps::ring_buf::RingBuf::try_from(value: U) -> core::result::Result>::Error> +impl core::convert::TryInto for aya_bpf::maps::ring_buf::RingBuf where U: core::convert::TryFrom +pub type aya_bpf::maps::ring_buf::RingBuf::Error = >::Error +pub fn aya_bpf::maps::ring_buf::RingBuf::try_into(self) -> core::result::Result>::Error> +impl core::any::Any for aya_bpf::maps::ring_buf::RingBuf where T: 'static + core::marker::Sized +pub fn aya_bpf::maps::ring_buf::RingBuf::type_id(&self) -> core::any::TypeId +impl core::borrow::Borrow for aya_bpf::maps::ring_buf::RingBuf where T: core::marker::Sized +pub fn aya_bpf::maps::ring_buf::RingBuf::borrow(&self) -> &T +impl core::borrow::BorrowMut for aya_bpf::maps::ring_buf::RingBuf where T: core::marker::Sized +pub fn aya_bpf::maps::ring_buf::RingBuf::borrow_mut(&mut self) -> &mut T +impl core::convert::From for aya_bpf::maps::ring_buf::RingBuf +pub fn aya_bpf::maps::ring_buf::RingBuf::from(t: T) -> T +pub struct aya_bpf::maps::ring_buf::RingBufEntry(_) +impl aya_bpf::maps::ring_buf::RingBufEntry +pub fn aya_bpf::maps::ring_buf::RingBufEntry::discard(self, flags: u64) +pub fn aya_bpf::maps::ring_buf::RingBufEntry::submit(self, flags: u64) +impl core::ops::deref::Deref for aya_bpf::maps::ring_buf::RingBufEntry +pub type aya_bpf::maps::ring_buf::RingBufEntry::Target = core::mem::maybe_uninit::MaybeUninit +pub fn aya_bpf::maps::ring_buf::RingBufEntry::deref(&self) -> &Self::Target +impl core::ops::deref::DerefMut for aya_bpf::maps::ring_buf::RingBufEntry +pub fn aya_bpf::maps::ring_buf::RingBufEntry::deref_mut(&mut self) -> &mut Self::Target +impl core::marker::Send for aya_bpf::maps::ring_buf::RingBufEntry where T: core::marker::Send +impl core::marker::Sync for aya_bpf::maps::ring_buf::RingBufEntry where T: core::marker::Sync +impl core::marker::Unpin for aya_bpf::maps::ring_buf::RingBufEntry +impl core::panic::unwind_safe::RefUnwindSafe for aya_bpf::maps::ring_buf::RingBufEntry where T: core::panic::unwind_safe::RefUnwindSafe +impl !core::panic::unwind_safe::UnwindSafe for aya_bpf::maps::ring_buf::RingBufEntry +impl core::convert::Into for aya_bpf::maps::ring_buf::RingBufEntry where U: core::convert::From +pub fn aya_bpf::maps::ring_buf::RingBufEntry::into(self) -> U +impl core::convert::TryFrom for aya_bpf::maps::ring_buf::RingBufEntry where U: core::convert::Into +pub type aya_bpf::maps::ring_buf::RingBufEntry::Error = core::convert::Infallible +pub fn aya_bpf::maps::ring_buf::RingBufEntry::try_from(value: U) -> core::result::Result>::Error> +impl core::convert::TryInto for aya_bpf::maps::ring_buf::RingBufEntry where U: core::convert::TryFrom +pub type aya_bpf::maps::ring_buf::RingBufEntry::Error = >::Error +pub fn aya_bpf::maps::ring_buf::RingBufEntry::try_into(self) -> core::result::Result>::Error> +impl core::any::Any for aya_bpf::maps::ring_buf::RingBufEntry where T: 'static + core::marker::Sized +pub fn aya_bpf::maps::ring_buf::RingBufEntry::type_id(&self) -> core::any::TypeId +impl core::borrow::Borrow for aya_bpf::maps::ring_buf::RingBufEntry where T: core::marker::Sized +pub fn aya_bpf::maps::ring_buf::RingBufEntry::borrow(&self) -> &T +impl core::borrow::BorrowMut for aya_bpf::maps::ring_buf::RingBufEntry where T: core::marker::Sized +pub fn aya_bpf::maps::ring_buf::RingBufEntry::borrow_mut(&mut self) -> &mut T +impl core::convert::From for aya_bpf::maps::ring_buf::RingBufEntry +pub fn aya_bpf::maps::ring_buf::RingBufEntry::from(t: T) -> T pub mod aya_bpf::maps::sock_hash #[repr(transparent)] pub struct aya_bpf::maps::sock_hash::SockHash impl aya_bpf::maps::sock_hash::SockHash @@ -1091,6 +1150,34 @@ impl core::borrow::BorrowMut for aya_bpf::maps::queue::Queue where T: c pub fn aya_bpf::maps::queue::Queue::borrow_mut(&mut self) -> &mut T impl core::convert::From for aya_bpf::maps::queue::Queue pub fn aya_bpf::maps::queue::Queue::from(t: T) -> T +#[repr(transparent)] pub struct aya_bpf::maps::RingBuf +impl aya_bpf::maps::ring_buf::RingBuf +pub fn aya_bpf::maps::ring_buf::RingBuf::output(&self, data: &T, flags: u64) -> core::result::Result<(), i64> +pub const fn aya_bpf::maps::ring_buf::RingBuf::pinned(byte_size: u32, flags: u32) -> Self +pub fn aya_bpf::maps::ring_buf::RingBuf::query(&self, flags: u64) -> u64 +pub fn aya_bpf::maps::ring_buf::RingBuf::reserve(&self, flags: u64) -> core::option::Option> where const_assert::Assert<{ _ }>: const_assert::IsTrue +pub const fn aya_bpf::maps::ring_buf::RingBuf::with_byte_size(byte_size: u32, flags: u32) -> Self +impl core::marker::Sync for aya_bpf::maps::ring_buf::RingBuf +impl core::marker::Send for aya_bpf::maps::ring_buf::RingBuf +impl core::marker::Unpin for aya_bpf::maps::ring_buf::RingBuf +impl !core::panic::unwind_safe::RefUnwindSafe for aya_bpf::maps::ring_buf::RingBuf +impl core::panic::unwind_safe::UnwindSafe for aya_bpf::maps::ring_buf::RingBuf +impl core::convert::Into for aya_bpf::maps::ring_buf::RingBuf where U: core::convert::From +pub fn aya_bpf::maps::ring_buf::RingBuf::into(self) -> U +impl core::convert::TryFrom for aya_bpf::maps::ring_buf::RingBuf where U: core::convert::Into +pub type aya_bpf::maps::ring_buf::RingBuf::Error = core::convert::Infallible +pub fn aya_bpf::maps::ring_buf::RingBuf::try_from(value: U) -> core::result::Result>::Error> +impl core::convert::TryInto for aya_bpf::maps::ring_buf::RingBuf where U: core::convert::TryFrom +pub type aya_bpf::maps::ring_buf::RingBuf::Error = >::Error +pub fn aya_bpf::maps::ring_buf::RingBuf::try_into(self) -> core::result::Result>::Error> +impl core::any::Any for aya_bpf::maps::ring_buf::RingBuf where T: 'static + core::marker::Sized +pub fn aya_bpf::maps::ring_buf::RingBuf::type_id(&self) -> core::any::TypeId +impl core::borrow::Borrow for aya_bpf::maps::ring_buf::RingBuf where T: core::marker::Sized +pub fn aya_bpf::maps::ring_buf::RingBuf::borrow(&self) -> &T +impl core::borrow::BorrowMut for aya_bpf::maps::ring_buf::RingBuf where T: core::marker::Sized +pub fn aya_bpf::maps::ring_buf::RingBuf::borrow_mut(&mut self) -> &mut T +impl core::convert::From for aya_bpf::maps::ring_buf::RingBuf +pub fn aya_bpf::maps::ring_buf::RingBuf::from(t: T) -> T #[repr(transparent)] pub struct aya_bpf::maps::SockHash impl aya_bpf::maps::sock_hash::SockHash pub const fn aya_bpf::maps::sock_hash::SockHash::pinned(max_entries: u32, flags: u32) -> aya_bpf::maps::sock_hash::SockHash diff --git a/xtask/public-api/aya.txt b/xtask/public-api/aya.txt index b2915315e..0d6b0f779 100644 --- a/xtask/public-api/aya.txt +++ b/xtask/public-api/aya.txt @@ -556,6 +556,71 @@ impl core::borrow::BorrowMut for aya::maps::queue::Queue where T: co pub fn aya::maps::queue::Queue::borrow_mut(&mut self) -> &mut T impl core::convert::From for aya::maps::queue::Queue pub fn aya::maps::queue::Queue::from(t: T) -> T +pub mod aya::maps::ring_buf +pub struct aya::maps::ring_buf::RingBuf +impl aya::maps::ring_buf::RingBuf +pub fn aya::maps::ring_buf::RingBuf::next(&mut self) -> core::option::Option> +impl core::convert::TryFrom for aya::maps::ring_buf::RingBuf +pub type aya::maps::ring_buf::RingBuf::Error = aya::maps::MapError +pub fn aya::maps::ring_buf::RingBuf::try_from(map: aya::maps::Map) -> core::result::Result +impl<'a> core::convert::TryFrom<&'a aya::maps::Map> for aya::maps::ring_buf::RingBuf<&'a aya::maps::MapData> +pub type aya::maps::ring_buf::RingBuf<&'a aya::maps::MapData>::Error = aya::maps::MapError +pub fn aya::maps::ring_buf::RingBuf<&'a aya::maps::MapData>::try_from(map: &'a aya::maps::Map) -> core::result::Result +impl<'a> core::convert::TryFrom<&'a mut aya::maps::Map> for aya::maps::ring_buf::RingBuf<&'a mut aya::maps::MapData> +pub type aya::maps::ring_buf::RingBuf<&'a mut aya::maps::MapData>::Error = aya::maps::MapError +pub fn aya::maps::ring_buf::RingBuf<&'a mut aya::maps::MapData>::try_from(map: &'a mut aya::maps::Map) -> core::result::Result +impl> std::os::fd::raw::AsRawFd for aya::maps::ring_buf::RingBuf +pub fn aya::maps::ring_buf::RingBuf::as_raw_fd(&self) -> std::os::fd::raw::RawFd +impl !core::marker::Send for aya::maps::ring_buf::RingBuf +impl !core::marker::Sync for aya::maps::ring_buf::RingBuf +impl core::marker::Unpin for aya::maps::ring_buf::RingBuf where T: core::marker::Unpin +impl core::panic::unwind_safe::RefUnwindSafe for aya::maps::ring_buf::RingBuf where T: core::panic::unwind_safe::RefUnwindSafe +impl core::panic::unwind_safe::UnwindSafe for aya::maps::ring_buf::RingBuf where T: core::panic::unwind_safe::UnwindSafe +impl core::convert::Into for aya::maps::ring_buf::RingBuf where U: core::convert::From +pub fn aya::maps::ring_buf::RingBuf::into(self) -> U +impl core::convert::TryFrom for aya::maps::ring_buf::RingBuf where U: core::convert::Into +pub type aya::maps::ring_buf::RingBuf::Error = core::convert::Infallible +pub fn aya::maps::ring_buf::RingBuf::try_from(value: U) -> core::result::Result>::Error> +impl core::convert::TryInto for aya::maps::ring_buf::RingBuf where U: core::convert::TryFrom +pub type aya::maps::ring_buf::RingBuf::Error = >::Error +pub fn aya::maps::ring_buf::RingBuf::try_into(self) -> core::result::Result>::Error> +impl core::any::Any for aya::maps::ring_buf::RingBuf where T: 'static + core::marker::Sized +pub fn aya::maps::ring_buf::RingBuf::type_id(&self) -> core::any::TypeId +impl core::borrow::Borrow for aya::maps::ring_buf::RingBuf where T: core::marker::Sized +pub fn aya::maps::ring_buf::RingBuf::borrow(&self) -> &T +impl core::borrow::BorrowMut for aya::maps::ring_buf::RingBuf where T: core::marker::Sized +pub fn aya::maps::ring_buf::RingBuf::borrow_mut(&mut self) -> &mut T +impl core::convert::From for aya::maps::ring_buf::RingBuf +pub fn aya::maps::ring_buf::RingBuf::from(t: T) -> T +pub struct aya::maps::ring_buf::RingBufItem<'a> +impl core::fmt::Debug for aya::maps::ring_buf::RingBufItem<'_> +pub fn aya::maps::ring_buf::RingBufItem<'_>::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::ops::deref::Deref for aya::maps::ring_buf::RingBufItem<'_> +pub type aya::maps::ring_buf::RingBufItem<'_>::Target = [u8] +pub fn aya::maps::ring_buf::RingBufItem<'_>::deref(&self) -> &Self::Target +impl core::ops::drop::Drop for aya::maps::ring_buf::RingBufItem<'_> +pub fn aya::maps::ring_buf::RingBufItem<'_>::drop(&mut self) +impl<'a> !core::marker::Send for aya::maps::ring_buf::RingBufItem<'a> +impl<'a> !core::marker::Sync for aya::maps::ring_buf::RingBufItem<'a> +impl<'a> core::marker::Unpin for aya::maps::ring_buf::RingBufItem<'a> +impl<'a> core::panic::unwind_safe::RefUnwindSafe for aya::maps::ring_buf::RingBufItem<'a> +impl<'a> !core::panic::unwind_safe::UnwindSafe for aya::maps::ring_buf::RingBufItem<'a> +impl core::convert::Into for aya::maps::ring_buf::RingBufItem<'a> where U: core::convert::From +pub fn aya::maps::ring_buf::RingBufItem<'a>::into(self) -> U +impl core::convert::TryFrom for aya::maps::ring_buf::RingBufItem<'a> where U: core::convert::Into +pub type aya::maps::ring_buf::RingBufItem<'a>::Error = core::convert::Infallible +pub fn aya::maps::ring_buf::RingBufItem<'a>::try_from(value: U) -> core::result::Result>::Error> +impl core::convert::TryInto for aya::maps::ring_buf::RingBufItem<'a> where U: core::convert::TryFrom +pub type aya::maps::ring_buf::RingBufItem<'a>::Error = >::Error +pub fn aya::maps::ring_buf::RingBufItem<'a>::try_into(self) -> core::result::Result>::Error> +impl core::any::Any for aya::maps::ring_buf::RingBufItem<'a> where T: 'static + core::marker::Sized +pub fn aya::maps::ring_buf::RingBufItem<'a>::type_id(&self) -> core::any::TypeId +impl core::borrow::Borrow for aya::maps::ring_buf::RingBufItem<'a> where T: core::marker::Sized +pub fn aya::maps::ring_buf::RingBufItem<'a>::borrow(&self) -> &T +impl core::borrow::BorrowMut for aya::maps::ring_buf::RingBufItem<'a> where T: core::marker::Sized +pub fn aya::maps::ring_buf::RingBufItem<'a>::borrow_mut(&mut self) -> &mut T +impl core::convert::From for aya::maps::ring_buf::RingBufItem<'a> +pub fn aya::maps::ring_buf::RingBufItem<'a>::from(t: T) -> T pub mod aya::maps::sock pub struct aya::maps::sock::SockHash impl, K: aya::Pod> aya::maps::SockHash @@ -992,6 +1057,7 @@ pub aya::maps::Map::PerCpuLruHashMap(aya::maps::MapData) pub aya::maps::Map::PerfEventArray(aya::maps::MapData) pub aya::maps::Map::ProgramArray(aya::maps::MapData) pub aya::maps::Map::Queue(aya::maps::MapData) +pub aya::maps::Map::RingBuf(aya::maps::MapData) pub aya::maps::Map::SockHash(aya::maps::MapData) pub aya::maps::Map::SockMap(aya::maps::MapData) pub aya::maps::Map::Stack(aya::maps::MapData) @@ -1022,6 +1088,9 @@ pub fn aya::maps::perf::AsyncPerfEventArray::try_from(map: a impl core::convert::TryFrom for aya::maps::perf::PerfEventArray pub type aya::maps::perf::PerfEventArray::Error = aya::maps::MapError pub fn aya::maps::perf::PerfEventArray::try_from(map: aya::maps::Map) -> core::result::Result +impl core::convert::TryFrom for aya::maps::ring_buf::RingBuf +pub type aya::maps::ring_buf::RingBuf::Error = aya::maps::MapError +pub fn aya::maps::ring_buf::RingBuf::try_from(map: aya::maps::Map) -> core::result::Result impl core::convert::TryFrom for aya::maps::stack_trace::StackTraceMap pub type aya::maps::stack_trace::StackTraceMap::Error = aya::maps::MapError pub fn aya::maps::stack_trace::StackTraceMap::try_from(map: aya::maps::Map) -> core::result::Result @@ -1103,6 +1172,9 @@ pub fn aya::maps::perf::AsyncPerfEventArray<&'a aya::maps::MapData>::try_from(ma impl<'a> core::convert::TryFrom<&'a aya::maps::Map> for aya::maps::perf::PerfEventArray<&'a aya::maps::MapData> pub type aya::maps::perf::PerfEventArray<&'a aya::maps::MapData>::Error = aya::maps::MapError pub fn aya::maps::perf::PerfEventArray<&'a aya::maps::MapData>::try_from(map: &'a aya::maps::Map) -> core::result::Result +impl<'a> core::convert::TryFrom<&'a aya::maps::Map> for aya::maps::ring_buf::RingBuf<&'a aya::maps::MapData> +pub type aya::maps::ring_buf::RingBuf<&'a aya::maps::MapData>::Error = aya::maps::MapError +pub fn aya::maps::ring_buf::RingBuf<&'a aya::maps::MapData>::try_from(map: &'a aya::maps::Map) -> core::result::Result impl<'a> core::convert::TryFrom<&'a aya::maps::Map> for aya::maps::stack_trace::StackTraceMap<&'a aya::maps::MapData> pub type aya::maps::stack_trace::StackTraceMap<&'a aya::maps::MapData>::Error = aya::maps::MapError pub fn aya::maps::stack_trace::StackTraceMap<&'a aya::maps::MapData>::try_from(map: &'a aya::maps::Map) -> core::result::Result @@ -1130,6 +1202,9 @@ pub fn aya::maps::perf::AsyncPerfEventArray<&'a mut aya::maps::MapData>::try_fro impl<'a> core::convert::TryFrom<&'a mut aya::maps::Map> for aya::maps::perf::PerfEventArray<&'a mut aya::maps::MapData> pub type aya::maps::perf::PerfEventArray<&'a mut aya::maps::MapData>::Error = aya::maps::MapError pub fn aya::maps::perf::PerfEventArray<&'a mut aya::maps::MapData>::try_from(map: &'a mut aya::maps::Map) -> core::result::Result +impl<'a> core::convert::TryFrom<&'a mut aya::maps::Map> for aya::maps::ring_buf::RingBuf<&'a mut aya::maps::MapData> +pub type aya::maps::ring_buf::RingBuf<&'a mut aya::maps::MapData>::Error = aya::maps::MapError +pub fn aya::maps::ring_buf::RingBuf<&'a mut aya::maps::MapData>::try_from(map: &'a mut aya::maps::Map) -> core::result::Result impl<'a> core::convert::TryFrom<&'a mut aya::maps::Map> for aya::maps::stack_trace::StackTraceMap<&'a mut aya::maps::MapData> pub type aya::maps::stack_trace::StackTraceMap<&'a mut aya::maps::MapData>::Error = aya::maps::MapError pub fn aya::maps::stack_trace::StackTraceMap<&'a mut aya::maps::MapData>::try_from(map: &'a mut aya::maps::Map) -> core::result::Result @@ -1892,6 +1967,41 @@ impl core::borrow::BorrowMut for aya::maps::queue::Queue where T: co pub fn aya::maps::queue::Queue::borrow_mut(&mut self) -> &mut T impl core::convert::From for aya::maps::queue::Queue pub fn aya::maps::queue::Queue::from(t: T) -> T +pub struct aya::maps::RingBuf +impl aya::maps::ring_buf::RingBuf +pub fn aya::maps::ring_buf::RingBuf::next(&mut self) -> core::option::Option> +impl core::convert::TryFrom for aya::maps::ring_buf::RingBuf +pub type aya::maps::ring_buf::RingBuf::Error = aya::maps::MapError +pub fn aya::maps::ring_buf::RingBuf::try_from(map: aya::maps::Map) -> core::result::Result +impl<'a> core::convert::TryFrom<&'a aya::maps::Map> for aya::maps::ring_buf::RingBuf<&'a aya::maps::MapData> +pub type aya::maps::ring_buf::RingBuf<&'a aya::maps::MapData>::Error = aya::maps::MapError +pub fn aya::maps::ring_buf::RingBuf<&'a aya::maps::MapData>::try_from(map: &'a aya::maps::Map) -> core::result::Result +impl<'a> core::convert::TryFrom<&'a mut aya::maps::Map> for aya::maps::ring_buf::RingBuf<&'a mut aya::maps::MapData> +pub type aya::maps::ring_buf::RingBuf<&'a mut aya::maps::MapData>::Error = aya::maps::MapError +pub fn aya::maps::ring_buf::RingBuf<&'a mut aya::maps::MapData>::try_from(map: &'a mut aya::maps::Map) -> core::result::Result +impl> std::os::fd::raw::AsRawFd for aya::maps::ring_buf::RingBuf +pub fn aya::maps::ring_buf::RingBuf::as_raw_fd(&self) -> std::os::fd::raw::RawFd +impl !core::marker::Send for aya::maps::ring_buf::RingBuf +impl !core::marker::Sync for aya::maps::ring_buf::RingBuf +impl core::marker::Unpin for aya::maps::ring_buf::RingBuf where T: core::marker::Unpin +impl core::panic::unwind_safe::RefUnwindSafe for aya::maps::ring_buf::RingBuf where T: core::panic::unwind_safe::RefUnwindSafe +impl core::panic::unwind_safe::UnwindSafe for aya::maps::ring_buf::RingBuf where T: core::panic::unwind_safe::UnwindSafe +impl core::convert::Into for aya::maps::ring_buf::RingBuf where U: core::convert::From +pub fn aya::maps::ring_buf::RingBuf::into(self) -> U +impl core::convert::TryFrom for aya::maps::ring_buf::RingBuf where U: core::convert::Into +pub type aya::maps::ring_buf::RingBuf::Error = core::convert::Infallible +pub fn aya::maps::ring_buf::RingBuf::try_from(value: U) -> core::result::Result>::Error> +impl core::convert::TryInto for aya::maps::ring_buf::RingBuf where U: core::convert::TryFrom +pub type aya::maps::ring_buf::RingBuf::Error = >::Error +pub fn aya::maps::ring_buf::RingBuf::try_into(self) -> core::result::Result>::Error> +impl core::any::Any for aya::maps::ring_buf::RingBuf where T: 'static + core::marker::Sized +pub fn aya::maps::ring_buf::RingBuf::type_id(&self) -> core::any::TypeId +impl core::borrow::Borrow for aya::maps::ring_buf::RingBuf where T: core::marker::Sized +pub fn aya::maps::ring_buf::RingBuf::borrow(&self) -> &T +impl core::borrow::BorrowMut for aya::maps::ring_buf::RingBuf where T: core::marker::Sized +pub fn aya::maps::ring_buf::RingBuf::borrow_mut(&mut self) -> &mut T +impl core::convert::From for aya::maps::ring_buf::RingBuf +pub fn aya::maps::ring_buf::RingBuf::from(t: T) -> T pub struct aya::maps::SockHash impl, K: aya::Pod> aya::maps::SockHash pub fn aya::maps::SockHash::fd(&self) -> &aya::maps::sock::SockMapFd