diff --git a/examples/ex_pose_goal.py b/examples/ex_pose_goal.py index 401fd19..fa701ad 100755 --- a/examples/ex_pose_goal.py +++ b/examples/ex_pose_goal.py @@ -25,12 +25,17 @@ def main(): # Declare parameters for position and orientation node.declare_parameter("position", [0.5, 0.0, 0.25]) node.declare_parameter("quat_xyzw", [1.0, 0.0, 0.0, 0.0]) - node.declare_parameter("cartesian", False) node.declare_parameter("synchronous", True) # If non-positive, don't cancel. Only used if synchronous is False node.declare_parameter("cancel_after_secs", 0.0) # Planner ID node.declare_parameter("planner_id", "RRTConnectkConfigDefault") + # Declare parameters for cartesian planning + node.declare_parameter("cartesian", False) + node.declare_parameter("cartesian_max_step", 0.0025) + node.declare_parameter("cartesian_fraction_threshold", 0.0) + node.declare_parameter("cartesian_jump_threshold", 0.0) + node.declare_parameter("cartesian_avoid_collisions", False) # Create callback group that allows execution of callbacks in parallel without restrictions callback_group = ReentrantCallbackGroup() @@ -62,17 +67,45 @@ def main(): # Get parameters position = node.get_parameter("position").get_parameter_value().double_array_value quat_xyzw = node.get_parameter("quat_xyzw").get_parameter_value().double_array_value - cartesian = node.get_parameter("cartesian").get_parameter_value().bool_value synchronous = node.get_parameter("synchronous").get_parameter_value().bool_value cancel_after_secs = ( node.get_parameter("cancel_after_secs").get_parameter_value().double_value ) + cartesian = node.get_parameter("cartesian").get_parameter_value().bool_value + cartesian_max_step = ( + node.get_parameter("cartesian_max_step").get_parameter_value().double_value + ) + cartesian_fraction_threshold = ( + node.get_parameter("cartesian_fraction_threshold") + .get_parameter_value() + .double_value + ) + cartesian_jump_threshold = ( + node.get_parameter("cartesian_jump_threshold") + .get_parameter_value() + .double_value + ) + cartesian_avoid_collisions = ( + node.get_parameter("cartesian_avoid_collisions") + .get_parameter_value() + .bool_value + ) + + # Set parameters for cartesian planning + moveit2.cartesian_avoid_collisions = cartesian_avoid_collisions + moveit2.cartesian_jump_threshold = cartesian_jump_threshold # Move to pose node.get_logger().info( f"Moving to {{position: {list(position)}, quat_xyzw: {list(quat_xyzw)}}}" ) - moveit2.move_to_pose(position=position, quat_xyzw=quat_xyzw, cartesian=cartesian) + moveit2.move_to_pose( + position=position, + quat_xyzw=quat_xyzw, + cartesian=cartesian, + cartesian_max_step=cartesian_max_step, + cartesian_fraction_threshold=cartesian_fraction_threshold, + ) if synchronous: # Note: the same functionality can be achieved by setting # `synchronous:=false` and `cancel_after_secs` to a negative value. diff --git a/pymoveit2/moveit2.py b/pymoveit2/moveit2.py index 45db2e5..b3918be 100644 --- a/pymoveit2/moveit2.py +++ b/pymoveit2/moveit2.py @@ -344,6 +344,8 @@ def move_to_pose( weight_position: float = 1.0, cartesian: bool = False, weight_orientation: float = 1.0, + cartesian_max_step: float = 0.0025, + cartesian_fraction_threshold: float = 0.0, ): """ Plan and execute motion based on previously set goals. Optional arguments can be @@ -428,6 +430,8 @@ def move_to_pose( weight_position=weight_position, weight_orientation=weight_orientation, cartesian=cartesian, + max_step=cartesian_max_step, + cartesian_fraction_threshold=cartesian_fraction_threshold, ) ) @@ -500,12 +504,18 @@ def plan( weight_joint_position: float = 1.0, start_joint_state: Optional[Union[JointState, List[float]]] = None, cartesian: bool = False, + max_step: float = 0.0025, + cartesian_fraction_threshold: float = 0.0, ) -> Optional[JointTrajectory]: """ Call plan_async and wait on future """ future = self.plan_async( - **{key: value for key, value in locals().items() if key != "self"} + **{ + key: value + for key, value in locals().items() + if key not in ["self", "cartesian_fraction_threshold"] + } ) if future is None: @@ -516,7 +526,11 @@ def plan( while not future.done(): rate.sleep() - return self.get_trajectory(future, cartesian=cartesian) + return self.get_trajectory( + future, + cartesian=cartesian, + cartesian_fraction_threshold=cartesian_fraction_threshold, + ) def plan_async( self, @@ -537,6 +551,7 @@ def plan_async( weight_joint_position: float = 1.0, start_joint_state: Optional[Union[JointState, List[float]]] = None, cartesian: bool = False, + max_step: float = 0.0025, ) -> Optional[Future]: """ Plan motion based on previously set goals. Optional arguments can be passed in to @@ -634,9 +649,10 @@ def plan_async( # Plan trajectory asynchronously by service call if cartesian: future = self._plan_cartesian_path( + max_step=max_step, frame_id=pose_stamped.header.frame_id if pose_stamped is not None - else frame_id + else frame_id, ) else: # Use service @@ -649,11 +665,17 @@ def plan_async( return future def get_trajectory( - self, future: Future, cartesian: bool = False + self, + future: Future, + cartesian: bool = False, + cartesian_fraction_threshold: float = 0.0, ) -> Optional[JointTrajectory]: """ Takes in a future returned by plan_async and returns the trajectory if the future is done and planning was successful, else None. + + For cartesian plans, the plan is rejected if the fraction of the path that was completed is + less than `cartesian_fraction_threshold`. """ if not future.done(): self._node.get_logger().warn( @@ -666,7 +688,14 @@ def get_trajectory( # Cartesian if cartesian: if MoveItErrorCodes.SUCCESS == res.error_code.val: - return res.solution.joint_trajectory + if res.fraction >= cartesian_fraction_threshold: + return res.solution.joint_trajectory + else: + self._node.get_logger().warn( + f"Planning failed! Cartesian planner completed {res.fraction} " + f"of the trajectory, less than the threshold {cartesian_fraction_threshold}." + ) + return None else: self._node.get_logger().warn( f"Planning failed! Error code: {res.error_code.val}." @@ -2130,6 +2159,14 @@ def __init_compute_ik(self): def follow_joint_trajectory_action_client(self) -> str: return self.__follow_joint_trajectory_action_client + @property + def end_effector_name(self) -> str: + return self.__end_effector_name + + @property + def base_link_name(self) -> str: + return self.__base_link_name + @property def joint_names(self) -> List[str]: return self.__joint_names @@ -2177,6 +2214,38 @@ def allowed_planning_time(self) -> float: def allowed_planning_time(self, value: float): self.__move_action_goal.request.allowed_planning_time = value + @property + def cartesian_avoid_collisions(self) -> bool: + return self.__cartesian_path_request.request.avoid_collisions + + @cartesian_avoid_collisions.setter + def cartesian_avoid_collisions(self, value: bool): + self.__cartesian_path_request.avoid_collisions = value + + @property + def cartesian_jump_threshold(self) -> float: + return self.__cartesian_path_request.request.jump_threshold + + @cartesian_jump_threshold.setter + def cartesian_jump_threshold(self, value: float): + self.__cartesian_path_request.jump_threshold = value + + @property + def cartesian_prismatic_jump_threshold(self) -> float: + return self.__cartesian_path_request.request.prismatic_jump_threshold + + @cartesian_prismatic_jump_threshold.setter + def cartesian_prismatic_jump_threshold(self, value: float): + self.__cartesian_path_request.prismatic_jump_threshold = value + + @property + def cartesian_revolute_jump_threshold(self) -> float: + return self.__cartesian_path_request.request.revolute_jump_threshold + + @cartesian_revolute_jump_threshold.setter + def cartesian_revolute_jump_threshold(self, value: float): + self.__cartesian_path_request.revolute_jump_threshold = value + @property def pipeline_id(self) -> int: return self.__move_action_goal.request.pipeline_id