diff --git a/examples/laser/laser_ffi.rs b/examples/laser/laser_ffi.rs index 538077c98..a3cccc3e8 100644 --- a/examples/laser/laser_ffi.rs +++ b/examples/laser/laser_ffi.rs @@ -74,7 +74,11 @@ fn main() { // Synchronous DAC detection. println!("Detecting DAC..."); let mut dac = std::mem::MaybeUninit::::uninit(); - let res = nannou_laser::ffi::detect_dac(&mut api, dac.as_mut_ptr()); + let res = nannou_laser::ffi::detect_dac( + &mut api, + dac.as_mut_ptr(), + nannou_laser::ffi::DacVariant::DacVariantEtherdream, + ); if res as u32 != 0 { let err_cstr = std::ffi::CStr::from_ptr(nannou_laser::ffi::api_last_error(&api)); eprintln!("failed to detect DAC: {:?}", err_cstr); diff --git a/examples/laser/laser_frame_stream_gui.rs b/examples/laser/laser_frame_stream_gui.rs index f8e11f043..1afbf4c29 100644 --- a/examples/laser/laser_frame_stream_gui.rs +++ b/examples/laser/laser_frame_stream_gui.rs @@ -135,9 +135,29 @@ fn model(app: &App) -> Model { let laser_api2 = laser_api.clone(); std::thread::spawn(move || { let mut detected = std::collections::HashSet::new(); + + // detect Helios DACs first since they can't be detected while simultaneously sending data to them + for res in laser_api2.detect_dacs(laser::DacVariant::DacVariantHelios) { + if let laser::DetectDacs::Helios { previous_dac } = res { + if !detected.insert(laser::DetectedDac::from(previous_dac).id()) { + break; + } + } + } + for detected_helios in &detected { + if let laser::dac_manager::Id::Helios { id } = *detected_helios { + let dac: laser::helios_dac::NativeHeliosDacParams = id.into(); + println!("{:#?}", dac); + if dac_tx.send(dac.into()).is_err() { + break; + } + } + } + + // for Etherdream DAC for res in laser_api2 - .detect_dacs() - .expect("failed to start detecting DACs") + .detect_dacs(laser::DacVariant::DacVariantEtherdream) + .expect("failed to start detecting Etherdream DACs") { let dac = res.expect("error occurred during DAC detection"); if detected.insert(dac.id()) { diff --git a/guide/src/changelog.md b/guide/src/changelog.md index 79f4b6fb8..7cd5fa43b 100644 --- a/guide/src/changelog.md +++ b/guide/src/changelog.md @@ -15,6 +15,7 @@ back to the origins. - Move `nannou_conrod` and `nannou_timeline` into a new repository: https://github.com/nannou-org/nannou_conrod. Both crates are deprecated in favour of `nannou_egui`. +- Added support for the Helios DAC to nannou_laser --- diff --git a/nannou_laser/Cargo.toml b/nannou_laser/Cargo.toml index 553a0e65d..9202eeca5 100644 --- a/nannou_laser/Cargo.toml +++ b/nannou_laser/Cargo.toml @@ -1,10 +1,10 @@ [package] name = "nannou_laser" version ="0.18.0" -authors = ["mitchmindtree "] +authors = ["mitchmindtree , Waseem G "] description = "A cross-platform laser DAC detection and streaming API." edition = "2018" -keywords = ["laser", "dac", "stream", "frame", "ether-dream"] +keywords = ["laser", "dac", "stream", "frame", "ether-dream", "helios"] license = "MIT OR Apache-2.0" repository = "https://github.com/nannou-org/nannou_laser.git" homepage = "https://github.com/nannou-org/nannou_laser" @@ -15,6 +15,7 @@ crate-type = ["rlib", "staticlib", "cdylib"] [dependencies] ether-dream = "~0.2.5" +helios-dac = { git = "https://github.com/seem-less/helios-dac-rs.git", branch="nannou-integration", default-features = false, features = ["native"] } ilda-idtf = { version = "0.1", optional = true } lasy = "0.4.1" thiserror = "1" diff --git a/nannou_laser/README.md b/nannou_laser/README.md index f131e87aa..fa54b1106 100644 --- a/nannou_laser/README.md +++ b/nannou_laser/README.md @@ -33,10 +33,21 @@ streams.* ## Supported Protocols -Currently, **nannou_laser** only supports the open source [Ether Dream -DAC](https://ether-dream.com/) protocol. The plan is to progressively add -support for more protocols as they are needed by ourselves and users throughout -the lifetime of the project. +Currently, **nannou_laser** supports the [Ether Dream](https://ether-dream.com/) and [Helios](https://bitlasers.com/helios-laser-dac/) open-source DAC protocols. + +When creating a new Frame/Raw Stream the type of DAC to be detected can be specified using the `Builder::dac_variant()` method. If this is not specified the Ether dream variant is selected by default. + +``` +let _laser_api = laser::Api::new(); +let laser_stream = _laser_api + .new_raw_stream(laser_model, laser) + .dac_variant(laser::DacVariant::DacVariantHelios) + .build() + .unwrap(); +``` +If you have issues starting a laser stream using the Helios DAC in linux, ensure you have [setup your udev rules](https://github.com/Grix/helios_dac/blob/master/docs/udev_rules_for_linux.md). + +The plan is to progressively add support for more protocols as they are needed by ourselves and users throughout the lifetime of the project. ## License diff --git a/nannou_laser/src/dac.rs b/nannou_laser/src/dac.rs deleted file mode 100644 index 72ee4c6e9..000000000 --- a/nannou_laser/src/dac.rs +++ /dev/null @@ -1,194 +0,0 @@ -//! Items related to DACs and DAC detection. - -use std::io; -use std::sync::atomic::{self, AtomicBool}; -use std::sync::mpsc; -use std::sync::Arc; -use std::time::Duration; - -/// Callback functions that may be passed to the `detect_dacs_async` function. -pub trait DetectedDacCallback: FnMut(io::Result) {} -impl DetectedDacCallback for F where F: FnMut(io::Result) {} - -/// A persistent, unique identifier associated with a DAC (like a MAC address). -/// -/// It should be possible to use this to uniquely identify the same DAC on different occasions. -#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] -pub enum Id { - EtherDream { mac_address: [u8; 6] }, -} - -/// An available DAC detected on the system. -#[derive(Clone, Debug)] -pub enum DetectedDac { - /// An ether dream laser DAC discovered via the ether dream protocol broadcast message. - EtherDream { - broadcast: ether_dream::protocol::DacBroadcast, - source_addr: std::net::SocketAddr, - }, -} - -/// An iterator yielding laser DACs available on the system as they are discovered. -pub struct DetectDacs { - pub(crate) dac_broadcasts: ether_dream::RecvDacBroadcasts, -} - -/// Messages that driver forward the DAC detector thread. -enum DetectorThreadMsg { - /// A message indicating to stop detection and close the thread immediately. - Close, - /// A message emitted from a timer to step forward and check for DACs again. - Tick, -} - -/// A handle to a non-blocking DAC detection thread. -pub struct DetectDacsAsync { - msg_tx: mpsc::Sender, - thread: Option>, -} - -impl DetectedDac { - /// The maximum point rate allowed by the DAC. - pub fn max_point_hz(&self) -> u32 { - match self { - DetectedDac::EtherDream { ref broadcast, .. } => broadcast.max_point_rate as _, - } - } - - /// The number of points that can be stored within the buffer. - pub fn buffer_capacity(&self) -> u32 { - match self { - DetectedDac::EtherDream { ref broadcast, .. } => broadcast.buffer_capacity as _, - } - } - - /// A persistent, unique identifier associated with the DAC (like a MAC address). - /// - /// It should be possible to use this to uniquely identify the same DAC on different occasions. - pub fn id(&self) -> Id { - match self { - DetectedDac::EtherDream { ref broadcast, .. } => Id::EtherDream { - mac_address: broadcast.mac_address, - }, - } - } -} - -impl DetectDacs { - /// Specify a duration for the detection to wait before timing out. - pub fn set_timeout(&self, duration: Option) -> io::Result<()> { - self.dac_broadcasts.set_timeout(duration) - } - - /// Specify whether or not retrieving the next DAC should block. - pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { - self.dac_broadcasts.set_nonblocking(nonblocking) - } -} - -impl DetectDacsAsync { - /// Close the DAC detection thread. - pub fn close(mut self) { - self.close_inner() - } - - /// Private close implementation shared between `Drop` and `close`. - fn close_inner(&mut self) { - if let Some(thread) = self.thread.take() { - if self.msg_tx.send(DetectorThreadMsg::Close).is_ok() { - thread.join().ok(); - } - } - } -} - -impl Iterator for DetectDacs { - type Item = io::Result; - fn next(&mut self) -> Option { - let res = self.dac_broadcasts.next()?; - match res { - Err(err) => Some(Err(err)), - Ok((broadcast, source_addr)) => Some(Ok(DetectedDac::EtherDream { - broadcast, - source_addr, - })), - } - } -} - -impl Drop for DetectDacsAsync { - fn drop(&mut self) { - self.close_inner(); - } -} - -/// An iterator yielding DACs available on the system as they are discovered. -pub(crate) fn detect_dacs() -> io::Result { - let dac_broadcasts = ether_dream::recv_dac_broadcasts()?; - Ok(DetectDacs { dac_broadcasts }) -} - -/// Spawn a thread for DAC detection. -/// -/// Calls the given `callback` with broadcasts as they are received. -pub(crate) fn detect_dacs_async( - timeout: Option, - callback: F, -) -> io::Result -where - F: 'static + DetectedDacCallback + Send, -{ - detect_dacs_async_inner(timeout, Box::new(callback) as Box<_>) -} - -/// Inner implementation of `detect_dacs_async` removing static dispatch indirection. -fn detect_dacs_async_inner( - timeout: Option, - mut callback: Box, -) -> io::Result { - let mut detect_dacs = detect_dacs()?; - detect_dacs.set_nonblocking(true)?; - let (msg_tx, msg_rx) = mpsc::channel(); - let msg_tx2 = msg_tx.clone(); - let thread = std::thread::Builder::new() - .name("nannou_laser-dac-detection".to_string()) - .spawn(move || { - // For closing the timer thread. - let is_closed = Arc::new(AtomicBool::new(false)); - - // Start the timer. - let is_closed2 = is_closed.clone(); - std::thread::spawn(move || { - let tick_interval = timeout.unwrap_or(std::time::Duration::from_secs(1)); - while !is_closed2.load(atomic::Ordering::Relaxed) { - std::thread::sleep(tick_interval); - if msg_tx2.send(DetectorThreadMsg::Tick).is_err() { - break; - } - } - }); - - // Loop until we receive a close. - 'msgs: for msg in msg_rx { - if let DetectorThreadMsg::Close = msg { - is_closed.store(true, atomic::Ordering::Relaxed); - break; - } - while let Some(res) = detect_dacs.next() { - if let Err(ref e) = res { - match e.kind() { - io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock => continue 'msgs, - _ => (), - } - } - callback(res); - } - } - }) - .expect("failed to spawn DAC detection thread"); - - Ok(DetectDacsAsync { - msg_tx, - thread: Some(thread), - }) -} diff --git a/nannou_laser/src/dac_manager.rs b/nannou_laser/src/dac_manager.rs new file mode 100644 index 000000000..388997aaf --- /dev/null +++ b/nannou_laser/src/dac_manager.rs @@ -0,0 +1,163 @@ +//! DAC detection items shared among all supported DACs + +use std::io; +use thiserror::Error; + +pub type Result = std::result::Result; + +/// Iterators yielding laser DACs available on the system as they are discovered. +pub enum DetectDacs { + EtherDream { + dac_broadcasts: ether_dream::RecvDacBroadcasts, + }, + Helios { + previous_dac: helios_dac::NativeHeliosDacParams, + }, +} + +impl DetectDacs { + /// Specify a duration for the detection to wait before timing out. + pub fn set_timeout(&self, duration: Option) -> io::Result<()> { + match self { + DetectDacs::EtherDream { dac_broadcasts } => dac_broadcasts.set_timeout(duration), + // Helios DAC does not require this function + DetectDacs::Helios { .. } => Ok(()), + } + } + + /// Specify whether or not retrieving the next DAC should block. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + match self { + DetectDacs::EtherDream { dac_broadcasts } => { + dac_broadcasts.set_nonblocking(nonblocking) + } + // Helios DAC does not require this function + DetectDacs::Helios { .. } => Ok(()), + } + } +} + +impl Iterator for DetectDacs { + type Item = Result; + fn next(&mut self) -> Option { + match self { + DetectDacs::EtherDream { dac_broadcasts } => { + let res = dac_broadcasts.next()?; + match res { + Err(err) => Some(Err(err.into())), + Ok((broadcast, source_addr)) => Some(Ok(DetectedDac::EtherDream { + broadcast, + source_addr, + })), + } + } + DetectDacs::Helios { previous_dac } => { + match helios_dac::NativeHeliosDacController::new() { + Ok(res) => { + match res.list_devices() { + Ok(dacs) => { + let current_position = + dacs.iter().position(|(id, ..)| *id == previous_dac.id); + if let Some(pos) = current_position { + if let Some((new_id, ..)) = dacs.get(pos + 1) { + Some(Ok(DetectedDac::Helios { + dac: (*new_id).into(), + })) + } else { + // Reached end of list of detected Helios DACs + // Return the first DAC in list: + Some(Ok(DetectedDac::Helios { + dac: dacs.get(0)?.0.into(), + })) + } + } else { + Some(Err( + helios_dac::NativeHeliosError::InvalidDeviceResult.into() + )) + } + } + Err(e) => Some(Err(e.into())), + } + } + Err(e) => Some(Err(e.into())), + } + } + } + } +} +/// An available DAC detected on the system. +#[derive(Clone, Debug)] +pub enum DetectedDac { + /// An ether dream laser DAC discovered via the ether dream protocol broadcast message. + EtherDream { + broadcast: ether_dream::protocol::DacBroadcast, + source_addr: std::net::SocketAddr, + }, + Helios { + dac: helios_dac::NativeHeliosDacParams, + }, +} + +impl From for DetectedDac { + fn from(dac: helios_dac::NativeHeliosDacParams) -> Self { + DetectedDac::Helios { dac } + } +} + +impl DetectedDac { + /// The maximum point rate allowed by the DAC. + pub fn max_point_hz(&self) -> u32 { + match self { + DetectedDac::EtherDream { ref broadcast, .. } => broadcast.max_point_rate as _, + DetectedDac::Helios { ref dac } => dac.max_point_rate as _, + } + } + + /// The number of points that can be stored within the buffer. + pub fn buffer_capacity(&self) -> u32 { + match self { + DetectedDac::EtherDream { ref broadcast, .. } => broadcast.buffer_capacity as _, + DetectedDac::Helios { ref dac } => dac.buffer_capacity as _, + } + } + + /// A persistent, unique identifier associated with the DAC (like a MAC address). + /// + /// It should be possible to use this to uniquely identify the same DAC on different occasions. + pub fn id(&self) -> Id { + match self { + DetectedDac::EtherDream { ref broadcast, .. } => Id::EtherDream { + mac_address: broadcast.mac_address, + }, + DetectedDac::Helios { ref dac } => Id::Helios { id: dac.id }, + } + } +} +/// A persistent, unique identifier associated with a DAC (like a MAC address for Etherdream). +/// +/// It should be possible to use this to uniquely identify the same DAC on different occasions. +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub enum Id { + EtherDream { mac_address: [u8; 6] }, + Helios { id: u32 }, +} + +#[derive(Clone, Debug, Copy)] +pub enum DacVariant { + DacVariantEtherdream, + DacVariantHelios, +} + +impl Default for DacVariant { + fn default() -> Self { + DacVariant::DacVariantEtherdream + } +} + +#[derive(Error, Debug)] +pub enum DetectedDacError { + #[error("Helios_dac error: {0}")] + HeliosDacError(#[from] helios_dac::NativeHeliosError), + #[error("EtherDream DAC IO detection error: {0}")] + IoError(#[from] io::Error), +} diff --git a/nannou_laser/src/dac_manager_etherdream.rs b/nannou_laser/src/dac_manager_etherdream.rs new file mode 100644 index 000000000..a485ad2d9 --- /dev/null +++ b/nannou_laser/src/dac_manager_etherdream.rs @@ -0,0 +1,245 @@ +//! Items related specifically to the EtherDream DAC. + +use std::io; +use std::sync::atomic::{self, AtomicBool}; +use std::sync::mpsc; +use std::sync::Arc; +use std::time::Duration; +use thiserror::Error; + +use crate::dac_manager::{DetectDacs, DetectedDac}; +use crate::util::{clamp, map_range}; +use crate::{DetectedDacError, RawPoint}; +/// Callback functions that may be passed to the `detect_dacs_async` function. +pub trait DetectedDacCallback: FnMut(io::Result) {} +impl DetectedDacCallback for F where F: FnMut(io::Result) {} + +/// Messages that driver forward the DAC detector thread. +enum DetectorThreadMsg { + /// A message indicating to stop detection and close the thread immediately. + Close, + /// A message emitted from a timer to step forward and check for DACs again. + Tick, +} + +/// A handle to a non-blocking DAC detection thread. +pub struct DetectEtherDreamDacsAsync { + msg_tx: mpsc::Sender, + thread: Option>, +} + +impl DetectEtherDreamDacsAsync { + /// Close the DAC detection thread. + pub fn close(mut self) { + self.close_inner() + } + + /// Private close implementation shared between `Drop` and `close`. + fn close_inner(&mut self) { + if let Some(thread) = self.thread.take() { + if self.msg_tx.send(DetectorThreadMsg::Close).is_ok() { + thread.join().ok(); + } + } + } +} + +impl Drop for DetectEtherDreamDacsAsync { + fn drop(&mut self) { + self.close_inner(); + } +} + +/// An iterator yielding DACs available on the system as they are discovered. +pub(crate) fn detect_dacs() -> io::Result { + let dac_broadcasts = ether_dream::recv_dac_broadcasts()?; + Ok(DetectDacs::EtherDream { dac_broadcasts }) +} + +/// Spawn a thread for DAC detection. +/// +/// Calls the given `callback` with broadcasts as they are received. +pub(crate) fn detect_dacs_async( + timeout: Option, + callback: F, +) -> io::Result +where + F: 'static + DetectedDacCallback + Send, +{ + detect_dacs_async_inner(timeout, Box::new(callback) as Box<_>) +} + +/// Inner implementation of `detect_dacs_async` removing static dispatch indirection. +fn detect_dacs_async_inner( + timeout: Option, + mut callback: Box, +) -> io::Result { + let mut detect_dacs = detect_dacs()?; + detect_dacs.set_nonblocking(true)?; + let (msg_tx, msg_rx) = mpsc::channel(); + let msg_tx2 = msg_tx.clone(); + let thread = std::thread::Builder::new() + .name("nannou_laser-dac-detection".to_string()) + .spawn(move || { + // For closing the timer thread. + let is_closed = Arc::new(AtomicBool::new(false)); + + // Start the timer. + let is_closed2 = is_closed.clone(); + std::thread::spawn(move || { + let tick_interval = timeout.unwrap_or(std::time::Duration::from_secs(1)); + while !is_closed2.load(atomic::Ordering::Relaxed) { + std::thread::sleep(tick_interval); + if msg_tx2.send(DetectorThreadMsg::Tick).is_err() { + break; + } + } + }); + + // Loop until we receive a close. + 'msgs: for msg in msg_rx { + if let DetectorThreadMsg::Close = msg { + is_closed.store(true, atomic::Ordering::Relaxed); + break; + } + while let Some(res) = detect_dacs.next() { + if let Err(ref e) = res { + match e { + DetectedDacError::IoError(err) => { + if let io::ErrorKind::TimedOut | io::ErrorKind::WouldBlock = + err.kind() + { + continue 'msgs; + } + } + _ => (), + } + } + let cb = res.map_err(|e| { + if let DetectedDacError::IoError(err) = e { + io::Error::from(err) + } else { + unreachable!("The detect_dacs enum variant here should be 'EtherDream'") + } + }); + callback(cb); + } + } + }) + .expect("failed to spawn DAC detection thread"); + + Ok(DetectEtherDreamDacsAsync { + msg_tx, + thread: Some(thread), + }) +} + +// The number of remaining points in the DAC. +pub fn dac_remaining_buffer_capacity(dac: ðer_dream::dac::Dac) -> u16 { + dac.buffer_capacity - 1 - dac.status.buffer_fullness +} + +// Determine the number of points needed to fill the DAC. +pub fn points_to_generate(dac: ðer_dream::dac::Dac, latency_points: u16) -> u16 { + let remaining_capacity = dac_remaining_buffer_capacity(dac); + let n = if dac.status.buffer_fullness < latency_points { + latency_points - dac.status.buffer_fullness + } else { + 0 + }; + std::cmp::min(n, remaining_capacity) +} + +// Constructor for a centered, blank ether dream DAC point. +pub fn centered_blank() -> ether_dream::protocol::DacPoint { + ether_dream::protocol::DacPoint { + control: 0, + x: 0, + y: 0, + r: 0, + g: 0, + b: 0, + i: 0, + u1: 0, + u2: 0, + } +} + +// Convert a `lase::point::Position` type to an `i16` representation compatible with ether dream. +fn position_to_ether_dream_position([px, py]: crate::point::Position) -> [i16; 2] { + let min = std::i16::MIN; + let max = std::i16::MAX; + let x = map_range(clamp(px, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as i16; + let y = map_range(clamp(py, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as i16; + [x, y] +} + +// Convert a `lase::point::Rgb` type to an `u16` representation compatible with ether dream. +fn color_to_ether_dream_color([pr, pg, pb]: crate::point::Rgb) -> [u16; 3] { + let r = (clamp(pr, 0.0, 1.0) * std::u16::MAX as f32) as u16; + let g = (clamp(pg, 0.0, 1.0) * std::u16::MAX as f32) as u16; + let b = (clamp(pb, 0.0, 1.0) * std::u16::MAX as f32) as u16; + [r, g, b] +} + +// Convert the laser point to an ether dream DAC point. +pub fn point_to_ether_dream_point(p: RawPoint) -> ether_dream::protocol::DacPoint { + let [x, y] = position_to_ether_dream_position(p.position); + let [r, g, b] = color_to_ether_dream_color(p.color); + let (control, i, u1, u2) = (0, 0, 0, 0); + ether_dream::protocol::DacPoint { + control, + x, + y, + r, + g, + b, + i, + u1, + u2, + } +} + +/// Errors that may occur while creating a node crate. +#[derive(Debug, Error)] +pub enum EtherDreamStreamError { + #[error("laser DAC detection failed: {err}")] + FailedToDetectDacs { + #[source] + err: io::Error, + /// The number of DAC detection attempts so far. + attempts: u32, + }, + #[error("failed to connect the DAC stream (attempt {attempts}): {err}")] + FailedToConnectStream { + #[source] + err: ether_dream::dac::stream::CommunicationError, + /// The number of connection attempts so far. + attempts: u32, + }, + #[error("failed to prepare the DAC stream: {err}")] + FailedToPrepareStream { + #[source] + err: ether_dream::dac::stream::CommunicationError, + }, + #[error("failed to begin the DAC stream: {err}")] + FailedToBeginStream { + #[source] + err: ether_dream::dac::stream::CommunicationError, + }, + #[error("failed to submit data over the DAC stream: {err}")] + FailedToSubmitData { + #[source] + err: ether_dream::dac::stream::CommunicationError, + }, + #[error("failed to submit point rate change over the DAC stream: {err}")] + FailedToSubmitPointRate { + #[source] + err: ether_dream::dac::stream::CommunicationError, + }, + #[error("failed to submit stop command to the DAC stream: {err}")] + FailedToStopStream { + #[source] + err: ether_dream::dac::stream::CommunicationError, + }, +} diff --git a/nannou_laser/src/dac_manager_helios.rs b/nannou_laser/src/dac_manager_helios.rs new file mode 100644 index 000000000..868ad527c --- /dev/null +++ b/nannou_laser/src/dac_manager_helios.rs @@ -0,0 +1,83 @@ +//! Items related specifically to the Helios DAC. +use helios_dac::{NativeHeliosDacController, NativeHeliosError}; +use thiserror::Error; + +use crate::{ + dac_manager::{DetectDacs, DetectedDacError, Result}, + util::{clamp, map_range}, + RawPoint, +}; + +/// An iterator yielding Helios DACs available on the system as they are discovered. +pub(crate) fn detect_dacs() -> Result { + let helios_controller = NativeHeliosDacController::new().map_err(DetectedDacError::from)?; + let devices = helios_controller + .list_devices() + .map_err(DetectedDacError::from)?; + if !devices.is_empty() { + Ok(DetectDacs::Helios { + previous_dac: devices.get(0).unwrap().0.into(), + }) + } else { + println!("No Helios DACs found"); + Err(NativeHeliosError::InvalidDeviceResult.into()) + } +} + +pub fn position_to_helios_coordinate([px, py]: crate::point::Position) -> helios_dac::Coordinate { + let min = 0; + let max = 0xFFF; + helios_dac::Coordinate { + x: map_range(clamp(px, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as u16, + y: map_range(clamp(py, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as u16, + } +} + +pub fn color_to_helios_color([pr, pg, pb]: crate::point::Rgb) -> helios_dac::Color { + helios_dac::Color { + r: (clamp(pr, 0.0, 1.0) * u8::MAX as f32) as u8, + g: (clamp(pg, 0.0, 1.0) * u8::MAX as f32) as u8, + b: (clamp(pb, 0.0, 1.0) * u8::MAX as f32) as u8, + } +} + +pub fn point_to_helios_point(p: RawPoint) -> helios_dac::Point { + helios_dac::Point { + coordinate: position_to_helios_coordinate(p.position), + color: color_to_helios_color(p.color), + intensity: 0xFF, // TODO: enable user to change intensity of point + } +} + +/// Errors that may occur while creating a node crate. +#[derive(Debug, Error)] +pub enum HeliosStreamError { + #[error("Failed to create USB context before detecting DACs: {err}")] + FailedToCreateUSBContext { + #[source] + err: NativeHeliosError, + attempts: u32, + }, + #[error("Laser DAC detection failed (attempt {attempts}): {err}")] + FailedToDetectDacs { + #[source] + err: NativeHeliosError, + /// The number of DAC detection attempts so far. + attempts: u32, + }, + #[error("{err}")] + InvalidDeviceResult { + #[source] + err: NativeHeliosError, + }, + #[error("Error when writing frame: {err}")] + FailedToWriteFrame { + #[source] + err: NativeHeliosError, + }, + #[error("Failed to submit stop command to the DAC: {err}")] + FailedToStopStream { + #[source] + err: NativeHeliosError, + }, +} diff --git a/nannou_laser/src/ffi.rs b/nannou_laser/src/ffi.rs index f059ced3b..c6afdf9bf 100644 --- a/nannou_laser/src/ffi.rs +++ b/nannou_laser/src/ffi.rs @@ -36,6 +36,7 @@ pub struct DetectedDac { #[derive(Clone, Copy)] pub union DetectedDacKind { pub ether_dream: DacEtherDream, + pub helios: DacHelios, } /// An Ether Dream DAC that was detected on the network. @@ -46,6 +47,18 @@ pub struct DacEtherDream { pub source_addr: SocketAddr, } +#[repr(C)] +#[derive(Clone, Copy, Debug)] +pub struct DacHelios { + pub dac: helios_dac::NativeHeliosDacParams, +} + +#[repr(C)] +#[derive(Clone, Debug, Copy)] +pub enum DacVariant { + DacVariantEtherdream, + DacVariantHelios, +} /// A set of stream configuration parameters applied to the initialisation of both `Raw` and /// `Frame` streams. #[repr(C)] @@ -135,6 +148,12 @@ pub enum StreamErrorKind { EtherDreamFailedToSubmitData, EtherDreamFailedToSubmitPointRate, EtherDreamFailedToStopStream, + HeliosDeviceNotOpened, + HeliosUsbError, + HeliosInvalidDeviceResult, + HeliosUtf8Error, + HeliosChannelSendError, + HeliosInvalidWriteFrameFlag, } #[repr(C)] @@ -215,7 +234,7 @@ struct ApiInner { } struct DetectDacsAsyncInner { - _inner: crate::DetectDacsAsync, + _inner: crate::DetectEtherDreamDacsAsync, dacs: Arc>>, last_error: Arc>>, } @@ -307,9 +326,13 @@ pub unsafe extern "C" fn available_dacs( /// Block the current thread until a new DAC is detected and return it. #[no_mangle] -pub unsafe extern "C" fn detect_dac(api: *mut Api, detected_dac: *mut DetectedDac) -> Result { +pub unsafe extern "C" fn detect_dac( + api: *mut Api, + detected_dac: *mut DetectedDac, + variant: DacVariant, +) -> Result { let api: &mut ApiInner = &mut (*(*api).inner); - let mut iter = match api.inner.detect_dacs() { + let mut iter = match api.inner.detect_dacs(dac_variant_from_ffi(variant)) { Err(err) => { api.last_error = Some(err_to_cstring(&err)); return Result::DetectDacFailed; @@ -1105,6 +1128,11 @@ fn detected_dac_to_ffi(dac: crate::DetectedDac) -> DetectedDac { let kind = DetectedDacKind { ether_dream }; DetectedDac { kind } } + crate::DetectedDac::Helios { dac } => { + let helios = DacHelios { dac }; + let kind = DetectedDacKind { helios }; + DetectedDac { kind } + } } } @@ -1119,9 +1147,17 @@ fn detected_dac_from_ffi(ffi_dac: DetectedDac) -> crate::DetectedDac { } } +fn dac_variant_from_ffi(variant: DacVariant) -> crate::DacVariant { + match variant { + DacVariant::DacVariantEtherdream => crate::DacVariant::DacVariantEtherdream, + DacVariant::DacVariantHelios => crate::DacVariant::DacVariantHelios, + } +} + fn stream_error_to_kind(err: &crate::StreamError) -> StreamErrorKind { - use crate::stream::raw::EtherDreamStreamError; - match *err { + use crate::dac_manager_etherdream::EtherDreamStreamError; + use crate::dac_manager_helios::HeliosStreamError; + match &*err { crate::StreamError::EtherDreamStream { ref err } => match *err { EtherDreamStreamError::FailedToDetectDacs { .. } => { StreamErrorKind::EtherDreamFailedToDetectDacs @@ -1145,16 +1181,34 @@ fn stream_error_to_kind(err: &crate::StreamError) -> StreamErrorKind { StreamErrorKind::EtherDreamFailedToStopStream } }, + crate::StreamError::HeliosStream { err } => match err { + HeliosStreamError::FailedToCreateUSBContext { .. } => StreamErrorKind::HeliosUsbError, + HeliosStreamError::FailedToDetectDacs { .. } => StreamErrorKind::HeliosUsbError, + HeliosStreamError::InvalidDeviceResult { .. } => { + StreamErrorKind::HeliosInvalidDeviceResult + } + HeliosStreamError::FailedToWriteFrame { .. } => StreamErrorKind::HeliosUsbError, + HeliosStreamError::FailedToStopStream { err } => match err { + helios_dac::NativeHeliosError::UsbError(_) => StreamErrorKind::HeliosUsbError, + _ => StreamErrorKind::HeliosDeviceNotOpened, + }, + }, } } fn stream_error_to_attempts(err: &crate::StreamError) -> u32 { - use crate::stream::raw::EtherDreamStreamError; + use crate::dac_manager_etherdream::EtherDreamStreamError; + use crate::dac_manager_helios::HeliosStreamError; match *err { crate::StreamError::EtherDreamStream { ref err } => match *err { EtherDreamStreamError::FailedToDetectDacs { attempts, .. } | EtherDreamStreamError::FailedToConnectStream { attempts, .. } => attempts, _ => 0, }, + crate::StreamError::HeliosStream { ref err } => match *err { + HeliosStreamError::FailedToCreateUSBContext { attempts, .. } + | HeliosStreamError::FailedToDetectDacs { attempts, .. } => attempts, + _ => 0, + }, } } diff --git a/nannou_laser/src/lib.rs b/nannou_laser/src/lib.rs index a17461490..05885830e 100644 --- a/nannou_laser/src/lib.rs +++ b/nannou_laser/src/lib.rs @@ -1,8 +1,11 @@ //! A cross-platform laser DAC detection and streaming API. pub extern crate ether_dream; +pub extern crate helios_dac; -pub mod dac; +pub mod dac_manager; +pub mod dac_manager_etherdream; +pub mod dac_manager_helios; #[cfg(feature = "ffi")] pub mod ffi; #[cfg(feature = "ilda-idtf")] @@ -11,7 +14,8 @@ pub mod point; pub mod stream; pub mod util; -pub use dac::{DetectDacs, DetectDacsAsync, DetectedDac, DetectedDacCallback, Id as DacId}; +pub use dac_manager::{DacVariant, DetectDacs, DetectedDac, DetectedDacError, Id as DacId, Result}; +pub use dac_manager_etherdream::{DetectEtherDreamDacsAsync, DetectedDacCallback}; pub use point::{Point, RawPoint}; pub use stream::frame::Frame; pub use stream::frame::Stream as FrameStream; @@ -42,23 +46,20 @@ impl Api { } } - /// An iterator yielding laser DACs available on the system as they are discovered. + /// An iterator yielding laser DACs available on the system per supported DAC variant. /// - /// Currently, the only laser protocol supported is the ether dream protocol. Thus, this - /// enumerates ether dream DACs that are discovered on the LAN. - /// - /// **Note** that the produced iterator will iterate forever and never terminate unless + /// **Note** that the DetectDacs::EtherDream iterator will iterate forever and never terminate unless /// `set_timeout` is called on the returned `DetectDacs` instance. - pub fn detect_dacs(&self) -> io::Result { - self.inner.detect_dacs() + pub fn detect_dacs(&self, variant: DacVariant) -> Result { + self.inner.detect_dacs(variant) } /// Block and wait until the DAC with the given `Id` is detected. - pub fn detect_dac(&self, id: DacId) -> io::Result { - self.inner.detect_dac(id) + pub fn detect_dac(&self, id: DacId, variant: DacVariant) -> Result { + self.inner.detect_dac(id, variant) } - /// Spawn a thread for DAC detection. + /// Spawn a thread for DAC detection. Currently only implemented for the Etherdream DAC /// /// Calls the given `callback` with broadcasts as they are received. /// @@ -67,7 +68,7 @@ impl Api { &self, timeout: Option, callback: F, - ) -> io::Result + ) -> io::Result where F: 'static + DetectedDacCallback + Send, { @@ -122,19 +123,25 @@ impl Api { model, render, stream_error, + is_frame: false, } } } impl Inner { /// See the `Api::detect_dacs` docs. - pub(crate) fn detect_dacs(&self) -> io::Result { - dac::detect_dacs() + pub(crate) fn detect_dacs(&self, variant: DacVariant) -> Result { + match variant { + DacVariant::DacVariantEtherdream => { + dac_manager_etherdream::detect_dacs().map_err(DetectedDacError::from) + } + DacVariant::DacVariantHelios => dac_manager_helios::detect_dacs(), + } } /// Block and wait until the DAC with the given `Id` is detected. - pub(crate) fn detect_dac(&self, id: DacId) -> io::Result { - for res in self.detect_dacs()? { + pub(crate) fn detect_dac(&self, id: DacId, variant: DacVariant) -> Result { + for res in self.detect_dacs(variant)? { let dac = res?; if dac.id() == id { return Ok(dac); @@ -148,11 +155,11 @@ impl Inner { &self, timeout: Option, callback: F, - ) -> io::Result + ) -> io::Result where F: 'static + DetectedDacCallback + Send, { - dac::detect_dacs_async(timeout, callback) + dac_manager_etherdream::detect_dacs_async(timeout, callback) } } diff --git a/nannou_laser/src/stream/frame.rs b/nannou_laser/src/stream/frame.rs index ec461d43b..6b69abef1 100644 --- a/nannou_laser/src/stream/frame.rs +++ b/nannou_laser/src/stream/frame.rs @@ -1,5 +1,5 @@ -use crate::stream; use crate::stream::raw::{self, Buffer, StreamError}; +use crate::{stream, DacVariant}; use crate::{Point, RawPoint}; use std::io; use std::ops::{Deref, DerefMut}; @@ -162,6 +162,14 @@ impl Builder { /// The DAC with which the stream should be established. pub fn detected_dac(mut self, dac: crate::DetectedDac) -> Self { self.builder.dac = Some(dac); + self.builder.dac_variant = if let Some(ref dac) = self.builder.dac { + match dac { + crate::DetectedDac::EtherDream { .. } => Some(DacVariant::DacVariantEtherdream), + crate::DetectedDac::Helios { .. } => Some(DacVariant::DacVariantHelios), + } + } else { + None + }; self } @@ -331,6 +339,16 @@ impl Builder { } } + /// DAC variant specified by user to be used for this stream. + /// + /// If none is specified DacVariant::DacVariantEtherdream will be used. + /// + /// DAC detection will only be attempted for this DAC variant. + pub fn dac_variant(mut self, variant: DacVariant) -> Self { + self.builder.dac_variant = Some(variant); + self + } + /// Build the stream with the specified parameters. /// /// **Note:** If no `dac` was specified, this will method will block until a DAC is detected. @@ -401,6 +419,7 @@ impl Builder { model, render: raw_render, stream_error, + is_frame: true, }; let raw_stream = raw_builder.build()?; let stream = Stream { diff --git a/nannou_laser/src/stream/mod.rs b/nannou_laser/src/stream/mod.rs index a43b8df4d..450e2830e 100644 --- a/nannou_laser/src/stream/mod.rs +++ b/nannou_laser/src/stream/mod.rs @@ -31,6 +31,10 @@ pub struct Builder { /// /// If this value is `None`, no timeout will be applied and the stream will wait forever. pub tcp_timeout: Option, + /// If Builder::dac is not specified, the stream will attempt to detect this DAC variant + /// + /// By default this value is 'DacVariant::DacVariantEtherdream' + pub dac_variant: Option, } /// Given a DAC point rate and a desired frame rate, determine how many points to generate per diff --git a/nannou_laser/src/stream/raw.rs b/nannou_laser/src/stream/raw.rs index cef5fc3a0..74c29f09e 100644 --- a/nannou_laser/src/stream/raw.rs +++ b/nannou_laser/src/stream/raw.rs @@ -1,6 +1,11 @@ -use crate::util::{clamp, map_range}; +use crate::dac_manager_etherdream::{ + centered_blank, dac_remaining_buffer_capacity, point_to_ether_dream_point, points_to_generate, + EtherDreamStreamError, +}; +use crate::dac_manager_helios::{point_to_helios_point, HeliosStreamError}; use crate::Inner as ApiInner; -use crate::{DetectedDac, RawPoint}; +use crate::{DacVariant, DetectedDac, DetectedDacError, RawPoint}; +use helios_dac::{DeviceStatus, NativeHeliosDacController, NativeHeliosError}; use std::io; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{self, AtomicBool}; @@ -70,6 +75,7 @@ pub struct Builder> { pub model: M, pub render: F, pub stream_error: E, + pub is_frame: bool, } /// The default stream error function type expected if none are specified. @@ -91,49 +97,10 @@ pub enum StreamError { #[from] err: EtherDreamStreamError, }, -} - -/// Errors that may occur while creating a node crate. -#[derive(Debug, Error)] -pub enum EtherDreamStreamError { - #[error("laser DAC detection failed: {err}")] - FailedToDetectDacs { - #[source] - err: io::Error, - /// The number of DAC detection attempts so far. - attempts: u32, - }, - #[error("failed to connect the DAC stream (attempt {attempts}): {err}")] - FailedToConnectStream { - #[source] - err: ether_dream::dac::stream::CommunicationError, - /// The number of connection attempts so far. - attempts: u32, - }, - #[error("failed to prepare the DAC stream: {err}")] - FailedToPrepareStream { - #[source] - err: ether_dream::dac::stream::CommunicationError, - }, - #[error("failed to begin the DAC stream: {err}")] - FailedToBeginStream { - #[source] - err: ether_dream::dac::stream::CommunicationError, - }, - #[error("failed to submit data over the DAC stream: {err}")] - FailedToSubmitData { - #[source] - err: ether_dream::dac::stream::CommunicationError, - }, - #[error("failed to submit point rate change over the DAC stream: {err}")] - FailedToSubmitPointRate { - #[source] - err: ether_dream::dac::stream::CommunicationError, - }, - #[error("failed to submit stop command to the DAC stream: {err}")] - FailedToStopStream { - #[source] - err: ether_dream::dac::stream::CommunicationError, + #[error("a Helios DAC stream error occurred: {err}")] + HeliosStream { + #[from] + err: HeliosStreamError, }, } @@ -320,6 +287,14 @@ impl Builder { /// detected on the system. pub fn detected_dac(mut self, dac: DetectedDac) -> Self { self.builder.dac = Some(dac); + self.builder.dac_variant = if let Some(ref dac) = self.builder.dac { + match dac { + crate::DetectedDac::EtherDream { .. } => Some(DacVariant::DacVariantEtherdream), + crate::DetectedDac::Helios { .. } => Some(DacVariant::DacVariantHelios), + } + } else { + None + }; self } @@ -360,6 +335,7 @@ impl Builder { builder, model, render, + is_frame, .. } = self; Builder { @@ -368,9 +344,20 @@ impl Builder { model, render, stream_error, + is_frame, } } + /// DAC variant specified by user to be used for this stream. + /// + /// If none is specified DacVariant::DacVariantEtherdream will be used. + /// + /// DAC detection will only be attempted for this DAC variant. + pub fn dac_variant(mut self, variant: DacVariant) -> Self { + self.builder.dac_variant = Some(variant); + self + } + /// Build the stream with the specified parameters. /// /// **Note:** If no `dac` was specified, this will method will block until a DAC is detected. @@ -387,6 +374,7 @@ impl Builder { model, render, stream_error, + is_frame, } = self; // Prepare the model for sharing between the laser thread and stream handle. @@ -417,6 +405,9 @@ impl Builder { // The TCP timeout duration. let tcp_timeout = builder.tcp_timeout; + // The DAC variant that will be attempted to be detected + let dac_variant = builder.dac_variant.unwrap_or_default(); + // A flag for tracking whether or not the stream has been closed. let is_closed = Arc::new(AtomicBool::new(false)); let is_closed2 = is_closed.clone(); @@ -436,6 +427,8 @@ impl Builder { &s_rx, &m_rx, &is_closed2, + dac_variant, + is_frame, ); is_closed2.store(true, atomic::Ordering::Relaxed); res @@ -501,6 +494,8 @@ fn run_laser_stream( state_update_rx: &mpsc::Receiver, model_update_rx: &mpsc::Receiver>, is_closed: &AtomicBool, + dac_variant: DacVariant, + is_frame: bool, ) -> Result<(), StreamError> where F: RenderFn, @@ -529,31 +524,42 @@ where if let Some(ref mut dac) = maybe_dac { let dac_id = dac.id(); detect_attempts += 1; - *dac = match api_inner.detect_dac(dac_id) { - Ok(dac) => { - detect_attempts = 0; - dac - } - Err(err) => { - let attempts = detect_attempts; - let err = EtherDreamStreamError::FailedToDetectDacs { err, attempts }; - let err = StreamError::from(err); - let mut guard = lock_or_return_err!(model, err); - let mut model = guard.take().unwrap(); - let mut action = StreamErrorAction::default(); - stream_error(&mut model, &err, &mut action); - *guard = Some(model); - match action { - StreamErrorAction::CloseThread => return Err(err), - StreamErrorAction::ReattemptConnect => continue, - StreamErrorAction::RedetectDac { timeout } => { - redetect_dac = true; - detect_timeout = timeout; - continue; + *dac = + match api_inner.detect_dac(dac_id, dac_variant) { + Ok(dac) => { + detect_attempts = 0; + dac + } + Err(err) => { + let attempts = detect_attempts; + let err = + match err { + DetectedDacError::IoError(err) => StreamError::from( + EtherDreamStreamError::FailedToDetectDacs { err, attempts }, + ), + DetectedDacError::HeliosDacError(err) => { + StreamError::from(HeliosStreamError::FailedToDetectDacs { + err, + attempts, + }) + } + }; + let mut guard = lock_or_return_err!(model, err); + let mut model = guard.take().unwrap(); + let mut action = StreamErrorAction::default(); + stream_error(&mut model, &err, &mut action); + *guard = Some(model); + match action { + StreamErrorAction::CloseThread => return Err(err), + StreamErrorAction::ReattemptConnect => continue, + StreamErrorAction::RedetectDac { timeout } => { + redetect_dac = true; + detect_timeout = timeout; + continue; + } } } - } - }; + }; } } @@ -563,19 +569,40 @@ where None => { detect_attempts += 1; let attempts = detect_attempts; - let detect_err = &|err| EtherDreamStreamError::FailedToDetectDacs { err, attempts }; + let detect_err = &|err| match err { + DetectedDacError::IoError(err) => { + StreamError::from(EtherDreamStreamError::FailedToDetectDacs { + err, + attempts, + }) + } + DetectedDacError::HeliosDacError(err) => { + StreamError::from(HeliosStreamError::FailedToDetectDacs { err, attempts }) + } + }; + let set_timeout_error = &|err: io::Error| match err.kind() { + io::ErrorKind::Other => { + StreamError::from(HeliosStreamError::InvalidDeviceResult { + err: NativeHeliosError::InvalidDeviceResult, + }) + } + _ => StreamError::from(EtherDreamStreamError::FailedToDetectDacs { + err, + attempts, + }), + }; match api_inner - .detect_dacs() + .detect_dacs(dac_variant) .map_err(detect_err) .and_then(|detect_dacs| { detect_dacs .set_timeout(detect_timeout) - .map_err(detect_err)?; + .map_err(set_timeout_error)?; Ok(detect_dacs) }) .and_then(|mut dacs| { dacs.next() - .expect("ether dream DAC detection iterator should never return `None`") + .expect("DAC detection iterator should never return `None`") .map_err(detect_err) }) { Ok(dac) => { @@ -583,7 +610,6 @@ where dac } Err(err) => { - let err = StreamError::from(err); let mut guard = lock_or_return_err!(model, err); let mut model = guard.take().unwrap(); let mut action = StreamErrorAction::default(); @@ -603,17 +629,34 @@ where }; // Connect and run the laser stream. - match run_laser_stream_tcp_loop( - &dac, - tcp_timeout, - &state, - &model, - &render, - &state_update_rx, - &model_update_rx, - &is_closed, - &mut connect_attempts, - ) { + let res = match dac { + DetectedDac::EtherDream { + broadcast: _, + source_addr: _, + } => run_laser_stream_tcp_loop( + &dac, + tcp_timeout, + state, + model, + &render, + state_update_rx, + model_update_rx, + is_closed, + &mut connect_attempts, + ), + DetectedDac::Helios { dac: _ } => run_laser_stream_usb_loop( + &dac, + state, + model, + &render, + state_update_rx, + model_update_rx, + is_closed, + &mut connect_attempts, + is_frame, + ), + }; + match res { Ok(()) => break, Err(err) => { let mut guard = lock_or_return_err!(model, err); @@ -636,6 +679,106 @@ where Ok(()) } +// Attempts to connect to the DAC via TCP and enters the stream loop. +fn run_laser_stream_usb_loop( + dac: &DetectedDac, + state: &Arc>, + model: &Arc>>, + render: F, + state_update_rx: &mpsc::Receiver, + model_update_rx: &mpsc::Receiver>, + is_closed: &AtomicBool, + connection_attempts: &mut u32, + is_frame: bool, +) -> Result<(), StreamError> +where + F: RenderFn, +{ + // Retrieve the DAC iterator from the USB connected DAC. + let dac_params = if let DetectedDac::Helios { dac: params } = dac { + params + } else { + unreachable!( + "'run_laser_stream_usb_loop()' is only meant to run if the DAC variant is Helios" + ) + }; + + // A buffer for collecting model updates. + let mut pending_model_updates: Vec> = Vec::new(); + let helios_device = NativeHeliosDacController::new() + .map_err(|err| HeliosStreamError::FailedToCreateUSBContext { + err, + attempts: *connection_attempts, + })? + .get_device(dac_params.id) + .map_err(|err| HeliosStreamError::FailedToDetectDacs { + err, + attempts: *connection_attempts, + })? + .open() + .map_err(|err| HeliosStreamError::FailedToDetectDacs { + err, + attempts: *connection_attempts, + })?; + + *connection_attempts = 0; + + while !is_closed.load(atomic::Ordering::Relaxed) { + let (state, ..) = get_stream_updates( + &mut pending_model_updates, + model_update_rx, + model, + state, + state_update_rx, + dac, + ); + + // Clamp the point hz by the DAC's maximum point rate. + let point_hz = std::cmp::min(state.point_hz, dac.max_point_hz()); + + // Clamp the latency by the DAC's buffer capacity. + let latency_points = std::cmp::min(state.latency_points, dac.buffer_capacity()); + // Determine how many points the DAC can currently receive. + let n_points = if let DeviceStatus::Ready = helios_device + .status() + .map_err(|err| HeliosStreamError::FailedToWriteFrame { err })? + { + // The Helios DAC is either ready or not, which indicates whether it can recieve a new frame. + // Because of this, the latency points determines the number of points that can be sent + latency_points as usize + } else { + // Continue the loop until the DAC is ready in order to get the latest frame updates. + continue; + }; + + // The buffer that the user will write to. TODO: Re-use this points buffer. + let mut buffer = Buffer { + point_hz, + latency_points: latency_points as _, + points: vec![RawPoint::centered_blank(); n_points].into_boxed_slice(), + }; + + // Request the points from the user. + apply_point_updates_to_buffer(model, &render, &mut buffer); + helios_device + .write_frame(helios_dac::Frame::new_with_flags( + buffer.point_hz, + buffer.iter().cloned().map(point_to_helios_point).collect(), + if is_frame { + helios_dac::WriteFrameFlags::empty() + } else { + helios_dac::WriteFrameFlags::SINGLE_MODE + }, + )) + .map_err(|err| HeliosStreamError::FailedToWriteFrame { err })? + } + helios_device + .stop() + .map_err(|err| HeliosStreamError::FailedToStopStream { err })?; + helios_device.close(); + Ok(()) +} + // Attempts to connect to the DAC via TCP and enters the stream loop. fn run_laser_stream_tcp_loop( dac: &DetectedDac, @@ -651,12 +794,17 @@ fn run_laser_stream_tcp_loop( where F: RenderFn, { - // Currently only ether dream is supported, so retrieve the broadcast and addr. - let (broadcast, src_addr) = match dac { - DetectedDac::EtherDream { - broadcast, - source_addr, - } => (broadcast, source_addr), + // Retrieve the broadcast and addr from the TCP supported DAC. + let (broadcast, src_addr) = if let DetectedDac::EtherDream { + broadcast, + source_addr, + } = dac + { + (broadcast, source_addr) + } else { + unreachable!( + "'run_laser_stream_tcp_loop()' will only run if the DAC connection type is TCP" + ) }; // A buffer for collecting model updates. @@ -714,33 +862,14 @@ where let mut ether_dream_points = vec![]; while !is_closed.load(atomic::Ordering::Relaxed) { - // Collect any pending updates. - pending_model_updates.extend(model_update_rx.try_iter()); - // If there are some updates available, take the lock and apply them. - if !pending_model_updates.is_empty() { - if let Ok(mut guard) = model.lock() { - let mut model = guard.take().unwrap(); - for mut update in pending_model_updates.drain(..) { - update(&mut model); - } - *guard = Some(model); - } - } - - // Check for updates and retrieve a copy of the state. - let (state, prev_point_hz) = { - let mut state = state.lock().expect("failed to acquare raw state lock"); - - // Keep track of whether or not the `point_hz` as changed. - let prev_point_hz = std::cmp::min(state.point_hz, dac.max_point_hz()); - - // Apply updates. - for mut state_update in state_update_rx.try_iter() { - (*state_update)(&mut state); - } - - (state.clone(), prev_point_hz) - }; + let (state, prev_point_hz) = get_stream_updates( + &mut pending_model_updates, + model_update_rx, + model, + state, + state_update_rx, + dac, + ); // Clamp the point hz by the DAC's maximum point rate. let point_hz = std::cmp::min(state.point_hz, dac.max_point_hz()); @@ -768,11 +897,7 @@ where }; // Request the points from the user. - if let Ok(mut guard) = model.lock() { - let mut m = guard.take().unwrap(); - render(&mut m, &mut buffer); - *guard = Some(m); - } + apply_point_updates_to_buffer(model, &render, &mut buffer); // Retrieve the points. ether_dream_points.extend(buffer.iter().cloned().map(point_to_ether_dream_point)); @@ -799,72 +924,55 @@ where Ok(()) } -// The number of remaining points in the DAC. -fn dac_remaining_buffer_capacity(dac: ðer_dream::dac::Dac) -> u16 { - dac.buffer_capacity - 1 - dac.status.buffer_fullness -} +fn get_stream_updates( + pending_model_updates: &mut Vec>, + model_update_rx: &mpsc::Receiver>, + model: &Arc>>, + state: &Arc>, + state_update_rx: &mpsc::Receiver, + dac: &DetectedDac, +) -> (State, u32) { + // Collect any pending updates. + pending_model_updates.extend(model_update_rx.try_iter()); + // If there are some updates available, take the lock and apply them. + if !pending_model_updates.is_empty() { + if let Ok(mut guard) = model.lock() { + let mut model = guard.take().unwrap(); + for mut update in pending_model_updates.drain(..) { + update(&mut model); + } + *guard = Some(model); + } + } -// Determine the number of points needed to fill the DAC. -fn points_to_generate(dac: ðer_dream::dac::Dac, latency_points: u16) -> u16 { - let remaining_capacity = dac_remaining_buffer_capacity(dac); - let n = if dac.status.buffer_fullness < latency_points { - latency_points - dac.status.buffer_fullness - } else { - 0 - }; - std::cmp::min(n, remaining_capacity) -} + // Check for updates and retrieve a copy of the state. + let mut state = state.lock().expect("failed to acquare raw state lock"); -// Constructor for a centered, blank ether dream DAC point. -fn centered_blank() -> ether_dream::protocol::DacPoint { - ether_dream::protocol::DacPoint { - control: 0, - x: 0, - y: 0, - r: 0, - g: 0, - b: 0, - i: 0, - u1: 0, - u2: 0, - } -} + // Keep track of whether or not the `point_hz` as changed. + let prev_point_hz = std::cmp::min(state.point_hz, dac.max_point_hz()); -// Convert a `lase::point::Position` type to an `i16` representation compatible with ether dream. -fn position_to_ether_dream_position([px, py]: crate::point::Position) -> [i16; 2] { - let min = std::i16::MIN; - let max = std::i16::MAX; - let x = map_range(clamp(px, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as i16; - let y = map_range(clamp(py, -1.0, 1.0), -1.0, 1.0, min as f64, max as f64) as i16; - [x, y] -} + // Apply updates. + for mut state_update in state_update_rx.try_iter() { + (*state_update)(&mut state); + } -// Convert a `lase::point::Rgb` type to an `u16` representation compatible with ether dream. -fn color_to_ether_dream_color([pr, pg, pb]: crate::point::Rgb) -> [u16; 3] { - let r = (clamp(pr, 0.0, 1.0) * std::u16::MAX as f32) as u16; - let g = (clamp(pg, 0.0, 1.0) * std::u16::MAX as f32) as u16; - let b = (clamp(pb, 0.0, 1.0) * std::u16::MAX as f32) as u16; - [r, g, b] + (state.clone(), prev_point_hz) } -// Convert the laser point to an ether dream DAC point. -fn point_to_ether_dream_point(p: RawPoint) -> ether_dream::protocol::DacPoint { - let [x, y] = position_to_ether_dream_position(p.position); - let [r, g, b] = color_to_ether_dream_color(p.color); - let (control, i, u1, u2) = (0, 0, 0, 0); - ether_dream::protocol::DacPoint { - control, - x, - y, - r, - g, - b, - i, - u1, - u2, +fn apply_point_updates_to_buffer( + model: &Arc>>, + render: &F, + buffer: &mut Buffer, +) where + F: RenderFn, +{ + // Request the points from the user. + if let Ok(mut guard) = model.lock() { + let mut m = guard.take().unwrap(); + render(&mut m, buffer); + *guard = Some(m); } } - /// The default function used for the `stream_error` function if none is specified. /// /// If an error occurs while the TCP stream is running, an attempt will be made to re-establish a @@ -884,26 +992,37 @@ pub fn default_stream_error_fn( let timeout = Some(Duration::from_secs(2)); StreamErrorAction::RedetectDac { timeout } } - let ether_dream_err = match *err { - StreamError::EtherDreamStream { ref err } => err, - }; - *action = match *ether_dream_err { - EtherDreamStreamError::FailedToDetectDacs { attempts, .. } if attempts < 3 => { - redetect_dac_action() - } - EtherDreamStreamError::FailedToConnectStream { attempts, .. } if attempts < 3 => { - std::thread::sleep(std::time::Duration::from_millis(16)); - StreamErrorAction::ReattemptConnect - } - EtherDreamStreamError::FailedToConnectStream { attempts, .. } if attempts == 3 => { - redetect_dac_action() - } - EtherDreamStreamError::FailedToPrepareStream { .. } - | EtherDreamStreamError::FailedToBeginStream { .. } - | EtherDreamStreamError::FailedToSubmitData { .. } - | EtherDreamStreamError::FailedToSubmitPointRate { .. } => { - StreamErrorAction::ReattemptConnect - } - _ => StreamErrorAction::CloseThread, + *action = match err { + StreamError::HeliosStream { ref err } => match *err { + HeliosStreamError::FailedToDetectDacs { attempts, .. } + | HeliosStreamError::FailedToCreateUSBContext { attempts, .. } + if attempts < 3 => + { + redetect_dac_action() + } + HeliosStreamError::FailedToStopStream { .. } + | HeliosStreamError::FailedToWriteFrame { .. } + | HeliosStreamError::InvalidDeviceResult { .. } => StreamErrorAction::ReattemptConnect, + _ => StreamErrorAction::CloseThread, + }, + StreamError::EtherDreamStream { ref err } => match *err { + EtherDreamStreamError::FailedToDetectDacs { attempts, .. } if attempts < 3 => { + redetect_dac_action() + } + EtherDreamStreamError::FailedToConnectStream { attempts, .. } if attempts < 3 => { + std::thread::sleep(std::time::Duration::from_millis(16)); + StreamErrorAction::ReattemptConnect + } + EtherDreamStreamError::FailedToConnectStream { attempts, .. } if attempts == 3 => { + redetect_dac_action() + } + EtherDreamStreamError::FailedToPrepareStream { .. } + | EtherDreamStreamError::FailedToBeginStream { .. } + | EtherDreamStreamError::FailedToSubmitData { .. } + | EtherDreamStreamError::FailedToSubmitPointRate { .. } => { + StreamErrorAction::ReattemptConnect + } + _ => StreamErrorAction::CloseThread, + }, }; }