diff --git a/Cargo.lock b/Cargo.lock index d0d0dc54..2633cc60 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2822,8 +2822,8 @@ dependencies = [ "async-trait", "can-rs", "clap", + "color-eyre", "crc32fast", - "eyre", "image", "orb-messages", "prost", diff --git a/mcu-util/Cargo.toml b/mcu-util/Cargo.toml index 6ba9f59b..8504d092 100644 --- a/mcu-util/Cargo.toml +++ b/mcu-util/Cargo.toml @@ -16,7 +16,7 @@ async-trait = "0.1.77" can-rs = { path = "../can", features = ["isotp"] } clap = { workspace = true, features = ["derive"] } crc32fast = "1.3.2" -eyre.workspace = true +color-eyre.workspace = true image = "0.24.8" orb-messages.workspace = true prost = "0.12.3" diff --git a/mcu-util/src/logging.rs b/mcu-util/src/logging.rs index 990dde90..12518879 100644 --- a/mcu-util/src/logging.rs +++ b/mcu-util/src/logging.rs @@ -1,22 +1,24 @@ -use eyre::Result; +use color_eyre::eyre::Result; use tracing::level_filters::LevelFilter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::Layer; +use tracing_subscriber::{EnvFilter, Layer}; + +/// Initialize the logger +pub fn init() -> Result<()> { + let filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); -fn try_init_stdout_logger(loglevel: tracing::Level) -> Result<()> { let stdout_log = tracing_subscriber::fmt::layer() .compact() .with_writer(std::io::stdout) - .with_filter(LevelFilter::from_level(loglevel)); - - tracing_subscriber::registry().with(stdout_log).try_init()?; + .with_filter(filter); - Ok(()) -} + let registry = tracing_subscriber::registry(); + #[cfg(tokio_unstable)] + let registry = registry.with(console_subscriber::spawn()); + registry.with(stdout_log).try_init()?; -/// Initialize the logger -pub fn init(loglevel: tracing::Level) -> Result<()> { - try_init_stdout_logger(loglevel)?; Ok(()) } diff --git a/mcu-util/src/main.rs b/mcu-util/src/main.rs index 143d9a3f..2a444914 100644 --- a/mcu-util/src/main.rs +++ b/mcu-util/src/main.rs @@ -3,7 +3,7 @@ extern crate core; use crate::orb::Orb; use clap::Parser; -use eyre::{Context, Result}; +use color_eyre::eyre::{Context, Result}; use std::io::Write; use std::path::PathBuf; use std::time::Duration; @@ -122,11 +122,6 @@ enum SecureElement { PowerCycle, } -#[cfg(debug_assertions)] -const LOG_LEVEL: tracing::Level = tracing::Level::DEBUG; -#[cfg(not(debug_assertions))] -const LOG_LEVEL: tracing::Level = tracing::Level::INFO; - async fn execute(args: Args) -> Result<()> { let mut orb = Orb::new().await?; @@ -190,7 +185,8 @@ async fn execute(args: Args) -> Result<()> { #[tokio::main] async fn main() -> Result<()> { - logging::init(LOG_LEVEL)?; + color_eyre::install()?; + logging::init()?; let args = Args::parse(); @@ -199,7 +195,7 @@ async fn main() -> Result<()> { } if let Err(e) = execute(args).await { - error!("{}", e); + error!("{:#?}", e); std::process::exit(-1); } else { std::process::exit(0); diff --git a/mcu-util/src/messaging/can/canfd.rs b/mcu-util/src/messaging/can/canfd.rs index eda2aa89..c917dbac 100644 --- a/mcu-util/src/messaging/can/canfd.rs +++ b/mcu-util/src/messaging/can/canfd.rs @@ -1,26 +1,23 @@ -use std::process; -use std::sync::atomic::{AtomicU16, Ordering}; -use std::sync::mpsc; - use async_trait::async_trait; -use eyre::{eyre, Context, Result}; +use can_rs::filter::Filter; +use can_rs::stream::FrameStream; +use can_rs::{Frame, Id, CANFD_DATA_LEN}; +use color_eyre::eyre::{eyre, Context, Result}; use orb_messages::CommonAckError; use prost::Message; +use std::sync::atomic::{AtomicU16, Ordering}; +use std::sync::{mpsc, Arc}; use tokio::time::Duration; use tracing::debug; -use can_rs::filter::Filter; -use can_rs::stream::FrameStream; -use can_rs::{Frame, Id}; - use crate::messaging::Device::{JetsonFromMain, JetsonFromSecurity, Main, Security}; use crate::messaging::{ - handle_main_mcu_message, handle_sec_mcu_message, Device, McuPayload, + create_ack, handle_main_mcu_message, handle_sec_mcu_message, Device, McuPayload, MessagingInterface, }; pub struct CanRawMessaging { - stream: FrameStream<64>, + stream: FrameStream, ack_num_lsb: AtomicU16, ack_queue: mpsc::Receiver<(CommonAckError, u32)>, can_node: Device, @@ -35,7 +32,7 @@ impl CanRawMessaging { new_message_queue: mpsc::Sender, ) -> Result { // open socket - let stream = FrameStream::<64>::build() + let stream = FrameStream::::build() .nonblocking(false) .filters(vec![ Filter { @@ -47,13 +44,14 @@ impl CanRawMessaging { mask: 0xff, }, ]) - .bind(bus.as_str().parse().unwrap())?; + .bind(bus.as_str().parse().unwrap()) + .wrap_err("Failed to bind CAN stream")?; let (ack_tx, ack_rx) = mpsc::channel(); - - // spawn CAN receiver let stream_copy = stream.try_clone()?; - tokio::task::spawn(can_rx(stream_copy, can_node, ack_tx, new_message_queue)); + tokio::task::spawn_blocking(move || { + can_rx(stream_copy, can_node, ack_tx, new_message_queue) + }); Ok(Self { stream, @@ -78,13 +76,14 @@ impl CanRawMessaging { } } - #[allow(dead_code)] - async fn send_wait_ack(&mut self, frame: &Frame<64>) -> Result { - self.stream.send(frame, 0)?; + async fn send_wait_ack( + &mut self, + frame: Arc>, + ) -> Result { + let stream = self.stream.try_clone()?; + tokio::task::spawn_blocking(move || stream.send(&frame, 0)).await??; - // put some randomness into ack number to prevent collision with other processes - let expected_ack_number = - process::id() << 16 | self.ack_num_lsb.load(Ordering::Relaxed) as u32; + let expected_ack_number = create_ack(self.ack_num_lsb.load(Ordering::SeqCst)); self.ack_num_lsb.fetch_add(1, Ordering::Relaxed); self.wait_ack(expected_ack_number).await @@ -94,15 +93,14 @@ impl CanRawMessaging { /// Receive CAN frames /// - relay acks to `ack_tx` /// - relay new McuMessage to `new_message_queue` -#[allow(dead_code)] -async fn can_rx( - stream: FrameStream<64>, +fn can_rx( + stream: FrameStream, remote_node: Device, ack_tx: mpsc::Sender<(CommonAckError, u32)>, new_message_queue: mpsc::Sender, ) -> Result<()> { loop { - let mut frame: Frame<64> = Frame::empty(); + let mut frame: Frame = Frame::empty(); loop { match stream.recv(&mut frame, 0) { Ok(_) => { @@ -146,12 +144,8 @@ async fn can_rx( #[async_trait] impl MessagingInterface for CanRawMessaging { /// Send payload into McuMessage - #[allow(dead_code)] async fn send(&mut self, payload: McuPayload) -> Result { - // snowflake ack ID to avoid collisions: - // prefix ack number with process ID - let ack_number = - process::id() << 16 | self.ack_num_lsb.load(Ordering::Relaxed) as u32; + let ack_number = create_ack(self.ack_num_lsb.load(Ordering::SeqCst)); let bytes = match self.can_node { Main => { @@ -201,26 +195,20 @@ impl MessagingInterface for CanRawMessaging { }; if let Some(bytes) = bytes { - let mut buf: [u8; 64] = [0u8; 64]; + let mut buf = [0u8; CANFD_DATA_LEN]; buf[..bytes.len()].copy_from_slice(bytes.as_slice()); let node_addr = self.can_node as u32; let frame = Frame { id: Id::Extended(node_addr), - len: 64, - flags: 0x0F, + len: bytes.len() as u8, + flags: can_rs::CANFD_BRS_FLAG | can_rs::CANFD_FDF_FLAG, data: buf, }; - self.send_wait_ack(&frame).await + self.send_wait_ack(Arc::new(frame)).await } else { Err(eyre!("Failed to encode payload")) } } } - -impl Drop for CanRawMessaging { - fn drop(&mut self) { - // TODO - } -} diff --git a/mcu-util/src/messaging/can/isotp.rs b/mcu-util/src/messaging/can/isotp.rs index 0eb3bc86..2dba8d52 100644 --- a/mcu-util/src/messaging/can/isotp.rs +++ b/mcu-util/src/messaging/can/isotp.rs @@ -1,18 +1,17 @@ +use async_trait::async_trait; +use color_eyre::eyre::{eyre, Context, Result}; +use orb_messages::CommonAckError; +use prost::Message; use std::io::{Read, Write}; use std::process; use std::sync::atomic::{AtomicU16, Ordering}; use std::sync::mpsc; - -use async_trait::async_trait; -use eyre::{eyre, Context, Result}; -use orb_messages::CommonAckError; -use prost::Message; use tokio::time::Duration; use tracing::{debug, error}; use can_rs::isotp::addr::CanIsotpAddr; use can_rs::isotp::stream::IsotpStream; -use can_rs::Id; +use can_rs::{Id, CAN_DATA_LEN}; use crate::messaging::{ handle_main_mcu_message, handle_sec_mcu_message, McuPayload, MessagingInterface, @@ -31,7 +30,6 @@ const CAN_ADDR_IS_DEST: u32 = 1 << 9; /// for bidirectional communication, addresses are comprised of source and destination digit /// along with some flags. #[derive(Clone, Copy, PartialEq, Debug)] -#[allow(dead_code)] pub enum IsoTpNodeIdentifier { MainMcu = 0x1, SecurityMcu = 0x2, @@ -69,7 +67,7 @@ impl From for IsoTpNodeIdentifier { } pub struct CanIsoTpMessaging { - tx_stream: IsotpStream<8>, + stream: IsotpStream, ack_num_lsb: AtomicU16, ack_queue: mpsc::Receiver<(CommonAckError, u32)>, } @@ -103,23 +101,27 @@ impl CanIsoTpMessaging { let (tx_stdid_src, tx_stdid_dst) = create_pair(local, remote)?; debug!("Sending on 0x{:x}->0x{:x}", tx_stdid_src, tx_stdid_dst); - let (ack_tx, ack_rx) = mpsc::channel(); - // open TX stream - let tx_isotp_stream = IsotpStream::<8>::build().bind( - CanIsotpAddr::new( - bus.as_str(), - Id::Standard(tx_stdid_dst), - Id::Standard(tx_stdid_src), + let tx_isotp_stream = IsotpStream::::build() + .bind( + CanIsotpAddr::new( + bus.as_str(), + Id::Standard(tx_stdid_dst), + Id::Standard(tx_stdid_src), + ) + .expect("Unable to build IsoTpStream"), ) - .expect("Unable to build IsoTpStream"), - )?; + .wrap_err("Failed to bind CAN ISO-TP stream")?; + + let (ack_tx, ack_rx) = mpsc::channel(); // spawn CAN receiver - tokio::task::spawn(can_rx(bus, remote, local, ack_tx, new_message_queue)); + tokio::task::spawn_blocking(move || { + can_rx(bus, remote, local, ack_tx, new_message_queue) + }); Ok(CanIsoTpMessaging { - tx_stream: tx_isotp_stream, + stream: tx_isotp_stream, ack_num_lsb: AtomicU16::new(0), ack_queue: ack_rx, }) @@ -140,13 +142,14 @@ impl CanIsoTpMessaging { } } - async fn send_wait_ack(&mut self, frame: &[u8]) -> Result { - match self.tx_stream.write(frame) { - Ok(_) => {} - Err(err) => { - error!("Error writing stream: {}", err); + async fn send_wait_ack(&mut self, frame: Vec) -> Result { + let mut stream = self.stream.try_clone()?; + tokio::task::spawn_blocking(move || { + if let Err(e) = stream.write(frame.as_slice()) { + error!("Error writing stream: {}", e); } - } + }) + .await?; let expected_ack_number = process::id() << 16 | self.ack_num_lsb.load(Ordering::Relaxed) as u32; @@ -159,7 +162,7 @@ impl CanIsoTpMessaging { /// Receive CAN frames /// - relay acks to `ack_tx` /// - relay new McuMessage to `new_message_queue` -async fn can_rx( +fn can_rx( bus: String, remote: IsoTpNodeIdentifier, local: IsoTpNodeIdentifier, @@ -170,7 +173,7 @@ async fn can_rx( let (rx_stdid_src, rx_stdid_dest) = create_pair(remote, local)?; debug!("Listening on 0x{:x}->0x{:x}", rx_stdid_src, rx_stdid_dest); - let mut rx_isotp_stream = IsotpStream::<8>::build().bind( + let mut rx_isotp_stream = IsotpStream::::build().bind( CanIsotpAddr::new( bus.as_str(), Id::Standard(rx_stdid_src), @@ -219,7 +222,6 @@ async fn can_rx( impl MessagingInterface for CanIsoTpMessaging { /// Send payload into McuMessage /// One could decide to only listen for ISO-TP message so allow dead code for `send` method - #[allow(dead_code)] async fn send(&mut self, payload: McuPayload) -> Result { let ack_number = process::id() << 16 | self.ack_num_lsb.load(Ordering::Relaxed) as u32; @@ -256,6 +258,6 @@ impl MessagingInterface for CanIsoTpMessaging { _ => return Err(eyre!("Invalid payload")), }; - self.send_wait_ack(bytes.as_slice()).await + self.send_wait_ack(bytes).await } } diff --git a/mcu-util/src/messaging/mod.rs b/mcu-util/src/messaging/mod.rs index 18e1ce66..6e78c41f 100644 --- a/mcu-util/src/messaging/mod.rs +++ b/mcu-util/src/messaging/mod.rs @@ -2,7 +2,7 @@ use std::process; use std::sync::mpsc; use async_trait::async_trait; -use eyre::{eyre, Result}; +use color_eyre::eyre::{eyre, Result}; use orb_messages::CommonAckError; use tracing::debug; @@ -19,7 +19,6 @@ pub enum McuPayload { /// CAN(-FD) addressing scheme #[derive(Clone, Copy, PartialEq, Debug)] -#[allow(dead_code)] pub enum Device { Main = 0x01, Security = 0x02, @@ -44,6 +43,22 @@ pub(crate) trait MessagingInterface { async fn send(&mut self, payload: McuPayload) -> Result; } +/// Create a unique ack number +/// - prefix with process ID +/// - suffix with counter +/// this added piece of information in the ack number is not strictly necessary +/// but helps filter out acks that are not for us (e.g. acks for other processes) +#[inline] +fn create_ack(counter: u16) -> u32 { + process::id() << 16 | counter as u32 +} + +/// Check that ack contains the process ID +#[inline] +pub fn is_ack_for_us(ack_number: u32) -> bool { + ack_number >> 16 == process::id() +} + /// handle new main mcu message, reference implementation fn handle_main_mcu_message( message: &orb_messages::mcu_main::McuMessage, @@ -66,9 +81,7 @@ fn handle_main_mcu_message( }, )), } => { - // this added piece of information in the ack number is not strictly necessary - // but helps filter out acks that are not for us (e.g. acks for other processes) - if ack.ack_number >> 16 == process::id() { + if is_ack_for_us(ack.ack_number) { ack_tx.send((CommonAckError::from(ack.error), ack.ack_number))?; } else { debug!("Ignoring ack # 0x{:x?}", ack.ack_number) @@ -116,8 +129,7 @@ fn handle_sec_mcu_message( }, )), } => { - // filter out acks that are not for us (e.g. acks for other processes) - if ack.ack_number >> 16 == process::id() { + if is_ack_for_us(ack.ack_number) { ack_tx.send((CommonAckError::from(ack.error), ack.ack_number))?; } } diff --git a/mcu-util/src/messaging/serial/mod.rs b/mcu-util/src/messaging/serial/mod.rs index 41c9b045..16ec665f 100644 --- a/mcu-util/src/messaging/serial/mod.rs +++ b/mcu-util/src/messaging/serial/mod.rs @@ -1,6 +1,6 @@ use crate::messaging::{Device, McuPayload, MessagingInterface}; use async_trait::async_trait; -use eyre::{eyre, Result}; +use color_eyre::eyre::{eyre, Result}; use orb_messages::CommonAckError; use prost::Message; use std::sync::atomic::{AtomicU16, Ordering}; diff --git a/mcu-util/src/orb/dfu.rs b/mcu-util/src/orb/dfu.rs index ab35bac2..29112fdf 100644 --- a/mcu-util/src/orb/dfu.rs +++ b/mcu-util/src/orb/dfu.rs @@ -1,4 +1,4 @@ -use eyre::{eyre, Result}; +use color_eyre::eyre::{eyre, Result}; use orb_messages::mcu_main as main_messaging; use orb_messages::mcu_sec as sec_messaging; use std::cmp::min; diff --git a/mcu-util/src/orb/main_board.rs b/mcu-util/src/orb/main_board.rs index a01d66fa..a8d640ea 100644 --- a/mcu-util/src/orb/main_board.rs +++ b/mcu-util/src/orb/main_board.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use eyre::{eyre, Result}; +use color_eyre::eyre::{eyre, Context, Result}; use orb_messages::{mcu_main as main_messaging, CommonAckError}; use std::ops::Sub; use std::sync::mpsc; @@ -21,8 +21,9 @@ const REBOOT_DELAY: u32 = 3; pub struct MainBoard { canfd_iface: CanRawMessaging, isotp_iface: CanIsoTpMessaging, + /// Optional serial interface for the main board, if available (ie orb-ui might own it) #[allow(dead_code)] - serial_iface: SerialMessaging, + serial_iface: Option, message_queue_rx: mpsc::Receiver, } @@ -46,16 +47,18 @@ impl MainBoardBuilder { String::from("can0"), Device::Main, self.message_queue_tx.clone(), - )?; + ) + .wrap_err("Failed to create CanRawMessaging for MainBoard")?; let isotp_iface = CanIsoTpMessaging::new( String::from("can0"), IsoTpNodeIdentifier::JetsonApp7, IsoTpNodeIdentifier::MainMcu, self.message_queue_tx.clone(), - )?; + ) + .wrap_err("Failed to create CanIsoTpMessaging for MainBoard")?; - let serial_iface = SerialMessaging::new(Device::Main)?; + let serial_iface = SerialMessaging::new(Device::Main).ok(); // Send a heartbeat to the main mcu to ensure it is alive // & "subscribe" to the main mcu messages: messages to the Jetson @@ -325,8 +328,9 @@ impl MainBoardInfo { /// Fetches `MainBoardInfo` from the main board /// on timeout, returns the info that was fetched so far - async fn build(mut self, main: &mut MainBoard) -> Result { - main.isotp_iface + async fn build(mut self, main_board: &mut MainBoard) -> Result { + main_board + .isotp_iface .send(McuPayload::ToMain( main_messaging::jetson_to_mcu::Payload::ValueGet( main_messaging::ValueGet { @@ -336,7 +340,8 @@ impl MainBoardInfo { ), )) .await?; - main.isotp_iface + main_board + .isotp_iface .send(McuPayload::ToMain( main_messaging::jetson_to_mcu::Payload::ValueGet( main_messaging::ValueGet { @@ -346,7 +351,8 @@ impl MainBoardInfo { ), )) .await?; - main.isotp_iface + main_board + .isotp_iface .send(McuPayload::ToMain( main_messaging::jetson_to_mcu::Payload::ValueGet( main_messaging::ValueGet { @@ -364,7 +370,7 @@ impl MainBoardInfo { }; loop { if let Ok(McuPayload::FromMain(main_mcu_payload)) = - main.message_queue_rx.recv_timeout(timeout) + main_board.message_queue_rx.recv_timeout(timeout) { match main_mcu_payload { main_messaging::mcu_to_jetson::Payload::Versions(v) => { diff --git a/mcu-util/src/orb/mod.rs b/mcu-util/src/orb/mod.rs index a0b526bb..32f51b2e 100644 --- a/mcu-util/src/orb/mod.rs +++ b/mcu-util/src/orb/mod.rs @@ -2,7 +2,7 @@ use std::fmt::{Display, Formatter}; use std::time::Duration; use async_trait::async_trait; -use eyre::Result; +use color_eyre::eyre::Result; use orb_messages::mcu_main as main_messaging; use orb_messages::mcu_sec as sec_messaging; diff --git a/mcu-util/src/orb/security_board.rs b/mcu-util/src/orb/security_board.rs index 16b8430c..76f4d1df 100644 --- a/mcu-util/src/orb/security_board.rs +++ b/mcu-util/src/orb/security_board.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use eyre::{eyre, Result}; +use color_eyre::eyre::{eyre, Context, Result}; use orb_messages::mcu_sec::battery_status::BatteryState; use orb_messages::{mcu_sec as security_messaging, CommonAckError}; use std::ops::Sub; @@ -43,14 +43,16 @@ impl SecurityBoardBuilder { String::from("can0"), Device::Security, self.message_queue_tx.clone(), - )?; + ) + .wrap_err("Failed to create CanRawMessaging for SecurityBoard")?; let isotp_iface = CanIsoTpMessaging::new( String::from("can0"), IsoTpNodeIdentifier::JetsonApp7, IsoTpNodeIdentifier::SecurityMcu, self.message_queue_tx.clone(), - )?; + ) + .wrap_err("Failed to create CanIsoTpMessaging for SecurityBoard")?; // Send a heartbeat to the mcu to ensure it is alive // & "subscribe" to the mcu messages: messages to the Jetson