Skip to content

Commit

Permalink
Fixed all ruff errors.
Browse files Browse the repository at this point in the history
  • Loading branch information
senthurayyappan committed Nov 14, 2024
1 parent 195da86 commit a2945a7
Show file tree
Hide file tree
Showing 28 changed files with 210 additions and 136 deletions.
3 changes: 2 additions & 1 deletion examples/basic_motion.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def make_periodic_trajectory(period, minimum, maximum):
ankle.set_output_position(ankle_setpoint)

print(
f"Ankle Desired {ankle_setpoint:+.2f} rad, Ankle Actual {ankle.output_position:+.2f} rad, Knee Desired {knee_setpoint:+.2f} rad, Ankle Desired {knee.output_position:+.2f} rad",
f"Ankle Desired {ankle_setpoint:+.2f} rad, Ankle Actual {ankle.output_position:+.2f} rad, \
Knee Desired {knee_setpoint:+.2f} rad, Ankle Desired {knee.output_position:+.2f} rad",
end="\r",
)

Expand Down
5 changes: 4 additions & 1 deletion examples/fsm_walking_compiled_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@

# Test print to ensure external library call works
print(
f"Current time in state {outputs.current_state}: {outputs.time_in_current_state:.2f} seconds, Knee Eq {outputs.knee_impedance.eq_angle:.2f}, Ankle Eq {outputs.ankle_impedance.eq_angle:.2f}, Fz {loadcell.fz:.2f}",
f"Current time in state {outputs.current_state}: {outputs.time_in_current_state:.2f} seconds, \
Knee Eq {outputs.knee_impedance.eq_angle:.2f}, \
Ankle Eq {outputs.ankle_impedance.eq_angle:.2f}, \
Fz {loadcell.fz:.2f}",
end="\r",
)

