Skip to content

Commit

Permalink
Allow users to set various cartesian path service parameters (#58)
Browse files Browse the repository at this point in the history
* Added ability to have cartesian planning calls avoid collisions

* Fix bad merge

* Allow users to set cartesian jump threshold

* Allow users to set cartesian max step

* Pass frame_id into the cartesian planning service

* Add ability to specify a required fraction for a cartesian plan to succeed

* Allowed users to get end effector name and base link name

* Add revolute and prismatic jump thresholds

* Formatting

* Added test cases
  • Loading branch information
amalnanavati authored Apr 4, 2024
1 parent fe26066 commit 3f87e3b
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 8 deletions.
39 changes: 36 additions & 3 deletions examples/ex_pose_goal.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ def main():
# Declare parameters for position and orientation
node.declare_parameter("position", [0.5, 0.0, 0.25])
node.declare_parameter("quat_xyzw", [1.0, 0.0, 0.0, 0.0])
node.declare_parameter("cartesian", False)
node.declare_parameter("synchronous", True)
# If non-positive, don't cancel. Only used if synchronous is False
node.declare_parameter("cancel_after_secs", 0.0)
# Planner ID
node.declare_parameter("planner_id", "RRTConnectkConfigDefault")
# Declare parameters for cartesian planning
node.declare_parameter("cartesian", False)
node.declare_parameter("cartesian_max_step", 0.0025)
node.declare_parameter("cartesian_fraction_threshold", 0.0)
node.declare_parameter("cartesian_jump_threshold", 0.0)
node.declare_parameter("cartesian_avoid_collisions", False)

# Create callback group that allows execution of callbacks in parallel without restrictions
callback_group = ReentrantCallbackGroup()
Expand Down Expand Up @@ -62,17 +67,45 @@ def main():
# Get parameters
position = node.get_parameter("position").get_parameter_value().double_array_value
quat_xyzw = node.get_parameter("quat_xyzw").get_parameter_value().double_array_value
cartesian = node.get_parameter("cartesian").get_parameter_value().bool_value
synchronous = node.get_parameter("synchronous").get_parameter_value().bool_value
cancel_after_secs = (
node.get_parameter("cancel_after_secs").get_parameter_value().double_value
)
cartesian = node.get_parameter("cartesian").get_parameter_value().bool_value
cartesian_max_step = (
node.get_parameter("cartesian_max_step").get_parameter_value().double_value
)
cartesian_fraction_threshold = (
node.get_parameter("cartesian_fraction_threshold")
.get_parameter_value()
.double_value
)
cartesian_jump_threshold = (
node.get_parameter("cartesian_jump_threshold")
.get_parameter_value()
.double_value
)
cartesian_avoid_collisions = (
node.get_parameter("cartesian_avoid_collisions")
.get_parameter_value()
.bool_value
)

# Set parameters for cartesian planning
moveit2.cartesian_avoid_collisions = cartesian_avoid_collisions
moveit2.cartesian_jump_threshold = cartesian_jump_threshold

# Move to pose
node.get_logger().info(
f"Moving to {{position: {list(position)}, quat_xyzw: {list(quat_xyzw)}}}"
)
moveit2.move_to_pose(position=position, quat_xyzw=quat_xyzw, cartesian=cartesian)
moveit2.move_to_pose(
position=position,
quat_xyzw=quat_xyzw,
cartesian=cartesian,
cartesian_max_step=cartesian_max_step,
cartesian_fraction_threshold=cartesian_fraction_threshold,
)
if synchronous:
# Note: the same functionality can be achieved by setting
# `synchronous:=false` and `cancel_after_secs` to a negative value.
Expand Down
79 changes: 74 additions & 5 deletions pymoveit2/moveit2.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,8 @@ def move_to_pose(
weight_position: float = 1.0,
cartesian: bool = False,
weight_orientation: float = 1.0,
cartesian_max_step: float = 0.0025,
cartesian_fraction_threshold: float = 0.0,
):
"""
Plan and execute motion based on previously set goals. Optional arguments can be
Expand Down Expand Up @@ -428,6 +430,8 @@ def move_to_pose(
weight_position=weight_position,
weight_orientation=weight_orientation,
cartesian=cartesian,
max_step=cartesian_max_step,
cartesian_fraction_threshold=cartesian_fraction_threshold,
)
)

Expand Down Expand Up @@ -500,12 +504,18 @@ def plan(
weight_joint_position: float = 1.0,
start_joint_state: Optional[Union[JointState, List[float]]] = None,
cartesian: bool = False,
max_step: float = 0.0025,
cartesian_fraction_threshold: float = 0.0,
) -> Optional[JointTrajectory]:
"""
Call plan_async and wait on future
"""
future = self.plan_async(
**{key: value for key, value in locals().items() if key != "self"}
**{
key: value
for key, value in locals().items()
if key not in ["self", "cartesian_fraction_threshold"]
}
)

if future is None:
Expand All @@ -516,7 +526,11 @@ def plan(
while not future.done():
rate.sleep()

return self.get_trajectory(future, cartesian=cartesian)
return self.get_trajectory(
future,
cartesian=cartesian,
cartesian_fraction_threshold=cartesian_fraction_threshold,
)

def plan_async(
self,
Expand All @@ -537,6 +551,7 @@ def plan_async(
weight_joint_position: float = 1.0,
start_joint_state: Optional[Union[JointState, List[float]]] = None,
cartesian: bool = False,
max_step: float = 0.0025,
) -> Optional[Future]:
"""
Plan motion based on previously set goals. Optional arguments can be passed in to
Expand Down Expand Up @@ -634,9 +649,10 @@ def plan_async(
# Plan trajectory asynchronously by service call
if cartesian:
future = self._plan_cartesian_path(
max_step=max_step,
frame_id=pose_stamped.header.frame_id
if pose_stamped is not None
else frame_id
else frame_id,
)
else:
# Use service
Expand All @@ -649,11 +665,17 @@ def plan_async(
return future

def get_trajectory(
self, future: Future, cartesian: bool = False
self,
future: Future,
cartesian: bool = False,
cartesian_fraction_threshold: float = 0.0,
) -> Optional[JointTrajectory]:
"""
Takes in a future returned by plan_async and returns the trajectory if the future is done
and planning was successful, else None.
For cartesian plans, the plan is rejected if the fraction of the path that was completed is
less than `cartesian_fraction_threshold`.
"""
if not future.done():
self._node.get_logger().warn(
Expand All @@ -666,7 +688,14 @@ def get_trajectory(
# Cartesian
if cartesian:
if MoveItErrorCodes.SUCCESS == res.error_code.val:
return res.solution.joint_trajectory
if res.fraction >= cartesian_fraction_threshold:
return res.solution.joint_trajectory
else:
self._node.get_logger().warn(
f"Planning failed! Cartesian planner completed {res.fraction} "
f"of the trajectory, less than the threshold {cartesian_fraction_threshold}."
)
return None
else:
self._node.get_logger().warn(
f"Planning failed! Error code: {res.error_code.val}."
Expand Down Expand Up @@ -2130,6 +2159,14 @@ def __init_compute_ik(self):
def follow_joint_trajectory_action_client(self) -> str:
return self.__follow_joint_trajectory_action_client

@property
def end_effector_name(self) -> str:
return self.__end_effector_name

@property
def base_link_name(self) -> str:
return self.__base_link_name

@property
def joint_names(self) -> List[str]:
return self.__joint_names
Expand Down Expand Up @@ -2177,6 +2214,38 @@ def allowed_planning_time(self) -> float:
def allowed_planning_time(self, value: float):
self.__move_action_goal.request.allowed_planning_time = value

@property
def cartesian_avoid_collisions(self) -> bool:
return self.__cartesian_path_request.request.avoid_collisions

@cartesian_avoid_collisions.setter
def cartesian_avoid_collisions(self, value: bool):
self.__cartesian_path_request.avoid_collisions = value

@property
def cartesian_jump_threshold(self) -> float:
return self.__cartesian_path_request.request.jump_threshold

@cartesian_jump_threshold.setter
def cartesian_jump_threshold(self, value: float):
self.__cartesian_path_request.jump_threshold = value

@property
def cartesian_prismatic_jump_threshold(self) -> float:
return self.__cartesian_path_request.request.prismatic_jump_threshold

@cartesian_prismatic_jump_threshold.setter
def cartesian_prismatic_jump_threshold(self, value: float):
self.__cartesian_path_request.prismatic_jump_threshold = value

@property
def cartesian_revolute_jump_threshold(self) -> float:
return self.__cartesian_path_request.request.revolute_jump_threshold

@cartesian_revolute_jump_threshold.setter
def cartesian_revolute_jump_threshold(self, value: float):
self.__cartesian_path_request.revolute_jump_threshold = value

@property
def pipeline_id(self) -> int:
return self.__move_action_goal.request.pipeline_id
Expand Down

0 comments on commit 3f87e3b

Please sign in to comment.