Skip to content

Commit

Permalink
Merge pull request #10 from kscalelabs/fix-telemetry
Browse files Browse the repository at this point in the history
Fix telemetry and logging
  • Loading branch information
hatomist authored Nov 24, 2024
2 parents 80cf1b9 + 7e37304 commit 934e21c
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 29 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ cross build --release --target aarch64-unknown-linux-gnu --features kscale_pro
RUST_LOG=debug cargo run --features stub
```

You can specify logging levels for individual modules by adding `module_name=log_level` to the `RUST_LOG` environment variable. For example:
```bash
RUST_LOG=debug,krec=warn cargo run --features stub
```

## Contributing
- Use `cargo fmt --all` to format the code.
- Use `cargo clippy` to check for lint errors.
Expand Down
76 changes: 51 additions & 25 deletions kos_core/src/services/krec_logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,27 @@ use krec::{
};
use prost::Message;
use rumqttc::{AsyncClient, Event, MqttOptions, Packet, QoS};
use serde::Deserialize;
use std::path::Path;
use std::sync::Arc;
use tokio::sync::Mutex;

#[derive(Deserialize, Debug)]
struct ActuatorCommandData {
frame_number: u64,
video_timestamp: u64,
inference_step: u64,
data: Vec<ActuatorCommandItem>,
}

#[derive(Deserialize, Debug)]
struct ActuatorCommandItem {
actuator_id: u32,
position: Option<f64>,
velocity: Option<f64>,
torque: Option<f64>,
}

pub struct TelemetryLogger {
krec: Arc<Mutex<KRec>>,
_mqtt_client: AsyncClient,
Expand Down Expand Up @@ -50,22 +67,21 @@ impl TelemetryLogger {
let current_inference_step = Arc::new(Mutex::new(0));
let current_frame = Arc::new(Mutex::new(KRecFrame::default()));

// Subscribe to relevant topics
mqtt_client
.subscribe(
format!("robots/{}/imu/values", robot_name),
format!("robots/{}-{}/imu/values", robot_name, robot_serial),
QoS::AtLeastOnce,
)
.await?;
mqtt_client
.subscribe(
format!("robots/{}/actuator/state", robot_name),
format!("robots/{}-{}/actuator/state", robot_name, robot_serial),
QoS::AtLeastOnce,
)
.await?;
mqtt_client
.subscribe(
format!("robots/{}/actuator/command", robot_name),
format!("robots/{}-{}/actuator/command", robot_name, robot_serial),
QoS::AtLeastOnce,
)
.await?;
Expand Down Expand Up @@ -121,6 +137,8 @@ impl TelemetryLogger {
},
quaternion: None,
});
} else {
tracing::error!("Failed to decode ImuValuesResponse {:?}", payload);
}
} else if topic.contains("/imu/quaternion") {
if let Ok(quat) = QuaternionResponse::decode(payload.as_ref()) {
Expand All @@ -136,6 +154,8 @@ impl TelemetryLogger {
w: quat.w,
});
}
} else {
tracing::error!("Failed to decode QuaternionResponse {:?}", payload);
}
} else if topic.contains("/actuator/state") {
if let Ok(state) = ActuatorStateResponse::decode(payload.as_ref()) {
Expand All @@ -149,36 +169,42 @@ impl TelemetryLogger {
voltage: state.voltage,
current: state.current,
});
} else {
tracing::error!("Failed to decode ActuatorStateResponse {:?}", payload);
}
} else if topic.contains("/actuator/command") {
if let Ok(command) = ActuatorCommand::decode(payload.as_ref()) {
frame.inference_step = *current_step.lock().await + 1;
frame.actuator_commands.push(ActuatorCommand {
actuator_id: command.actuator_id,
position: command.position as f32,
velocity: command.velocity as f32,
torque: command.torque as f32,
});
match serde_json::from_slice::<ActuatorCommandData>(&payload) {
Ok(command_data) => {
frame.inference_step = command_data.inference_step;
for item in command_data.data {
frame.actuator_commands.push(ActuatorCommand {
actuator_id: item.actuator_id,
position: item.position.unwrap_or_default() as f32,
velocity: item.velocity.unwrap_or_default() as f32,
torque: item.torque.unwrap_or_default() as f32,
});
}
tracing::debug!("Parsed actuator command: {:?}", frame);
}
Err(e) => {
tracing::error!("Failed to parse actuator command JSON: {:?}", e);
}
}
}

// Check if inference step has increased
let mut current = current_step.lock().await;
if frame.inference_step > *current {
// Add frame to KRec
if let Ok(mut krec) = krec_clone.try_lock() {
krec.add_frame(frame.clone());

// Save every 500 frames
if krec.frames.len() % 500 == 0 {
if let Err(e) = krec.save(&output_path) {
tracing::warn!("Failed to save KRec file: {}", e);
} else {
tracing::debug!(
"Saved {} frames to KRec file",
krec.frames.len()
);
}
let mut krec = krec_clone.lock().await;
krec.add_frame(frame.clone());

// Save every 500 frames
if krec.frames.len() % 500 == 0 {
if let Err(e) = krec.save(&output_path) {
tracing::warn!("Failed to save KRec file: {}", e);
} else {
tracing::debug!("Saved {} frames to KRec file", krec.frames.len());
}
}
// Reset frame for next step
Expand Down
2 changes: 1 addition & 1 deletion platforms/kbot/src/process_manager.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_trait::async_trait;
use chrono::Local;
use eyre::{eyre, Result, WrapErr};
use gstreamer as gst;
use gstreamer::prelude::*;
Expand All @@ -13,7 +14,6 @@ use std::env;
use std::path::PathBuf;
use tokio::sync::Mutex;
use uuid::Uuid;
use chrono::Local;

pub struct KBotProcessManager {
kclip_uuid: Mutex<Option<String>>,
Expand Down
1 change: 1 addition & 0 deletions pykos/pykos/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pykos.services.imu import IMUServiceClient
from pykos.services.process_manager import ProcessManagerServiceClient


class KOS:
"""KOS client.
Expand Down
2 changes: 1 addition & 1 deletion pykos/pykos/services/actuator.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def configure_actuator(self, actuator_id: int, **kwargs: Dict[str, Any]) -> comm
request = actuator_pb2.ConfigureActuatorRequest(**config)
return self.stub.ConfigureActuator(request)

def get_actuators_state(self, actuator_ids: List[int] = None) -> List[common_pb2.ActionResult]:
def get_actuators_state(self, actuator_ids: Optional[List[int]] = None) -> List[common_pb2.ActionResult]:
"""Get the state of multiple actuators.
Args:
Expand Down
3 changes: 2 additions & 1 deletion pykos/pykos/services/process_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Process manager service client"""
"""Process manager service client."""

from typing import Optional, Tuple

import grpc
Expand Down
2 changes: 1 addition & 1 deletion pykos/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
install_requires=requirements,
tests_require=requirements_dev,
extras_require={"dev": requirements_dev},
packages=["pykos"],
packages=["pykos", "kos"],
package_data={
"pykos": ["py.typed"],
},
Expand Down

0 comments on commit 934e21c

Please sign in to comment.