diff --git a/.gitignore b/.gitignore index 7ddd7f5..37e353c 100644 --- a/.gitignore +++ b/.gitignore @@ -31,3 +31,7 @@ build/ dist/ *.so out*/ + +# Dev +*.patch +robstridev2/ diff --git a/MANIFEST.in b/MANIFEST.in index b3c6553..8b6054f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -1 +1 @@ -recursive-include kos_protos/ *.py *.txt py.typed MANIFEST.in py.typed Makefile +recursive-include kos_protos/ *.py *.txt py.typed MANIFEST.in py.typed Makefile pykos/Makefile diff --git a/README.md b/README.md index 82f4cab..c9b7009 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,12 @@ Reference the existing platforms / features in [platforms](platforms). You essentially create another package (Cargo.toml, lib.rs, etc) with the necessary actuator and imu implementations according to the specifications in [kos-core](kos-core/src/services) + +To save trace logs to a file, pass the `--log` flag: +```bash +cargo run --features stub -- --log + + ## Contributing - Use `cargo fmt --all` to format the code. - Use `cargo clippy` to check for lint errors. diff --git a/daemon/Cargo.toml b/daemon/Cargo.toml index eaad24f..fd6d01c 100644 --- a/daemon/Cargo.toml +++ b/daemon/Cargo.toml @@ -20,6 +20,11 @@ tower = "0.5" gstreamer = "0.23" gstreamer-base = "0.23" glib = "0.17" +tracing-appender = "0.2" +clap = { version = "4.0", features = ["derive"] } +chrono = "0.4" +directories = "5.0" +flate2 = "1.0" kos-sim = { version = "0.1.0", path = "../platforms/sim", optional = true } kos-stub = { version = "0.1.0", path = "../platforms/stub", optional = true } diff --git a/daemon/src/file_logging.rs b/daemon/src/file_logging.rs new file mode 100644 index 0000000..20c018c --- /dev/null +++ b/daemon/src/file_logging.rs @@ -0,0 +1,176 @@ +use chrono::Local; +use directories::BaseDirs; +use eyre::Result; +use flate2::write::GzEncoder; +use flate2::Compression; +use std::fs::File; +use std::io::{self, BufWriter, Write}; +use std::path::PathBuf; +use tracing::{info, error}; +use tracing_subscriber::prelude::*; +use tracing_subscriber::{filter::EnvFilter, Layer}; + +pub struct CompressedWriter { + encoder: Option>>, + path: PathBuf, +} + +impl CompressedWriter { + pub fn new(path: impl AsRef) -> io::Result { + let file = File::create(path.as_ref())?; + let buffered = BufWriter::new(file); + Ok(Self { + encoder: Some(GzEncoder::new(buffered, Compression::new(6))), + path: path.as_ref().to_path_buf(), + }) + } + + fn sync(&mut self) -> io::Result<()> { + if let Some(encoder) = &mut self.encoder { + encoder.flush()?; + let buf_writer = encoder.get_mut(); + buf_writer.flush()?; + let file = buf_writer.get_mut(); + file.sync_all()?; + } + Ok(()) + } + + pub fn finalize(&mut self) -> io::Result<()> { + if let Some(encoder) = self.encoder.take() { + info!("Finalizing compressed log {}", self.path.display()); + // Finish the compression and get the BufWriter back + let mut buf_writer = encoder.finish()?; + // Flush the buffer + buf_writer.flush()?; + // Sync to disk + buf_writer.get_mut().sync_all()?; + info!("Successfully finalized log {}", self.path.display()); + } + Ok(()) + } +} + +impl Write for CompressedWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + if let Some(encoder) = &mut self.encoder { + match encoder.write(buf) { + Ok(size) => { + if size > 0 && buf.contains(&b'\n') { + self.sync()?; + } + Ok(size) + } + Err(e) => { + error!( + "Failed to write to compressed log {}: {}", + self.path.display(), + e + ); + Err(e) + } + } + } else { + error!( + "Attempted to write to finalized log {}", + self.path.display() + ); + Err(io::Error::new( + io::ErrorKind::Other, + "Writer has been finalized", + )) + } + } + + fn flush(&mut self) -> io::Result<()> { + self.sync() + } +} + +impl Drop for CompressedWriter { + fn drop(&mut self) { + if let Err(e) = self.finalize() { + error!( + "Failed to finalize compressed log {}: {}", + self.path.display(), + e + ); + } + } +} + +pub fn setup_logging( + enable_file_logging: bool, + log_level: &str, +) -> Result> { + let level_filter = match log_level.to_lowercase().as_str() { + "trace" => tracing_subscriber::filter::LevelFilter::TRACE, + "debug" => tracing_subscriber::filter::LevelFilter::DEBUG, + "info" => tracing_subscriber::filter::LevelFilter::INFO, + "warn" => tracing_subscriber::filter::LevelFilter::WARN, + "error" => tracing_subscriber::filter::LevelFilter::ERROR, + _ => { + eprintln!("Invalid log level '{}', defaulting to 'info'", log_level); + tracing_subscriber::filter::LevelFilter::INFO + } + }; + + let subscriber = tracing_subscriber::registry(); + + // Update stdout layer to use the specified level + let stdout_layer = tracing_subscriber::fmt::layer() + .with_writer(std::io::stdout) + .with_filter( + EnvFilter::from_default_env() + .add_directive(format!("kos={}", log_level).parse().unwrap()) + .add_directive("h2=error".parse().unwrap()) + .add_directive("grpc=error".parse().unwrap()) + .add_directive("rumqttc=error".parse().unwrap()) + .add_directive("kos_core::telemetry=error".parse().unwrap()) + .add_directive("polling=error".parse().unwrap()) + .add_directive("async_io=error".parse().unwrap()) + .add_directive("krec=error".parse().unwrap()), + ); + + let subscriber = subscriber.with(stdout_layer); + + if enable_file_logging { + let log_dir = if let Some(base_dirs) = BaseDirs::new() { + base_dirs.data_local_dir().join("kos").join("logs") + } else { + PathBuf::from("~/.local/share/kos/logs") + }; + + std::fs::create_dir_all(&log_dir)?; + + let timestamp = Local::now().format("%Y%m%d_%H%M%S"); + let final_name = format!("kos-daemon_{}.log.gz", timestamp); + let log_path = log_dir.join(&final_name); + + let compressed_writer = CompressedWriter::new(&log_path)?; + let (non_blocking, guard) = tracing_appender::non_blocking(compressed_writer); + + let file_layer = tracing_subscriber::fmt::layer() + .with_thread_ids(true) + .with_target(true) + .with_file(true) + .with_line_number(true) + .with_writer(non_blocking) + .with_filter(level_filter); + + subscriber.with(file_layer).init(); + Ok(Some(guard)) + } else { + subscriber.init(); + Ok(None) + } +} + +pub fn cleanup_logging(guard: Option) { + if let Some(guard) = guard { + // Ensure we flush any pending writes before dropping + drop(guard); + // Give a small amount of time for the worker to finish + std::thread::sleep(std::time::Duration::from_millis(100)); + } +} diff --git a/daemon/src/main.rs b/daemon/src/main.rs index 34fd48e..814ab75 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -2,6 +2,7 @@ // This will run the gRPC server and, if applicable, a runtime loop // (e.g., actuator polling, loaded model inference). +use clap::Parser; use eyre::Result; use kos_core::google_proto::longrunning::operations_server::OperationsServer; use kos_core::services::OperationsServiceImpl; @@ -10,10 +11,13 @@ use kos_core::Platform; use kos_core::ServiceEnum; use std::collections::HashMap; use std::sync::Arc; +use tokio::signal; use tokio::sync::Mutex; use tonic::transport::Server; use tracing::{debug, error, info}; use tracing_subscriber::filter::EnvFilter; +use tracing_subscriber::prelude::*; +use tracing_subscriber::Layer; #[cfg(not(any(feature = "kos-sim", feature = "kos-zeroth-01", feature = "kos-kbot")))] use kos_stub::StubPlatform as PlatformImpl; @@ -27,6 +31,21 @@ use kos_zeroth_01::Zeroth01Platform as PlatformImpl; #[cfg(feature = "kos-kbot")] use kos_kbot::KbotPlatform as PlatformImpl; +mod file_logging; +use file_logging::{setup_logging, cleanup_logging}; + +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Enable file logging + #[arg(long, default_value_t = false)] + log: bool, + + /// Log level (trace, debug, info, warn, error) + #[arg(long, default_value = "info")] + log_level: String, +} + fn add_service_to_router( router: tonic::transport::server::Router, service: ServiceEnum, @@ -46,7 +65,7 @@ async fn run_server( let addr = "0.0.0.0:50051".parse()?; let mut server_builder = Server::builder(); - let services = platform.create_services(operations_service.clone())?; + let services = platform.create_services(operations_service.clone()).await?; let operations_service = OperationsServer::new(operations_service); @@ -63,24 +82,53 @@ async fn run_server( Ok(()) } +struct DaemonState { + _guard: Option, + platform: PlatformImpl, +} + #[tokio::main] async fn main() -> Result<()> { - // logging - tracing_subscriber::fmt() - .with_env_filter( + let args = Args::parse(); + + // tracing + let subscriber = tracing_subscriber::registry(); + + // Always add stdout layer + let stdout_layer = tracing_subscriber::fmt::layer() + .with_writer(std::io::stdout) + .with_filter( EnvFilter::from_default_env() .add_directive("h2=error".parse().unwrap()) .add_directive("grpc=error".parse().unwrap()) .add_directive("rumqttc=error".parse().unwrap()) - .add_directive("kos_core::telemetry=error".parse().unwrap()), - ) - .init(); + .add_directive("kos_core::telemetry=error".parse().unwrap()) + .add_directive("polling=error".parse().unwrap()) + .add_directive("async_io=error".parse().unwrap()) + .add_directive("krec=error".parse().unwrap()), + ); + + let _subscriber = subscriber.with(stdout_layer); + + let guard = setup_logging(args.log, &args.log_level)?; - let mut platform = PlatformImpl::new(); + let mut state = DaemonState { + _guard: guard, + platform: PlatformImpl::new(), + }; + + // Setup signal handler + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + + tokio::spawn(async move { + if let Ok(()) = signal::ctrl_c().await { + let _ = shutdown_tx.send(()); + } + }); // telemetry Telemetry::initialize( - format!("{}-{}", platform.name(), platform.serial()).as_str(), + format!("{}-{}", state.platform.name(), state.platform.serial()).as_str(), "localhost", 1883, ) @@ -89,11 +137,19 @@ async fn main() -> Result<()> { let operations_store = Arc::new(Mutex::new(HashMap::new())); let operations_service = Arc::new(OperationsServiceImpl::new(operations_store)); - platform.initialize(operations_service.clone())?; - - if let Err(e) = run_server(&platform, operations_service).await { - error!("Server error: {:?}", e); - std::process::exit(1); + state.platform.initialize(operations_service.clone())?; + + tokio::select! { + res = run_server(&state.platform, operations_service) => { + if let Err(e) = res { + error!("Server error: {:?}", e); + std::process::exit(1); + } + } + _ = shutdown_rx => { + info!("Received shutdown signal, cleaning up..."); + cleanup_logging(state._guard.take()); + } } Ok(()) diff --git a/kos_core/Cargo.toml b/kos_core/Cargo.toml index 5ff0b5e..665615a 100644 --- a/kos_core/Cargo.toml +++ b/kos_core/Cargo.toml @@ -26,7 +26,7 @@ eyre = "0.6" hyper = "0.14" tracing = "0.1" lazy_static = "1.4" -krec = "0.1" +krec = "0.2" [build-dependencies] tonic-build = "0.12" diff --git a/kos_core/proto/kos/actuator.proto b/kos_core/proto/kos/actuator.proto index 4b55763..e56cea0 100644 --- a/kos_core/proto/kos/actuator.proto +++ b/kos_core/proto/kos/actuator.proto @@ -59,6 +59,7 @@ message ConfigureActuatorRequest { optional float protection_time = 7; // Protection time in seconds optional bool torque_enabled = 8; // Torque enabled flag optional uint32 new_actuator_id = 9; // New actuator ID + optional bool zero_position = 10; // Instant zero position } // Request message for CalibrateActuator. diff --git a/kos_core/src/lib.rs b/kos_core/src/lib.rs index 275b4b9..3b36bb2 100644 --- a/kos_core/src/lib.rs +++ b/kos_core/src/lib.rs @@ -11,12 +11,15 @@ pub mod telemetry_types; pub use grpc_interface::google as google_proto; pub use grpc_interface::kos as kos_proto; +use async_trait::async_trait; use hal::actuator_service_server::ActuatorServiceServer; use hal::imu_service_server::ImuServiceServer; use hal::process_manager_service_server::ProcessManagerServiceServer; use services::OperationsServiceImpl; use services::{ActuatorServiceImpl, IMUServiceImpl, ProcessManagerServiceImpl}; use std::fmt::Debug; +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; impl Debug for ActuatorServiceImpl { @@ -42,14 +45,15 @@ pub enum ServiceEnum { ProcessManager(ProcessManagerServiceServer), } -pub trait Platform { +#[async_trait] +pub trait Platform: Send + Sync { fn name(&self) -> &'static str; fn serial(&self) -> String; fn initialize(&mut self, operations_service: Arc) -> eyre::Result<()>; - fn create_services( - &self, + fn create_services<'a>( + &'a self, operations_service: Arc, - ) -> eyre::Result>; + ) -> Pin>> + Send + 'a>>; fn shutdown(&mut self) -> eyre::Result<()>; } diff --git a/kos_core/src/services/krec_logger.rs b/kos_core/src/services/krec_logger.rs index db18979..017133c 100644 --- a/kos_core/src/services/krec_logger.rs +++ b/kos_core/src/services/krec_logger.rs @@ -1,7 +1,4 @@ -use crate::kos_proto::{ - actuator::ActuatorStateResponse, - imu::{ImuValuesResponse, QuaternionResponse}, -}; +use crate::kos_proto::imu::{ImuValuesResponse, QuaternionResponse}; use eyre::Result; use krec::{ ActuatorCommand, ActuatorState, ImuQuaternion, ImuValues, KRec, KRecFrame, KRecHeader, Vec3, @@ -29,6 +26,27 @@ struct ActuatorCommandItem { torque: Option, } +#[derive(Deserialize, Debug)] +struct ActuatorStateData { + actuator_id: u32, + online: bool, + position: Option, + velocity: Option, + torque: Option, + temperature: Option, + voltage: Option, + current: Option, +} + +#[derive(Deserialize, Debug)] +#[allow(unused)] +struct ActuatorStateList { + frame_number: u64, + video_timestamp: u64, + inference_step: u64, + data: Vec, +} + pub struct TelemetryLogger { krec: Arc>, _mqtt_client: AsyncClient, @@ -158,24 +176,37 @@ impl TelemetryLogger { tracing::error!("Failed to decode QuaternionResponse {:?}", payload); } } else if topic.contains("/actuator/state") { - if let Ok(state) = ActuatorStateResponse::decode(payload.as_ref()) { - frame.actuator_states.push(ActuatorState { - actuator_id: state.actuator_id, - online: state.online, - position: state.position, - velocity: state.velocity, - torque: state.torque, - temperature: state.temperature, - voltage: state.voltage, - current: state.current, - }); - } else { - tracing::error!("Failed to decode ActuatorStateResponse {:?}", payload); + match serde_json::from_slice::(payload) { + Ok(state_list) => { + for state in state_list.data { + frame.actuator_states.push(ActuatorState { + actuator_id: state.actuator_id, + online: state.online, + position: state.position, + velocity: state.velocity, + torque: state.torque, + temperature: state.temperature, + voltage: state.voltage, + current: state.current, + }); + } + } + Err(e) => { + tracing::error!("Failed to parse actuator state JSON: {:?}", e); + } } } else if topic.contains("/actuator/command") { - match serde_json::from_slice::(&payload) { + match serde_json::from_slice::(payload) { Ok(command_data) => { frame.inference_step = command_data.inference_step; + frame.video_timestamp = command_data.video_timestamp; + frame.video_frame_number = command_data.frame_number; + frame.real_timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() + as u64; + for item in command_data.data { frame.actuator_commands.push(ActuatorCommand { actuator_id: item.actuator_id, @@ -184,7 +215,6 @@ impl TelemetryLogger { torque: item.torque.unwrap_or_default() as f32, }); } - tracing::debug!("Parsed actuator command: {:?}", frame); } Err(e) => { tracing::error!("Failed to parse actuator command JSON: {:?}", e); @@ -197,6 +227,7 @@ impl TelemetryLogger { if frame.inference_step > *current { // Add frame to KRec let mut krec = krec_clone.lock().await; + krec.add_frame(frame.clone()); // Save every 500 frames diff --git a/platforms/kbot/Cargo.toml b/platforms/kbot/Cargo.toml index da9b71d..12b6e3f 100644 --- a/platforms/kbot/Cargo.toml +++ b/platforms/kbot/Cargo.toml @@ -10,10 +10,10 @@ description = "KOS platform for KBot" [dependencies] kos_core = { version = "0.1.1", path = "../../kos_core" } eyre = "0.6" -krec = "0.1" +krec = "0.2" tracing = "0.1" async-trait = "0.1" -robstride = "0.2.9" +robstridev2 = "0.3" gstreamer = "0.20" gstreamer-app = "0.20" gstreamer-video = "0.20" @@ -21,4 +21,4 @@ uuid = "1" tokio = { version = "1", features = ["full"] } [target.'cfg(target_os = "linux")'.dependencies] imu = ">=0.1.6" -chrono = "0.4" \ No newline at end of file +chrono = "0.4" diff --git a/platforms/kbot/src/actuator.rs b/platforms/kbot/src/actuator.rs index 7ed66a2..c5857a3 100644 --- a/platforms/kbot/src/actuator.rs +++ b/platforms/kbot/src/actuator.rs @@ -1,6 +1,6 @@ use crate::{Arc, Operation, OperationsServiceImpl}; use async_trait::async_trait; -use eyre::{Result, WrapErr}; +use eyre::Result; use kos_core::{ hal::{ActionResponse, Actuator, ActuatorCommand, CalibrateActuatorRequest}, kos_proto::{ @@ -9,42 +9,72 @@ use kos_core::{ common::{ActionResult, ErrorCode}, }, }; -use std::collections::HashMap; -use robstride::{MotorType, MotorsSupervisor}; +use robstridev2::{CH341Transport, ControlConfig, SocketCanTransport, Supervisor, TransportType}; +use std::time::Duration; +use tokio::sync::Mutex; pub struct KBotActuator { - motors_supervisor: MotorsSupervisor, + supervisor: Arc>, } impl KBotActuator { - pub fn new( + pub async fn new( _operations_service: Arc, - port: &str, - motor_infos: HashMap, - verbose: Option, - max_update_rate: Option, - zero_on_init: Option, + ports: Vec<&str>, + actuator_timeout: Duration, + polling_interval: Duration, + actuators_config: &[(u8, robstridev2::ActuatorConfiguration)], ) -> Result { - let motor_infos_u8 = motor_infos - .into_iter() - .map(|(k, v)| { - let id = - u8::try_from(k).wrap_err_with(|| format!("Motor ID {} too large for u8", k))?; - Ok((id, v)) - }) - .collect::>>()?; - - let motors_supervisor = MotorsSupervisor::new( - port, - &motor_infos_u8, - verbose.unwrap_or(false), - max_update_rate.unwrap_or(100000) as f64, - zero_on_init.unwrap_or(false), - ) - .map_err(|e| eyre::eyre!("Failed to create motors supervisor: {}", e))?; - - Ok(KBotActuator { motors_supervisor }) + let mut supervisor = Supervisor::new(actuator_timeout)?; + let mut found_motors = vec![false; actuators_config.len()]; + + for port in ports.clone() { + if port.starts_with("/dev/tty") { + let serial = CH341Transport::new(port.to_string()).await?; + supervisor + .add_transport(format!("{}", port), TransportType::CH341(serial)) + .await?; + } else if port.starts_with("can") { + let can = SocketCanTransport::new(port.to_string()).await?; + supervisor + .add_transport(format!("{}", port), TransportType::SocketCAN(can)) + .await?; + } else { + return Err(eyre::eyre!("Invalid port: {}", port)); + } + } + + let mut supervisor_runner = supervisor.clone_controller(); + let _supervisor_handle = tokio::spawn(async move { + if let Err(e) = supervisor_runner.run(polling_interval).await { + tracing::error!("Supervisor task failed: {}", e); + } + }); + + for port in ports.clone() { + let discovered_ids = supervisor.scan_bus(0xFD, port, actuators_config).await?; + + for (idx, (motor_id, _)) in actuators_config.iter().enumerate() { + if discovered_ids.contains(motor_id) { + found_motors[idx] = true; + } + } + } + + for (idx, (motor_id, _)) in actuators_config.iter().enumerate() { + if !found_motors[idx] { + tracing::warn!( + "Configured motor not found - ID: {}, Type: {:?}", + motor_id, + actuators_config[idx].1.actuator_type + ); + } + } + + Ok(KBotActuator { + supervisor: Arc::new(Mutex::new(supervisor)), + }) } } @@ -53,47 +83,28 @@ impl Actuator for KBotActuator { async fn command_actuators(&self, commands: Vec) -> Result> { let mut results = vec![]; for command in commands { - let mut motor_result = vec![]; - if let Some(position) = command.position { - let result = self - .motors_supervisor - .set_position(command.actuator_id as u8, position.to_radians() as f32); - motor_result.push(result); - } - if let Some(velocity) = command.velocity { - let result = self - .motors_supervisor - .set_velocity(command.actuator_id as u8, velocity as f32); - motor_result.push(result); - } - if let Some(torque) = command.torque { - let result = self - .motors_supervisor - .set_torque(command.actuator_id as u8, torque as f32); - motor_result.push(result); - } + let motor_id = command.actuator_id as u8; + let mut supervisor = self.supervisor.lock().await; + let result = supervisor + .command( + motor_id, + command + .position + .map(|p| p.to_radians() as f32) + .unwrap_or(0.0), + command + .velocity + .map(|v| v.to_radians() as f32) + .unwrap_or(0.0), + command.torque.map(|t| t as f32).unwrap_or(0.0), + ) + .await; - let success = motor_result.iter().all(|r| r.is_ok()); - let error = if !success { - Some(KosError { - code: if motor_result - .iter() - .any(|r| matches!(r, Err(e) if e.kind() == std::io::ErrorKind::NotFound)) - { - ErrorCode::InvalidArgument as i32 - } else { - ErrorCode::HardwareFailure as i32 - }, - message: motor_result - .iter() - .filter_map(|r| r.as_ref().err()) - .map(|e| e.to_string()) - .collect::>() - .join("; "), - }) - } else { - None - }; + let success = result.is_ok(); + let error = result.err().map(|e| KosError { + code: ErrorCode::HardwareFailure as i32, + message: e.to_string(), + }); results.push(ActionResult { actuator_id: command.actuator_id, @@ -106,41 +117,43 @@ impl Actuator for KBotActuator { async fn configure_actuator(&self, config: ConfigureActuatorRequest) -> Result { let motor_id = config.actuator_id as u8; - let mut results = vec![]; - // Configure KP if provided - if let Some(kp) = config.kp { - let result = self.motors_supervisor.set_kp(motor_id, kp as f32); - results.push(result); + let control_config = ControlConfig { + kp: config.kp.unwrap_or(0.0) as f32, + kd: config.kd.unwrap_or(0.0) as f32, + max_torque: Some(config.max_torque.unwrap_or(2.0) as f32), + max_velocity: Some(5.0), + max_current: Some(10.0), + }; + + let mut supervisor = self.supervisor.lock().await; + let result = supervisor.configure(motor_id, control_config).await; + + if let Some(torque_enabled) = config.torque_enabled { + if torque_enabled { + supervisor.enable(motor_id).await?; + } else { + supervisor.disable(motor_id, true).await?; + } } - // Configure KD if provided - if let Some(kd) = config.kd { - let result = self.motors_supervisor.set_kd(motor_id, kd as f32); - results.push(result); + if let Some(zero_position) = config.zero_position { + if zero_position { + supervisor.zero(motor_id).await?; + } } - let success = results.iter().all(|r| r.is_ok()); - let error = if !success { - Some(kos_core::kos_proto::common::Error { - code: if results - .iter() - .any(|r| matches!(r, Err(e) if e.kind() == std::io::ErrorKind::NotFound)) - { - ErrorCode::InvalidArgument as i32 - } else { - ErrorCode::HardwareFailure as i32 - }, - message: results - .iter() - .filter_map(|r| r.as_ref().err()) - .map(|e| e.to_string()) - .collect::>() - .join("; "), - }) - } else { - None - }; + if let Some(new_actuator_id) = config.new_actuator_id { + supervisor + .change_id(motor_id, new_actuator_id as u8) + .await?; + } + + let success = result.is_ok(); + let error = result.err().map(|e| KosError { + code: ErrorCode::HardwareFailure as i32, + message: e.to_string(), + }); Ok(ActionResponse { success, error }) } @@ -151,21 +164,24 @@ impl Actuator for KBotActuator { async fn get_actuators_state( &self, - _actuator_ids: Vec, + actuator_ids: Vec, ) -> Result> { - let feedback = self.motors_supervisor.get_latest_feedback(); - Ok(feedback - .iter() - .map(|(id, state)| ActuatorStateResponse { - actuator_id: u32::from(*id), - online: matches!(state.mode, robstride::MotorMode::Motor), - position: Some(state.position.to_degrees() as f64), - velocity: Some(state.velocity as f64), - torque: Some(state.torque as f64), - temperature: None, - voltage: None, - current: None, - }) - .collect()) + let mut responses = vec![]; + for id in actuator_ids { + let supervisor = self.supervisor.lock().await; + if let Ok(Some((feedback, ts))) = supervisor.get_feedback(id as u8).await { + responses.push(ActuatorStateResponse { + actuator_id: id, + online: ts.elapsed().unwrap_or(Duration::from_secs(1)) < Duration::from_secs(1), + position: Some(feedback.angle.to_degrees() as f64), + velocity: Some(feedback.velocity.to_degrees() as f64), + torque: Some(feedback.torque as f64), + temperature: Some(feedback.temperature as f64), + voltage: None, + current: None, + }); + } + } + Ok(responses) } } diff --git a/platforms/kbot/src/lib.rs b/platforms/kbot/src/lib.rs index e5a07ea..cb988ea 100644 --- a/platforms/kbot/src/lib.rs +++ b/platforms/kbot/src/lib.rs @@ -5,12 +5,14 @@ mod process_manager; mod hexmove; pub use actuator::*; +pub use robstridev2::{ActuatorConfiguration, ActuatorType}; #[cfg(target_os = "linux")] pub use hexmove::*; pub use process_manager::*; -use eyre::{Result, WrapErr}; +use async_trait::async_trait; +use eyre::WrapErr; use kos_core::hal::Operation; use kos_core::kos_proto::actuator::actuator_service_server::ActuatorServiceServer; use kos_core::kos_proto::process_manager::process_manager_service_server::ProcessManagerServiceServer; @@ -18,9 +20,10 @@ use kos_core::{ services::{ActuatorServiceImpl, OperationsServiceImpl, ProcessManagerServiceImpl}, Platform, ServiceEnum, }; -use robstride::MotorType; -use std::collections::HashMap; +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; pub struct KbotPlatform {} @@ -36,6 +39,7 @@ impl Default for KbotPlatform { } } +#[async_trait] impl Platform for KbotPlatform { fn name(&self) -> &'static str { "KBot" @@ -51,70 +55,248 @@ impl Platform for KbotPlatform { Ok(()) } - fn create_services( - &self, + fn create_services<'a>( + &'a self, operations_service: Arc, - ) -> Result> { - if cfg!(target_os = "linux") { - // Create the process manager first and handle any errors - let process_manager = KBotProcessManager::new(self.name().to_string(), self.serial()) - .wrap_err("Failed to initialize GStreamer process manager")?; - - Ok(vec![ - // ServiceEnum::Imu( - // kos_core::kos_proto::imu::imu_service_server::ImuServiceServer::new( - // kos_core::services::IMUServiceImpl::new(Arc::new( - // KBotIMU::new(operations_service.clone(), "can0", 1, 1) - // .wrap_err("Failed to create IMU")?, - // )), - // ), - // ), - // TODO: fix config definition - ServiceEnum::Actuator(ActuatorServiceServer::new(ActuatorServiceImpl::new( - Arc::new( - KBotActuator::new( - operations_service, - "/dev/ttyCH341USB1", - HashMap::from([ - (1, MotorType::Type03), - (2, MotorType::Type03), - (3, MotorType::Type01), - (4, MotorType::Type01), - (5, MotorType::Type01), - ]), - None, - None, - None, - ) - .wrap_err("Failed to create actuator")?, - ), - ))), - ServiceEnum::ProcessManager(ProcessManagerServiceServer::new( - ProcessManagerServiceImpl::new(Arc::new(process_manager)), - )), - ]) - } else { - Ok(vec![ServiceEnum::Actuator(ActuatorServiceServer::new( - ActuatorServiceImpl::new(Arc::new( - KBotActuator::new( - operations_service, - "/dev/ttyCH341USB0", - HashMap::from([ - (1, MotorType::Type04), - (2, MotorType::Type04), - (3, MotorType::Type04), - (4, MotorType::Type04), - (5, MotorType::Type04), - (6, MotorType::Type01), - ]), - None, - None, - None, - ) - .wrap_err("Failed to create actuator")?, - )), - ))]) - } + ) -> Pin>> + Send + 'a>> { + Box::pin(async move { + if cfg!(target_os = "linux") { + tracing::debug!("Initializing KBot services for Linux"); + + let process_manager = + KBotProcessManager::new(self.name().to_string(), self.serial()) + .wrap_err("Failed to initialize GStreamer process manager")?; + + let actuator = KBotActuator::new( + operations_service, + vec![ + // "/dev/ttyCH341USB0", + // "/dev/ttyCH341USB1", + // "/dev/ttyCH341USB2", + // "/dev/ttyCH341USB3", + // "can0", + "can1", "can2", + ], + Duration::from_secs(1), + // Duration::from_nanos(3_333_333), + Duration::from_millis(7), + &[ + // Left Arm + ( + 11, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride03, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 12, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride03, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 13, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride02, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 14, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride02, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 15, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride02, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 16, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride00, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + // Right Arm + ( + 21, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride03, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 22, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride03, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 23, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride02, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 24, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride02, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 25, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride02, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 26, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride00, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + // Left Leg + ( + 31, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride04, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 32, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride03, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 33, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride03, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 34, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride04, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 35, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride02, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + // Right Leg + ( + 41, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride04, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 42, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride03, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 43, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride03, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 44, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride04, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ( + 45, + ActuatorConfiguration { + actuator_type: ActuatorType::RobStride02, + max_angle_change: Some(4.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + ), + ], + ) + .await + .wrap_err("Failed to create actuator")?; + + Ok(vec![ + ServiceEnum::Actuator(ActuatorServiceServer::new(ActuatorServiceImpl::new( + Arc::new(actuator), + ))), + ServiceEnum::ProcessManager(ProcessManagerServiceServer::new( + ProcessManagerServiceImpl::new(Arc::new(process_manager)), + )), + ]) + } else { + let actuator = KBotActuator::new( + operations_service, + vec!["can0"], + Duration::from_secs(1), + Duration::from_nanos(3_333_333), + &[( + 1, + robstridev2::ActuatorConfiguration { + actuator_type: ActuatorType::RobStride04, + max_angle_change: Some(2.0f32.to_radians()), + max_velocity: Some(10.0f32.to_radians()), + }, + )], + ) + .await + .wrap_err("Failed to create actuator")?; + + Ok(vec![ServiceEnum::Actuator(ActuatorServiceServer::new( + ActuatorServiceImpl::new(Arc::new(actuator)), + ))]) + } + }) } fn shutdown(&mut self) -> eyre::Result<()> { diff --git a/platforms/stub/src/lib.rs b/platforms/stub/src/lib.rs index 321e89a..371f0b7 100644 --- a/platforms/stub/src/lib.rs +++ b/platforms/stub/src/lib.rs @@ -5,15 +5,11 @@ pub use actuator::*; pub use imu::*; pub use process_manager::*; -use eyre::Result; +use async_trait::async_trait; use kos_core::hal::Operation; -use kos_core::kos_proto::{ - actuator::actuator_service_server::ActuatorServiceServer, - imu::imu_service_server::ImuServiceServer, - process_manager::process_manager_service_server::ProcessManagerServiceServer, -}; -use kos_core::services::{ActuatorServiceImpl, IMUServiceImpl, ProcessManagerServiceImpl}; use kos_core::{services::OperationsServiceImpl, Platform, ServiceEnum}; +use std::future::Future; +use std::pin::Pin; use std::sync::Arc; pub struct StubPlatform {} @@ -30,6 +26,7 @@ impl Default for StubPlatform { } } +#[async_trait] impl Platform for StubPlatform { fn name(&self) -> &'static str { "Stub" @@ -44,22 +41,13 @@ impl Platform for StubPlatform { Ok(()) } - fn create_services( - &self, - operations_service: Arc, - ) -> Result> { - // Add available services here - Ok(vec![ - ServiceEnum::Imu(ImuServiceServer::new(IMUServiceImpl::new(Arc::new( - StubIMU::new(operations_service.clone()), - )))), - ServiceEnum::Actuator(ActuatorServiceServer::new(ActuatorServiceImpl::new( - Arc::new(StubActuator::new(operations_service.clone())), - ))), - ServiceEnum::ProcessManager(ProcessManagerServiceServer::new( - ProcessManagerServiceImpl::new(Arc::new(StubProcessManager::new())), - )), - ]) + fn create_services<'a>( + &'a self, + _operations_service: Arc, + ) -> Pin>> + Send + 'a>> { + Box::pin(async move { + Ok(vec![]) // or whatever the stub implementation should return + }) } fn shutdown(&mut self) -> eyre::Result<()> { diff --git a/platforms/stub/src/process_manager.rs b/platforms/stub/src/process_manager.rs index 5354966..13959cc 100644 --- a/platforms/stub/src/process_manager.rs +++ b/platforms/stub/src/process_manager.rs @@ -9,6 +9,12 @@ pub struct StubProcessManager { kclip_uuid: Mutex>, } +impl Default for StubProcessManager { + fn default() -> Self { + Self::new() + } +} + impl StubProcessManager { pub fn new() -> Self { StubProcessManager { diff --git a/pykos/Makefile b/pykos/Makefile index f47ff64..de2fe3a 100644 --- a/pykos/Makefile +++ b/pykos/Makefile @@ -38,6 +38,18 @@ generate-proto: mv kos/* kos_protos/ rm -rf kos touch kos_protos/__init__.py + + # Fix imports in all generated files + case "$$(uname)" in \ + Darwin) \ + find kos_protos -type f -name "*.py" -exec sed -i '' 's/from kos/from kos_protos/g' {} + && \ + find kos_protos -type f -name "*.py" -exec sed -i '' 's/import kos/import kos_protos/g' {} + \ + ;; \ + *) \ + find kos_protos -type f -name "*.py" -exec sed -i 's/from kos/from kos_protos/g' {} + && \ + find kos_protos -type f -name "*.py" -exec sed -i 's/import kos/import kos_protos/g' {} + \ + ;; \ + esac .PHONY: generate-proto diff --git a/pykos/pykos/__init__.py b/pykos/pykos/__init__.py index 47a9c1d..0381452 100644 --- a/pykos/pykos/__init__.py +++ b/pykos/pykos/__init__.py @@ -1,3 +1,5 @@ -__version__ = "0.1.2" +__version__ = "0.1.6" from pykos.client import KOS + +from . import services diff --git a/pykos/pykos/services/__init__.py b/pykos/pykos/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/pykos/pykos/services/actuator.py b/pykos/pykos/services/actuator.py index 5c836fc..ed4be3a 100644 --- a/pykos/pykos/services/actuator.py +++ b/pykos/pykos/services/actuator.py @@ -5,8 +5,9 @@ import grpc from google.longrunning import operations_pb2, operations_pb2_grpc from google.protobuf.any_pb2 import Any as AnyPb2 -from kos import actuator_pb2, actuator_pb2_grpc, common_pb2 -from kos.actuator_pb2 import CalibrateActuatorMetadata + +from kos_protos import actuator_pb2, actuator_pb2_grpc, common_pb2 +from kos_protos.actuator_pb2 import CalibrateActuatorMetadata class CalibrationStatus: diff --git a/pykos/pykos/services/imu.py b/pykos/pykos/services/imu.py index acccd28..1af283d 100644 --- a/pykos/pykos/services/imu.py +++ b/pykos/pykos/services/imu.py @@ -2,7 +2,8 @@ import grpc from google.protobuf.empty_pb2 import Empty -from kos import imu_pb2, imu_pb2_grpc + +from kos_protos import imu_pb2, imu_pb2_grpc class ImuValues: diff --git a/pykos/pykos/services/process_manager.py b/pykos/pykos/services/process_manager.py index 91710c8..c0a35c6 100644 --- a/pykos/pykos/services/process_manager.py +++ b/pykos/pykos/services/process_manager.py @@ -4,23 +4,28 @@ import grpc from google.protobuf.empty_pb2 import Empty -from kos import process_manager_pb2_grpc -from kos.common_pb2 import Error -from kos.process_manager_pb2 import KClipStartRequest + +from kos_protos import process_manager_pb2_grpc +from kos_protos.common_pb2 import Error +from kos_protos.process_manager_pb2 import KClipStartRequest class ProcessManagerServiceClient: def __init__(self, channel: grpc.Channel) -> None: self.stub = process_manager_pb2_grpc.ProcessManagerServiceStub(channel) - def start_kclip(self, request: KClipStartRequest) -> Tuple[Optional[str], Optional[Error]]: + def start_kclip(self, action: str) -> Tuple[Optional[str], Optional[Error]]: """Start KClip recording. + Args: + action: The action string for the KClip request + Returns: Tuple containing: - clip_uuid (str): UUID of the started clip, if successful - error (Error): Error details if the operation failed """ + request = KClipStartRequest(action=action) response = self.stub.StartKClip(request) return response.clip_uuid, response.error if response.HasField("error") else None diff --git a/pykos/pyproject.toml b/pykos/pyproject.toml index 4c8a3ef..4a79cbf 100644 --- a/pykos/pyproject.toml +++ b/pykos/pyproject.toml @@ -42,10 +42,13 @@ module = [ "google.longrunning.*", "kos.*", "pykos.*", - "kos.actuator_pb2_grpc", - "kos.imu_pb2_grpc", - "kos.actuator_pb2", - "kos.imu_pb2" + "kos_protos.actuator_pb2_grpc", + "kos_protos.process_manager_pb2_grpc", + "kos_protos.imu_pb2_grpc", + "kos_protos.common_pb2", + "kos_protos.actuator_pb2", + "kos_protos.process_manager_pb2", + "kos_protos.imu_pb2" ] ignore_missing_imports = true follow_imports = "skip" @@ -62,10 +65,9 @@ target-version = "py310" [tool.ruff.lint] -select = ["ANN", "D", "E", "F", "I", "N", "PGH", "PLC", "PLE", "PLR", "PLW", "W"] +select = ["ANN", "D", "E", "F", "G", "I", "N", "PGH", "PLC", "PLE", "PLR", "PLW", "TID", "W"] ignore = [ - "ANN101", "ANN102", "D101", "D102", "D103", "D104", "D105", "D106", "D107", "N812", "N817", "PLR0911", "PLR0912", "PLR0913", "PLR0915", "PLR2004", @@ -80,7 +82,7 @@ dummy-variable-rgx = "^(_+|(_+[a-zA-Z0-9_]*[a-zA-Z0-9]+?))$" [tool.ruff.lint.isort] -known-first-party = ["pykos", "tests"] +known-first-party = ["pykos", "kos_protos", "tests"] combine-as-imports = true [tool.ruff.lint.pydocstyle] diff --git a/pykos/setup.py b/pykos/setup.py index fa69d74..e25caba 100644 --- a/pykos/setup.py +++ b/pykos/setup.py @@ -68,7 +68,7 @@ def run(self) -> None: python_requires=">=3.8", install_requires=requirements, extras_require={"dev": requirements_dev}, - packages=["pykos", "kos_protos"], + packages=["pykos", "pykos.services", "kos_protos"], package_data={ "pykos": ["py.typed"], "kos_protos": ["py.typed"],