Expand Down
6 changes: 4 additions & 2 deletions opensourceleg/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""An open-source software library for numerical computation, data acquisition, and control of lower-limb robotic prostheses."""
"""
An open-source software library for numerical computation, data acquisition,
and control of lower-limb robotic prostheses.
"""

import sys
from importlib import metadata as importlib_metadata


Expand Down
6 changes: 3 additions & 3 deletions opensourceleg/actuators/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
from .base import *
from .decorators import *
from .dephy import *
from .base import * # noqa: F403
from .decorators import * # noqa: F403
from .dephy import * # noqa: F403
40 changes: 26 additions & 14 deletions opensourceleg/actuators/dephy.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ def start(self) -> None:
except OSError:
print("\n")
LOGGER.error(
msg=f"[{self.__repr__()}] Need admin previleges to open the port '{self.port}'. \n\nPlease run the script with 'sudo' command or add the user to the dialout group.\n"
msg=f"[{self.__repr__()}] Need admin previleges to open the port '{self.port}'. \n\n \
Please run the script with 'sudo' command or add the user to the dialout group.\n"
)
os._exit(status=1)

Expand Down Expand Up @@ -213,14 +214,16 @@ def update(self) -> None:
)
if self.case_temperature >= self.max_case_temperature:
LOGGER.error(
msg=f"[{str.upper(self.tag)}] Case thermal limit {self.max_case_temperature} reached. Stopping motor."
msg=f"[{str.upper(self.tag)}] Case thermal limit {self.max_case_temperature} reached. \
Stopping motor."
)
# self.stop()
raise ThermalLimitException()

if self.winding_temperature >= self.max_winding_temperature:
LOGGER.error(
msg=f"[{str.upper(self.tag)}] Winding thermal limit {self.max_winding_temperature} reached. Stopping motor."
msg=f"[{str.upper(self.tag)}] Winding thermal limit {self.max_winding_temperature} reached. \
Stopping motor."
)
raise ThermalLimitException()
# Check for thermal fault, bit 2 of the execute status byte
Expand Down Expand Up @@ -252,8 +255,10 @@ def home(
joint_direction (int): Direction to move the joint during homing. Default is -1.
joint_position_offset (float): Offset in radians to add to the joint position. Default is 0.0.
motor_position_offset (float): Offset in radians to add to the motor position. Default is 0.0.
current_threshold (int): Current threshold in mA to stop homing the joint or actuator. This is used to detect if the actuator or joint has hit a hard stop. Default is 5000 mA.
velocity_threshold (float): Velocity threshold in rad/s to stop homing the joint or actuator. This is also used to detect if the actuator or joint has hit a hard stop. Default is 0.001 rad/s.
current_threshold (int): Current threshold in mA to stop homing the joint or actuator.
This is used to detect if the actuator or joint has hit a hard stop. Default is 5000 mA.
velocity_threshold (float): Velocity threshold in rad/s to stop homing the joint or actuator.
This is also used to detect if the actuator or joint has hit a hard stop. Default is 0.001 rad/s.
"""
is_homing = True
Expand Down Expand Up @@ -305,7 +310,8 @@ def home(
self.set_encoder_map(np.polynomial.polynomial.Polynomial(coef=coefficients))
else:
LOGGER.debug(
msg=f"[{self.__repr__()}] No encoder map found. Please call the make_encoder_map method to create one. The encoder map is used to estimate joint position more accurately."
msg=f"[{self.__repr__()}] No encoder map found. Please call the make_encoder_map method \
to create one. The encoder map is used to estimate joint position more accurately."
)

def make_encoder_map(self, overwrite=False) -> None:
Expand Down Expand Up @@ -345,15 +351,17 @@ def make_encoder_map(self, overwrite=False) -> None:
_output_position_array = []

LOGGER.info(
msg=f"[{self.__repr__()}] Please manually move the {self.tag} joint numerous times through its full range of motion for 10 seconds. \n{input('Press any key when you are ready to start.')}"
msg=f"[{self.__repr__()}] Please manually move the {self.tag} joint numerous times through \
its full range of motion for 10 seconds. \n{input('Press any key when you are ready to start.')}"
)

_start_time: float = time.time()

try:
while time.time() - _start_time < 10:
LOGGER.info(
msg=f"[{self.__repr__()}] Mapping the {self.tag} joint encoder: {10 - time.time() + _start_time} seconds left."
msg=f"[{self.__repr__()}] Mapping the {self.tag} \
joint encoder: {10 - time.time() + _start_time} seconds left."
)
self.update()
_joint_encoder_array.append(self.joint_encoder_counts)
Expand Down Expand Up @@ -876,8 +884,8 @@ def gyroz(self) -> float:
def thermal_scaling_factor(self) -> float:
"""
Scale factor to use in torque control, in [0,1].
If you scale the torque command by this factor, the motor temperature will never exceed max allowable temperature.
For a proof, see paper referenced in thermal model.
If you scale the torque command by this factor, the motor temperature will never
exceed max allowable temperature. For a proof, see paper referenced in thermal model.
"""
return self._thermal_scale

Expand Down Expand Up @@ -1010,7 +1018,8 @@ def start(self) -> None:
except OSError:
print("\n")
LOGGER.error(
msg=f"[{self.__repr__()}] Need admin previleges to open the port '{self.port}'. \n\nPlease run the script with 'sudo' command or add the user to the dialout group.\n"
msg=f"[{self.__repr__()}] Need admin previleges to open the port '{self.port}'. \n\n \
Please run the script with 'sudo' command or add the user to the dialout group.\n"
)
os._exit(status=1)

Expand Down Expand Up @@ -1040,20 +1049,23 @@ def update(self) -> None:
)
if self.case_temperature >= self.max_case_temperature:
LOGGER.error(
msg=f"[{str.upper(self.tag)}] Case thermal limit {self.max_case_temperature} reached. Stopping motor."
msg=f"[{str.upper(self.tag)}] Case thermal limit {self.max_case_temperature} reached. \
Stopping motor."
)
raise ThermalLimitException()

if self.winding_temperature >= self.max_winding_temperature:
LOGGER.error(
msg=f"[{str.upper(self.tag)}] Winding thermal limit {self.max_winding_temperature} reached. Stopping motor."
msg=f"[{str.upper(self.tag)}] Winding thermal limit {self.max_winding_temperature} reached. \
Stopping motor."
)
raise ThermalLimitException()
# Check for thermal fault, bit 2 of the execute status byte

if self._data.status_ex & 0b00000010 == 0b00000010:
LOGGER.error(
msg=f"[{str.upper(self.tag)}] Thermal Fault: Winding temperature: {self.winding_temperature}; Case temperature: {self.case_temperature}."
msg=f"[{str.upper(self.tag)}] Thermal Fault: Winding temperature: {self.winding_temperature}; \
Case temperature: {self.case_temperature}."
)
raise ThermalLimitException("Internal thermal limit tripped.")

Expand Down
17 changes: 12 additions & 5 deletions opensourceleg/actuators/moteus.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import math
import os
from typing import ClassVar, Optional

import numpy as np
from moteus import Command, Controller, Stream
Expand Down Expand Up @@ -73,7 +74,7 @@ class MoteusQueryResolution:
aux1_gpio = mp.IGNORE
aux2_gpio = mp.IGNORE

