Skip to content

Commit

Permalink
Require Send for T in Queue instead of Copy (#75)
Browse files Browse the repository at this point in the history
By requiring items to be `Copy` we reduce the queue to only work with
primitives. What is more appropriate here is to require that the item
is `Send` and use move semantics instead of copy.
  • Loading branch information
eivindbergem authored Oct 16, 2024
1 parent 953b580 commit 21b753c
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 58 deletions.
55 changes: 31 additions & 24 deletions freertos-rust/src/patterns/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ pub trait ReplyableMessage {
#[derive(Copy, Clone)]
pub struct InputMessage<I>
where
I: Copy,
I: Copy + Send,
{
val: I,
reply_to_client_id: Option<usize>,
}

impl<I> InputMessage<I>
where
I: Copy,
I: Copy + Send,
{
pub fn request(val: I) -> Self {
InputMessage {
Expand All @@ -46,7 +46,7 @@ where

impl<I> ReplyableMessage for InputMessage<I>
where
I: Copy,
I: Copy + Send,
{
fn reply_to_client_id(&self) -> Option<usize> {
self.reply_to_client_id
Expand All @@ -55,17 +55,17 @@ where

pub struct Processor<I, O>
where
I: ReplyableMessage + Copy,
O: Copy,
I: ReplyableMessage + Copy + Send,
O: Copy + Send,
{
queue: Arc<Queue<I>>,
inner: Arc<Mutex<ProcessorInner<O>>>,
}

impl<I, O> Processor<I, O>
where
I: ReplyableMessage + Copy,
O: Copy,
I: ReplyableMessage + Copy + Send,
O: Copy + Send,
{
pub fn new(queue_size: usize) -> Result<Self, FreeRtosError> {
let p = ProcessorInner {
Expand Down Expand Up @@ -141,7 +141,10 @@ where
.flat_map(|ref x| x.1.upgrade().into_iter())
.find(|x| x.id == client_id)
{
client.receive_queue.send(reply, max_wait)?;
client
.receive_queue
.send(reply, max_wait)
.map_err(|err| err.error())?;
return Ok(true);
}
}
Expand All @@ -152,8 +155,8 @@ where

impl<I, O> Processor<InputMessage<I>, O>
where
I: Copy,
O: Copy,
I: Copy + Send,
O: Copy + Send,
{
pub fn reply_val<D: DurationTicks>(
&self,
Expand All @@ -167,15 +170,15 @@ where

struct ProcessorInner<O>
where
O: Copy,
O: Copy + Send,
{
clients: Vec<(usize, Weak<ClientWithReplyQueue<O>>)>,
next_client_id: usize,
}

impl<O> ProcessorInner<O>
where
O: Copy,
O: Copy + Send,
{
fn remove_client_reply(&mut self, client: &ClientWithReplyQueue<O>) {
self.clients.retain(|ref x| x.0 != client.id)
Expand All @@ -184,22 +187,24 @@ where

pub struct ProcessorClient<I, C>
where
I: ReplyableMessage + Copy,
I: ReplyableMessage + Copy + Send,
{
processor_queue: Weak<Queue<I>>,
client_reply: C,
}

impl<I, O> ProcessorClient<I, O>
where
I: ReplyableMessage + Copy,
I: ReplyableMessage + Copy + Send,
{
pub fn send<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<(), FreeRtosError> {
let processor_queue = self
.processor_queue
.upgrade()
.ok_or(FreeRtosError::ProcessorHasShutDown)?;
processor_queue.send(message, max_wait)?;
processor_queue
.send(message, max_wait)
.map_err(|err| err.error())?;
Ok(())
}

Expand All @@ -212,13 +217,15 @@ where
.processor_queue
.upgrade()
.ok_or(FreeRtosError::ProcessorHasShutDown)?;
processor_queue.send_from_isr(context, message)
processor_queue
.send_from_isr(context, message)
.map_err(|err| err.error())
}
}

impl<I> ProcessorClient<InputMessage<I>, ()>
where
I: Copy,
I: Copy + Send,
{
pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
self.send(InputMessage::request(val), max_wait)
Expand All @@ -235,8 +242,8 @@ where

impl<I, O> ProcessorClient<I, SharedClientWithReplyQueue<O>>
where
I: ReplyableMessage + Copy,
O: Copy,
I: ReplyableMessage + Copy + Send,
O: Copy + Send,
{
pub fn call<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<O, FreeRtosError> {
self.send(message, max_wait)?;
Expand All @@ -250,8 +257,8 @@ where

impl<I, O> ProcessorClient<InputMessage<I>, SharedClientWithReplyQueue<O>>
where
I: Copy,
O: Copy,
I: Copy + Send,
O: Copy + Send,
{
pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
self.send(InputMessage::request(val), max_wait)
Expand All @@ -268,7 +275,7 @@ where

impl<I, C> Clone for ProcessorClient<I, C>
where
I: ReplyableMessage + Copy,
I: ReplyableMessage + Copy + Send,
C: Clone,
{
fn clone(&self) -> Self {
Expand All @@ -281,7 +288,7 @@ where

pub struct ClientWithReplyQueue<O>
where
O: Copy,
O: Copy + Send,
{
id: usize,
processor_inner: Arc<Mutex<ProcessorInner<O>>>,
Expand All @@ -290,7 +297,7 @@ where

impl<O> Drop for ClientWithReplyQueue<O>
where
O: Copy,
O: Copy + Send,
{
fn drop(&mut self) {
if let Ok(mut p) = self.processor_inner.lock(Duration::ms(1000)) {
Expand Down
18 changes: 9 additions & 9 deletions freertos-rust/src/patterns/pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use crate::queue::*;
use crate::units::*;

/// A pub-sub queue. An item sent to the publisher is sent to every subscriber.
pub struct QueuePublisher<T: Sized + Copy> {
pub struct QueuePublisher<T: Sized + Copy + Send> {
inner: Arc<Mutex<PublisherInner<T>>>,
}

/// A subscribtion to the publisher.
pub struct QueueSubscriber<T: Sized + Copy> {
pub struct QueueSubscriber<T: Sized + Copy + Send> {
inner: Arc<SubscriberInner<T>>,
}

impl<T: Sized + Copy> QueuePublisher<T> {
impl<T: Sized + Copy + Send> QueuePublisher<T> {
/// Create a new publisher
pub fn new() -> Result<QueuePublisher<T>, FreeRtosError> {
let inner = PublisherInner {
Expand Down Expand Up @@ -69,41 +69,41 @@ impl<T: Sized + Copy> QueuePublisher<T> {
}
}

impl<T: Sized + Copy> Clone for QueuePublisher<T> {
impl<T: Sized + Copy + Send> Clone for QueuePublisher<T> {
fn clone(&self) -> Self {
QueuePublisher {
inner: self.inner.clone(),
}
}
}

impl<T: Sized + Copy> Drop for QueueSubscriber<T> {
impl<T: Sized + Copy + Send> Drop for QueueSubscriber<T> {
fn drop(&mut self) {
if let Ok(mut l) = self.inner.publisher.lock(Duration::infinite()) {
l.unsubscribe(&self.inner);
}
}
}

impl<T: Sized + Copy> QueueSubscriber<T> {
impl<T: Sized + Copy + Send> QueueSubscriber<T> {
/// Wait for an item to be posted from the publisher.
pub fn receive<D: DurationTicks>(&self, max_wait: D) -> Result<T, FreeRtosError> {
self.inner.queue.receive(max_wait)
}
}

struct PublisherInner<T: Sized + Copy> {
struct PublisherInner<T: Sized + Copy + Send> {
subscribers: Vec<Arc<SubscriberInner<T>>>,
queue_next_id: usize,
}

impl<T: Sized + Copy> PublisherInner<T> {
impl<T: Sized + Copy + Send> PublisherInner<T> {
fn unsubscribe(&mut self, subscriber: &SubscriberInner<T>) {
self.subscribers.retain(|ref x| x.id != subscriber.id);
}
}

struct SubscriberInner<T: Sized + Copy> {
struct SubscriberInner<T: Sized + Copy + Send> {
id: usize,
queue: Queue<T>,
publisher: Arc<Mutex<PublisherInner<T>>>,
Expand Down
72 changes: 47 additions & 25 deletions freertos-rust/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
use mem::ManuallyDrop;
use mem::MaybeUninit;

use crate::base::*;
use crate::isr::*;
use crate::prelude::v1::*;
use crate::shim::*;
use crate::units::*;

unsafe impl<T: Sized + Copy> Send for Queue<T> {}
unsafe impl<T: Sized + Copy> Sync for Queue<T> {}
unsafe impl<T: Sized + Send> Send for Queue<T> {}
unsafe impl<T: Sized + Send> Sync for Queue<T> {}

#[derive(Debug)]
pub struct SendError<T> {
err: FreeRtosError,
item: T,
}

impl<T> SendError<T> {
pub fn error(&self) -> FreeRtosError {
self.err
}

pub fn into_item(self) -> T {
self.item
}
}

/// A queue with a finite size. The items are owned by the queue and are
/// copied.
/// A queue with a finite size.
#[derive(Debug)]
pub struct Queue<T: Sized + Copy> {
pub struct Queue<T: Sized + Send> {
queue: FreeRtosQueueHandle,
item_type: PhantomData<T>,
}

impl<T: Sized + Copy> Queue<T> {
impl<T: Sized + Send> Queue<T> {
pub fn new(max_size: usize) -> Result<Queue<T>, FreeRtosError> {
let item_size = mem::size_of::<T>();

Expand Down Expand Up @@ -49,15 +67,16 @@ impl<T: Sized + Copy> Queue<T> {
}

/// Send an item to the end of the queue. Wait for the queue to have empty space for it.
pub fn send<D: DurationTicks>(&self, item: T, max_wait: D) -> Result<(), FreeRtosError> {
pub fn send<D: DurationTicks>(&self, item: T, max_wait: D) -> Result<(), SendError<T>> {
let item = ManuallyDrop::new(item);
let ptr = &item as *const _ as FreeRtosVoidPtr;

unsafe {
if freertos_rs_queue_send(
self.queue,
&item as *const _ as FreeRtosVoidPtr,
max_wait.to_ticks(),
) != 0
{
Err(FreeRtosError::QueueSendTimeout)
if freertos_rs_queue_send(self.queue, ptr, max_wait.to_ticks()) != 0 {
Err(SendError {
err: FreeRtosError::QueueSendTimeout,
item: ManuallyDrop::into_inner(item),
})
} else {
Ok(())
}
Expand All @@ -69,15 +88,16 @@ impl<T: Sized + Copy> Queue<T> {
&self,
context: &mut InterruptContext,
item: T,
) -> Result<(), FreeRtosError> {
) -> Result<(), SendError<T>> {
let item = ManuallyDrop::new(item);
let ptr = &item as *const _ as FreeRtosVoidPtr;

unsafe {
if freertos_rs_queue_send_isr(
self.queue,
&item as *const _ as FreeRtosVoidPtr,
context.get_task_field_mut(),
) != 0
{
Err(FreeRtosError::QueueFull)
if freertos_rs_queue_send_isr(self.queue, ptr, context.get_task_field_mut()) != 0 {
Err(SendError {
err: FreeRtosError::QueueFull,
item: ManuallyDrop::into_inner(item),
})
} else {
Ok(())
}
Expand All @@ -87,14 +107,16 @@ impl<T: Sized + Copy> Queue<T> {
/// Wait for an item to be available on the queue.
pub fn receive<D: DurationTicks>(&self, max_wait: D) -> Result<T, FreeRtosError> {
unsafe {
let mut buff = mem::zeroed::<T>();
// Use `MaybeUninit` to avoid calling drop on
// uninitialized struct in case of timeout
let mut buff = MaybeUninit::uninit();
let r = freertos_rs_queue_receive(
self.queue,
&mut buff as *mut _ as FreeRtosMutVoidPtr,
max_wait.to_ticks(),
);
if r == 0 {
return Ok(buff);
return Ok(buff.assume_init());
} else {
return Err(FreeRtosError::QueueReceiveTimeout);
}
Expand All @@ -107,7 +129,7 @@ impl<T: Sized + Copy> Queue<T> {
}
}

impl<T: Sized + Copy> Drop for Queue<T> {
impl<T: Sized + Send> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
freertos_rs_queue_delete(self.queue);
Expand Down

0 comments on commit 21b753c

Please sign in to comment.