diff --git a/Cargo.toml b/Cargo.toml index 5a19408..73714e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ description = "A rust crate for ECU diagnostic servers and communication APIs" license = "MIT" repository = "https://github.com/rnd-ash/ecu_diagnostics" readme = "README.md" -keywords = ["socketcan", "kwp2000", "uds", "j2534", "dpdu"] +keywords = ["socketcan", "kwp2000", "uds", "j2534", "dpdu", "slcan"] exclude = [ "examples/*", "build.rs", @@ -22,9 +22,10 @@ all-features = true targets = ["x86_64-unknown-linux-gnu", "i686-pc-windows-msvc", "x86_64-apple-darwin"] [features] -default = ["passthru", "socketcan"] +default = ["passthru", "socketcan", "slcan"] socketcan = ["dep:socketcan-isotp", "dep:socketcan"] passthru = ["dep:libloading", "dep:shellexpand", "dep:winreg", "dep:serde_json", "dep:j2534_rust"] +slcan = ["dep:serial-rs"] [dependencies] #automotive_diag = { version = "0.1", path = "../automotive_diag" } @@ -36,6 +37,7 @@ log="0.4.16" strum = "0.26.3" strum_macros = "0.26.4" thiserror="1.0.44" +serial-rs = { version = "0.2.1", optional = true } [dev-dependencies] env_logger = "0.11.3" diff --git a/README.md b/README.md index 7bec9b2..fb83395 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,10 @@ for creating Channels for diagnostic servers using the hardware * ISO-TP * CAN +### SLCAN +* ISO-TP +* CAN + ### D-PDU (ISO 22900-2) TBA diff --git a/examples/kwp_slcan.rs b/examples/kwp_slcan.rs new file mode 100644 index 0000000..24afd3d --- /dev/null +++ b/examples/kwp_slcan.rs @@ -0,0 +1,135 @@ +use std::time::Duration; + +use automotive_diag::kwp2000::{KwpSessionType, KwpSessionTypeByte}; +use serial_rs::{FlowControl, SerialPortSettings}; + +use ecu_diagnostics::{ + channel::IsoTPSettings, + dynamic_diag::{ + DiagProtocol, DiagServerAdvancedOptions, DiagServerBasicOptions, DiagServerEmptyLogger, DiagSessionMode, DynamicDiagSession, TimeoutConfig + }, + hardware::Hardware, + kwp2000::Kwp2000Protocol, +}; + +extern crate ecu_diagnostics; +extern crate serial_rs; + +fn ecu_waiting_hook() { + println!("Called hook! ECU is processing our request"); +} + +fn tx_ok_hook(data: &[u8]) { + println!( + "This {} long array was sent to the ECU OK!: {:02X?}", + data.len(), + data + ); +} + +fn print_diag_mode(server: &DynamicDiagSession) { + if let Some(mode) = server.get_current_diag_mode() { + println!( + "ECU is currently in '{}' diagnostic mode (0x{:02X?}). Tester present being sent?: {}", + mode.name, mode.id, mode.tp_require + ); + } +} + + +fn main() { + env_logger::builder() + .format_timestamp(Some(env_logger::TimestampPrecision::Millis)) + .init(); + + let port = serial_rs::new_from_path( + "/dev/cu.usbmodem90379201".into(), + Some( + SerialPortSettings::default() + .baud(2000000) + .read_timeout(Some(5)) + .write_timeout(Some(100)) + .set_flow_control(FlowControl::None) + .set_blocking(true), + ), + ).unwrap(); + + let mut d = ecu_diagnostics::hardware::slcan::device::SlCanDevice::new(port, 1000); + + let isotp = d.create_iso_tp_channel().unwrap(); + + let mut protocol = Kwp2000Protocol::default(); + println!("Diagnostic server is {}!", protocol.get_protocol_name()); + // Register a custom diagnostic session with the protocol (Usually OEM specific) + protocol.register_session_type(DiagSessionMode { + id: 0x93, + tp_require: true, + name: "SuperSecretDiagMode".into(), + }); + + let mut diag_server = ecu_diagnostics::dynamic_diag::DynamicDiagSession::new_over_iso_tp( + protocol, + isotp, + IsoTPSettings { + // ISO-TP layer settings + block_size: 8, + st_min: 20, + extended_addresses: None, + pad_frame: true, + can_speed: 500_000, + can_use_ext_addr: false, + }, + DiagServerBasicOptions { + // Basic server options + send_id: 0x07E1, + recv_id: 0x07E9, + timeout_cfg: TimeoutConfig { + read_timeout_ms: 2500, + write_timeout_ms: 2500, + }, + }, + Some(DiagServerAdvancedOptions { + // Advanced server options + global_tp_id: 0, + tester_present_interval_ms: 2000, + tester_present_require_response: true, + global_session_control: false, + tp_ext_id: None, + command_cooldown_ms: 100, + }), + DiagServerEmptyLogger{} + ) + .unwrap(); + + // This call would work for KWP or UDS, not OBD2 as OBD2 has no form of 'session control' + if let Some(mode) = diag_server.get_current_diag_mode() { + println!( + "ECU is currently in '{}' diagnostic mode (0x{:02X?}). Tester present being sent?: {}", + mode.name, mode.id, mode.tp_require + ); + } + + // Register hook for when ECU responsds with RequestCorrectlyReceivedResponsePending + diag_server.register_waiting_hook(|| ecu_waiting_hook()); + // Register hook for when our requests are sent to the ECU, but we have not got a response. Usually + // this can be used to just let the program know Tx was OK! + diag_server.register_send_complete_hook(|bytes| tx_ok_hook(bytes)); + // Set diag session mode + let res = diag_server.kwp_set_session(KwpSessionType::ExtendedDiagnostics.into()); + println!("Into extended diag mode result: {:?}", res); + // Now check diag session mode, should be extended + print_diag_mode(&diag_server); + let res = diag_server.kwp_set_session(KwpSessionTypeByte::from(0x93)); // Same ID as what we registered at the start + println!("Into special diag mode result: {:?}", res); + print_diag_mode(&diag_server); + println!("Reset result: {:?}", diag_server.kwp_reset_ecu(automotive_diag::kwp2000::ResetType::PowerOnReset)); + print_diag_mode(&diag_server); // ECU should be in standard mode now as the ECU was rebooted + std::thread::sleep(Duration::from_millis(500)); + println!("Read op: {:?}", diag_server.kwp_enable_normal_message_transmission()); + print_diag_mode(&diag_server); // ECU will automatically be put into 0x93 mode + // (Last requested mode as enable_normal_message_transmission cannot be ran in standard mode) + loop { + // TP will be sent in this mode forever + std::thread::sleep(Duration::from_millis(1000)); + } +} diff --git a/examples/slcan_can.rs b/examples/slcan_can.rs new file mode 100644 index 0000000..0a2ce66 --- /dev/null +++ b/examples/slcan_can.rs @@ -0,0 +1,45 @@ +use serial_rs::{FlowControl, SerialPortSettings}; +use ecu_diagnostics::{channel::CanFrame, hardware::Hardware}; + +extern crate ecu_diagnostics; +extern crate serial_rs; + +fn main() { + env_logger::builder() + .format_timestamp(Some(env_logger::TimestampPrecision::Millis)) + .init(); + + let port = serial_rs::new_from_path( + "/dev/cu.usbmodem90379201".into(), + Some( + SerialPortSettings::default() + .baud(2000000) + .read_timeout(Some(1)) + .write_timeout(Some(100)) + .set_flow_control(FlowControl::None) + .set_blocking(true), + ), + ).unwrap(); + + let mut d = ecu_diagnostics::hardware::slcan::device::SlCanDevice::new(port, 1000); + + let mut can = d.create_can_channel().unwrap(); + + can.set_can_cfg(83_333, false).unwrap(); + + can.open().unwrap(); + + let packets = can.read_packets(100, 500).unwrap(); + + for p in packets { + println!("{:02X?}", p); + } + + can.write_packets(vec![CanFrame::new(0x5B4, vec![2, 0x10, 0x92, 0, 0,0,0].as_ref(), false)], 100).unwrap(); + + let packets = can.read_packets(100, 500).unwrap(); + + for p in packets { + println!("{:02X?}", p); + } +} \ No newline at end of file diff --git a/src/hardware/mod.rs b/src/hardware/mod.rs index 901773e..212b689 100644 --- a/src/hardware/mod.rs +++ b/src/hardware/mod.rs @@ -7,13 +7,14 @@ mod dpdu; #[cfg(feature = "passthru")] pub mod passthru; // Not finished at all yet, hide from the crate -#[cfg(feature = "passthru")] -use std::sync::Arc; -use std::{any::{Any, TypeId}, fmt::Debug, sync::{Mutex, PoisonError, RwLock}}; +use std::{any::{Any, TypeId}, fmt::Debug, sync::{Arc, PoisonError, RwLock}}; #[cfg(all(feature="socketcan", target_os="linux"))] pub mod socketcan; +#[cfg(feature = "slcan")] +pub mod slcan; + use crate::channel::{CanChannel, IsoTPChannel}; /// Hardware API result diff --git a/src/hardware/slcan/device.rs b/src/hardware/slcan/device.rs new file mode 100644 index 0000000..d637d4d --- /dev/null +++ b/src/hardware/slcan/device.rs @@ -0,0 +1,283 @@ +//! Implements SLCAN device + +use std::{ + collections::VecDeque, + fmt::{Debug, Formatter, Result as FmtResult}, + io::{Read, Write}, + sync::{atomic::AtomicBool, Arc, Mutex}, + time::Instant +}; + +use serial_rs::SerialPort; + +use crate::{channel::{CanFrame, ChannelError, Packet}, hardware::{HardwareCapabilities, HardwareInfo}}; + +const MAX_PACKET_SIZE: usize = 32; +const HEX: [u8; 16] = *b"0123456789ABCDEF"; + +#[derive(Debug, Clone, thiserror::Error)] +/// Error produced by a communication channel +pub enum SlCanError { + /// IO Error + #[error("IO error")] + IOError(#[from] #[source] Arc), + /// Operation failed + #[error("Operation failed")] + OperationFailed, + /// Unsupported speed + #[error("Unsupported speed")] + UnsupportedSpeed, + /// Read timeout + #[error("Read timeout")] + ReadTimeout, + /// Rx buffer full + #[error("Rx buffer full")] + RxBufferFull, + /// Decoding failed + #[error("Decoding failed")] + DecodingFailed, + /// Not acknowledged + #[error("Not acknowledged")] + NotAcknowledged, +} + +/// SLCAN Result +pub type SlCanResult = Result; + +const SLCAN_CAPABILITIES: HardwareCapabilities = HardwareCapabilities { + iso_tp: true, + can: true, + ip: false, + sae_j1850: false, + kline: false, + kline_kwp: false, + sci: false, +}; + + +/// SLCAN Device +#[derive(Clone)] +pub struct SlCanDevice { + pub(crate) info: HardwareInfo, + port: Arc>>, + pub(crate) canbus_active: Arc, + pub(crate) isotp_active: Arc, + rx_queue: VecDeque, + rx_queue_limit: usize, +} + +impl Debug for SlCanDevice { + fn fmt(&self, f: &mut Formatter) -> FmtResult { + write!(f, "SlCanDevice {}", self.info.name) + } +} +enum ReadWithAckResult { + Ack, + CanFrame(CanFrame) +} + +impl SlCanDevice { + /// Creates a new SLCAN device + pub fn new(port: Box, rx_queue_limit: usize) -> Self { + SlCanDevice { + info: HardwareInfo { + name: "slcan".into(), // TODO: Get version and serial number from device + vendor: None, + capabilities: SLCAN_CAPABILITIES, + device_fw_version: None, + api_version: None, + library_version: None, + library_location: None, + }, + port: Arc::new(Mutex::new(port)), + canbus_active: Arc::new(AtomicBool::new(false)), + isotp_active: Arc::new(AtomicBool::new(false)), + rx_queue: VecDeque::new(), + rx_queue_limit + } + } + + fn read_ack_or_packet(&mut self) -> SlCanResult { + let mut buf_1 = [0; 1]; + let mut buf = Vec::with_capacity(MAX_PACKET_SIZE); + while self.port.lock().unwrap().read(&mut buf_1).map_err(convert_io_error)? == 1 { + let byte = buf_1[0]; + if byte != b'\r' && byte != 0x7 { + if buf.len() == MAX_PACKET_SIZE { + return Err(SlCanError::RxBufferFull); + } + buf.push(byte); + } else { + if buf.len() == 0 { + if buf_1[0] == 0x7 { + return Err(SlCanError::NotAcknowledged); + } else if buf_1[0] == b'\r' { + return Ok(ReadWithAckResult::Ack); + } else { + return Err(SlCanError::OperationFailed); + } + } + if buf.len() < 5 { + return Err(SlCanError::DecodingFailed); + } + match buf[0] { + b't' => { + let id = (hex_to_byte(buf[1])? as u32) << 8 + | (hex_to_byte(buf[2])? as u32) << 4 + | hex_to_byte(buf[3])? as u32; + let dlc = hex_to_byte(buf[4])?; + + if dlc > 8 || buf.len() < (dlc * 2) as usize + 5 { + return Err(SlCanError::DecodingFailed); + } + let mut data = [0u8; 8]; + for i in 0..dlc as usize { + data[i] = hex_to_byte(buf[5 + i * 2])? << 4 + | hex_to_byte(buf[5 + i * 2 + 1])?; + } + return Ok(ReadWithAckResult::CanFrame(CanFrame::new(id, &data, false))); + }, + b'T' => { + let id = (hex_to_byte(buf[1])? as u32) << 28 + | (hex_to_byte(buf[2])? as u32) << 24 + | (hex_to_byte(buf[3])? as u32) << 20 + | (hex_to_byte(buf[4])? as u32) << 16 + | (hex_to_byte(buf[5])? as u32) << 12 + | (hex_to_byte(buf[6])? as u32) << 8 + | (hex_to_byte(buf[7])? as u32) << 4 + | hex_to_byte(buf[8])? as u32; + let dlc = hex_to_byte(buf[9])?; + if dlc > 8 || buf.len() <= (dlc * 2) as usize + 5 { + return Err(SlCanError::DecodingFailed); + } + let mut data = [0u8; 8]; + for i in 0..dlc as usize { + data[i] = hex_to_byte(buf[10 + i * 2 + 1])? << 4 + | hex_to_byte(buf[10 + i * 2])?; + } + return Ok(ReadWithAckResult::CanFrame(CanFrame::new(id, &data, true))); + }, + _ => return Err(SlCanError::DecodingFailed) + } + } + } + Err(SlCanError::ReadTimeout) + } + + fn send_command_with_ack(&mut self, cmd: &[u8]) -> SlCanResult<()> { + self.port + .lock() + .unwrap() + .write(cmd) + .map_err(convert_io_error)?; + // Trying to get ACK, but there can be other packet coming at that point + // That packet is saved in queue and will be read with read later + // Timeout of 1 second must be enough in order to get the ACK among other packets + let instant = Instant::now(); + while instant.elapsed().as_millis() <= 1000 { + match self.read_ack_or_packet()? { + ReadWithAckResult::CanFrame(f) => { + if self.rx_queue.len() >= self.rx_queue_limit { + return Err(SlCanError::RxBufferFull); + } + self.rx_queue.push_back(f) + }, + ReadWithAckResult::Ack => return Ok(()) + } + } + Err(SlCanError::ReadTimeout) + } + + /// Sets can speed and opens SLCAN channel + pub fn open(&mut self, can_speed: u32) -> SlCanResult<()> { + self.send_command_with_ack(get_speed_cmd(can_speed)?.as_ref())?; + self.send_command_with_ack(b"O\r") + } + + /// Closes SLCAN channel + pub fn close(&mut self) -> SlCanResult<()> { + self.send_command_with_ack(b"C\r") + } + + /// Reads can frames from SLCAN device + pub fn read(&mut self) -> SlCanResult { + if let Some(f) = self.rx_queue.pop_front() { + Ok(f) + } else { + match self.read_ack_or_packet()? { + ReadWithAckResult::CanFrame(f) => Ok(f), + ReadWithAckResult::Ack => Err(SlCanError::DecodingFailed), + } + } + } + + /// Sends can frames to SLCAN device + pub fn write(&mut self, frame: CanFrame) -> SlCanResult<()> { + let mut buf = Vec::with_capacity(27); + if frame.is_extended() { + buf.push(b'T'); + let id = frame.get_address().to_be_bytes(); + for i in id { + buf.push(HEX[i as usize >> 4]); + buf.push(HEX[i as usize & 0xF]); + } + } else { + buf.push(b't'); + let id = frame.get_address() & 0xFFF; + buf.push(HEX[id as usize >> 8]); + buf.push(HEX[(id as usize >> 4) & 0xF]); + buf.push(HEX[id as usize & 0xF]); + } + buf.push(HEX[frame.get_data().len() & 0xF]); + for d in frame.get_data() { + buf.push(HEX[*d as usize >> 4]); + buf.push(HEX[*d as usize & 0xF]); + } + buf.push(b'\r'); + self.send_command_with_ack(buf.as_slice()) + } + + /// Clears RX queue + pub fn clear_rx_queue(&mut self) { + self.rx_queue.clear(); + } +} + +fn get_speed_cmd(can_speed: u32) -> SlCanResult<[u8; 3]> { + match can_speed { + 10_000 => Ok(*b"S0\r"), + 20_000 => Ok(*b"S1\r"), + 50_000 => Ok(*b"S2\r"), + 100_000 => Ok(*b"S3\r"), + 125_000 => Ok(*b"S4\r"), + 250_000 => Ok(*b"S5\r"), + 500_000 => Ok(*b"S6\r"), + 800_000 => Ok(*b"S7\r"), + 1_000_000 => Ok(*b"S8\r"), + 83_333 => Ok(*b"S9\r"), // Not supported by original standard + _ => Err(SlCanError::UnsupportedSpeed) + } +} + +fn convert_io_error(error: std::io::Error) -> SlCanError { + SlCanError::IOError(Arc::new(error)) +} + +fn hex_to_byte(hex: u8) -> SlCanResult { + match hex { + b'0'..=b'9' => Ok(hex - b'0'), + b'a'..=b'f' => Ok(hex - b'a' + 10), + b'A'..=b'F' => Ok(hex - b'A' + 10), + _ => Err(SlCanError::DecodingFailed) + } +} + +impl From for ChannelError { + fn from(value: SlCanError) -> Self { + match value { + SlCanError::IOError(err) => ChannelError::IOError(err), + SlCanError::ReadTimeout => ChannelError::ReadTimeout, + _ => ChannelError::Other(value.to_string()), + } + } +} diff --git a/src/hardware/slcan/mod.rs b/src/hardware/slcan/mod.rs new file mode 100644 index 0000000..b8f7397 --- /dev/null +++ b/src/hardware/slcan/mod.rs @@ -0,0 +1,760 @@ +//! SLCAN Module +//! +//! NOTE: This module was not tested with a genuine SLCAN device, therefore is considered EXPERIMENTAL +//! The following sketch was as a base for implementation of SLCAN device on Teensy 4.1: +//! https://github.com/buched/teensy-slcan-flexcan-T4 +//! +//! This module was NOT tested for extended CAN IDs + +use std::{ + borrow::BorrowMut, + fmt::Debug, + sync::{atomic::{AtomicBool, Ordering}, mpsc, Arc, Mutex}, + thread::JoinHandle, + time::{Duration, Instant} +}; + +use device::SlCanDevice; +use crate::channel::{CanChannel, CanFrame, ChannelError, ChannelResult, IsoTPChannel, IsoTPSettings, Packet, PacketChannel, PayloadChannel}; +use super::{Hardware, HardwareInfo, HardwareResult}; + +pub mod device; + + +unsafe impl Sync for SlCanDevice {} +unsafe impl Send for SlCanDevice {} + + +impl Hardware for SlCanDevice { + fn create_iso_tp_channel(&mut self) -> HardwareResult> { + Ok(Box::new(SlCanChannel::new(self.clone())?)) + } + + fn create_can_channel(&mut self) -> HardwareResult> { + Ok(Box::new(SlCanChannel::new(self.clone())?)) + } + + fn read_battery_voltage(&mut self) -> Option { + None + } + + fn read_ignition_voltage(&mut self) -> Option { + None + } + + fn get_info(&self) -> &HardwareInfo { + &self.info + } + + fn is_iso_tp_channel_open(&self) -> bool { + self.isotp_active.load(Ordering::Relaxed) + } + + fn is_can_channel_open(&self) -> bool { + self.canbus_active.load(Ordering::Relaxed) + } + + fn is_connected(&self) -> bool { + // Assuming it is always connected, since port must be opened before creation of Hardware + true + } +} + + +#[derive(Debug, Clone)] +enum ChannelMessage { + ClearRx, + ClearTx, + SendData { ext_id: Option, d: T }, + SetConfig(X), + SetFilter(u32, u32), // Only for ISOTP + Open, + Close, +} + +/// SLCAN ISO-TP +#[allow(missing_debug_implementations)] +pub struct SlCanChannel { + device: SlCanDevice, + + can_rx_queue: mpsc::Receiver, + can_tx_queue: mpsc::Sender>, + can_tx_res_queue: mpsc::Receiver>, + + isotp_rx_queue: mpsc::Receiver<(u32, Vec)>, + isotp_tx_queue: mpsc::Sender), IsoTPSettings>>, + isotp_tx_res_queue: mpsc::Receiver>, + + can_mutex: Mutex<()>, + isotp_mutex: Mutex<()>, + handle: Option>, + running: Arc, +} + +struct IsoTpPayload { + pub data: Vec, + pub curr_size: usize, + pub max_size: usize, + pub cts: bool, + pub pci: u8, + pub max_cpy_size: u8, + pub ext_addr: bool, + pub bs: u8, + pub stmin: u8, +} + +unsafe impl Sync for SlCanChannel {} +unsafe impl Send for SlCanChannel {} + +impl SlCanChannel { + + /// Creates SLCAN Channel + pub fn new(mut dev: SlCanDevice) -> HardwareResult { + let (tx_can_send, rx_can_send) = mpsc::channel::>(); + let (tx_can_send_res, rx_can_send_res) = mpsc::channel::>(); + let (tx_can_recv, rx_can_recv) = mpsc::channel::(); + + + let (tx_isotp_send, rx_isotp_send) = + mpsc::channel::), IsoTPSettings>>(); + let (tx_isotp_send_res, rx_isotp_send_res) = mpsc::channel::>(); + let (tx_isotp_recv, rx_isotp_recv) = mpsc::channel::<(u32, Vec)>(); + + let is_running = Arc::new(AtomicBool::new(true)); + let is_running_t = is_running.clone(); + + let device = dev.clone(); + + let handle = std::thread::spawn(move || { + let mut iso_tp_cfg: Option = None; + let mut can_cfg: Option<(u32, bool)> = None; + let mut iso_tp_filter: Option<(u32, u32)> = None; // Tx, Rx + let mut isotp_rx: Option = None; + let mut isotp_tx: Option = None; + let mut ext_address = false; + let mut last_tx_time = Instant::now(); + let mut tx_frames_sent = 0u32; + let mut rx_frames_received = 0u32; + + let mut is_iso_tp_open = false; + let mut is_can_open = false; + + while is_running_t.load(Ordering::Relaxed) { + if let Ok(can_req) = rx_can_send.try_recv() { + log::debug!("SLCAN CAN Req: {can_req:?}"); + let _res = match can_req { + ChannelMessage::ClearRx => { + dev.clear_rx_queue(); + tx_can_send_res.send(Ok(())) + }, + ChannelMessage::ClearTx => tx_can_send_res.send(Ok(())), + ChannelMessage::SendData { ext_id: _, d } => { + let res: ChannelResult<()> = dev.write(d) + .map_err(|e| e.into()); + tx_can_send_res.send(res) + }, + ChannelMessage::SetConfig((baud, use_ext)) => { + let mut res: ChannelResult<()> = Ok(()); + if let Some(icfg) = iso_tp_cfg { + // Compare against ISOTP config + if icfg.can_speed != baud || icfg.can_use_ext_addr != use_ext { + // Mismatched config! + res = Err(ChannelError::Other( + "CAN and ISO-TP cfg mismatched for channel".into(), + )); + } + } + if res.is_ok() { + can_cfg = Some((baud, use_ext)); + } + tx_can_send_res.send(res) + }, + ChannelMessage::SetFilter(_, _) => todo!(), + ChannelMessage::Open => { + let res: ChannelResult<()>; + if can_cfg.is_none() { + res = Err(ChannelError::ConfigurationError) + } else { + let cfg = can_cfg.unwrap(); + res = dev.open(cfg.0).map_err(|e| e.into()); + } + if res.is_ok() { + is_can_open = true; + } + tx_can_send_res.send(res) + }, + ChannelMessage::Close => { + can_cfg = None; + is_can_open = false; + tx_isotp_send_res.send(dev.close().map_err(|e| e.into())) + }, + }; + } + + if let Ok(isotp_req) = rx_isotp_send.try_recv() { + let _send = match isotp_req { + ChannelMessage::SetConfig(cfg) => { + let mut res: ChannelResult<()> = Ok(()); + if let Some(ccfg) = can_cfg { + // Compare against CAN config + if ccfg.0 != cfg.can_speed || ccfg.1 != cfg.can_use_ext_addr { + // Mismatched config! + res = Err(ChannelError::Other( + "CAN and ISO-TP cfg mismatched for channel".into(), + )); + } + } + if res.is_ok() { + ext_address = cfg.extended_addresses.is_some(); + iso_tp_cfg = Some(cfg); + } + tx_isotp_send_res.send(res) + } + ChannelMessage::Open => { + let res: ChannelResult<()>; + if iso_tp_cfg.is_none() { + res = Err(ChannelError::ConfigurationError) + } else { + let tp_cfg = iso_tp_cfg.unwrap(); + res = dev.open(tp_cfg.can_speed).map_err(|e| e.into()); + if res.is_ok() { + is_iso_tp_open = true; + } + } + tx_isotp_send_res.send(res) + } + ChannelMessage::Close => { + iso_tp_cfg = None; + isotp_rx = None; + isotp_tx = None; + is_iso_tp_open = false; + tx_isotp_send_res.send(dev.close().map_err(|e| e.into())) + } + ChannelMessage::SetFilter(tx, rx) => { + iso_tp_filter = Some((tx, rx)); + tx_isotp_send_res.send(Ok(())) + } + ChannelMessage::ClearRx => { + dev.clear_rx_queue(); + isotp_rx = None; + tx_isotp_send_res.send(Ok(())) + } + ChannelMessage::ClearTx => { + isotp_tx = None; + tx_isotp_send_res.send(Ok(())) + } // Todo clear Tx buffer, + ChannelMessage::SendData { + ext_id: _, + d: (addr, data), + } => { + if iso_tp_cfg.is_none() || iso_tp_filter.is_none() { + tx_isotp_send_res.send(Err(ChannelError::ConfigurationError)) + } else if !is_iso_tp_open { + tx_isotp_send_res.send(Err(ChannelError::InterfaceNotOpen)) + } else { + let cfg = iso_tp_cfg.unwrap(); + // Send + if (ext_address && data.len() < 6) + || (!ext_address && data.len() < 7) + { + let mut df: Vec = Vec::with_capacity(8); + if ext_address { + df.push(cfg.extended_addresses.unwrap().0); + } + df.push(data.len() as u8); + df.extend_from_slice(&data); + if cfg.pad_frame { + df.resize(8, 0xCC); + } + log::debug!("Sending ISO-TP msg as 1 CAN frame {df:02X?}"); + let cf = CanFrame::new(addr, &df, cfg.can_use_ext_addr); + let res: ChannelResult<()> = dev.write(cf) + .map_err(|e| e.into()); + tx_isotp_send_res.send(res) + } else { + if isotp_tx.is_some() { + tx_isotp_send_res.send(Err(ChannelError::BufferFull)) + } else if data.len() > 0xFFF { + tx_isotp_send_res + .send(Err(ChannelError::UnsupportedRequest)) + // Request too big + } else { + let mut df: Vec = Vec::with_capacity(8); + if ext_address { + df.push(cfg.extended_addresses.unwrap().0); + } + df.push((0x10 | ((data.len() as u32) >> 8) & 0x0F) as u8); + df.push(data.len() as u8); + let max_copy = if ext_address { 5 } else { 6 }; + df.extend_from_slice(&data[0..max_copy]); + let cf = CanFrame::new(addr, &df, cfg.can_use_ext_addr); + let res: ChannelResult<()> = dev.write(cf) + .map_err(|e| e.into()); + if res.is_ok() { + isotp_tx = Some(IsoTpPayload { + data: data.clone(), + curr_size: max_copy, + max_size: data.len(), + cts: false, + pci: 0x21, + max_cpy_size: max_copy as u8 + 1, + ext_addr: ext_address, + // These 2 are temp, they are overriden by the ECU when FC comes in + bs: cfg.block_size, + stmin: cfg.st_min, + }); + } + tx_isotp_send_res.send(Ok(())) + } + } + } + } + }; + }; + + if is_iso_tp_open || is_can_open { + let incomming = dev.read().ok(); + if can_cfg.is_some() { + if let Some(p) = incomming { + tx_can_recv.send(p).unwrap(); + } + } + if let (Some(cfg), Some(filter)) = (iso_tp_cfg, iso_tp_filter) { + if let Some(packet) = incomming { + if packet.get_address() == filter.1 { + if ext_address + && packet.get_data()[1] != cfg.extended_addresses.unwrap().1 + { + continue; + } + // IsoTP is some so process the incomming frame! + // check PCI first (Quicker) + let pci_idx = if ext_address { 1 } else { 0 }; + let pci_raw = *packet.get_data().get(pci_idx).unwrap_or(&0xFF); + let pci = pci_raw & 0xF0; + if pci == 0x00 || pci == 0x10 || pci == 0x20 || pci == 0x30 { + let data = packet.get_data(); + log::debug!( + "Incomming ISO-TP frame 0x{:04X?}: {:02X?}", + filter.1, + data + ); + match pci { + 0x00 => { + // Single frame + tx_isotp_recv + .send(( + filter.1, + data[1 + pci_idx + ..1 + pci_idx + pci_raw as usize] + .to_vec(), + )) + .unwrap(); + } + 0x10 => { + if isotp_rx.is_some() { + log::warn!("ISOTP Rx overwriting old payload!"); + } + let size = ((data[pci_idx] as usize & 0x0F) << 8) + | data[1 + pci_idx] as usize; + let mut data_rx = Vec::with_capacity(size); + log::debug!("ISOTP Expecting data payload of {size} bytes, sending FC"); + data_rx.extend_from_slice(&data[pci_idx + 2..]); + isotp_rx = Some(IsoTpPayload { + data: data_rx, + curr_size: 8 - 2 - pci_idx, + max_size: size, + cts: true, + pci: 0x21, + max_cpy_size: if ext_address { 6 } else { 7 }, + ext_addr: ext_address, + bs: cfg.block_size, + stmin: cfg.st_min, + }); + // Send FC + let mut df: Vec = Vec::with_capacity(8); + if ext_address { + df.push(cfg.extended_addresses.unwrap().0); + } + df.extend_from_slice(&[ + 0x30, + cfg.block_size, + cfg.st_min, + ]); + if cfg.pad_frame { + df.resize(8, 0xCC); + } + if let Err(e) = dev.write(CanFrame::new( + filter.0, + &df, + cfg.can_use_ext_addr, + )) { + isotp_rx = None; // Could not send FC + log::error!("Could not send FC to ECU: {e}"); + } + rx_frames_received = 0; + } + 0x20 => { + if let Some(rx) = isotp_rx.borrow_mut() { + let mut max_copy = rx.max_size - rx.data.len(); + if max_copy > rx.max_cpy_size as usize { + max_copy = rx.max_cpy_size as usize; + } + rx_frames_received += 1; + rx.data.extend_from_slice( + &data[1 + pci_idx..1 + pci_idx + max_copy], + ); + if rx.data.len() >= rx.max_size { + // Yay, done! + tx_isotp_recv + .send((filter.1, rx.data.clone())) + .unwrap(); + isotp_rx = None; + continue; + } + // Not done, check if ECU requires a new FC msg + if rx.bs > 0 && rx_frames_received >= rx.bs as u32 { + // Check for new fc required + // Send FC + let mut df: Vec = Vec::with_capacity(8); + if ext_address { + df.push(cfg.extended_addresses.unwrap().0); + } + df.extend_from_slice(&[ + 0x30, + cfg.block_size, + cfg.st_min, + ]); + if cfg.pad_frame { + df.resize(8, 0xCC); + } + if let Err(e) = dev.write(CanFrame::new( + filter.0, + &df, + cfg.can_use_ext_addr, + )) { + isotp_rx = None; // Could not send FC + log::error!("Could not send FC to ECU: {e}"); + } + rx_frames_received = 0; + // Send FC + } + } + } + 0x30 => { + if pci_raw == 0x30 { + if let Some(to_tx) = isotp_tx.as_mut() { + to_tx.cts = true; + to_tx.bs = data[1 + pci_idx]; + to_tx.stmin = data[2 + pci_idx]; + if to_tx.stmin > 127 { + to_tx.stmin = 1; // In microseconds, we don't count that fast, so use 1ms + } + last_tx_time = Instant::now(); + tx_frames_sent = 0; + } + } + } + _ => { + log::warn!("Cannot handle ISO-TP frame {data:02X?}"); + } + } + } + } + } + let mut send_complete = false; + if let Some(tx_payload) = isotp_tx.borrow_mut() { + // Handle Tx data + let mut can_buffer = vec![]; + for _ in 0..8 { + // 8 frames in a batch max - Makes Tx with 0bs faster + if tx_payload.cts + && ((last_tx_time.elapsed().as_millis() + >= tx_payload.stmin.into()) + || tx_payload.stmin == 0) + { + let mut cf_payload = Vec::with_capacity(8); + // Do send + let max_copy = std::cmp::min( + tx_payload.max_size - tx_payload.curr_size, + tx_payload.max_cpy_size as usize, + ); + + if ext_address { + cf_payload.push(cfg.extended_addresses.unwrap().0) + } + cf_payload.push(tx_payload.pci); + cf_payload.extend_from_slice( + &tx_payload.data + [tx_payload.curr_size..tx_payload.curr_size + max_copy], + ); + can_buffer.push(CanFrame::new( + filter.0, + &cf_payload, + cfg.can_use_ext_addr, + )); + if cfg.pad_frame { + cf_payload.resize(8, 0xCC); + } + + if tx_payload.bs != 0 { + tx_frames_sent += 1; + } + tx_payload.pci += 1; + tx_payload.curr_size += max_copy; + if tx_payload.pci == 0x30 { + tx_payload.pci = 0x20; + } + + // Await new FC + last_tx_time = Instant::now(); + if tx_frames_sent as u8 >= tx_payload.bs && tx_payload.bs != 0 { + tx_frames_sent = 0; + tx_payload.cts = false; + break; + } + if tx_payload.bs != 0 { + break; // Delay + } + if tx_payload.curr_size >= tx_payload.max_size { + send_complete = true; + break; + } + } + } + if !can_buffer.is_empty() { + for packet in can_buffer { + if dev.write(packet).is_err() { + send_complete = true; // Destroy! + break; + } + } + } + } + if send_complete { + isotp_tx = None; + log::debug!("ISO-TP Send completed!"); + } + } + } + if iso_tp_cfg.is_none() && can_cfg.is_none() { + std::thread::sleep(Duration::from_millis(10)); + } else { + std::thread::sleep(Duration::from_millis(1)); + } + } + dev.close().unwrap(); + }); + + Ok(Self { + device, + + can_rx_queue: rx_can_recv, + can_tx_queue: tx_can_send, + can_tx_res_queue: rx_can_send_res, + + isotp_rx_queue: rx_isotp_recv, + isotp_tx_queue: tx_isotp_send, + isotp_tx_res_queue: rx_isotp_send_res, + can_mutex: Mutex::new(()), + isotp_mutex: Mutex::new(()), + running: is_running, + handle: Some(handle), + }) + } +} + + + +impl CanChannel for SlCanChannel { + fn set_can_cfg(&mut self, baud: u32, use_extended: bool) -> ChannelResult<()> { + log::debug!("CAN SetCANCfg called"); + let _guard = self.can_mutex.lock()?; + while self.can_tx_res_queue.try_recv().is_ok() {} + self.can_tx_queue + .send(ChannelMessage::SetConfig((baud, use_extended)))?; + // Wait for channels response + self.can_tx_res_queue + .recv_timeout(Duration::from_millis(100))? + } +} + +impl IsoTPChannel for SlCanChannel { + fn set_iso_tp_cfg(&mut self, cfg: IsoTPSettings) -> ChannelResult<()> { + log::debug!("ISO-TP SetIsoTpCfg called"); + let _guard = self.isotp_mutex.lock()?; + while self.isotp_tx_res_queue.try_recv().is_ok() {} + self.isotp_tx_queue.send(ChannelMessage::SetConfig(cfg))?; + // Wait for channels response + self.isotp_tx_res_queue + .recv_timeout(Duration::from_millis(100))? + } +} + +impl PacketChannel for SlCanChannel { + fn open(&mut self) -> ChannelResult<()> { + log::debug!("CAN Open called"); + let _guard = self.can_mutex.lock()?; + while self.can_tx_res_queue.try_recv().is_ok() {} + self.can_tx_queue.send(ChannelMessage::Open)?; + // Wait for channels response + let res = self.can_tx_res_queue + .recv_timeout(Duration::from_millis(100))?; + self.device.canbus_active.store(true, Ordering::Relaxed); + res + } + + fn close(&mut self) -> ChannelResult<()> { + log::debug!("CAN Close called"); + let _guard = self.can_mutex.lock()?; + while self.can_tx_res_queue.try_recv().is_ok() {} + self.can_tx_queue.send(ChannelMessage::Close)?; + // Wait for channels response + let res = self.can_tx_res_queue + .recv_timeout(Duration::from_millis(100))?; + self.device.canbus_active.store(false, Ordering::Relaxed); + res + } + + fn write_packets(&mut self, packets: Vec, timeout_ms: u32) -> ChannelResult<()> { + log::debug!("CAN WritePackets called"); + let _guard = self.can_mutex.lock()?; + for p in packets { + self.can_tx_queue + .send(ChannelMessage::SendData { ext_id: None, d: p })?; + if timeout_ms != 0 { + match self + .can_tx_res_queue + .recv_timeout(Duration::from_millis(timeout_ms as u64)) + { + Ok(m) => m?, + Err(e) => return Err(e.into()), + } + } + } + Ok(()) + } + + fn read_packets(&mut self, max: usize, timeout_ms: u32) -> ChannelResult> { + log::debug!("CAN ReadPackets called"); + let timeout = std::cmp::max(1, timeout_ms); + let mut res = vec![]; + let instant = Instant::now(); + while instant.elapsed().as_millis() <= timeout as u128 { + if let Ok(c) = self.can_rx_queue.try_recv() { + res.push(c) + } + if res.len() >= max { + break; + } + } + Ok(res) + } + + fn clear_rx_buffer(&mut self) -> ChannelResult<()> { + log::debug!("CAN ClearRxBuffer called"); + while self.can_rx_queue.try_recv().is_ok() {} + Ok(self.can_tx_queue.send(ChannelMessage::ClearRx)?) + + } + + fn clear_tx_buffer(&mut self) -> ChannelResult<()> { + log::debug!("CAN ClearTxBuffer called"); + Ok(self.can_tx_queue.send(ChannelMessage::ClearTx)?) + } +} + +impl PayloadChannel for SlCanChannel { + fn open(&mut self) -> ChannelResult<()> { + log::debug!("ISO-TP Open called"); + let _guard = self.isotp_mutex.lock()?; + while self.isotp_tx_res_queue.try_recv().is_ok() {} + self.isotp_tx_queue.send(ChannelMessage::Open)?; + // Wait for channels response + let res = self.isotp_tx_res_queue + .recv_timeout(Duration::from_millis(100))?; + self.device.isotp_active.store(true, Ordering::Relaxed); + res + } + + fn close(&mut self) -> ChannelResult<()> { + log::debug!("ISO-TP Close called"); + let _guard = self.isotp_mutex.lock()?; + while self.isotp_tx_res_queue.try_recv().is_ok() {} + self.isotp_tx_queue.send(ChannelMessage::Close)?; + // Wait for channels response + let res = self.isotp_tx_res_queue + .recv_timeout(Duration::from_millis(100))?; + self.device.isotp_active.store(true, Ordering::Relaxed); + res + } + + fn set_ids(&mut self, send: u32, recv: u32) -> ChannelResult<()> { + log::debug!("ISO-TP SetIDS called"); + let _guard = self.isotp_mutex.lock()?; + while self.isotp_tx_res_queue.try_recv().is_ok() {} + self.isotp_tx_queue + .send(ChannelMessage::SetFilter(send, recv))?; + // Wait for channels response + self.isotp_tx_res_queue + .recv_timeout(Duration::from_millis(100))? + } + + fn read_bytes(&mut self, timeout_ms: u32) -> ChannelResult> { + log::debug!("ISO-TP ReadBytes called"); + let timeout = std::cmp::max(1, timeout_ms); + let instant = Instant::now(); + while instant.elapsed().as_millis() <= timeout as u128 { + if let Ok(c) = self.isotp_rx_queue.try_recv() { + return Ok(c.1); + } + } + Err(ChannelError::BufferEmpty) + } + + fn write_bytes( + &mut self, + addr: u32, + ext_id: Option, + buffer: &[u8], + timeout_ms: u32, + ) -> ChannelResult<()> { + log::debug!("ISO-TP WriteBytes called"); + let _guard = self.isotp_mutex.lock()?; + while self.isotp_tx_res_queue.try_recv().is_ok() {} + self.isotp_tx_queue.send(ChannelMessage::SendData { + ext_id, + d: (addr, buffer.to_vec()), + })?; + if timeout_ms == 0 { + Ok(()) + } else { + // Wait for channels response + self.isotp_tx_res_queue + .recv_timeout(Duration::from_millis(timeout_ms as u64))? + } + } + + fn clear_rx_buffer(&mut self) -> ChannelResult<()> { + log::debug!("ISO-TP ClearRxBuffer called"); + while self.isotp_rx_queue.try_recv().is_ok() {} + self.isotp_tx_queue.send(ChannelMessage::ClearRx)?; + self.isotp_tx_res_queue + .recv_timeout(Duration::from_millis(100))? + } + + fn clear_tx_buffer(&mut self) -> ChannelResult<()> { + log::debug!("ISO-TP ClearTxBuffer called"); + self.isotp_tx_queue.send(ChannelMessage::ClearTx)?; + self.isotp_tx_res_queue + .recv_timeout(Duration::from_millis(100))? + } +} + +impl Drop for SlCanChannel { + fn drop(&mut self) { + log::debug!("Drop called"); + self.running.store(false, Ordering::Relaxed); + self.handle.take().map(|x| x.join()); + } +}