From 9ca69e1da697767bf39ae5fe6f77646a34328f4d Mon Sep 17 00:00:00 2001 From: Maanas Date: Wed, 3 Apr 2024 12:48:44 -0400 Subject: [PATCH] Maanas Commit ROs2 --- .../mil_pneumatic_actuator/CMakeLists.txt | 23 +++-- .../launch/example.launch | 94 +++++++++---------- .../mil_pneumatic_actuator/board.py | 36 ++++++- .../mil_pneumatic_actuator/simulated_board.py | 20 ++-- .../nodes/pneumatic_actuator_node | 94 ++++++++----------- .../mil_pneumatic_actuator/package.xml | 22 +++-- 6 files changed, 150 insertions(+), 139 deletions(-) diff --git a/mil_common/drivers/mil_pneumatic_actuator/CMakeLists.txt b/mil_common/drivers/mil_pneumatic_actuator/CMakeLists.txt index 9c50fe661..0e0094708 100644 --- a/mil_common/drivers/mil_pneumatic_actuator/CMakeLists.txt +++ b/mil_common/drivers/mil_pneumatic_actuator/CMakeLists.txt @@ -1,18 +1,17 @@ -cmake_minimum_required(VERSION 3.0.2) +cmake_minimum_required(VERSION 3.5) project(mil_pneumatic_actuator) -find_package(catkin REQUIRED COMPONENTS - message_generation - rospy -) +# Find dependencies +find_package(ament_cmake REQUIRED) +find_package(rosidl_default_generators REQUIRED) -catkin_python_setup() +# Set C++ standard +set(CMAKE_CXX_STANDARD 14) -add_service_files( - FILES - SetValve.srv +# Generate ROS interfaces +rosidl_generate_interfaces(${PROJECT_NAME} + "srv/SetValve.srv" ) -generate_messages() - -catkin_package(CATKIN_DEPENDS message_runtime) +ament_export_dependencies(rosidl_default_runtime) +ament_package() diff --git a/mil_common/drivers/mil_pneumatic_actuator/launch/example.launch b/mil_common/drivers/mil_pneumatic_actuator/launch/example.launch index 4ef0bf41f..cf54a47b1 100644 --- a/mil_common/drivers/mil_pneumatic_actuator/launch/example.launch +++ b/mil_common/drivers/mil_pneumatic_actuator/launch/example.launch @@ -1,52 +1,44 @@ - - - - - - # Stores information about each actuator. - # Each actuator can be up to 2 physical valves (such as an extend/retract for a piston) - # Actuators can be of type 'set' (open / close atomically) or 'pulse' (open for a short time, then close) +from launch import LaunchDescription +from launch_ros.actions import Node +from launch.actions import SetParameter - # Simplest configuration, sets up a 'set' actuator where true opens id 7 and false closes id 7 - my_actuator: 7 - - # Example of a pulse actuator, will open 1 for 1 second then close - torpedo1: - type: 'pulse' - ports: - open_port: - id: 1 - default: 0 - close_port: - id: -1 - default: 0 - pulse_time: 1 - - # Example of a pulse actuator with 2 valves. When pulse, 3 opens and 4 closes, then both switch after 1 second - dropper: - type: 'pulse' - ports: - open_port: - # Drops - id: 3 - default: 0 - close_port: - # Reloads - id: 4 - default: 1 - pulse_time: 1 - - # Example of a 'set 'actuator with 2 valves. When set true, 6 closes and 5 opens. When false, 6 opens and 5 closes. - gripper: - type: 'set' - ports: - open_port: - id: 6 - default: 1 - close_port: - id: 5 - default: 0 - pulse_time: 1 - - - +def generate_launch_description(): + return LaunchDescription([ + SetParameter(name='/is_simulation', value='True'), + Node( + package='mil_pneumatic_actuator', + executable='pneumatic_actuator_node', + name='actuator_driver', + output='screen', + parameters=[{ + 'port': '/dev/serial/by-id/usb-MIL_Data_Merge_Board_Ports_5_to_8_DMBP58-if00-port0', + 'actuators': { + 'my_actuator': 7, + 'torpedo1': { + 'type': 'pulse', + 'ports': { + 'open_port': {'id': 1, 'default': 0}, + 'close_port': {'id': -1, 'default': 0} + }, + 'pulse_time': 1 + }, + 'dropper': { + 'type': 'pulse', + 'ports': { + 'open_port': {'id': 3, 'default': 0}, + 'close_port': {'id': 4, 'default': 1} + }, + 'pulse_time': 1 + }, + 'gripper': { + 'type': 'set', + 'ports': { + 'open_port': {'id': 6, 'default': 1}, + 'close_port': {'id': 5, 'default': 0} + }, + 'pulse_time': 1 + } + } + }] + ) + ]) diff --git a/mil_common/drivers/mil_pneumatic_actuator/mil_pneumatic_actuator/board.py b/mil_common/drivers/mil_pneumatic_actuator/mil_pneumatic_actuator/board.py index 0a9217691..07927f873 100644 --- a/mil_common/drivers/mil_pneumatic_actuator/mil_pneumatic_actuator/board.py +++ b/mil_common/drivers/mil_pneumatic_actuator/mil_pneumatic_actuator/board.py @@ -1,5 +1,5 @@ #!/usr/bin/env python3 -import threading +import threading #ros2 from typing import Optional import serial @@ -10,6 +10,8 @@ lock = threading.Lock() +import rclpy +from rclpy.node import Node class PnuematicActuatorDriverError(Exception): """ @@ -65,7 +67,7 @@ def __init__(self): super().__init__(message) -class PnuematicActuatorDriver: +class PnuematicActuatorDriver(Node): """ Allows high level ROS code to interface with Daniel's pneumatics board. @@ -77,7 +79,6 @@ class PnuematicActuatorDriver: design documentation. """ - # TODO: Add a function to try and reconnect to the serial port if we lose connection. def __init__(self, port: str, baud: int = 9600, simulated: bool = False): """ @@ -87,6 +88,8 @@ def __init__(self, port: str, baud: int = 9600, simulated: bool = False): simulated (bool): Whether to use a simulated actuator board class or an interface to the physical board. """ + super().__init__('pneumatic_actuator_driver') + if simulated: self.ser = SimulatedPnuematicActuatorBoard() else: @@ -205,6 +208,17 @@ def get_port(self, port: int) -> int: """ Reads the data at a specific port. + Args: + port (int): The port to read from. + + Raises: + PnuematicActuatorDriverResponseError: The expected response from the board + was not received. + PnuematicActuatorDriverChecksumError: The checksum expected and the checksum + received were not the same def get_port(self, port: int) -> int: + """ + Reads the data at a specific port. + Args: port (int): The port to read from. @@ -236,3 +250,19 @@ def ping(self) -> int: int: The response from the board. """ return self._send_request(Constants.PING_REQUEST, Constants.PING_RESPONSE) + +def main(args=None): + rclpy.init(args=args) + + node = PnuematicActuatorDriver(port='/dev/ttyUSB0', baud=9600, simulated=False) + + try: + rclpy.spin(node) + except KeyboardInterrupt: + pass + finally: + node.destroy_node() + rclpy.shutdown() + +if __name__ == '__main__': + main() diff --git a/mil_common/drivers/mil_pneumatic_actuator/mil_pneumatic_actuator/simulated_board.py b/mil_common/drivers/mil_pneumatic_actuator/mil_pneumatic_actuator/simulated_board.py index 14b8f7650..1595e0aba 100644 --- a/mil_common/drivers/mil_pneumatic_actuator/mil_pneumatic_actuator/simulated_board.py +++ b/mil_common/drivers/mil_pneumatic_actuator/mil_pneumatic_actuator/simulated_board.py @@ -1,17 +1,19 @@ #!/usr/bin/env python3 -import rospy +import rclpy +from rclpy.node import Node from mil_misc_tools.serial_tools import SimulatedSerial from .constants import Constants -class SimulatedPnuematicActuatorBoard(SimulatedSerial): +class SimulatedPnuematicActuatorBoard(SimulatedSerial, Node): """ A simulation of the pneumatic actuator board's serial protocol """ def __init__(self, *args, **kwargs): - super().__init__() + Node.__init__(self, 'simulated_pneumatic_actuator_board') + SimulatedSerial.__init__(self) def write(self, data: bytes): """ @@ -20,16 +22,16 @@ def write(self, data: bytes): request = Constants.deserialize_packet(data) request = request[0] if request == Constants.PING_REQUEST: - # rospy.loginfo("Ping received") + # self.get_logger().info("Ping received") byte = Constants.PING_RESPONSE - elif request > 0x20 and request < 0x30: - rospy.loginfo(f"Open port {request - 0x20}") + elif 0x20 < request < 0x30: + self.get_logger().info(f"Open port {request - 0x20}") byte = Constants.OPEN_RESPONSE - elif request > 0x30 and request < 0x40: - rospy.loginfo(f"Close port {request - 0x30}") + elif 0x30 < request < 0x40: + self.get_logger().info(f"Close port {request - 0x30}") byte = Constants.CLOSE_RESPONSE else: - rospy.loginfo("Default") + self.get_logger().info("Default") byte = 0x00 self.buffer += Constants.serialize_packet(byte) return len(data) diff --git a/mil_common/drivers/mil_pneumatic_actuator/nodes/pneumatic_actuator_node b/mil_common/drivers/mil_pneumatic_actuator/nodes/pneumatic_actuator_node index 920fb5a92..5d9e1a062 100755 --- a/mil_common/drivers/mil_pneumatic_actuator/nodes/pneumatic_actuator_node +++ b/mil_common/drivers/mil_pneumatic_actuator/nodes/pneumatic_actuator_node @@ -1,8 +1,9 @@ #!/usr/bin/env python3 import threading -import rospy -from mil_pneumatic_actuator import PnuematicActuatorDriver, PnuematicActuatorDriverError +import rclpy +from rclpy.node import Node +from mil_pneumatic_actuator import PneumaticActuatorDriver, PneumaticActuatorDriverError from mil_pneumatic_actuator.srv import SetValve from serial import SerialException from std_srvs.srv import Trigger @@ -58,55 +59,44 @@ class Actuator: return cls("set", valve_id, 0, -1, 0, 0.0) -class PnuematicActuatorNode: +class PneumaticActuatorNode(Node): """ - Allows high level ros code to interface with Daniel's pneumatics board. + Allows high level ROS code to interface with the pneumatics board. """ - # TODO: Add a function to try and reconnect to the serial port if we lose connection. - # TODO: publish info to /diagnostics def __init__(self): - # Connect to board of serial or simulated serial - self.baud_rate = rospy.get_param("~baud_rate", 9600) - self.port = rospy.get_param("~port") - self.is_simulation = rospy.get_param("/is_simulation", False) + super().__init__('pneumatic_actuator_driver') + self.baud_rate = self.get_parameter_or('baud_rate', 9600).value + self.port = self.get_parameter_or('port', '/dev/ttyUSB0').value + self.is_simulation = self.get_parameter_or('is_simulation', False).value self.connected = False while not self.connected: self.reconnect_and_ping() if not self.connected: - rospy.sleep(1) + rclpy.sleep(1) - # Parse actuator config from parameters. see example.launch for format - actuators = rospy.get_param("~actuators") + actuators = self.get_parameter_or('actuators', {}).value self.actuators = {} for a in actuators: self.actuators[a] = Actuator.from_dict(actuators[a]) - # Reset all actuators to default self.reset() - rospy.Service("~actuate", SetValve, self.got_service_request) - rospy.Service("~reset", Trigger, self.reset) + self.create_service(SetValve, '~actuate', self.got_service_request) + self.create_service(Trigger, '~reset', self.reset) - # Setup timer to ping every 5 second - self.timer = rospy.Timer(rospy.Duration(3.0), self.reconnect_and_ping) + self.timer = self.create_timer(3.0, self.reconnect_and_ping) def reset(self, *args): - """ - Reset all valves to default state - """ for actuator in self.actuators.values(): self.set_port(actuator.open_id, actuator.open_default) self.set_port(actuator.close_id, actuator.close_default) - rospy.loginfo("Valves set to default state") + self.get_logger().info("Valves set to default state") return {"success": True} def set_raw_valve(self, srv): - """ - Set a valve open/close by raw id - """ - rospy.loginfo( + self.get_logger().info( "Setting valve {} to {}".format( srv.actuator, "open" if srv.opened else "closed", @@ -116,70 +106,55 @@ class PnuematicActuatorNode: return {"success": True} def set_port(self, port, state): - """ - Open/Close a valve (by raw id), absorbing errors into logging - """ if port == -1: return False try: self.driver.set_port(port, state) - except (PnuematicActuatorDriverError, SerialException) as e: - rospy.logerr(f"Error interfacing with actuator board: {e}") + except (PneumaticActuatorDriverError, SerialException) as e: + self.get_logger().error(f"Error interfacing with actuator board: {e}") return False return True def reconnect_and_ping(self, *args): - """ - Try to ping board, reconnecting if needed - """ - # If board is disconnected, try to connect / reconnect if not self.connected: try: - self.driver = PnuematicActuatorDriver( + self.driver = PneumaticActuatorDriver( self.port, baud=self.baud_rate, simulated=self.is_simulation, ) except SerialException as e: - rospy.logwarn(e) + self.get_logger().warn(e) return - # Try to ping try: self.driver.ping() - except (PnuematicActuatorDriverError, SerialException) as e: - rospy.logwarn(f"Could not ping actuator board: {e}") + except (PneumaticActuatorDriverError, SerialException) as e: + self.get_logger().warn(f"Could not ping actuator board: {e}") if self.connected: - rospy.logerr("Board not responding to pings. Disconnected.") + self.get_logger().error("Board not responding to pings. Disconnected.") self.connected = False return - # If made it this far, you are not connected if not self.connected: - rospy.loginfo("Board connected!") + self.get_logger().info("Board connected!") self.connected = True def got_service_request(self, srv): - """ - Close/Open/Pulse the requested actuator when a service request is received - """ if not self.connected: return {"success": False, "message": "board not connected"} - # See if requested actuator is just the raw ID try: int(srv.actuator) return self.set_raw_valve(srv) except ValueError: pass - # Otherwise, make sure actuator was described in parameters if srv.actuator not in self.actuators: return {"success": False, "message": "actuator not registered"} actuator = self.actuators[srv.actuator] - # If actuator is pulse type, open it and setup timer to close after pulse time if actuator.type == "pulse": - rospy.loginfo(f"Pulsing {srv.actuator} for {actuator.pulse_time}s") + self.get_logger().info(f"Pulsing {srv.actuator} for {actuator.pulse_time}s") self.set_port(actuator.open_id, not actuator.open_default) self.set_port(actuator.close_id, not actuator.close_default) @@ -187,22 +162,27 @@ class PnuematicActuatorNode: self.set_port(actuator.open_id, actuator.open_default) self.set_port(actuator.close_id, actuator.close_default) - rospy.Timer(rospy.Duration(actuator.pulse_time), close, oneshot=True) + self.create_timer(actuator.pulse_time, close, oneshot=True) - # If actuator is set type, open/close as requested elif actuator.type == "set": if srv.opened: - rospy.loginfo(f"Opening {srv.actuator}") + self.get_logger().info(f"Opening {srv.actuator}") self.set_port(actuator.open_id, True) self.set_port(actuator.close_id, False) else: - rospy.loginfo(f"Closing {srv.actuator}") + self.get_logger().info(f"Closing {srv.actuator}") self.set_port(actuator.open_id, False) self.set_port(actuator.close_id, True) return {"success": True} +def main(args=None): + rclpy.init(args=args) + node = PneumaticActuatorNode() + rclpy.spin(node) + node.destroy_node() + rclpy.shutdown() + + if __name__ == "__main__": - rospy.init_node("actuator_driver") - PnuematicActuatorNode() - rospy.spin() + main() diff --git a/mil_common/drivers/mil_pneumatic_actuator/package.xml b/mil_common/drivers/mil_pneumatic_actuator/package.xml index 8efeb8d01..a9dbc8acf 100644 --- a/mil_common/drivers/mil_pneumatic_actuator/package.xml +++ b/mil_common/drivers/mil_pneumatic_actuator/package.xml @@ -1,13 +1,21 @@ - + + mil_pneumatic_actuator 0.0.0 - ROS interface to the MIL pneumatic acutator board + ROS interface to the MIL pneumatic actuator board Matthew Langford MIL - catkin - message_generation - rospy - message_runtime - rospy + + ament_cmake + rosidl_default_generators + rosidl_default_runtime + rclpy + rclpy + + + ament_cmake +