From abff83d8e63259a0c2865f5c3792a84f86d4142d Mon Sep 17 00:00:00 2001 From: Denys Bezmenov Date: Wed, 4 Dec 2024 14:38:59 -0800 Subject: [PATCH] krec fixes --- daemon/src/main.rs | 3 +- kos_core/src/services/krec_logger.rs | 52 ++++++++++++++++++------- pykos/pykos/services/process_manager.py | 6 ++- 3 files changed, 45 insertions(+), 16 deletions(-) diff --git a/daemon/src/main.rs b/daemon/src/main.rs index 0513a30..7ead2ef 100644 --- a/daemon/src/main.rs +++ b/daemon/src/main.rs @@ -74,7 +74,8 @@ async fn main() -> Result<()> { .add_directive("rumqttc=error".parse().unwrap()) .add_directive("kos_core::telemetry=error".parse().unwrap()) .add_directive("polling=error".parse().unwrap()) - .add_directive("async_io=error".parse().unwrap()), + .add_directive("async_io=error".parse().unwrap()) + .add_directive("krec=error".parse().unwrap()), ) .init(); diff --git a/kos_core/src/services/krec_logger.rs b/kos_core/src/services/krec_logger.rs index 0956cd0..c7eca70 100644 --- a/kos_core/src/services/krec_logger.rs +++ b/kos_core/src/services/krec_logger.rs @@ -29,6 +29,26 @@ struct ActuatorCommandItem { torque: Option, } +#[derive(Deserialize, Debug)] +struct ActuatorStateData { + actuator_id: u32, + online: bool, + position: Option, + velocity: Option, + torque: Option, + temperature: Option, + voltage: Option, + current: Option, +} + +#[derive(Deserialize, Debug)] +struct ActuatorStateList { + frame_number: u64, + video_timestamp: u64, + inference_step: u64, + data: Vec, +} + pub struct TelemetryLogger { krec: Arc>, _mqtt_client: AsyncClient, @@ -158,19 +178,24 @@ impl TelemetryLogger { tracing::error!("Failed to decode QuaternionResponse {:?}", payload); } } else if topic.contains("/actuator/state") { - if let Ok(state) = ActuatorStateResponse::decode(payload.as_ref()) { - frame.actuator_states.push(ActuatorState { - actuator_id: state.actuator_id, - online: state.online, - position: state.position, - velocity: state.velocity, - torque: state.torque, - temperature: state.temperature, - voltage: state.voltage, - current: state.current, - }); - } else { - tracing::error!("Failed to decode ActuatorStateResponse {:?}", payload); + match serde_json::from_slice::(&payload) { + Ok(state_list) => { + for state in state_list.data { + frame.actuator_states.push(ActuatorState { + actuator_id: state.actuator_id, + online: state.online, + position: state.position, + velocity: state.velocity, + torque: state.torque, + temperature: state.temperature, + voltage: state.voltage, + current: state.current, + }); + } + } + Err(e) => { + tracing::error!("Failed to parse actuator state JSON: {:?}", e); + } } } else if topic.contains("/actuator/command") { match serde_json::from_slice::(&payload) { @@ -187,7 +212,6 @@ impl TelemetryLogger { torque: item.torque.unwrap_or_default() as f32, }); } - tracing::debug!("Parsed actuator command: {:?}", frame); } Err(e) => { tracing::error!("Failed to parse actuator command JSON: {:?}", e); diff --git a/pykos/pykos/services/process_manager.py b/pykos/pykos/services/process_manager.py index d83f9b4..c0a35c6 100644 --- a/pykos/pykos/services/process_manager.py +++ b/pykos/pykos/services/process_manager.py @@ -14,14 +14,18 @@ class ProcessManagerServiceClient: def __init__(self, channel: grpc.Channel) -> None: self.stub = process_manager_pb2_grpc.ProcessManagerServiceStub(channel) - def start_kclip(self, request: KClipStartRequest) -> Tuple[Optional[str], Optional[Error]]: + def start_kclip(self, action: str) -> Tuple[Optional[str], Optional[Error]]: """Start KClip recording. + Args: + action: The action string for the KClip request + Returns: Tuple containing: - clip_uuid (str): UUID of the started clip, if successful - error (Error): Error details if the operation failed """ + request = KClipStartRequest(action=action) response = self.stub.StartKClip(request) return response.clip_uuid, response.error if response.HasField("error") else None