Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcu-util: blocking call to can-rs in async #85

Merged
merged 10 commits into from
May 7, 2024
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mcu-util/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ async-trait = "0.1.77"
can-rs = { path = "../can", features = ["isotp"] }
clap = { workspace = true, features = ["derive"] }
crc32fast = "1.3.2"
eyre.workspace = true
color-eyre.workspace = true
image = "0.24.8"
orb-messages.workspace = true
prost = "0.12.3"
Expand Down
24 changes: 13 additions & 11 deletions mcu-util/src/logging.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,24 @@
use eyre::Result;
use color_eyre::eyre::Result;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::Layer;
use tracing_subscriber::{EnvFilter, Layer};

/// Initialize the logger
pub fn init() -> Result<()> {
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();

fn try_init_stdout_logger(loglevel: tracing::Level) -> Result<()> {
let stdout_log = tracing_subscriber::fmt::layer()
.compact()
.with_writer(std::io::stdout)
.with_filter(LevelFilter::from_level(loglevel));

tracing_subscriber::registry().with(stdout_log).try_init()?;
.with_filter(filter);

Ok(())
}
let registry = tracing_subscriber::registry();
#[cfg(tokio_unstable)]
let registry = registry.with(console_subscriber::spawn());
registry.with(stdout_log).try_init()?;

/// Initialize the logger
pub fn init(loglevel: tracing::Level) -> Result<()> {
try_init_stdout_logger(loglevel)?;
Ok(())
}
12 changes: 4 additions & 8 deletions mcu-util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
extern crate core;
use crate::orb::Orb;
use clap::Parser;
use eyre::{Context, Result};
use color_eyre::eyre::{Context, Result};
use std::io::Write;
use std::path::PathBuf;
use std::time::Duration;
Expand Down Expand Up @@ -122,11 +122,6 @@ enum SecureElement {
PowerCycle,
}

#[cfg(debug_assertions)]
const LOG_LEVEL: tracing::Level = tracing::Level::DEBUG;
#[cfg(not(debug_assertions))]
const LOG_LEVEL: tracing::Level = tracing::Level::INFO;

async fn execute(args: Args) -> Result<()> {
let mut orb = Orb::new().await?;

Expand Down Expand Up @@ -190,7 +185,8 @@ async fn execute(args: Args) -> Result<()> {

#[tokio::main]
async fn main() -> Result<()> {
logging::init(LOG_LEVEL)?;
color_eyre::install()?;
logging::init()?;

let args = Args::parse();

Expand All @@ -199,7 +195,7 @@ async fn main() -> Result<()> {
}

if let Err(e) = execute(args).await {
error!("{}", e);
error!("{:#?}", e);
std::process::exit(-1);
} else {
std::process::exit(0);
Expand Down
70 changes: 29 additions & 41 deletions mcu-util/src/messaging/can/canfd.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,23 @@
use std::process;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::mpsc;

use async_trait::async_trait;
use eyre::{eyre, Context, Result};
use can_rs::filter::Filter;
use can_rs::stream::FrameStream;
use can_rs::{Frame, Id, CANFD_DATA_LEN};
use color_eyre::eyre::{eyre, Context, Result};
use orb_messages::CommonAckError;
use prost::Message;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::{mpsc, Arc};
use tokio::time::Duration;
use tracing::debug;

use can_rs::filter::Filter;
use can_rs::stream::FrameStream;
use can_rs::{Frame, Id};

use crate::messaging::Device::{JetsonFromMain, JetsonFromSecurity, Main, Security};
use crate::messaging::{
handle_main_mcu_message, handle_sec_mcu_message, Device, McuPayload,
create_ack, handle_main_mcu_message, handle_sec_mcu_message, Device, McuPayload,
MessagingInterface,
};

pub struct CanRawMessaging {
stream: FrameStream<64>,
stream: FrameStream<CANFD_DATA_LEN>,
ack_num_lsb: AtomicU16,
ack_queue: mpsc::Receiver<(CommonAckError, u32)>,
can_node: Device,
Expand All @@ -35,7 +32,7 @@ impl CanRawMessaging {
new_message_queue: mpsc::Sender<McuPayload>,
) -> Result<Self> {
// open socket
let stream = FrameStream::<64>::build()
let stream = FrameStream::<CANFD_DATA_LEN>::build()
.nonblocking(false)
.filters(vec![
Filter {
Expand All @@ -47,13 +44,14 @@ impl CanRawMessaging {
mask: 0xff,
},
])
.bind(bus.as_str().parse().unwrap())?;
.bind(bus.as_str().parse().unwrap())
.wrap_err("Failed to bind CAN stream")?;

let (ack_tx, ack_rx) = mpsc::channel();

// spawn CAN receiver
let stream_copy = stream.try_clone()?;
tokio::task::spawn(can_rx(stream_copy, can_node, ack_tx, new_message_queue));
tokio::task::spawn_blocking(move || {
can_rx(stream_copy, can_node, ack_tx, new_message_queue)
});

Ok(Self {
stream,
Expand All @@ -78,13 +76,14 @@ impl CanRawMessaging {
}
}

#[allow(dead_code)]
async fn send_wait_ack(&mut self, frame: &Frame<64>) -> Result<CommonAckError> {
self.stream.send(frame, 0)?;
async fn send_wait_ack(
&mut self,
frame: Arc<Frame<CANFD_DATA_LEN>>,
) -> Result<CommonAckError> {
let stream = self.stream.try_clone()?;
tokio::task::spawn_blocking(move || stream.send(&frame, 0)).await??;

// put some randomness into ack number to prevent collision with other processes
let expected_ack_number =
process::id() << 16 | self.ack_num_lsb.load(Ordering::Relaxed) as u32;
let expected_ack_number = create_ack(self.ack_num_lsb.load(Ordering::SeqCst));
self.ack_num_lsb.fetch_add(1, Ordering::Relaxed);

self.wait_ack(expected_ack_number).await
Expand All @@ -94,15 +93,14 @@ impl CanRawMessaging {
/// Receive CAN frames
/// - relay acks to `ack_tx`
/// - relay new McuMessage to `new_message_queue`
#[allow(dead_code)]
async fn can_rx(
stream: FrameStream<64>,
fn can_rx(
stream: FrameStream<CANFD_DATA_LEN>,
remote_node: Device,
ack_tx: mpsc::Sender<(CommonAckError, u32)>,
new_message_queue: mpsc::Sender<McuPayload>,
) -> Result<()> {
loop {
let mut frame: Frame<64> = Frame::empty();
let mut frame: Frame<CANFD_DATA_LEN> = Frame::empty();
loop {
match stream.recv(&mut frame, 0) {
Ok(_) => {
Expand Down Expand Up @@ -146,12 +144,8 @@ async fn can_rx(
#[async_trait]
impl MessagingInterface for CanRawMessaging {
/// Send payload into McuMessage
#[allow(dead_code)]
async fn send(&mut self, payload: McuPayload) -> Result<CommonAckError> {
// snowflake ack ID to avoid collisions:
// prefix ack number with process ID
let ack_number =
process::id() << 16 | self.ack_num_lsb.load(Ordering::Relaxed) as u32;
let ack_number = create_ack(self.ack_num_lsb.load(Ordering::SeqCst));

let bytes = match self.can_node {
Main => {
Expand Down Expand Up @@ -201,26 +195,20 @@ impl MessagingInterface for CanRawMessaging {
};

if let Some(bytes) = bytes {
let mut buf: [u8; 64] = [0u8; 64];
let mut buf = [0u8; CANFD_DATA_LEN];
buf[..bytes.len()].copy_from_slice(bytes.as_slice());

let node_addr = self.can_node as u32;
let frame = Frame {
id: Id::Extended(node_addr),
len: 64,
flags: 0x0F,
len: bytes.len() as u8,
flags: can_rs::CANFD_BRS_FLAG | can_rs::CANFD_FDF_FLAG,
data: buf,
};

self.send_wait_ack(&frame).await
self.send_wait_ack(Arc::new(frame)).await
} else {
Err(eyre!("Failed to encode payload"))
}
}
}

impl Drop for CanRawMessaging {
fn drop(&mut self) {
// TODO
}
}
60 changes: 31 additions & 29 deletions mcu-util/src/messaging/can/isotp.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
use async_trait::async_trait;
use color_eyre::eyre::{eyre, Context, Result};
use orb_messages::CommonAckError;
use prost::Message;
use std::io::{Read, Write};
use std::process;
use std::sync::atomic::{AtomicU16, Ordering};
use std::sync::mpsc;

use async_trait::async_trait;
use eyre::{eyre, Context, Result};
use orb_messages::CommonAckError;
use prost::Message;
use tokio::time::Duration;
use tracing::{debug, error};

use can_rs::isotp::addr::CanIsotpAddr;
use can_rs::isotp::stream::IsotpStream;
use can_rs::Id;
use can_rs::{Id, CAN_DATA_LEN};

use crate::messaging::{
handle_main_mcu_message, handle_sec_mcu_message, McuPayload, MessagingInterface,
Expand All @@ -31,7 +30,6 @@ const CAN_ADDR_IS_DEST: u32 = 1 << 9;
/// for bidirectional communication, addresses are comprised of source and destination digit
/// along with some flags.
#[derive(Clone, Copy, PartialEq, Debug)]
#[allow(dead_code)]
pub enum IsoTpNodeIdentifier {
MainMcu = 0x1,
SecurityMcu = 0x2,
Expand Down Expand Up @@ -69,7 +67,7 @@ impl From<u8> for IsoTpNodeIdentifier {
}

pub struct CanIsoTpMessaging {
tx_stream: IsotpStream<8>,
stream: IsotpStream<CAN_DATA_LEN>,
ack_num_lsb: AtomicU16,
ack_queue: mpsc::Receiver<(CommonAckError, u32)>,
}
Expand Down Expand Up @@ -103,23 +101,27 @@ impl CanIsoTpMessaging {
let (tx_stdid_src, tx_stdid_dst) = create_pair(local, remote)?;
debug!("Sending on 0x{:x}->0x{:x}", tx_stdid_src, tx_stdid_dst);

let (ack_tx, ack_rx) = mpsc::channel();

// open TX stream
let tx_isotp_stream = IsotpStream::<8>::build().bind(
CanIsotpAddr::new(
bus.as_str(),
Id::Standard(tx_stdid_dst),
Id::Standard(tx_stdid_src),
let tx_isotp_stream = IsotpStream::<CAN_DATA_LEN>::build()
.bind(
CanIsotpAddr::new(
bus.as_str(),
Id::Standard(tx_stdid_dst),
Id::Standard(tx_stdid_src),
)
.expect("Unable to build IsoTpStream"),
)
.expect("Unable to build IsoTpStream"),
)?;
.wrap_err("Failed to bind CAN ISO-TP stream")?;

let (ack_tx, ack_rx) = mpsc::channel();

// spawn CAN receiver
tokio::task::spawn(can_rx(bus, remote, local, ack_tx, new_message_queue));
tokio::task::spawn_blocking(move || {
can_rx(bus, remote, local, ack_tx, new_message_queue)
});

Ok(CanIsoTpMessaging {
tx_stream: tx_isotp_stream,
stream: tx_isotp_stream,
ack_num_lsb: AtomicU16::new(0),
ack_queue: ack_rx,
})
Expand All @@ -140,13 +142,14 @@ impl CanIsoTpMessaging {
}
}

async fn send_wait_ack(&mut self, frame: &[u8]) -> Result<CommonAckError> {
match self.tx_stream.write(frame) {
Ok(_) => {}
Err(err) => {
error!("Error writing stream: {}", err);
async fn send_wait_ack(&mut self, frame: Vec<u8>) -> Result<CommonAckError> {
let mut stream = self.stream.try_clone()?;
tokio::task::spawn_blocking(move || {
if let Err(e) = stream.write(frame.as_slice()) {
error!("Error writing stream: {}", e);
}
}
})
.await?;

let expected_ack_number =
process::id() << 16 | self.ack_num_lsb.load(Ordering::Relaxed) as u32;
Expand All @@ -159,7 +162,7 @@ impl CanIsoTpMessaging {
/// Receive CAN frames
/// - relay acks to `ack_tx`
/// - relay new McuMessage to `new_message_queue`
async fn can_rx(
fn can_rx(
bus: String,
remote: IsoTpNodeIdentifier,
local: IsoTpNodeIdentifier,
Expand All @@ -170,7 +173,7 @@ async fn can_rx(
let (rx_stdid_src, rx_stdid_dest) = create_pair(remote, local)?;
debug!("Listening on 0x{:x}->0x{:x}", rx_stdid_src, rx_stdid_dest);

let mut rx_isotp_stream = IsotpStream::<8>::build().bind(
let mut rx_isotp_stream = IsotpStream::<CAN_DATA_LEN>::build().bind(
CanIsotpAddr::new(
bus.as_str(),
Id::Standard(rx_stdid_src),
Expand Down Expand Up @@ -219,7 +222,6 @@ async fn can_rx(
impl MessagingInterface for CanIsoTpMessaging {
/// Send payload into McuMessage
/// One could decide to only listen for ISO-TP message so allow dead code for `send` method
#[allow(dead_code)]
async fn send(&mut self, payload: McuPayload) -> Result<CommonAckError> {
let ack_number =
process::id() << 16 | self.ack_num_lsb.load(Ordering::Relaxed) as u32;
Expand Down Expand Up @@ -256,6 +258,6 @@ impl MessagingInterface for CanIsoTpMessaging {
_ => return Err(eyre!("Invalid payload")),
};

self.send_wait_ack(bytes.as_slice()).await
self.send_wait_ack(bytes).await
}
}
Loading
Loading