diff --git a/README.md b/README.md index 76612ee..3c1f88c 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,11 @@ cross build --release --target aarch64-unknown-linux-gnu --features kscale_pro RUST_LOG=debug cargo run --features stub ``` +You can specify logging levels for individual modules by adding `module_name=log_level` to the `RUST_LOG` environment variable. For example: +```bash +RUST_LOG=debug,krec=warn cargo run --features stub +``` + ## Contributing - Use `cargo fmt --all` to format the code. - Use `cargo clippy` to check for lint errors. diff --git a/kos_core/src/services/krec_logger.rs b/kos_core/src/services/krec_logger.rs index 2c5e4e2..db18979 100644 --- a/kos_core/src/services/krec_logger.rs +++ b/kos_core/src/services/krec_logger.rs @@ -8,10 +8,27 @@ use krec::{ }; use prost::Message; use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS}; +use serde::Deserialize; use std::path::Path; use std::sync::Arc; use tokio::sync::Mutex; +#[derive(Deserialize, Debug)] +struct ActuatorCommandData { + frame_number: u64, + video_timestamp: u64, + inference_step: u64, + data: Vec, +} + +#[derive(Deserialize, Debug)] +struct ActuatorCommandItem { + actuator_id: u32, + position: Option, + velocity: Option, + torque: Option, +} + pub struct TelemetryLogger { krec: Arc>, _mqtt_client: AsyncClient, @@ -50,22 +67,21 @@ impl TelemetryLogger { let current_inference_step = Arc::new(Mutex::new(0)); let current_frame = Arc::new(Mutex::new(KRecFrame::default())); - // Subscribe to relevant topics mqtt_client .subscribe( - format!("robots/{}/imu/values", robot_name), + format!("robots/{}-{}/imu/values", robot_name, robot_serial), QoS::AtLeastOnce, ) .await?; mqtt_client .subscribe( - format!("robots/{}/actuator/state", robot_name), + format!("robots/{}-{}/actuator/state", robot_name, robot_serial), QoS::AtLeastOnce, ) .await?; mqtt_client .subscribe( - format!("robots/{}/actuator/command", robot_name), + format!("robots/{}-{}/actuator/command", robot_name, robot_serial), QoS::AtLeastOnce, ) .await?; @@ -121,6 +137,8 @@ impl TelemetryLogger { }, quaternion: None, }); + } else { + tracing::error!("Failed to decode ImuValuesResponse {:?}", payload); } } else if topic.contains("/imu/quaternion") { if let Ok(quat) = QuaternionResponse::decode(payload.as_ref()) { @@ -136,6 +154,8 @@ impl TelemetryLogger { w: quat.w, }); } + } else { + tracing::error!("Failed to decode QuaternionResponse {:?}", payload); } } else if topic.contains("/actuator/state") { if let Ok(state) = ActuatorStateResponse::decode(payload.as_ref()) { @@ -149,16 +169,26 @@ impl TelemetryLogger { voltage: state.voltage, current: state.current, }); + } else { + tracing::error!("Failed to decode ActuatorStateResponse {:?}", payload); } } else if topic.contains("/actuator/command") { - if let Ok(command) = ActuatorCommand::decode(payload.as_ref()) { - frame.inference_step = *current_step.lock().await + 1; - frame.actuator_commands.push(ActuatorCommand { - actuator_id: command.actuator_id, - position: command.position as f32, - velocity: command.velocity as f32, - torque: command.torque as f32, - }); + match serde_json::from_slice::(&payload) { + Ok(command_data) => { + frame.inference_step = command_data.inference_step; + for item in command_data.data { + frame.actuator_commands.push(ActuatorCommand { + actuator_id: item.actuator_id, + position: item.position.unwrap_or_default() as f32, + velocity: item.velocity.unwrap_or_default() as f32, + torque: item.torque.unwrap_or_default() as f32, + }); + } + tracing::debug!("Parsed actuator command: {:?}", frame); + } + Err(e) => { + tracing::error!("Failed to parse actuator command JSON: {:?}", e); + } } } @@ -166,19 +196,15 @@ impl TelemetryLogger { let mut current = current_step.lock().await; if frame.inference_step > *current { // Add frame to KRec - if let Ok(mut krec) = krec_clone.try_lock() { - krec.add_frame(frame.clone()); - - // Save every 500 frames - if krec.frames.len() % 500 == 0 { - if let Err(e) = krec.save(&output_path) { - tracing::warn!("Failed to save KRec file: {}", e); - } else { - tracing::debug!( - "Saved {} frames to KRec file", - krec.frames.len() - ); - } + let mut krec = krec_clone.lock().await; + krec.add_frame(frame.clone()); + + // Save every 500 frames + if krec.frames.len() % 500 == 0 { + if let Err(e) = krec.save(&output_path) { + tracing::warn!("Failed to save KRec file: {}", e); + } else { + tracing::debug!("Saved {} frames to KRec file", krec.frames.len()); } } // Reset frame for next step diff --git a/platforms/kbot/src/process_manager.rs b/platforms/kbot/src/process_manager.rs index 5269fd6..15751ef 100644 --- a/platforms/kbot/src/process_manager.rs +++ b/platforms/kbot/src/process_manager.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use chrono::Local; use eyre::{eyre, Result, WrapErr}; use gstreamer as gst; use gstreamer::prelude::*; @@ -13,7 +14,6 @@ use std::env; use std::path::PathBuf; use tokio::sync::Mutex; use uuid::Uuid; -use chrono::Local; pub struct KBotProcessManager { kclip_uuid: Mutex>, diff --git a/pykos/pykos/client.py b/pykos/pykos/client.py index a047764..6f6db09 100644 --- a/pykos/pykos/client.py +++ b/pykos/pykos/client.py @@ -6,6 +6,7 @@ from pykos.services.imu import IMUServiceClient from pykos.services.process_manager import ProcessManagerServiceClient + class KOS: """KOS client. diff --git a/pykos/pykos/services/actuator.py b/pykos/pykos/services/actuator.py index 1b2dbaa..b25c4ae 100644 --- a/pykos/pykos/services/actuator.py +++ b/pykos/pykos/services/actuator.py @@ -90,7 +90,7 @@ def configure_actuator(self, actuator_id: int, **kwargs: Dict[str, Any]) -> comm request = actuator_pb2.ConfigureActuatorRequest(**config) return self.stub.ConfigureActuator(request) - def get_actuators_state(self, actuator_ids: List[int] = None) -> List[common_pb2.ActionResult]: + def get_actuators_state(self, actuator_ids: Optional[List[int]] = None) -> List[common_pb2.ActionResult]: """Get the state of multiple actuators. Args: diff --git a/pykos/pykos/services/process_manager.py b/pykos/pykos/services/process_manager.py index 84aac36..2e09151 100644 --- a/pykos/pykos/services/process_manager.py +++ b/pykos/pykos/services/process_manager.py @@ -1,4 +1,5 @@ -"""Process manager service client""" +"""Process manager service client.""" + from typing import Optional, Tuple import grpc diff --git a/pykos/setup.py b/pykos/setup.py index 9c20b21..e43bf66 100644 --- a/pykos/setup.py +++ b/pykos/setup.py @@ -37,7 +37,7 @@ install_requires=requirements, tests_require=requirements_dev, extras_require={"dev": requirements_dev}, - packages=["pykos"], + packages=["pykos", "kos"], package_data={ "pykos": ["py.typed"], },