From d3dbca3b934d954129ba2ad36a269bc6299f937a Mon Sep 17 00:00:00 2001 From: Eivind Alexander Bergem Date: Fri, 23 Aug 2024 14:56:12 +0200 Subject: [PATCH] Require `Send` for `T` in `Queue` instead of Copy By requiring items to be `Copy` we reduce the queue to only work with primitives. What is more appropriate here is to require that the item is `Send` and use move semantics instead of copy. --- freertos-rust/src/patterns/processor.rs | 55 ++++++++++--------- freertos-rust/src/patterns/pub_sub.rs | 18 +++---- freertos-rust/src/queue.rs | 72 ++++++++++++++++--------- 3 files changed, 87 insertions(+), 58 deletions(-) diff --git a/freertos-rust/src/patterns/processor.rs b/freertos-rust/src/patterns/processor.rs index 99b8d52..3197552 100644 --- a/freertos-rust/src/patterns/processor.rs +++ b/freertos-rust/src/patterns/processor.rs @@ -15,7 +15,7 @@ pub trait ReplyableMessage { #[derive(Copy, Clone)] pub struct InputMessage where - I: Copy, + I: Copy + Send, { val: I, reply_to_client_id: Option, @@ -23,7 +23,7 @@ where impl InputMessage where - I: Copy, + I: Copy + Send, { pub fn request(val: I) -> Self { InputMessage { @@ -46,7 +46,7 @@ where impl ReplyableMessage for InputMessage where - I: Copy, + I: Copy + Send, { fn reply_to_client_id(&self) -> Option { self.reply_to_client_id @@ -55,8 +55,8 @@ where pub struct Processor where - I: ReplyableMessage + Copy, - O: Copy, + I: ReplyableMessage + Copy + Send, + O: Copy + Send, { queue: Arc>, inner: Arc>>, @@ -64,8 +64,8 @@ where impl Processor where - I: ReplyableMessage + Copy, - O: Copy, + I: ReplyableMessage + Copy + Send, + O: Copy + Send, { pub fn new(queue_size: usize) -> Result { let p = ProcessorInner { @@ -141,7 +141,10 @@ where .flat_map(|ref x| x.1.upgrade().into_iter()) .find(|x| x.id == client_id) { - client.receive_queue.send(reply, max_wait)?; + client + .receive_queue + .send(reply, max_wait) + .map_err(|err| err.error())?; return Ok(true); } } @@ -152,8 +155,8 @@ where impl Processor, O> where - I: Copy, - O: Copy, + I: Copy + Send, + O: Copy + Send, { pub fn reply_val( &self, @@ -167,7 +170,7 @@ where struct ProcessorInner where - O: Copy, + O: Copy + Send, { clients: Vec<(usize, Weak>)>, next_client_id: usize, @@ -175,7 +178,7 @@ where impl ProcessorInner where - O: Copy, + O: Copy + Send, { fn remove_client_reply(&mut self, client: &ClientWithReplyQueue) { self.clients.retain(|ref x| x.0 != client.id) @@ -184,7 +187,7 @@ where pub struct ProcessorClient where - I: ReplyableMessage + Copy, + I: ReplyableMessage + Copy + Send, { processor_queue: Weak>, client_reply: C, @@ -192,14 +195,16 @@ where impl ProcessorClient where - I: ReplyableMessage + Copy, + I: ReplyableMessage + Copy + Send, { pub fn send(&self, message: I, max_wait: D) -> Result<(), FreeRtosError> { let processor_queue = self .processor_queue .upgrade() .ok_or(FreeRtosError::ProcessorHasShutDown)?; - processor_queue.send(message, max_wait)?; + processor_queue + .send(message, max_wait) + .map_err(|err| err.error())?; Ok(()) } @@ -212,13 +217,15 @@ where .processor_queue .upgrade() .ok_or(FreeRtosError::ProcessorHasShutDown)?; - processor_queue.send_from_isr(context, message) + processor_queue + .send_from_isr(context, message) + .map_err(|err| err.error()) } } impl ProcessorClient, ()> where - I: Copy, + I: Copy + Send, { pub fn send_val(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> { self.send(InputMessage::request(val), max_wait) @@ -235,8 +242,8 @@ where impl ProcessorClient> where - I: ReplyableMessage + Copy, - O: Copy, + I: ReplyableMessage + Copy + Send, + O: Copy + Send, { pub fn call(&self, message: I, max_wait: D) -> Result { self.send(message, max_wait)?; @@ -250,8 +257,8 @@ where impl ProcessorClient, SharedClientWithReplyQueue> where - I: Copy, - O: Copy, + I: Copy + Send, + O: Copy + Send, { pub fn send_val(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> { self.send(InputMessage::request(val), max_wait) @@ -268,7 +275,7 @@ where impl Clone for ProcessorClient where - I: ReplyableMessage + Copy, + I: ReplyableMessage + Copy + Send, C: Clone, { fn clone(&self) -> Self { @@ -281,7 +288,7 @@ where pub struct ClientWithReplyQueue where - O: Copy, + O: Copy + Send, { id: usize, processor_inner: Arc>>, @@ -290,7 +297,7 @@ where impl Drop for ClientWithReplyQueue where - O: Copy, + O: Copy + Send, { fn drop(&mut self) { if let Ok(mut p) = self.processor_inner.lock(Duration::ms(1000)) { diff --git a/freertos-rust/src/patterns/pub_sub.rs b/freertos-rust/src/patterns/pub_sub.rs index 1691be3..da5a168 100644 --- a/freertos-rust/src/patterns/pub_sub.rs +++ b/freertos-rust/src/patterns/pub_sub.rs @@ -5,16 +5,16 @@ use crate::queue::*; use crate::units::*; /// A pub-sub queue. An item sent to the publisher is sent to every subscriber. -pub struct QueuePublisher { +pub struct QueuePublisher { inner: Arc>>, } /// A subscribtion to the publisher. -pub struct QueueSubscriber { +pub struct QueueSubscriber { inner: Arc>, } -impl QueuePublisher { +impl QueuePublisher { /// Create a new publisher pub fn new() -> Result, FreeRtosError> { let inner = PublisherInner { @@ -69,7 +69,7 @@ impl QueuePublisher { } } -impl Clone for QueuePublisher { +impl Clone for QueuePublisher { fn clone(&self) -> Self { QueuePublisher { inner: self.inner.clone(), @@ -77,7 +77,7 @@ impl Clone for QueuePublisher { } } -impl Drop for QueueSubscriber { +impl Drop for QueueSubscriber { fn drop(&mut self) { if let Ok(mut l) = self.inner.publisher.lock(Duration::infinite()) { l.unsubscribe(&self.inner); @@ -85,25 +85,25 @@ impl Drop for QueueSubscriber { } } -impl QueueSubscriber { +impl QueueSubscriber { /// Wait for an item to be posted from the publisher. pub fn receive(&self, max_wait: D) -> Result { self.inner.queue.receive(max_wait) } } -struct PublisherInner { +struct PublisherInner { subscribers: Vec>>, queue_next_id: usize, } -impl PublisherInner { +impl PublisherInner { fn unsubscribe(&mut self, subscriber: &SubscriberInner) { self.subscribers.retain(|ref x| x.id != subscriber.id); } } -struct SubscriberInner { +struct SubscriberInner { id: usize, queue: Queue, publisher: Arc>>, diff --git a/freertos-rust/src/queue.rs b/freertos-rust/src/queue.rs index a9071ff..f311523 100644 --- a/freertos-rust/src/queue.rs +++ b/freertos-rust/src/queue.rs @@ -1,21 +1,39 @@ +use mem::ManuallyDrop; +use mem::MaybeUninit; + use crate::base::*; use crate::isr::*; use crate::prelude::v1::*; use crate::shim::*; use crate::units::*; -unsafe impl Send for Queue {} -unsafe impl Sync for Queue {} +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} + +#[derive(Debug)] +pub struct SendError { + err: FreeRtosError, + item: T, +} + +impl SendError { + pub fn error(&self) -> FreeRtosError { + self.err + } + + pub fn into_item(self) -> T { + self.item + } +} -/// A queue with a finite size. The items are owned by the queue and are -/// copied. +/// A queue with a finite size. #[derive(Debug)] -pub struct Queue { +pub struct Queue { queue: FreeRtosQueueHandle, item_type: PhantomData, } -impl Queue { +impl Queue { pub fn new(max_size: usize) -> Result, FreeRtosError> { let item_size = mem::size_of::(); @@ -49,15 +67,16 @@ impl Queue { } /// Send an item to the end of the queue. Wait for the queue to have empty space for it. - pub fn send(&self, item: T, max_wait: D) -> Result<(), FreeRtosError> { + pub fn send(&self, item: T, max_wait: D) -> Result<(), SendError> { + let item = ManuallyDrop::new(item); + let ptr = &item as *const _ as FreeRtosVoidPtr; + unsafe { - if freertos_rs_queue_send( - self.queue, - &item as *const _ as FreeRtosVoidPtr, - max_wait.to_ticks(), - ) != 0 - { - Err(FreeRtosError::QueueSendTimeout) + if freertos_rs_queue_send(self.queue, ptr, max_wait.to_ticks()) != 0 { + Err(SendError { + err: FreeRtosError::QueueSendTimeout, + item: ManuallyDrop::into_inner(item), + }) } else { Ok(()) } @@ -69,15 +88,16 @@ impl Queue { &self, context: &mut InterruptContext, item: T, - ) -> Result<(), FreeRtosError> { + ) -> Result<(), SendError> { + let item = ManuallyDrop::new(item); + let ptr = &item as *const _ as FreeRtosVoidPtr; + unsafe { - if freertos_rs_queue_send_isr( - self.queue, - &item as *const _ as FreeRtosVoidPtr, - context.get_task_field_mut(), - ) != 0 - { - Err(FreeRtosError::QueueFull) + if freertos_rs_queue_send_isr(self.queue, ptr, context.get_task_field_mut()) != 0 { + Err(SendError { + err: FreeRtosError::QueueFull, + item: ManuallyDrop::into_inner(item), + }) } else { Ok(()) } @@ -87,14 +107,16 @@ impl Queue { /// Wait for an item to be available on the queue. pub fn receive(&self, max_wait: D) -> Result { unsafe { - let mut buff = mem::zeroed::(); + // Use `MaybeUninit` to avoid calling drop on + // uninitialized struct in case of timeout + let mut buff = MaybeUninit::uninit(); let r = freertos_rs_queue_receive( self.queue, &mut buff as *mut _ as FreeRtosMutVoidPtr, max_wait.to_ticks(), ); if r == 0 { - return Ok(buff); + return Ok(buff.assume_init()); } else { return Err(FreeRtosError::QueueReceiveTimeout); } @@ -107,7 +129,7 @@ impl Queue { } } -impl Drop for Queue { +impl Drop for Queue { fn drop(&mut self) { unsafe { freertos_rs_queue_delete(self.queue);