Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Require Send for T in Queue instead of Copy #75

Merged
merged 1 commit into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions freertos-rust/src/patterns/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ pub trait ReplyableMessage {
#[derive(Copy, Clone)]
pub struct InputMessage<I>
where
I: Copy,
I: Copy + Send,
{
val: I,
reply_to_client_id: Option<usize>,
}

impl<I> InputMessage<I>
where
I: Copy,
I: Copy + Send,
{
pub fn request(val: I) -> Self {
InputMessage {
Expand All @@ -46,7 +46,7 @@ where

impl<I> ReplyableMessage for InputMessage<I>
where
I: Copy,
I: Copy + Send,
{
fn reply_to_client_id(&self) -> Option<usize> {
self.reply_to_client_id
Expand All @@ -55,17 +55,17 @@ where

pub struct Processor<I, O>
where
I: ReplyableMessage + Copy,
O: Copy,
I: ReplyableMessage + Copy + Send,
O: Copy + Send,
{
queue: Arc<Queue<I>>,
inner: Arc<Mutex<ProcessorInner<O>>>,
}

impl<I, O> Processor<I, O>
where
I: ReplyableMessage + Copy,
O: Copy,
I: ReplyableMessage + Copy + Send,
O: Copy + Send,
{
pub fn new(queue_size: usize) -> Result<Self, FreeRtosError> {
let p = ProcessorInner {
Expand Down Expand Up @@ -141,7 +141,10 @@ where
.flat_map(|ref x| x.1.upgrade().into_iter())
.find(|x| x.id == client_id)
{
client.receive_queue.send(reply, max_wait)?;
client
.receive_queue
.send(reply, max_wait)
.map_err(|err| err.error())?;
return Ok(true);
}
}
Expand All @@ -152,8 +155,8 @@ where

impl<I, O> Processor<InputMessage<I>, O>
where
I: Copy,
O: Copy,
I: Copy + Send,
O: Copy + Send,
{
pub fn reply_val<D: DurationTicks>(
&self,
Expand All @@ -167,15 +170,15 @@ where

struct ProcessorInner<O>
where
O: Copy,
O: Copy + Send,
{
clients: Vec<(usize, Weak<ClientWithReplyQueue<O>>)>,
next_client_id: usize,
}

impl<O> ProcessorInner<O>
where
O: Copy,
O: Copy + Send,
{
fn remove_client_reply(&mut self, client: &ClientWithReplyQueue<O>) {
self.clients.retain(|ref x| x.0 != client.id)
Expand All @@ -184,22 +187,24 @@ where

pub struct ProcessorClient<I, C>
where
I: ReplyableMessage + Copy,
I: ReplyableMessage + Copy + Send,
{
processor_queue: Weak<Queue<I>>,
client_reply: C,
}

impl<I, O> ProcessorClient<I, O>
where
I: ReplyableMessage + Copy,
I: ReplyableMessage + Copy + Send,
{
pub fn send<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<(), FreeRtosError> {
let processor_queue = self
.processor_queue
.upgrade()
.ok_or(FreeRtosError::ProcessorHasShutDown)?;
processor_queue.send(message, max_wait)?;
processor_queue
.send(message, max_wait)
.map_err(|err| err.error())?;
Ok(())
}

Expand All @@ -212,13 +217,15 @@ where
.processor_queue
.upgrade()
.ok_or(FreeRtosError::ProcessorHasShutDown)?;
processor_queue.send_from_isr(context, message)
processor_queue
.send_from_isr(context, message)
.map_err(|err| err.error())
}
}

impl<I> ProcessorClient<InputMessage<I>, ()>
where
I: Copy,
I: Copy + Send,
{
pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
self.send(InputMessage::request(val), max_wait)
Expand All @@ -235,8 +242,8 @@ where

impl<I, O> ProcessorClient<I, SharedClientWithReplyQueue<O>>
where
I: ReplyableMessage + Copy,
O: Copy,
I: ReplyableMessage + Copy + Send,
O: Copy + Send,
{
pub fn call<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<O, FreeRtosError> {
self.send(message, max_wait)?;
Expand All @@ -250,8 +257,8 @@ where

impl<I, O> ProcessorClient<InputMessage<I>, SharedClientWithReplyQueue<O>>
where
I: Copy,
O: Copy,
I: Copy + Send,
O: Copy + Send,
{
pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
self.send(InputMessage::request(val), max_wait)
Expand All @@ -268,7 +275,7 @@ where

impl<I, C> Clone for ProcessorClient<I, C>
where
I: ReplyableMessage + Copy,
I: ReplyableMessage + Copy + Send,
C: Clone,
{
fn clone(&self) -> Self {
Expand All @@ -281,7 +288,7 @@ where

pub struct ClientWithReplyQueue<O>
where
O: Copy,
O: Copy + Send,
{
id: usize,
processor_inner: Arc<Mutex<ProcessorInner<O>>>,
Expand All @@ -290,7 +297,7 @@ where

impl<O> Drop for ClientWithReplyQueue<O>
where
O: Copy,
O: Copy + Send,
{
fn drop(&mut self) {
if let Ok(mut p) = self.processor_inner.lock(Duration::ms(1000)) {
Expand Down
18 changes: 9 additions & 9 deletions freertos-rust/src/patterns/pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ use crate::queue::*;
use crate::units::*;

/// A pub-sub queue. An item sent to the publisher is sent to every subscriber.
pub struct QueuePublisher<T: Sized + Copy> {
pub struct QueuePublisher<T: Sized + Copy + Send> {
inner: Arc<Mutex<PublisherInner<T>>>,
}

/// A subscribtion to the publisher.
pub struct QueueSubscriber<T: Sized + Copy> {
pub struct QueueSubscriber<T: Sized + Copy + Send> {
inner: Arc<SubscriberInner<T>>,
}

impl<T: Sized + Copy> QueuePublisher<T> {
impl<T: Sized + Copy + Send> QueuePublisher<T> {
/// Create a new publisher
pub fn new() -> Result<QueuePublisher<T>, FreeRtosError> {
let inner = PublisherInner {
Expand Down Expand Up @@ -69,41 +69,41 @@ impl<T: Sized + Copy> QueuePublisher<T> {
}
}

impl<T: Sized + Copy> Clone for QueuePublisher<T> {
impl<T: Sized + Copy + Send> Clone for QueuePublisher<T> {
fn clone(&self) -> Self {
QueuePublisher {
inner: self.inner.clone(),
}
}
}

impl<T: Sized + Copy> Drop for QueueSubscriber<T> {
impl<T: Sized + Copy + Send> Drop for QueueSubscriber<T> {
fn drop(&mut self) {
if let Ok(mut l) = self.inner.publisher.lock(Duration::infinite()) {
l.unsubscribe(&self.inner);
}
}
}

impl<T: Sized + Copy> QueueSubscriber<T> {
impl<T: Sized + Copy + Send> QueueSubscriber<T> {
/// Wait for an item to be posted from the publisher.
pub fn receive<D: DurationTicks>(&self, max_wait: D) -> Result<T, FreeRtosError> {
self.inner.queue.receive(max_wait)
}
}

struct PublisherInner<T: Sized + Copy> {
struct PublisherInner<T: Sized + Copy + Send> {
subscribers: Vec<Arc<SubscriberInner<T>>>,
queue_next_id: usize,
}

impl<T: Sized + Copy> PublisherInner<T> {
impl<T: Sized + Copy + Send> PublisherInner<T> {
fn unsubscribe(&mut self, subscriber: &SubscriberInner<T>) {
self.subscribers.retain(|ref x| x.id != subscriber.id);
}
}

struct SubscriberInner<T: Sized + Copy> {
struct SubscriberInner<T: Sized + Copy + Send> {
id: usize,
queue: Queue<T>,
publisher: Arc<Mutex<PublisherInner<T>>>,
Expand Down
72 changes: 47 additions & 25 deletions freertos-rust/src/queue.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,39 @@
use mem::ManuallyDrop;
use mem::MaybeUninit;

use crate::base::*;
use crate::isr::*;
use crate::prelude::v1::*;
use crate::shim::*;
use crate::units::*;

unsafe impl<T: Sized + Copy> Send for Queue<T> {}
unsafe impl<T: Sized + Copy> Sync for Queue<T> {}
unsafe impl<T: Sized + Send> Send for Queue<T> {}
unsafe impl<T: Sized + Send> Sync for Queue<T> {}

#[derive(Debug)]
pub struct SendError<T> {
err: FreeRtosError,
item: T,
}

impl<T> SendError<T> {
pub fn error(&self) -> FreeRtosError {
self.err
}

pub fn into_item(self) -> T {
self.item
}
}

/// A queue with a finite size. The items are owned by the queue and are
/// copied.
/// A queue with a finite size.
#[derive(Debug)]
pub struct Queue<T: Sized + Copy> {
pub struct Queue<T: Sized + Send> {
queue: FreeRtosQueueHandle,
item_type: PhantomData<T>,
}

impl<T: Sized + Copy> Queue<T> {
impl<T: Sized + Send> Queue<T> {
pub fn new(max_size: usize) -> Result<Queue<T>, FreeRtosError> {
let item_size = mem::size_of::<T>();

Expand Down Expand Up @@ -49,15 +67,16 @@ impl<T: Sized + Copy> Queue<T> {
}

/// Send an item to the end of the queue. Wait for the queue to have empty space for it.
pub fn send<D: DurationTicks>(&self, item: T, max_wait: D) -> Result<(), FreeRtosError> {
pub fn send<D: DurationTicks>(&self, item: T, max_wait: D) -> Result<(), SendError<T>> {
let item = ManuallyDrop::new(item);
let ptr = &item as *const _ as FreeRtosVoidPtr;

unsafe {
if freertos_rs_queue_send(
self.queue,
&item as *const _ as FreeRtosVoidPtr,
max_wait.to_ticks(),
) != 0
{
Err(FreeRtosError::QueueSendTimeout)
if freertos_rs_queue_send(self.queue, ptr, max_wait.to_ticks()) != 0 {
Err(SendError {
err: FreeRtosError::QueueSendTimeout,
item: ManuallyDrop::into_inner(item),
})
} else {
Ok(())
}
Expand All @@ -69,15 +88,16 @@ impl<T: Sized + Copy> Queue<T> {
&self,
context: &mut InterruptContext,
item: T,
) -> Result<(), FreeRtosError> {
) -> Result<(), SendError<T>> {
let item = ManuallyDrop::new(item);
let ptr = &item as *const _ as FreeRtosVoidPtr;

unsafe {
if freertos_rs_queue_send_isr(
self.queue,
&item as *const _ as FreeRtosVoidPtr,
context.get_task_field_mut(),
) != 0
{
Err(FreeRtosError::QueueFull)
if freertos_rs_queue_send_isr(self.queue, ptr, context.get_task_field_mut()) != 0 {
Err(SendError {
err: FreeRtosError::QueueFull,
item: ManuallyDrop::into_inner(item),
})
} else {
Ok(())
}
Expand All @@ -87,14 +107,16 @@ impl<T: Sized + Copy> Queue<T> {
/// Wait for an item to be available on the queue.
pub fn receive<D: DurationTicks>(&self, max_wait: D) -> Result<T, FreeRtosError> {
unsafe {
let mut buff = mem::zeroed::<T>();
// Use `MaybeUninit` to avoid calling drop on
// uninitialized struct in case of timeout
let mut buff = MaybeUninit::uninit();
let r = freertos_rs_queue_receive(
self.queue,
&mut buff as *mut _ as FreeRtosMutVoidPtr,
max_wait.to_ticks(),
);
if r == 0 {
return Ok(buff);
return Ok(buff.assume_init());
} else {
return Err(FreeRtosError::QueueReceiveTimeout);
}
Expand All @@ -107,7 +129,7 @@ impl<T: Sized + Copy> Queue<T> {
}
}

impl<T: Sized + Copy> Drop for Queue<T> {
impl<T: Sized + Send> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
freertos_rs_queue_delete(self.queue);
Expand Down
Loading