_extra = {
_extra: ClassVar = {
MoteusRegister.COMMAND_POSITION: mp.F32,
MoteusRegister.COMMAND_VELOCITY: mp.F32,
MoteusRegister.COMMAND_FEEDFORWARD_TORQUE: mp.F32,
Expand Down Expand Up @@ -166,8 +167,11 @@ def __init__(
gear_ratio: float = 1.0,
frequency: int = 500,
offline: bool = False,
query: MoteusQueryResolution = MoteusQueryResolution(),
query: Optional[MoteusQueryResolution] = None,
) -> None:
if query is None:
query = MoteusQueryResolution()

self._servo_id = servo_id
self._bus_id = bus_id
super().__init__(
Expand Down Expand Up @@ -223,7 +227,8 @@ async def start(self) -> None:
except OSError:
print("\n")
LOGGER.error(
msg=f"[{self.__repr__()}] Need admin previleges to open the port. \n\nPlease run the script with 'sudo' command or add the user to the dialout group.\n"
msg=f"[{self.__repr__()}] Need admin previleges to open the port. \n\n \
Please run the script with 'sudo' command or add the user to the dialout group.\n"
)
os._exit(status=1)

Expand Down Expand Up @@ -262,7 +267,8 @@ async def update(self):

if self.winding_temperature >= self.max_winding_temperature:
LOGGER.error(
msg=f"[{str.upper(self.tag)}] Winding thermal limit {self.max_winding_temperature} reached. Stopping motor."
msg=f"[{str.upper(self.tag)}] Winding thermal limit {self.max_winding_temperature} reached. \
Stopping motor."
)
raise ThermalLimitException()

Expand Down Expand Up @@ -531,7 +537,8 @@ def winding_temperature(self) -> float:
def thermal_scaling_factor(self) -> float:
"""
Scale factor to use in torque control, in [0,1].
If you scale the torque command by this factor, the motor temperature will never exceed max allowable temperature.
If you scale the torque command by this factor, the motor temperature will
never exceed max allowable temperature.
For a proof, see paper referenced in thermal model.
"""
return self._thermal_scale
Expand Down
48 changes: 22 additions & 26 deletions opensourceleg/actuators/tmotor.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,7 @@ def stop(self):
def home(self):
pass

# this method is called by the user to synchronize the current state used by the controller
# with the most recent message recieved
def update(self):
def update(self): # noqa: C901
"""
This method is called by the user to synchronize the current state used by the controller
with the most recent message recieved, as well as to send the current command.
Expand All @@ -213,11 +211,12 @@ def update(self):
# print(self._command_sent)
now = time.time()
if (now - self._last_command_time) < 0.25 and ((now - self._last_update_time) > 0.1):
# print("State update requested but no data recieved from motor. Delay longer after zeroing, decrease frequency, or check connection.")
warnings.warn(
"State update requested but no data from motor. Delay longer after zeroing, decrease frequency, or check connection. "
"State update requested but no data from motor. Delay longer after zeroing, \
decrease frequency, or check connection. "
+ self.device_info_string(),
RuntimeWarning,
stacklevel=2,
)
else:
self._command_sent = False
Expand Down Expand Up @@ -251,8 +250,10 @@ def update(self):
elif (thresh_pos <= old_pos and old_pos <= P_max) and (-P_max <= new_pos and new_pos <= -thresh_pos):
self._times_past_position_limit += 1

# current is basically the same as position, but if you instantly command a switch it can actually change fast enough
# to throw this off, so that is accounted for too. We just put a hard limit on the current to solve current jitter problems.
# current is basically the same as position, but if you instantly
# command a switch it can actually change fast enough
# to throw this off, so that is accounted for too. We just put a hard limit on the current
# to solve current jitter problems.
if (thresh_curr <= new_curr and new_curr <= I_max) and (-I_max <= old_curr and old_curr <= -thresh_curr):
# self._old_current_zone = -1
# if (thresh_curr <= curr_command and curr_command <= I_max):
Expand Down Expand Up @@ -294,11 +295,6 @@ def update(self):

# send current motor command
self._send_command()

# # writing to log file
# if self.csv_file_name is not None:
# self.csv_writer.writerow([self._last_update_time - self._start_time] + [self.LOG_FUNCTIONS[var]() for var in self.log_vars])

self._updated = False

# sends a command to the motor depending on whats controlm mode the motor is in
Expand All @@ -311,8 +307,6 @@ def _send_command(self):
This allows control based on actual q-axis current, rather than estimated torque, which
doesn't account for friction losses.
"""
# if self._control_state == _TMotorManState.FULL_STATE:
# self._canman.MIT_controller(self.ID,self.type, self._command.position, self._command.velocity, self._command.kp, self._command.kd, self.qaxis_current_to_TMotor_current(self._command.current))
if self.mode == CONTROL_MODES.IMPEDANCE:
self._canman.MIT_controller(
self.ID,
Expand Down Expand Up @@ -426,8 +420,18 @@ def set_impedance_gains(self, kp=0, ki=0, K=0.08922, B=0.0038070, ff=0):
B: The damping in Nm/(rad/s)
ff: A dummy argument for backward compatibility with the dephy library.
"""
assert isfinite(K) and MIT_Params[self.type]["Kp_min"] <= K and MIT_Params[self.type]["Kp_max"] >= K
assert isfinite(B) and MIT_Params[self.type]["Kd_min"] <= B and MIT_Params[self.type]["Kd_max"] >= B
if not (isfinite(K) and MIT_Params[self.type]["Kp_min"] <= K and MIT_Params[self.type]["Kp_max"] >= K):
raise ValueError(
f"K must be finite and between \
{MIT_Params[self.type]['Kp_min']} and {MIT_Params[self.type]['Kp_max']}"
)

if not (isfinite(B) and MIT_Params[self.type]["Kd_min"] <= B and MIT_Params[self.type]["Kd_max"] >= B):
raise ValueError(
f"B must be finite and between \
{MIT_Params[self.type]['Kd_min']} and {MIT_Params[self.type]['Kd_max']}"
)

self._command.kp = K
self._command.kd = B
self._command.velocity = 0.0
Expand Down Expand Up @@ -470,18 +474,13 @@ def set_output_position(self, value):
Raises:
RuntimeError: If the position command is outside the range of the motor.
"""
# position commands must be within a certain range :/
# pos = (np.abs(pos) % MIT_Params[self.type]["P_max"])*np.sign(pos) # this doesn't work because it will unwind itself!
# CANNOT Control using impedance mode for angles greater than 12.5 rad!!
if np.abs(value) >= MIT_Params[self.type]["P_max"]:
raise RuntimeError(
"Cannot control using impedance mode for angles with magnitude greater than "
+ str(MIT_Params[self.type]["P_max"])
+ "rad!"
)

# if self._control_state not in [_TMotorManState.IMPEDANCE, _TMotorManState.FULL_STATE]:
# raise RuntimeError("Attempted to send position command without gains for device " + self.device_info_string())
self._command.position = value

def set_output_velocity(self, value):
Expand All @@ -503,8 +502,6 @@ def set_output_velocity(self, value):
+ "rad/s!"
)

# if self._control_state not in [_TMotorManState.SPEED, _TMotorManState.FULL_STATE]:
# raise RuntimeError("Attempted to send speed command without gains for device " + self.device_info_string())
self._command.velocity = value

# used for either current MIT mode to set current
Expand All @@ -518,8 +515,6 @@ def set_motor_current(self, value):
Args:
value: the desired current in amps.
"""
# if self._control_state not in [_TMotorManState.CURRENT, _TMotorManState.FULL_STATE]:
# raise RuntimeError("Attempted to send current command before entering current mode for device " + self.device_info_string())
self._command.current = value

# used for either current or MIT Mode to set current, based on desired torque
Expand Down Expand Up @@ -636,7 +631,8 @@ def check_can_connection(self) -> bool:
"""
if not self._entered:
raise RuntimeError(
"Tried to check_can_connection before entering motor control! Enter control using the __enter__ method, or instantiating the TMotorManager in a with block."
"Tried to check_can_connection before entering motor control! \
Enter control using the __enter__ method, or instantiating the TMotorManager in a with block."
)
Listener = can.BufferedReader()
self._canman.notifier.add_listener(Listener)
Expand Down
10 changes: 5 additions & 5 deletions opensourceleg/collections/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ class Validator(ABC):
def __set_name__(self, owner, name):
self.private_name = f"_{name}"

def __get__(self, object, objtype=None):
return getattr(object, self.private_name)
def __get__(self, instance, objtype=None):
return getattr(instance, self.private_name)

def __set__(self, object, value):
def __set__(self, instance, value):
self.validate(value)
setattr(object, self.private_name, value)
setattr(instance, self.private_name, value)

@abstractmethod
def validate(self, value):
Expand All @@ -24,7 +24,7 @@ def __init__(self, min_value=None, max_value=None) -> None:

def validate(self, value):
if not isinstance(value, (int, float)):
raise ValueError("Value must be an int or float")
raise TypeError("Value must be an int or float")

if self.min_value is not None and value < self.min_value:
raise ValueError(f"Value must be at least {self.min_value}")
Expand Down
Loading

0 comments on commit a2945a7

Please sign in to comment.