diff --git a/.gitignore b/.gitignore index 952acc04..f338b501 100755 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ # Local files .env *.DS_Store +isaacgym # Python *.py[oc] __pycache__/ diff --git a/policy_1.pt b/policy_1.pt index 95aa2a63..075e2e12 100644 Binary files a/policy_1.pt and b/policy_1.pt differ diff --git a/sim/algo/ppo/on_policy_runner.py b/sim/algo/ppo/on_policy_runner.py index e3fb6e1c..2b3401b3 100755 --- a/sim/algo/ppo/on_policy_runner.py +++ b/sim/algo/ppo/on_policy_runner.py @@ -96,7 +96,7 @@ def learn(self, num_learning_iterations: int, init_at_random_ep_len: bool = Fals # initialize writer if self.log_dir is not None and self.writer is None: wandb.init( - project="XBot", + project="Quadruped Try", sync_tensorboard=True, name=self.wandb_run_name, config=self.all_cfg, diff --git a/sim/envs/__init__.py b/sim/envs/__init__.py index 16478119..14a108c2 100755 --- a/sim/envs/__init__.py +++ b/sim/envs/__init__.py @@ -14,11 +14,17 @@ from sim.envs.humanoids.g1_env import G1FreeEnv from sim.envs.humanoids.h1_config import H1Cfg, H1CfgPPO from sim.envs.humanoids.h1_env import H1FreeEnv +from sim.envs.humanoids.quadruped_config import QuadrupedCfg, QuadrupedCfgPPO +from sim.envs.humanoids.quadruped_env import QuadrupedFreeEnv from sim.envs.humanoids.stompymicro_config import StompyMicroCfg, StompyMicroCfgPPO from sim.envs.humanoids.stompymicro_env import StompyMicroEnv from sim.envs.humanoids.stompymini_config import MiniCfg, MiniCfgPPO from sim.envs.humanoids.stompymini_env import MiniFreeEnv -from sim.envs.humanoids.stompypro_config import StompyProCfg, StompyProStandingCfg, StompyProCfgPPO +from sim.envs.humanoids.stompypro_config import ( + StompyProCfg, + StompyProCfgPPO, + StompyProStandingCfg, +) from sim.envs.humanoids.stompypro_env import StompyProFreeEnv from sim.envs.humanoids.xbot_config import XBotCfg, XBotCfgPPO from sim.envs.humanoids.xbot_env import XBotLFreeEnv @@ -31,5 +37,7 @@ task_registry.register("dora", DoraFreeEnv, DoraCfg(), DoraCfgPPO()) task_registry.register("h1", H1FreeEnv, H1Cfg(), H1CfgPPO()) task_registry.register("g1", G1FreeEnv, G1Cfg(), G1CfgPPO()) +task_registry.register("xbot", XBotLFreeEnv, XBotCfg(), XBotCfgPPO()) task_registry.register("XBotL_free", XBotLFreeEnv, XBotCfg(), XBotCfgPPO()) task_registry.register("stompymicro", StompyMicroEnv, StompyMicroCfg(), StompyMicroCfgPPO()) +task_registry.register("quadruped", QuadrupedFreeEnv, QuadrupedCfg(), QuadrupedCfgPPO()) diff --git a/sim/envs/base/quadruped_robot.py b/sim/envs/base/quadruped_robot.py new file mode 100755 index 00000000..fda2dc14 --- /dev/null +++ b/sim/envs/base/quadruped_robot.py @@ -0,0 +1,886 @@ +import os +from collections import deque + +import numpy as np + +from sim import ROOT_DIR +from sim.envs.base.base_task import BaseTask +from sim.utils.helpers import class_to_dict +from sim.utils.math import get_euler_xyz_tensor, quat_apply_yaw, wrap_to_pi + +# fmt: off +from isaacgym import gymapi, gymtorch, gymutil # isort: skip +from isaacgym.torch_utils import * # isort: skip +import torch # isort: skip +# fmt: on + + +class LeggedRobot(BaseTask): + def __init__(self, cfg, sim_params, physics_engine, sim_device, headless): + """Parses the provided config file, + calls create_sim() (which creates, simulation, terrain and environments), + initilizes pytorch buffers used during training + + Args: + cfg (Dict): Environment config file + sim_params (gymapi.SimParams): simulation parameters + physics_engine (gymapi.SimType): gymapi.SIM_PHYSX (must be PhysX) + device_type (string): 'cuda' or 'cpu' + device_id (int): 0, 1, ... + headless (bool): Run without rendering if True + """ + self.cfg = cfg + self.sim_params = sim_params + self.height_samples = None + self.debug_viz = False + self.init_done = False + self._parse_cfg(self.cfg) + super().__init__(self.cfg, sim_params, physics_engine, sim_device, headless) + if not self.headless: + self.set_camera(self.cfg.viewer.pos, self.cfg.viewer.lookat) + self._init_buffers() + self._prepare_reward_function() + self.init_done = True + + def step(self, actions): + """Apply actions, simulate, call self.post_physics_step() + + Args: + actions (torch.Tensor): Tensor of shape (num_envs, num_actions_per_env) + """ + clip_actions = self.cfg.normalization.clip_actions + self.actions = torch.clip(actions, -clip_actions, clip_actions).to(self.device) + # step physics and render each frame + self.render() + for _ in range(self.cfg.control.decimation): + self.torques = self._compute_torques(self.actions).view(self.torques.shape) + self.gym.set_dof_actuation_force_tensor(self.sim, gymtorch.unwrap_tensor(self.torques)) + + self.gym.simulate(self.sim) + if self.device == "cpu": + self.gym.fetch_results(self.sim, True) + self.gym.refresh_dof_state_tensor(self.sim) + self.post_physics_step() + + # return clipped obs, clipped states (None), rewards, dones and infos + clip_obs = self.cfg.normalization.clip_observations + self.obs_buf = torch.clip(self.obs_buf, -clip_obs, clip_obs) + if self.privileged_obs_buf is not None: + self.privileged_obs_buf = torch.clip(self.privileged_obs_buf, -clip_obs, clip_obs) + return self.obs_buf, self.privileged_obs_buf, self.rew_buf, self.reset_buf, self.extras + + def reset(self): + """Reset all robots""" + self.reset_idx(torch.arange(self.num_envs, device=self.device)) + obs, privileged_obs, _, _, _ = self.step( + torch.zeros(self.num_envs, self.num_actions, device=self.device, requires_grad=False) + ) + return obs, privileged_obs + + def post_physics_step(self): + """Check terminations, compute observations and rewards + calls self._post_physics_step_callback() for common computations + calls self._draw_debug_vis() if needed + """ + self.gym.refresh_actor_root_state_tensor(self.sim) + self.gym.refresh_net_contact_force_tensor(self.sim) + self.gym.refresh_rigid_body_state_tensor(self.sim) + + self.episode_length_buf += 1 + self.common_step_counter += 1 + + # prepare quantities + # TODO(pfb30) - debug this + origin = torch.tensor(self.cfg.init_state.rot, device=self.device).repeat(self.num_envs, 1) + origin = quat_conjugate(origin) + self.base_quat[:] = quat_mul(origin, self.root_states[:, 3:7]) + self.base_euler_xyz = get_euler_xyz_tensor(self.base_quat) + + self.base_lin_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10]) + self.base_ang_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13]) + self.projected_gravity[:] = quat_rotate_inverse(self.base_quat, self.gravity_vec) + + # breakpoint() + self._post_physics_step_callback() + + # compute observations, rewards, resets, ... + self.check_termination() + self.compute_reward() + env_ids = self.reset_buf.nonzero(as_tuple=False).flatten() + self.reset_idx(env_ids) + self.compute_observations() # in some cases a simulation step might be required to refresh some obs (for example body positions) + + self.last_last_actions[:] = torch.clone(self.last_actions[:]) + self.last_actions[:] = self.actions[:] + self.last_dof_vel[:] = self.dof_vel[:] + self.last_root_vel[:] = self.root_states[:, 7:13] + self.last_rigid_state[:] = self.rigid_state[:] + + if self.viewer and self.enable_viewer_sync and self.debug_viz: + self._draw_debug_vis() + + def check_termination(self): + """Check if environments need to be reset""" + self.reset_buf = torch.any( + torch.norm(self.contact_forces[:, self.termination_contact_indices, :], dim=-1) > 1.0, dim=1 + ) + self.time_out_buf = self.episode_length_buf > self.max_episode_length # no terminal reward for time-outs + self.reset_buf |= self.time_out_buf + + def reset_idx(self, env_ids): + """Reset some environments. + Calls self._reset_dofs(env_ids), self._reset_root_states(env_ids), and self._resample_commands(env_ids) + [Optional] calls self._update_terrain_curriculum(env_ids), self.update_command_curriculum(env_ids) and + Logs episode info + Resets some buffers + + Args: + env_ids (list[int]): List of environment ids which must be reset + """ + if len(env_ids) == 0: + return + # update curriculum + if self.cfg.terrain.curriculum: + self._update_terrain_curriculum(env_ids) + # avoid updating command curriculum at each step since the maximum command is common to all envs + if self.cfg.commands.curriculum and (self.common_step_counter % self.max_episode_length == 0): + self.update_command_curriculum(env_ids) + + # reset robot states + self._reset_dofs(env_ids) + + self._reset_root_states(env_ids) + + self._resample_commands(env_ids) + + # reset buffers + self.last_last_actions[env_ids] = 0.0 + self.actions[env_ids] = 0.0 + self.last_actions[env_ids] = 0.0 + self.last_rigid_state[env_ids] = 0.0 + self.last_dof_vel[env_ids] = 0.0 + self.feet_air_time[env_ids] = 0.0 + self.episode_length_buf[env_ids] = 0 + self.reset_buf[env_ids] = 1 + # fill extras + self.extras["episode"] = {} + for key in self.episode_sums.keys(): + self.extras["episode"]["rew_" + key] = ( + torch.mean(self.episode_sums[key][env_ids]) / self.max_episode_length_s + ) + self.episode_sums[key][env_ids] = 0.0 + # log additional curriculum info + if self.cfg.terrain.mesh_type == "trimesh": + self.extras["episode"]["terrain_level"] = torch.mean(self.terrain_levels.float()) + if self.cfg.commands.curriculum: + self.extras["episode"]["max_command_x"] = self.command_ranges["lin_vel_x"][1] + # send timeout info to the algorithm + if self.cfg.env.send_timeouts: + self.extras["time_outs"] = self.time_out_buf + + # fix reset gravity bug + # TODO(pfb30) - debug this + origin = torch.tensor(self.cfg.init_state.rot, device=self.device).repeat(self.num_envs, 1) + origin = quat_conjugate(origin) + self.base_quat[env_ids] = quat_mul(origin[env_ids, :], self.root_states[env_ids, 3:7]) + + self.base_euler_xyz = get_euler_xyz_tensor(self.base_quat) + self.projected_gravity[env_ids] = quat_rotate_inverse(self.base_quat[env_ids], self.gravity_vec[env_ids]) + + def compute_reward(self): + """Compute rewards + Calls each reward function which had a non-zero scale (processed in self._prepare_reward_function()) + adds each terms to the episode sums and to the total reward + """ + self.rew_buf[:] = 0.0 + + for i in range(len(self.reward_functions)): + name = self.reward_names[i] + rew = self.reward_functions[i]() * self.reward_scales[name] + self.rew_buf += rew + self.episode_sums[name] += rew + if self.cfg.rewards.only_positive_rewards: + self.rew_buf[:] = torch.clip(self.rew_buf[:], min=0.0) + # add termination reward after clipping + if "termination" in self.reward_scales: + rew = self._reward_termination() * self.reward_scales["termination"] + self.rew_buf += rew + self.episode_sums["termination"] += rew + + def set_camera(self, position, lookat): + """Set camera position and direction""" + cam_pos = gymapi.Vec3(position[0], position[1], position[2]) + cam_target = gymapi.Vec3(lookat[0], lookat[1], lookat[2]) + self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target) + + # ------------- Callbacks -------------- + def _process_rigid_shape_props(self, props, env_id): + """Callback allowing to store/change/randomize the rigid shape properties of each environment. + Called During environment creation. + Base behavior: randomizes the friction of each environment + + Args: + props (List[gymapi.RigidShapeProperties]): Properties of each shape of the asset + env_id (int): Environment id + + Returns: + [List[gymapi.RigidShapeProperties]]: Modified rigid shape properties + """ + if self.cfg.domain_rand.randomize_friction: + if env_id == 0: + # prepare friction randomization + friction_range = self.cfg.domain_rand.friction_range + num_buckets = 256 + bucket_ids = torch.randint(0, num_buckets, (self.num_envs, 1)) + friction_buckets = torch_rand_float( + friction_range[0], friction_range[1], (num_buckets, 1), device="cpu" + ) + self.friction_coeffs = friction_buckets[bucket_ids] + + for s in range(len(props)): + props[s].friction = self.friction_coeffs[env_id] + return props + + def _process_dof_props(self, props, env_id): + """Callback allowing to store/change/randomize the DOF properties of each environment. + Called During environment creation. + Base behavior: stores position, velocity and torques limits defined in the URDF + + Args: + props (numpy.array): Properties of each DOF of the asset + env_id (int): Environment id + + Returns: + [numpy.array]: Modified DOF properties + """ + if env_id == 0: + self.dof_pos_limits = torch.zeros( + self.num_dof, 2, dtype=torch.float, device=self.device, requires_grad=False + ) + self.dof_vel_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False) + self.torque_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False) + for i in range(len(props)): + self.dof_pos_limits[i, 0] = props["lower"][i].item() * self.cfg.safety.pos_limit + self.dof_pos_limits[i, 1] = props["upper"][i].item() * self.cfg.safety.pos_limit + self.dof_vel_limits[i] = props["velocity"][i].item() * self.cfg.safety.vel_limit + self.torque_limits[i] = props["effort"][i].item() * self.cfg.safety.torque_limit + return props + + def _process_rigid_body_props(self, props, env_id): + # randomize base mass + if self.cfg.domain_rand.randomize_base_mass: + rng = self.cfg.domain_rand.added_mass_range + props[0].mass += np.random.uniform(rng[0], rng[1]) + + return props + + def _post_physics_step_callback(self): + """Callback called before computing terminations, rewards, and observations + Default behaviour: Compute ang vel command based on target and heading, compute measured terrain heights and randomly push robots + """ + env_ids = ( + (self.episode_length_buf % int(self.cfg.commands.resampling_time / self.dt) == 0) + .nonzero(as_tuple=False) + .flatten() + ) + self._resample_commands(env_ids) + if self.cfg.commands.heading_command: + forward = quat_apply(self.base_quat, self.forward_vec) + heading = torch.atan2(forward[:, 1], forward[:, 0]) + self.commands[:, 2] = torch.clip(0.5 * wrap_to_pi(self.commands[:, 3] - heading), -1.0, 1.0) + + if self.cfg.terrain.measure_heights: + self.measured_heights = self._get_heights() + + if self.cfg.domain_rand.push_robots and (self.common_step_counter % self.cfg.domain_rand.push_interval == 0): + self._push_robots() + + def _resample_commands(self, env_ids): + """Randommly select commands of some environments + + Args: + env_ids (List[int]): Environments ids for which new commands are needed + """ + self.commands[env_ids, 0] = torch_rand_float( + self.command_ranges["lin_vel_x"][0], + self.command_ranges["lin_vel_x"][1], + (len(env_ids), 1), + device=self.device, + ).squeeze(1) + self.commands[env_ids, 1] = torch_rand_float( + self.command_ranges["lin_vel_y"][0], + self.command_ranges["lin_vel_y"][1], + (len(env_ids), 1), + device=self.device, + ).squeeze(1) + if self.cfg.commands.heading_command: + self.commands[env_ids, 3] = torch_rand_float( + self.command_ranges["heading"][0], + self.command_ranges["heading"][1], + (len(env_ids), 1), + device=self.device, + ).squeeze(1) + else: + self.commands[env_ids, 2] = torch_rand_float( + self.command_ranges["ang_vel_pitch"][0], + self.command_ranges["ang_vel_pitch"][1], + (len(env_ids), 1), + device=self.device, + ).squeeze(1) + + # set small commands to zero + self.commands[env_ids, :2] *= (torch.norm(self.commands[env_ids, :2], dim=1) > 0.2).unsqueeze(1) + + def _compute_torques(self, actions): + """Compute torques from actions. + Actions can be interpreted as position or velocity targets given to a PD controller, or directly as scaled torques. + [NOTE]: torques must have the same dimension as the number of DOFs, even if some DOFs are not actuated. + + Args: + actions (torch.Tensor): Actions + + Returns: + [torch.Tensor]: Torques sent to the simulation + """ + actions_scaled = actions * self.cfg.control.action_scale + p_gains = self.p_gains + d_gains = self.d_gains + torques = p_gains * (actions_scaled + self.default_dof_pos - self.dof_pos) - d_gains * self.dof_vel + + res = torch.clip(torques, -self.torque_limits, self.torque_limits) + + return res + + def _reset_dofs(self, env_ids): + """Resets DOF position and velocities of selected environmments + Positions are randomly selected within 0.5:1.5 x default positions. + Velocities are set to zero. + + Args: + env_ids (List[int]): Environemnt ids + """ + self.dof_pos[env_ids] = self.default_dof_pos + torch_rand_float( + -self.cfg.domain_rand.start_pos_noise, + self.cfg.domain_rand.start_pos_noise, + (len(env_ids), self.num_dof), + device=self.device, + ) + self.dof_vel[env_ids] = 0.0 + + env_ids_int32 = env_ids.to(dtype=torch.int32) + self.gym.set_dof_state_tensor_indexed( + self.sim, gymtorch.unwrap_tensor(self.dof_state), gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32) + ) + + def _reset_root_states(self, env_ids): + """Resets ROOT states position and velocities of selected environmments + Sets base position based on the curriculum + Selects randomized base velocities within -0.5:0.5 [m/s, rad/s] + + Args: + env_ids (List[int]): Environemnt ids + """ + # base position + if self.custom_origins: + self.root_states[env_ids] = self.base_init_state + self.root_states[env_ids, :3] += self.env_origins[env_ids] + self.root_states[env_ids, :2] += torch_rand_float( + -1.0, 1.0, (len(env_ids), 2), device=self.device + ) # xy position within 1m of the center + else: + self.root_states[env_ids] = self.base_init_state + self.root_states[env_ids, :3] += self.env_origins[env_ids] + # base velocities + # self.root_states[env_ids, 7:13] = torch_rand_float(-0.05, 0.05, (len(env_ids), 6), device=self.device) # [7:10]: lin vel, [10:13]: ang vel + if self.cfg.asset.fix_base_link: + self.root_states[env_ids, 7:13] = 0 + self.root_states[env_ids, 2] += 1.8 + env_ids_int32 = env_ids.to(dtype=torch.int32) + self.gym.set_actor_root_state_tensor_indexed( + self.sim, + gymtorch.unwrap_tensor(self.root_states), + gymtorch.unwrap_tensor(env_ids_int32), + len(env_ids_int32), + ) + + def _update_terrain_curriculum(self, env_ids): + """Implements the game-inspired curriculum. + + Args: + env_ids (List[int]): ids of environments being reset + """ + # Implement Terrain curriculum + if not self.init_done: + # don't change on initial reset + return + distance = torch.norm(self.root_states[env_ids, :2] - self.env_origins[env_ids, :2], dim=1) + # robots that walked far enough progress to harder terains + move_up = distance > self.terrain.env_length / 2 + # robots that walked less than half of their required distance go to simpler terrains + move_down = ( + distance < torch.norm(self.commands[env_ids, :2], dim=1) * self.max_episode_length_s * 0.5 + ) * ~move_up + self.terrain_levels[env_ids] += 1 * move_up - 1 * move_down + # Robots that solve the last level are sent to a random one + self.terrain_levels[env_ids] = torch.where( + self.terrain_levels[env_ids] >= self.max_terrain_level, + torch.randint_like(self.terrain_levels[env_ids], self.max_terrain_level), + torch.clip(self.terrain_levels[env_ids], 0), + ) # (the minumum level is zero) + self.env_origins[env_ids] = self.terrain_origins[self.terrain_levels[env_ids], self.terrain_types[env_ids]] + + def update_command_curriculum(self, env_ids): + """Implements a curriculum of increasing commands + + Args: + env_ids (List[int]): ids of environments being reset + """ + # If the tracking reward is above 80% of the maximum, increase the range of commands + if ( + torch.mean(self.episode_sums["tracking_lin_vel"][env_ids]) / self.max_episode_length + > 0.8 * self.reward_scales["tracking_lin_vel"] + ): + self.command_ranges["lin_vel_x"][0] = np.clip( + self.command_ranges["lin_vel_x"][0] - 0.5, -self.cfg.commands.max_curriculum, 0.0 + ) + self.command_ranges["lin_vel_x"][1] = np.clip( + self.command_ranges["lin_vel_x"][1] + 0.5, 0.0, self.cfg.commands.max_curriculum + ) + + # ---------------------------------------- + def _init_buffers(self): + """Initialize torch tensors which will contain simulation states and processed quantities""" + # get gym GPU state tensors + actor_root_state = self.gym.acquire_actor_root_state_tensor(self.sim) + dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim) + net_contact_forces = self.gym.acquire_net_contact_force_tensor(self.sim) + rigid_body_state = self.gym.acquire_rigid_body_state_tensor(self.sim) + + self.gym.refresh_dof_state_tensor(self.sim) + self.gym.refresh_actor_root_state_tensor(self.sim) + self.gym.refresh_net_contact_force_tensor(self.sim) + self.gym.refresh_rigid_body_state_tensor(self.sim) + + # create some wrapper tensors for different slices + self.root_states = gymtorch.wrap_tensor(actor_root_state) + self.dof_state = gymtorch.wrap_tensor(dof_state_tensor) + self.dof_pos = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 0] + self.dof_vel = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 1] + # self.base_quat = self.root_states[:, 3:7] + # TODO(pfb30): debug this + origin = torch.tensor(self.cfg.init_state.rot, device=self.device).repeat(self.num_envs, 1) + origin = quat_conjugate(origin) + self.base_quat = quat_mul(origin, self.root_states[:, 3:7]) + self.base_euler_xyz = get_euler_xyz_tensor(self.base_quat) + + self.contact_forces = gymtorch.wrap_tensor(net_contact_forces).view( + self.num_envs, -1, 3 + ) # shape: num_envs, num_bodies, xyz axis + self.rigid_state = gymtorch.wrap_tensor(rigid_body_state).view(self.num_envs, -1, 13) + + # initialize some data used later on + self.common_step_counter = 0 + self.extras = {} + + self.noise_scale_vec = self._get_noise_scale_vec(self.cfg) + self.gravity_vec = to_torch(get_axis_params(-1.0, self.up_axis_idx), device=self.device).repeat( + (self.num_envs, 1) + ) + self.forward_vec = to_torch([1.0, 0.0, 0.0], device=self.device).repeat((self.num_envs, 1)) + self.torques = torch.zeros( + self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False + ) + self.p_gains = torch.zeros( + self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False + ) + self.d_gains = torch.zeros( + self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False + ) + self.actions = torch.zeros( + self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False + ) + self.last_actions = torch.zeros( + self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False + ) + self.last_last_actions = torch.zeros( + self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False + ) + self.last_rigid_state = torch.zeros_like(self.rigid_state) + self.last_dof_vel = torch.zeros_like(self.dof_vel) + self.last_root_vel = torch.zeros_like(self.root_states[:, 7:13]) + self.commands = torch.zeros( + self.num_envs, self.cfg.commands.num_commands, dtype=torch.float, device=self.device, requires_grad=False + ) # x vel, y vel, yaw vel, heading + self.commands_scale = torch.tensor( + [self.obs_scales.lin_vel, self.obs_scales.lin_vel, self.obs_scales.ang_vel], + device=self.device, + requires_grad=False, + ) # TODO change this + self.feet_air_time = torch.zeros( + self.num_envs, self.feet_indices.shape[0], dtype=torch.float, device=self.device, requires_grad=False + ) + self.last_contacts = torch.zeros( + self.num_envs, len(self.feet_indices), dtype=torch.bool, device=self.device, requires_grad=False + ) + self.base_lin_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10]) + self.base_ang_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13]) + self.projected_gravity = quat_rotate_inverse(self.base_quat, self.gravity_vec) + if self.cfg.terrain.measure_heights: + self.height_points = self._init_height_points() + self.measured_heights = 0 + # joint positions offsets and PD gains + self.default_dof_pos = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False) + + for i in range(self.num_dofs): + name = self.dof_names[i] + self.default_dof_pos[i] = self.cfg.init_state.default_joint_angles[name] + found = False + + for dof_name in self.cfg.control.stiffness.keys(): + if dof_name in name: + self.p_gains[:, i] = self.cfg.control.stiffness[dof_name] + self.d_gains[:, i] = self.cfg.control.damping[dof_name] + found = True + if not found: + self.p_gains[:, i] = 0.0 + self.d_gains[:, i] = 0.0 + print(f"PD gain of joint {name} were not defined, setting them to zero") + self.rand_push_force = torch.zeros((self.num_envs, 3), dtype=torch.float32, device=self.device) + self.rand_push_torque = torch.zeros((self.num_envs, 3), dtype=torch.float32, device=self.device) + self.default_dof_pos = self.default_dof_pos.unsqueeze(0) + + self.default_joint_pd_target = self.default_dof_pos.clone() + self.obs_history = deque(maxlen=self.cfg.env.frame_stack) + self.critic_history = deque(maxlen=self.cfg.env.c_frame_stack) + for _ in range(self.cfg.env.frame_stack): + self.obs_history.append( + torch.zeros(self.num_envs, self.cfg.env.num_single_obs, dtype=torch.float, device=self.device) + ) + for _ in range(self.cfg.env.c_frame_stack): + self.critic_history.append( + torch.zeros( + self.num_envs, self.cfg.env.single_num_privileged_obs, dtype=torch.float, device=self.device + ) + ) + + def _prepare_reward_function(self): + """Prepares a list of reward functions, which will be called to compute the total reward. + Looks for self._reward_, where are names of all non zero reward scales in the cfg. + """ + # remove zero scales + multiply non-zero ones by dt + for key in list(self.reward_scales.keys()): + scale = self.reward_scales[key] + if scale == 0: + self.reward_scales.pop(key) + else: + self.reward_scales[key] *= self.dt + # prepare list of functions + self.reward_functions = [] + self.reward_names = [] + for name, scale in self.reward_scales.items(): + if name == "termination": + continue + self.reward_names.append(name) + name = "_reward_" + name + self.reward_functions.append(getattr(self, name)) + + # reward episode sums + self.episode_sums = { + name: torch.zeros(self.num_envs, dtype=torch.float, device=self.device, requires_grad=False) + for name in self.reward_scales.keys() + } + + def _create_ground_plane(self): + """Adds a ground plane to the simulation, sets friction and restitution based on the cfg.""" + plane_params = gymapi.PlaneParams() + plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0) + plane_params.static_friction = self.cfg.terrain.static_friction + plane_params.dynamic_friction = self.cfg.terrain.dynamic_friction + plane_params.restitution = self.cfg.terrain.restitution + self.gym.add_ground(self.sim, plane_params) + + def _create_heightfield(self): + """Adds a heightfield terrain to the simulation, sets parameters based on the cfg.""" + hf_params = gymapi.HeightFieldParams() + hf_params.column_scale = self.terrain.cfg.horizontal_scale + hf_params.row_scale = self.terrain.cfg.horizontal_scale + hf_params.vertical_scale = self.terrain.cfg.vertical_scale + hf_params.nbRows = self.terrain.tot_cols + hf_params.nbColumns = self.terrain.tot_rows + hf_params.transform.p.x = -self.terrain.cfg.border_size + hf_params.transform.p.y = -self.terrain.cfg.border_size + hf_params.transform.p.z = 0.0 + hf_params.static_friction = self.cfg.terrain.static_friction + hf_params.dynamic_friction = self.cfg.terrain.dynamic_friction + hf_params.restitution = self.cfg.terrain.restitution + + self.gym.add_heightfield(self.sim, self.terrain.heightsamples, hf_params) + self.height_samples = ( + torch.tensor(self.terrain.heightsamples).view(self.terrain.tot_rows, self.terrain.tot_cols).to(self.device) + ) + + def _create_trimesh(self): + """Adds a triangle mesh terrain to the simulation, sets parameters based on the cfg. + # + """ + tm_params = gymapi.TriangleMeshParams() + tm_params.nb_vertices = self.terrain.vertices.shape[0] + tm_params.nb_triangles = self.terrain.triangles.shape[0] + + tm_params.transform.p.x = -self.terrain.cfg.border_size + tm_params.transform.p.y = -self.terrain.cfg.border_size + tm_params.transform.p.z = 0.0 + tm_params.static_friction = self.cfg.terrain.static_friction + tm_params.dynamic_friction = self.cfg.terrain.dynamic_friction + tm_params.restitution = self.cfg.terrain.restitution + self.gym.add_triangle_mesh( + self.sim, self.terrain.vertices.flatten(order="C"), self.terrain.triangles.flatten(order="C"), tm_params + ) + self.height_samples = ( + torch.tensor(self.terrain.heightsamples).view(self.terrain.tot_rows, self.terrain.tot_cols).to(self.device) + ) + + def _create_envs(self): + """Creates environments: + 1. loads the robot URDF/MJCF asset, + 2. For each environment + 2.1 creates the environment, + 2.2 calls DOF and Rigid shape properties callbacks, + 2.3 create actor with these properties and add them to the env + 3. Store indices of different bodies of the robot + """ + asset_path = self.cfg.asset.file.format(ROOT_DIR=ROOT_DIR) + asset_root = os.path.dirname(asset_path) + asset_file = os.path.basename(asset_path) + + asset_options = gymapi.AssetOptions() + asset_options.default_dof_drive_mode = self.cfg.asset.default_dof_drive_mode + asset_options.collapse_fixed_joints = self.cfg.asset.collapse_fixed_joints + asset_options.replace_cylinder_with_capsule = self.cfg.asset.replace_cylinder_with_capsule + asset_options.flip_visual_attachments = self.cfg.asset.flip_visual_attachments + asset_options.fix_base_link = self.cfg.asset.fix_base_link + asset_options.density = self.cfg.asset.density + asset_options.angular_damping = self.cfg.asset.angular_damping + asset_options.linear_damping = self.cfg.asset.linear_damping + asset_options.max_angular_velocity = self.cfg.asset.max_angular_velocity + asset_options.max_linear_velocity = self.cfg.asset.max_linear_velocity + asset_options.armature = self.cfg.asset.armature + asset_options.thickness = self.cfg.asset.thickness + asset_options.disable_gravity = self.cfg.asset.disable_gravity + + robot_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options) + self.num_dof = self.gym.get_asset_dof_count(robot_asset) + self.num_bodies = self.gym.get_asset_rigid_body_count(robot_asset) + dof_props_asset = self.gym.get_asset_dof_properties(robot_asset) + rigid_shape_props_asset = self.gym.get_asset_rigid_shape_properties(robot_asset) + + # save body names from the asset + body_names = self.gym.get_asset_rigid_body_names(robot_asset) + self.dof_names = self.gym.get_asset_dof_names(robot_asset) + self.num_bodies = len(body_names) + self.num_dofs = len(self.dof_names) + feet_names = [s for s in body_names if s in self.cfg.asset.foot_name] + knee_names = [s for s in body_names if s in self.cfg.asset.knee_name] + penalized_contact_names = [] + for name in self.cfg.asset.penalize_contacts_on: + penalized_contact_names.extend([s for s in body_names if name in s]) + termination_contact_names = [] + for name in self.cfg.asset.terminate_after_contacts_on: + termination_contact_names.extend([s for s in body_names if name in s]) + base_init_state_list = ( + self.cfg.init_state.pos + + self.cfg.init_state.rot + + self.cfg.init_state.lin_vel + + self.cfg.init_state.ang_vel + ) + self.base_init_state = to_torch(base_init_state_list, device=self.device, requires_grad=False) + start_pose = gymapi.Transform() + start_pose.p = gymapi.Vec3(*self.base_init_state[:3]) + self._get_env_origins() + env_lower = gymapi.Vec3(0.0, 0.0, 0.0) + env_upper = gymapi.Vec3(0.0, 0.0, 0.0) + self.actor_handles = [] + self.envs = [] + self.env_frictions = torch.zeros(self.num_envs, 1, dtype=torch.float32, device=self.device) + + self.body_mass = torch.zeros(self.num_envs, 1, dtype=torch.float32, device=self.device, requires_grad=False) + + for i in range(self.num_envs): + # create env instance + env_handle = self.gym.create_env(self.sim, env_lower, env_upper, int(np.sqrt(self.num_envs))) + pos = self.env_origins[i].clone() + pos[:2] += torch_rand_float(-1.0, 1.0, (2, 1), device=self.device).squeeze(1) + start_pose.p = gymapi.Vec3(*pos) + + rigid_shape_props = self._process_rigid_shape_props(rigid_shape_props_asset, i) + self.gym.set_asset_rigid_shape_properties(robot_asset, rigid_shape_props) + actor_handle = self.gym.create_actor( + env_handle, robot_asset, start_pose, self.cfg.asset.name, i, self.cfg.asset.self_collisions, 0 + ) + dof_props = self._process_dof_props(dof_props_asset, i) + self.gym.set_actor_dof_properties(env_handle, actor_handle, dof_props) + body_props = self.gym.get_actor_rigid_body_properties(env_handle, actor_handle) + body_props = self._process_rigid_body_props(body_props, i) + + self.gym.set_actor_rigid_body_properties(env_handle, actor_handle, body_props, recomputeInertia=False) + self.envs.append(env_handle) + self.actor_handles.append(actor_handle) + + self.feet_indices = torch.zeros(len(feet_names), dtype=torch.long, device=self.device, requires_grad=False) + for i in range(len(feet_names)): + self.feet_indices[i] = self.gym.find_actor_rigid_body_handle( + self.envs[0], self.actor_handles[0], feet_names[i] + ) + print("feet", self.feet_indices) + print(f"Processed body properties for env {i}:") + for j, prop in enumerate(body_props): + print(f" Body {j} mass: {prop.mass}") + # prop.mass = prop.mass * 1e7 + print(f" Body {j} inertia: {prop.inertia.x}, {prop.inertia.y}, {prop.inertia.z}") + + self.knee_indices = torch.zeros(len(knee_names), dtype=torch.long, device=self.device, requires_grad=False) + for i in range(len(knee_names)): + self.knee_indices[i] = self.gym.find_actor_rigid_body_handle( + self.envs[0], self.actor_handles[0], knee_names[i] + ) + + self.penalised_contact_indices = torch.zeros( + len(penalized_contact_names), dtype=torch.long, device=self.device, requires_grad=False + ) + for i in range(len(penalized_contact_names)): + self.penalised_contact_indices[i] = self.gym.find_actor_rigid_body_handle( + self.envs[0], self.actor_handles[0], penalized_contact_names[i] + ) + + self.termination_contact_indices = torch.zeros( + len(termination_contact_names), dtype=torch.long, device=self.device, requires_grad=False + ) + for i in range(len(termination_contact_names)): + self.termination_contact_indices[i] = self.gym.find_actor_rigid_body_handle( + self.envs[0], self.actor_handles[0], termination_contact_names[i] + ) + + def _get_env_origins(self): + """Sets environment origins. On rough terrain the origins are defined by the terrain platforms. + Otherwise create a grid. + """ + if self.cfg.terrain.mesh_type in ["heightfield", "trimesh"]: + self.custom_origins = True + self.env_origins = torch.zeros(self.num_envs, 3, device=self.device, requires_grad=False) + # put robots at the origins defined by the terrain + max_init_level = self.cfg.terrain.max_init_terrain_level + if not self.cfg.terrain.curriculum: + max_init_level = self.cfg.terrain.num_rows - 1 + self.terrain_levels = torch.randint(0, max_init_level + 1, (self.num_envs,), device=self.device) + self.terrain_types = torch.div( + torch.arange(self.num_envs, device=self.device), + (self.num_envs / self.cfg.terrain.num_cols), + rounding_mode="floor", + ).to(torch.long) + self.max_terrain_level = self.cfg.terrain.num_rows + self.terrain_origins = torch.from_numpy(self.terrain.env_origins).to(self.device).to(torch.float) + self.env_origins[:] = self.terrain_origins[self.terrain_levels, self.terrain_types] + else: + self.custom_origins = False + self.env_origins = torch.zeros(self.num_envs, 3, device=self.device, requires_grad=False) + # create a grid of robots + num_cols = np.floor(np.sqrt(self.num_envs)) + num_rows = np.ceil(self.num_envs / num_cols) + xx, yy = torch.meshgrid(torch.arange(num_rows), torch.arange(num_cols)) + spacing = self.cfg.env.env_spacing + self.env_origins[:, 0] = spacing * xx.flatten()[: self.num_envs] + self.env_origins[:, 1] = spacing * yy.flatten()[: self.num_envs] + self.env_origins[:, 2] = 0.0 + + def _parse_cfg(self, cfg): + self.dt = self.cfg.control.decimation * self.sim_params.dt + self.obs_scales = self.cfg.normalization.obs_scales + self.reward_scales = class_to_dict(self.cfg.rewards.scales) + self.command_ranges = class_to_dict(self.cfg.commands.ranges) + if self.cfg.terrain.mesh_type not in ["heightfield", "trimesh"]: + self.cfg.terrain.curriculum = False + self.max_episode_length_s = self.cfg.env.episode_length_s + self.max_episode_length = np.ceil(self.max_episode_length_s / self.dt) + + self.cfg.domain_rand.push_interval = np.ceil(self.cfg.domain_rand.push_interval_s / self.dt) + + def _draw_debug_vis(self): + """Draws visualizations for dubugging (slows down simulation a lot). + Default behaviour: draws height measurement points + """ + # draw height lines + if not self.cfg.terrain.measure_heights: + return + self.gym.clear_lines(self.viewer) + self.gym.refresh_rigid_body_state_tensor(self.sim) + sphere_geom = gymutil.WireframeSphereGeometry(0.02, 4, 4, None, color=(1, 1, 0)) + for i in range(self.num_envs): + base_pos = (self.root_states[i, :3]).cpu().numpy() + heights = self.measured_heights[i].cpu().numpy() + height_points = ( + quat_apply_yaw(self.base_quat[i].repeat(heights.shape[0]), self.height_points[i]).cpu().numpy() + ) + for j in range(heights.shape[0]): + x = height_points[j, 0] + base_pos[0] + y = height_points[j, 1] + base_pos[1] + z = heights[j] + sphere_pose = gymapi.Transform(gymapi.Vec3(x, y, z), r=None) + gymutil.draw_lines(sphere_geom, self.gym, self.viewer, self.envs[i], sphere_pose) + + def _init_height_points(self): + """Returns points at which the height measurments are sampled (in base frame) + + Returns: + [torch.Tensor]: Tensor of shape (num_envs, self.num_height_points, 3) + """ + y = torch.tensor(self.cfg.terrain.measured_points_y, device=self.device, requires_grad=False) + x = torch.tensor(self.cfg.terrain.measured_points_x, device=self.device, requires_grad=False) + grid_x, grid_y = torch.meshgrid(x, y) + + self.num_height_points = grid_x.numel() + points = torch.zeros(self.num_envs, self.num_height_points, 3, device=self.device, requires_grad=False) + points[:, :, 0] = grid_x.flatten() + points[:, :, 1] = grid_y.flatten() + return points + + def _get_heights(self, env_ids=None): + """Samples heights of the terrain at required points around each robot. + The points are offset by the base's position and rotated by the base's yaw + + Args: + env_ids (List[int], optional): Subset of environments for which to return the heights. Defaults to None. + + Raises: + NameError: [description] + + Returns: + [type]: [description] + """ + if self.cfg.terrain.mesh_type == "plane": + return torch.zeros(self.num_envs, self.num_height_points, device=self.device, requires_grad=False) + elif self.cfg.terrain.mesh_type == "none": + raise NameError("Can't measure height with terrain mesh type 'none'") + + if env_ids: + points = quat_apply_yaw( + self.base_quat[env_ids].repeat(1, self.num_height_points), self.height_points[env_ids] + ) + (self.root_states[env_ids, :3]).unsqueeze(1) + else: + points = quat_apply_yaw(self.base_quat.repeat(1, self.num_height_points), self.height_points) + ( + self.root_states[:, :3] + ).unsqueeze(1) + + points += self.terrain.cfg.border_size + points = (points / self.terrain.cfg.horizontal_scale).long() + px = points[:, :, 0].view(-1) + py = points[:, :, 1].view(-1) + px = torch.clip(px, 0, self.height_samples.shape[0] - 2) + py = torch.clip(py, 0, self.height_samples.shape[1] - 2) + + heights1 = self.height_samples[px, py] + heights2 = self.height_samples[px + 1, py] + heightXBotL = self.height_samples[px, py + 1] + heights = torch.min(heights1, heights2) + heights = torch.min(heights, heightXBotL) + + return heights.view(self.num_envs, -1) * self.terrain.cfg.vertical_scale diff --git a/sim/envs/base/quadruped_robot_config.py b/sim/envs/base/quadruped_robot_config.py new file mode 100755 index 00000000..016cca4b --- /dev/null +++ b/sim/envs/base/quadruped_robot_config.py @@ -0,0 +1,261 @@ +# SPDX-FileCopyrightText: Copyright (c) 2021 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-FileCopyrightText: Copyright (c) 2021 ETH Zurich, Nikita Rudin +# SPDX-License-Identifier: BSD-3-Clause +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, this +# list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# 3. Neither the name of the copyright holder nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +# +# Copyright (c) 2024 Beijing RobotEra TECHNOLOGY CO.,LTD. All rights reserved. + +from sim.envs.base.base_config import BaseConfig + + +class LeggedRobotCfg(BaseConfig): + class env: + num_envs = 4096 + num_observations = 235 + num_privileged_obs = None # if not None a priviledge_obs_buf will be returned by step() (critic obs for assymetric training). None is returned otherwise + num_actions = 12 + env_spacing = 3.0 # not used with heightfields/trimeshes + send_timeouts = True # send time out information to the algorithm + episode_length_s = 20 # episode length in seconds + + class terrain: + mesh_type = "trimesh" # "heightfield" # none, plane, heightfield or trimesh + horizontal_scale = 0.1 # [m] + vertical_scale = 0.005 # [m] + border_size = 25 # [m] + curriculum = True + static_friction = 1.0 + dynamic_friction = 1.0 + restitution = 0.0 + # rough terrain only: + measure_heights = True + measured_points_x = [ + -0.8, + -0.7, + -0.6, + -0.5, + -0.4, + -0.3, + -0.2, + -0.1, + 0.0, + 0.1, + 0.2, + 0.3, + 0.4, + 0.5, + 0.6, + 0.7, + 0.8, + ] # 1mx1.6m rectangle (without center line) + measured_points_y = [-0.5, -0.4, -0.3, -0.2, -0.1, 0.0, 0.1, 0.2, 0.3, 0.4, 0.5] + selected = False # select a unique terrain type and pass all arguments + terrain_kwargs = None # Dict of arguments for selected terrain + max_init_terrain_level = 5 # starting curriculum state + terrain_length = 8.0 + terrain_width = 8.0 + num_rows = 10 # number of terrain rows (levels) + num_cols = 20 # number of terrain cols (types) + # terrain types: [smooth slope, rough slope, stairs up, stairs down, discrete] + terrain_proportions = [0.1, 0.1, 0.35, 0.25, 0.2] + # trimesh only: + slope_treshold = 0.75 # slopes above this threshold will be corrected to vertical surfaces + + class commands: + curriculum = False + max_curriculum = 1.0 + num_commands = 4 # default: lin_vel_x, lin_vel_y, ang_vel_yaw, heading (in heading mode ang_vel_yaw is recomputed from heading error) + resampling_time = 10.0 # time before command are changed[s] + heading_command = True # if true: compute ang vel command from heading error + + class ranges: + lin_vel_x = [0.5, 10.0] # [-1.0, 1.0] # min max [m/s] + lin_vel_y = [0.3, 10.0] # min max [m/s] + ang_vel_pitch = [-1, 1] # min max [rad/s] + heading = [-3.14, 3.14] + + class init_state: + pos = [0.0, 0.0, 1.0] # x,y,z [m] + rot = [0.0, 0.0, 0.0, 1.0] # x,y,z,w [quat] + lin_vel = [0.0, 0.0, 0.0] # x,y,z [m/s] + ang_vel = [0.0, 0.0, 0.0] # x,y,z [rad/s] + default_joint_angles = {"joint_a": 0.0, "joint_b": 0.0} # target angles when action = 0.0 + + class control: + # PD Drive parameters: + stiffness = {"joint_a": 10.0, "joint_b": 15.0} # [N*m/rad] + damping = {"joint_a": 1.0, "joint_b": 1.5} # [N*m*s/rad] + # action scale: target angle = actionScale * action + defaultAngle + action_scale = 0.5 + # decimation: Number of control action updates @ sim DT per policy DT + decimation = 4 + + class asset: + file = "" + name = "legged_robot" # actor name + foot_name = "None" # name of the feet bodies, used to index body state and contact force tensors + penalize_contacts_on = [] + terminate_after_contacts_on = [] + disable_gravity = False + collapse_fixed_joints = True # merge bodies connected by fixed joints. Specific fixed joints can be kept by adding " <... dont_collapse="true"> + fix_base_link = False # fixe the base of the robot + default_dof_drive_mode = 3 # see GymDofDriveModeFlags (0 is none, 1 is pos tgt, 2 is vel tgt, 3 effort) + self_collisions = 0 # 1 to disable, 0 to enable...bitwise filter + replace_cylinder_with_capsule = ( + True # replace collision cylinders with capsules, leads to faster/more stable simulation + ) + flip_visual_attachments = True # Some .obj meshes must be flipped from y-up to z-up + + density = 0.001 + angular_damping = 0.0 + linear_damping = 0.0 + max_angular_velocity = 1000.0 + max_linear_velocity = 1000.0 + armature = 0.0 + thickness = 0.01 + + class domain_rand: + start_pos_noise = 0.1 + randomize_friction = True + friction_range = [0.5, 1.25] + randomize_base_mass = False + added_mass_range = [-1.0, 1.0] + push_robots = True + push_interval_s = 15 + max_push_vel_xy = 1.0 + start_pos_noise = 0.1 + + class rewards: + class scales: + termination = -0.0 + tracking_lin_vel = 1.0 + tracking_ang_vel = 0.5 + lin_vel_z = -2.0 + ang_vel_xy = -0.05 + orientation = -0.0 + torques = -0.00001 + dof_vel = -0.0 + dof_acc = -2.5e-7 + base_height = -0.0 + feet_air_time = 1.0 + collision = -1.0 + feet_stumble = -0.0 + action_rate = -0.0 + stand_still = -0.0 + + only_positive_rewards = ( + True # if true negative total rewards are clipped at zero (avoids early termination problems) + ) + + class normalization: + class obs_scales: + lin_vel = 2.0 + ang_vel = 0.25 + dof_pos = 1.0 + dof_vel = 0.05 + height_measurements = 5.0 + + clip_observations = 100.0 + clip_actions = 100.0 + + class noise: + add_noise = True + noise_level = 1.0 # scales other values + + class noise_scales: + dof_pos = 0.01 + dof_vel = 1.5 + lin_vel = 0.1 + ang_vel = 0.2 + gravity = 0.05 + height_measurements = 0.1 + + # viewer camera: + class viewer: + ref_env = 0 + pos = [10, 0, 6] # [m] + lookat = [11.0, 5, 3.0] # [m] + + class sim: + dt = 0.005 + substeps = 1 + gravity = [0.0, 0.0, -9.81] # [m/s^2] + up_axis = 1 # 0 is y, 1 is z + + class physx: + num_threads = 10 + solver_type = 1 # 0: pgs, 1: tgs + num_position_iterations = 4 + num_velocity_iterations = 0 + contact_offset = 0.01 # [m] + rest_offset = 0.0 # [m] + bounce_threshold_velocity = 0.5 # 0.5 [m/s] + max_depenetration_velocity = 1.0 + max_gpu_contact_pairs = 2**23 # 2**24 -> needed for 8000 envs and more + default_buffer_size_multiplier = 5 + contact_collection = 2 # 0: never, 1: last sub-step, 2: all sub-steps (default=2) + + +class LeggedRobotCfgPPO(BaseConfig): + seed = 1 + runner_class_name = "OnPolicyRunner" + + class policy: + init_noise_std = 1.0 + actor_hidden_dims = [512, 256, 128] + critic_hidden_dims = [512, 256, 128] + + class algorithm: + # training params + value_loss_coef = 1.0 + use_clipped_value_loss = True + clip_param = 0.2 + entropy_coef = 0.01 + num_learning_epochs = 5 + num_mini_batches = 4 # mini batch size = num_envs*nsteps / nminibatches + learning_rate = 1.0e-3 # 5.e-4 + schedule = "adaptive" # could be adaptive, fixed + gamma = 0.99 + lam = 0.95 + desired_kl = 0.01 + max_grad_norm = 1.0 + + class runner: + policy_class_name = "ActorCritic" + algorithm_class_name = "PPO" + num_steps_per_env = 24 # per iteration + max_iterations = 1500 # number of policy updates + + # logging + save_interval = 100 # check for potential saves every this many iterations + experiment_name = "test" + run_name = "" + # load and resume + resume = False + load_run = -1 # -1 = last run + checkpoint = -1 # -1 = last saved model + resume_path = None # updated from load_run and chkpt diff --git a/sim/envs/humanoids/quadruped_config.py b/sim/envs/humanoids/quadruped_config.py new file mode 100644 index 00000000..261e33f2 --- /dev/null +++ b/sim/envs/humanoids/quadruped_config.py @@ -0,0 +1,265 @@ +"""Defines the environment configuration for the Getting up task""" + +from sim.env import robot_urdf_path +from sim.envs.base.quadruped_robot_config import ( # type: ignore + LeggedRobotCfg, + LeggedRobotCfgPPO, +) +from sim.resources.quadruped.joints import Robot + +NUM_JOINTS = len(Robot.all_joints()) # 20 + + +class QuadrupedCfg(LeggedRobotCfg): + """Configuration class for the Legs humanoid robot.""" + + class env(LeggedRobotCfg.env): + # change the observation dim + frame_stack = 15 + c_frame_stack = 3 + num_single_obs = 11 + NUM_JOINTS * 3 + num_observations = int(frame_stack * num_single_obs) + single_num_privileged_obs = 25 + NUM_JOINTS * 4 + 4 # The +4 added for quadruped...why? + num_privileged_obs = int(c_frame_stack * single_num_privileged_obs) + num_actions = NUM_JOINTS + num_envs = 4096 + episode_length_s = 24 # episode length in seconds + use_ref_actions = False + + class safety: + # safety factors + pos_limit = 1.0 + vel_limit = 1.0 + torque_limit = 0.85 + + class asset(LeggedRobotCfg.asset): + name = "quadruped" + file = str(robot_urdf_path(name)) + + foot_name = ["Right_Back_Lower", "Left_Back_Lower", "Right_Front_Lower", "Left_Front_Lower"] + knee_name = ["Right_Back_Upper", "Left_Back_Upper", "Right_Front_Upper", "Left_Front_Upper"] + + termination_height = 0.13 # use termination contacts instead + default_feet_height = 0.05 + terminate_after_contacts_on = [ + "Right_Back_Upper", + "Left_Back_Upper", + "Right_Front_Upper", + "Left_Front_Upper", + "Body", + ] + + penalize_contacts_on = [] + self_collisions = 1 # 1 to disable, 0 to enable...bitwise filter + flip_visual_attachments = False + replace_cylinder_with_capsule = False + fix_base_link = False + + class terrain(LeggedRobotCfg.terrain): + mesh_type = "plane" + # mesh_type = 'trimesh' + curriculum = False + # rough terrain only: + measure_heights = False + static_friction = 0.6 + dynamic_friction = 0.6 + terrain_length = 8.0 + terrain_width = 8.0 + num_rows = 10 # number of terrain rows (levels) + num_cols = 10 # number of terrain cols (types) + max_init_terrain_level = 10 # starting curriculum state + # plane; obstacles; uniform; slope_up; slope_down, stair_up, stair_down + terrain_proportions = [0.2, 0.2, 0.4, 0.1, 0.1, 0, 0] + restitution = 0.0 + + class noise: + add_noise = False + noise_level = 0.6 # scales other values + + class noise_scales: + dof_pos = 0.05 + dof_vel = 0.5 + ang_vel = 0.1 + lin_vel = 0.05 + quat = 0.03 + height_measurements = 0.03 # was 0.1 + + class init_state(LeggedRobotCfg.init_state): + pos = [0.0, 0.0, Robot.height] + # setting the right rotation + # quat_from_euler_xyz(torch.tensor(1.57), torch.tensor(0), torch.tensor(-1.57)) + rot = Robot.rotation + + default_joint_angles = {k: 0.0 for k in Robot.all_joints()} + + default_positions = Robot.default_standing() + for joint in default_positions: + default_joint_angles[joint] = default_positions[joint] + + class control(LeggedRobotCfg.control): + # PD Drive parameters: + stiffness = Robot.stiffness() + damping = Robot.damping() + # action scale: target angle = actionScale * action + defaultAngle + action_scale = 0.25 + # decimation: Number of control action updates @ sim DT per policy DT + decimation = 10 # 100hz + + class sim(LeggedRobotCfg.sim): + dt = 0.001 # 1000 Hz + substeps = 1 # 2 + up_axis = 1 # 0 is y, 1 is z + + class physx(LeggedRobotCfg.sim.physx): + num_threads = 10 + solver_type = 1 # 0: pgs, 1: tgs + num_position_iterations = 4 + num_velocity_iterations = 1 + contact_offset = 0.01 # [m] + rest_offset = 0.0 # [m] + bounce_threshold_velocity = 0.1 # [m/s] + max_depenetration_velocity = 1.0 + max_gpu_contact_pairs = 2**23 # 2**24 -> needed for 8000 envs and more + default_buffer_size_multiplier = 5 + # 0: never, 1: last sub-step, 2: all sub-steps (default=2) + contact_collection = 2 + + class domain_rand(LeggedRobotCfg.domain_rand): + start_pos_noise = 0.00001 # changed for quad + randomize_friction = True + friction_range = [0.1, 2.0] + + randomize_base_mass = True # changed for quad + added_mass_range = [-0.0, 0.001] + push_robots = True + push_interval_s = 4 + max_push_vel_xy = 0.2 + max_push_ang_vel = 0.4 + dynamic_randomization = 0.05 + + class commands(LeggedRobotCfg.commands): + # Vers: lin_vel_x, lin_vel_y, ang_vel_yaw, heading (in heading mode ang_vel_yaw is recomputed from heading error) + num_commands = 4 + resampling_time = 8.0 # time before command are changed[s] + heading_command = True # if true: compute ang vel command from heading error + + class ranges: + lin_vel_x = [1.5, 5] # min max [m/s] + lin_vel_y = [-0.3, 0.3] # min max [m/s] + ang_vel_yaw = [-0.3, 0.3] # min max [rad/s] + heading = [-3.14, 3.14] + + class rewards: + soft_dof_pos_limit = 0.9 + base_height_target = 0.4 # 0.25 + target_feet_height = 0.05 # m + + # Additions from Unitree for ... + target_joint_pos_scale = 0.17 # rad, for compute_ref_state + cycle_time = 0.4 # sec, for compute_ref_state + tracking_sigma = 4 # 0.25 for unitree (but they used divide by tracking sigma) + max_contact_force = 100 # forces above this value are penalized + + class scales(LeggedRobotCfg.rewards.scales): + dof_pos_limits = -10.0 + + # Override from base class (qaudruped_robot_config) + + # standing---- + # base pos + default_joint_pos = 0.1 # 1 for the new quadruped formula for standing + orientation = 0.25 # 1 for standing + base_height = 0.2 + base_acc = 0.01 + + # energy + # action_smoothness = -0.002 # not used in quadruped + torques = -1e-5 + dof_vel = -5e-4 + dof_acc = -1e-7 + collision = -1.0 + + # walking --- + # gait + feet_air_time = 7 # 2.5 + # contact + feet_contact_forces = -0.01 + # vel tracking , main driver of forward motion!!! + tracking_lin_vel = 3 # 1.0 + tracking_ang_vel = 0.5 # 0.5 + + # only in unitree + lin_vel_z = -2.0 + action_rate = -0.01 + + # in unitree repo (so is in our base file, but not used)--- + # action_rate = -0.0 #used in unitree only + # stand_still = -0.0 #used in unitree only + + # NOT used in unitree---- + # joint_pos = 1.6 + # feet_clearance = 1.6 + # feet_contact_number = 1.2 + # gait + # foot_slip = -0.05 + # feet_distance = 0.2 + # knee_distance = 0.2 + # vel tracking + # vel_mismatch_exp = 0.5 # lin_z; ang x,y + # track_vel_hard = 0.5 + # feet_stumble = -0.0 #not used at all + + only_positive_rewards = ( + True # if true negative total rewards are clipped at zero (avoids early termination problems) + ) + + class normalization: + class obs_scales: + lin_vel = 2.0 + ang_vel = 1.0 + dof_pos = 1.0 + dof_vel = 0.05 + quat = 1.0 + height_measurements = 5.0 + + clip_observations = 18.0 + clip_actions = 18.0 + + class viewer: + ref_env = 0 + pos = [4, -4, 2] + lookat = [0, -2, 0] + + +class QuadrupedCfgPPO(LeggedRobotCfgPPO): + seed = 5 + runner_class_name = "OnPolicyRunner" # DWLOnPolicyRunner + + class policy: + init_noise_std = 0.3 # changed for quad + actor_hidden_dims = [512, 256, 128] + critic_hidden_dims = [768, 256, 128] + + class algorithm(LeggedRobotCfgPPO.algorithm): + entropy_coef = 0.001 + learning_rate = 1e-5 + num_learning_epochs = 2 + gamma = 0.994 + lam = 0.9 + num_mini_batches = 4 + + class runner: + policy_class_name = "ActorCritic" + algorithm_class_name = "PPO" + num_steps_per_env = 60 # per iteration + max_iterations = 5001 # number of policy updates + + # logging + save_interval = 100 # check for potential saves every this many iterations + experiment_name = "Quadruped" + run_name = "" + # load and resume + resume = False + load_run = -1 # -1 = last run + checkpoint = -1 # -1 = last saved model + resume_path = None # updated from load_run and chkpt diff --git a/sim/envs/humanoids/quadruped_env.py b/sim/envs/humanoids/quadruped_env.py new file mode 100644 index 00000000..751666cd --- /dev/null +++ b/sim/envs/humanoids/quadruped_env.py @@ -0,0 +1,503 @@ +# mypy: disable-error-code="valid-newtype" +"""Defines the environment for training the humanoid.""" + +from sim.envs.base.quadruped_robot import LeggedRobot +from sim.resources.quadruped.joints import Robot +from sim.utils.terrain import HumanoidTerrain + +from isaacgym import gymtorch # isort:skip +from isaacgym.torch_utils import * # isort: skip + + +import torch # isort:skip + + +class QuadrupedFreeEnv(LeggedRobot): + """StompyFreeEnv is a class that represents a custom environment for a legged robot. + + Args: + cfg: Configuration object for the legged robot. + sim_params: Parameters for the simulation. + physics_engine: Physics engin e used in the simulation. + sim_device: Device used for the simulation. + headless: Flag indicating whether the simulation should be run in headless mode. + + Attributes: + last_feet_z (float): The z-coordinate of the last feet position. + feet_height (torch.Tensor): Tensor representing the height of the feet. + sim (gymtorch.GymSim): The simulation object. + terrain (HumanoidTerrain): The terrain object. + up_axis_idx (int): The index representing the up axis. + command_input (torch.Tensor): Tensor representing the command input. + privileged_obs_buf (torch.Tensor): Tensor representing the privileged observations buffer. + obs_buf (torch.Tensor): Tensor representing the observations buffer. + obs_history (collections.deque): Deque containing the history of observations. + critic_history (collections.deque): Deque containing the history of critic observations. + + Methods: + _push_robots(): Randomly pushes the robots by setting a randomized base velocity. + _get_phase(): Calculates the phase of the gait cycle. + _get_gait_phase(): Calculates the gait phase. + compute_ref_state(): Computes the reference state. + create_sim(): Creates the simulation, terrain, and environments. + _get_noise_scale_vec(cfg): Sets a vector used to scale the noise added to the observations. + step(actions): Performs a simulation step with the given actions. + compute_observations(): Computes the observations. + reset_idx(env_ids): Resets the environment for the specified environment IDs. + """ + + def __init__(self, cfg, sim_params, physics_engine, sim_device, headless): + super().__init__(cfg, sim_params, physics_engine, sim_device, headless) + self.last_feet_z = self.cfg.asset.default_feet_height + self.feet_height = torch.zeros((self.num_envs, 4), device=self.device) + self.reset_idx(torch.tensor(range(self.num_envs), device=self.device)) + + env_handle = self.envs[0] + actor_handle = self.actor_handles[0] + + self.legs_joints = {} + for name, joint in Robot.legs.left.joints_motors(): + print(name) + joint_handle = self.gym.find_actor_dof_handle(env_handle, actor_handle, joint) + self.legs_joints["left_" + name] = joint_handle + + for name, joint in Robot.legs.right.joints_motors(): + joint_handle = self.gym.find_actor_dof_handle(env_handle, actor_handle, joint) + self.legs_joints["right_" + name] = joint_handle + + # For quadruped, treat "arms" as legs + for name, joint in Robot.arms.left.joints_motors(): + print(name) + joint_handle = self.gym.find_actor_dof_handle(env_handle, actor_handle, joint) + self.legs_joints["left_" + name] = joint_handle + + for name, joint in Robot.arms.right.joints_motors(): + joint_handle = self.gym.find_actor_dof_handle(env_handle, actor_handle, joint) + self.legs_joints["right_" + name] = joint_handle + + self.compute_observations() + + def _push_robots(self): + """Random pushes the robots. Emulates an impulse by setting a randomized base velocity.""" + max_vel = self.cfg.domain_rand.max_push_vel_xy + max_push_angular = self.cfg.domain_rand.max_push_ang_vel + self.rand_push_force[:, :2] = torch_rand_float( + -max_vel, max_vel, (self.num_envs, 2), device=self.device + ) # lin vel x/y + self.root_states[:, 7:9] = self.rand_push_force[:, :2] + + self.rand_push_torque = torch_rand_float( + -max_push_angular, max_push_angular, (self.num_envs, 3), device=self.device + ) + self.root_states[:, 10:13] = self.rand_push_torque + + self.gym.set_actor_root_state_tensor(self.sim, gymtorch.unwrap_tensor(self.root_states)) + + def _get_phase(self): + cycle_time = self.cfg.rewards.cycle_time + phase = self.episode_length_buf * self.dt / cycle_time + return phase + + def _get_gait_phase(self): + # return float mask 1 is stance, 0 is swing + phase = self._get_phase() + sin_pos = torch.sin(2 * torch.pi * phase) + # Add double support phase + stance_mask = torch.zeros((self.num_envs, 4), device=self.device) + # left foot stance + stance_mask[:, 0] = sin_pos >= 0 + stance_mask[:, 3] = sin_pos >= 0 + # right foot stance + stance_mask[:, 1] = sin_pos < 0 + stance_mask[:, 2] = sin_pos < 0 + # Double support phase + stance_mask[torch.abs(sin_pos) < 0.1] = 1 + + return stance_mask + + def check_termination(self): + """Check if environments need to be reset""" + self.reset_buf = torch.any( + torch.norm(self.contact_forces[:, self.termination_contact_indices, :], dim=-1) > 1.0, + dim=1, + ) + self.reset_buf |= self.root_states[:, 2] < self.cfg.asset.termination_height + self.time_out_buf = self.episode_length_buf > self.max_episode_length # no terminal reward for time-outs + self.reset_buf |= self.time_out_buf + + def compute_ref_state(self): + phase = self._get_phase() + sin_pos = torch.sin(2 * torch.pi * phase) + sin_pos_l = sin_pos.clone() + sin_pos_r = sin_pos.clone() + default_clone = self.default_dof_pos.clone() + self.ref_dof_pos = self.default_dof_pos.repeat(self.num_envs, 1) + scale_1 = self.cfg.rewards.target_joint_pos_scale + scale_2 = 2 * scale_1 + # left foot stance phase set to default joint pos + sin_pos_l[sin_pos_l > 0] = 0 + self.ref_dof_pos[:, self.legs_joints["left_hip_pitch"]] += sin_pos_l * scale_1 + self.ref_dof_pos[:, self.legs_joints["left_knee_pitch"]] += sin_pos_l * scale_2 + self.ref_dof_pos[:, self.legs_joints["left_elbow_pitch"]] += sin_pos_l * scale_1 + self.ref_dof_pos[:, self.legs_joints["left_shoulder_pitch"]] += sin_pos_l * scale_2 + # right foot stance phase set to default joint pos + sin_pos_r[sin_pos_r < 0] = 0 + self.ref_dof_pos[:, self.legs_joints["right_hip_pitch"]] += sin_pos_r * scale_1 + self.ref_dof_pos[:, self.legs_joints["right_knee_pitch"]] += sin_pos_r * scale_2 + self.ref_dof_pos[:, self.legs_joints["right_elbow_pitch"]] += sin_pos_r * scale_1 + self.ref_dof_pos[:, self.legs_joints["right_shoulder_pitch"]] += sin_pos_r * scale_2 + + # Double support phase + self.ref_dof_pos[torch.abs(sin_pos) < 0.1] = 0 + + self.ref_action = 2 * self.ref_dof_pos + + def create_sim(self): + """Creates simulation, terrain and evironments""" + self.up_axis_idx = 2 # 2 for z, 1 for y -> adapt gravity accordingly + self.sim = self.gym.create_sim( + self.sim_device_id, + self.graphics_device_id, + self.physics_engine, + self.sim_params, + ) + mesh_type = self.cfg.terrain.mesh_type + if mesh_type in ["heightfield", "trimesh"]: + self.terrain = HumanoidTerrain(self.cfg.terrain, self.num_envs) + if mesh_type == "plane": + self._create_ground_plane() + elif mesh_type == "heightfield": + self._create_heightfield() + elif mesh_type == "trimesh": + self._create_trimesh() + elif mesh_type is not None: + raise ValueError("Terrain mesh type not recognised. Allowed types are [None, plane, heightfield, trimesh]") + self._create_envs() + + def _get_noise_scale_vec(self, cfg): + """Sets a vector used to scale the noise added to the observations. + [NOTE]: Must be adapted when changing the observations structure + + Args: + cfg (Dict): Environment config file + + Returns: + [torch.Tensor]: Vector of scales used to multiply a uniform distribution in [-1, 1] + """ + num_actions = self.num_actions + noise_vec = torch.zeros(self.cfg.env.num_single_obs, device=self.device) + self.add_noise = self.cfg.noise.add_noise + noise_scales = self.cfg.noise.noise_scales + noise_vec[0:5] = 0.0 # commands + noise_vec[5 : (num_actions + 5)] = noise_scales.dof_pos * self.obs_scales.dof_pos + noise_vec[(num_actions + 5) : (2 * num_actions + 5)] = noise_scales.dof_vel * self.obs_scales.dof_vel + noise_vec[(2 * num_actions + 5) : (3 * num_actions + 5)] = 0.0 # previous actions + noise_vec[(3 * num_actions + 5) : (3 * num_actions + 5) + 3] = ( + noise_scales.ang_vel * self.obs_scales.ang_vel + ) # ang vel + noise_vec[(3 * num_actions + 5) + 3 : (3 * num_actions + 5)] = ( + noise_scales.quat * self.obs_scales.quat + ) # euler x,y + return noise_vec + + def step(self, actions): + if self.cfg.env.use_ref_actions: + actions += self.ref_action + # dynamic randomization + delay = torch.rand((self.num_envs, 1), device=self.device) + actions = (1 - delay) * actions + delay * self.actions + actions += self.cfg.domain_rand.dynamic_randomization * torch.randn_like(actions) * actions + return super().step(actions) + + def compute_observations(self): + phase = self._get_phase() + self.compute_ref_state() + + sin_pos = torch.sin(2 * torch.pi * phase).unsqueeze(1) + cos_pos = torch.cos(2 * torch.pi * phase).unsqueeze(1) + + stance_mask = self._get_gait_phase() + contact_mask = self.contact_forces[:, self.feet_indices, 2] > 5.0 + + self.command_input = torch.cat((sin_pos, cos_pos, self.commands[:, :3] * self.commands_scale), dim=1) + q = (self.dof_pos - self.default_dof_pos) * self.obs_scales.dof_pos + dq = self.dof_vel * self.obs_scales.dof_vel + + diff = self.dof_pos - self.ref_dof_pos + self.privileged_obs_buf = torch.cat( + ( + self.command_input, # 2 + 3 + (self.dof_pos - self.default_joint_pd_target) * self.obs_scales.dof_pos, # 12 + self.dof_vel * self.obs_scales.dof_vel, # 12 + self.actions, # 12 + diff, # 12 + self.base_lin_vel * self.obs_scales.lin_vel, # 3 + self.base_ang_vel * self.obs_scales.ang_vel, # 3 + self.base_euler_xyz * self.obs_scales.quat, # 3 + self.rand_push_force[:, :2], # 3 + self.rand_push_torque, # 3 + self.env_frictions, # 1 + self.body_mass / 30.0, # 1 + stance_mask, # 2 + contact_mask, # 2 + ), + dim=-1, + ) + + obs_buf = torch.cat( + ( + self.command_input, # 5 = 2D(sin cos) + 3D(vel_x, vel_y, aug_vel_yaw) + q, # 20D + dq, # 20D + self.actions, # 20D + self.base_ang_vel * self.obs_scales.ang_vel, # 3 + self.base_euler_xyz * self.obs_scales.quat, # 3 + ), + dim=-1, + ) + + if self.cfg.terrain.measure_heights: + heights = ( + torch.clip( + self.root_states[:, 2].unsqueeze(1) - 0.5 - self.measured_heights, + -1, + 1.0, + ) + * self.obs_scales.height_measurements + ) + self.privileged_obs_buf = torch.cat((self.obs_buf, heights), dim=-1) + + if self.add_noise: + obs_now = obs_buf.clone() + torch.randn_like(obs_buf) * self.noise_scale_vec * self.cfg.noise.noise_level + else: + obs_now = obs_buf.clone() + self.obs_history.append(obs_now) + self.critic_history.append(self.privileged_obs_buf) + + obs_buf_all = torch.stack([self.obs_history[i] for i in range(self.obs_history.maxlen)], dim=1) # N,T,K + + self.obs_buf = obs_buf_all.reshape(self.num_envs, -1) # N, T*K + self.privileged_obs_buf = torch.cat([self.critic_history[i] for i in range(self.cfg.env.c_frame_stack)], dim=1) + + def reset_idx(self, env_ids): + super().reset_idx(env_ids) + for i in range(self.obs_history.maxlen): + self.obs_history[i][env_ids] *= 0 + for i in range(self.critic_history.maxlen): + self.critic_history[i][env_ids] *= 0 + + # ================================================ Rewards ================================================== # + # changed from unitree repo + def _reward_orientation(self): + quat_mismatch = torch.exp(-torch.sum(torch.abs(self.base_euler_xyz[:, :2]), dim=1) * 15) + orientation = torch.exp(-torch.norm(self.projected_gravity[:, :2], dim=1) * 20) + return (quat_mismatch + orientation) / 2 + + # this function was not in unitree repo, added from humanoids repo + def _reward_default_joint_pos(self): + """Calculates the reward for keeping joint positions close to default positions + Should + """ + joint_diff = self.dof_pos - self.default_joint_pd_target + + left_pitch = joint_diff[ + :, + [ + self.legs_joints["left_hip_pitch"], + self.legs_joints["left_knee_pitch"], + self.legs_joints["left_elbow_pitch"], + self.legs_joints["left_shoulder_pitch"], + ], + ] + right_pitch = joint_diff[ + :, + [ + self.legs_joints["right_hip_pitch"], + self.legs_joints["right_knee_pitch"], + self.legs_joints["right_elbow_pitch"], + self.legs_joints["right_shoulder_pitch"], + ], + ] + pitch_dev = torch.norm(left_pitch, dim=1) + torch.norm(right_pitch, dim=1) + pitch_dev = torch.clamp(pitch_dev - 0.1, 0, 50) # deadzone of 0.1, max 50 min 0 + return torch.exp(-pitch_dev * 0.5) - 0.01 * torch.norm(joint_diff, dim=1) # + + # this function was not in unitree repo, added from humanoids repo + def _reward_base_acc(self): + """Computes the reward based on the base's acceleration. Penalizes high accelerations of the robot's base, + encouraging smoother motion. + """ + root_acc = self.last_root_vel - self.root_states[:, 7:13] + rew = torch.exp(-torch.norm(root_acc, dim=1) * 3) + return rew + + # FOR WALKING: + def _reward_lin_vel_z(self): + # Penalize z axis base linear velocity + return torch.square(self.base_lin_vel[:, 2]) + + def _reward_ang_vel_xy(self): + # Penalize xy axes base angular velocity + return torch.sum(torch.square(self.base_ang_vel[:, :2]), dim=1) + + # Changed from unitree repo + def _reward_base_height(self): + stance_mask = self._get_gait_phase() + measured_heights = torch.sum(self.rigid_state[:, self.feet_indices, 2] * stance_mask, dim=1) / torch.sum( + stance_mask, dim=1 + ) + base_height = self.root_states[:, 2] - (measured_heights - self.cfg.asset.default_feet_height) + reward = torch.exp(-torch.abs(base_height - self.cfg.rewards.base_height_target) * 10) + return reward + + def _reward_torques(self): + # Penalize torques + return torch.sum(torch.square(self.torques), dim=1) + + def _reward_dof_vel(self): + # Penalize dof velocities + return torch.sum(torch.square(self.dof_vel), dim=1) + + def _reward_dof_acc(self): + # Penalize dof accelerations + return torch.sum(torch.square((self.last_dof_vel - self.dof_vel) / self.dt), dim=1) + + def _reward_action_rate(self): + # Penalize changes in actions + return torch.sum(torch.square(self.last_actions - self.actions), dim=1) + + def _reward_collision(self): + # Penalize collisions on selected bodies + return torch.sum( + 1.0 * (torch.norm(self.contact_forces[:, self.penalised_contact_indices, :], dim=-1) > 0.1), dim=1 + ) + + def _reward_termination(self): + # Terminal reward / penalty + return self.reset_buf * ~self.time_out_buf + + def _reward_dof_pos_limits(self): + # Penalize dof positions too close to the limit + out_of_limits = -(self.dof_pos - self.dof_pos_limits[:, 0]).clip(max=0.0) # lower limit + out_of_limits += (self.dof_pos - self.dof_pos_limits[:, 1]).clip(min=0.0) + return torch.sum(out_of_limits, dim=1) + + def _reward_dof_vel_limits(self): + # Penalize dof velocities too close to the limit + # clip to max error = 1 rad/s per joint to avoid huge penalties + return torch.sum( + (torch.abs(self.dof_vel) - self.dof_vel_limits * self.cfg.rewards.soft_dof_vel_limit).clip( + min=0.0, max=1.0 + ), + dim=1, + ) + + def _reward_torque_limits(self): + # penalize torques too close to the limit + return torch.sum( + (torch.abs(self.torques) - self.torque_limits * self.cfg.rewards.soft_torque_limit).clip(min=0.0), dim=1 + ) + + # the main walking param + def _reward_tracking_lin_vel(self): + # Tracking of linear velocity commands (xy axes) + lin_vel_error = torch.sum( + torch.square(self.commands[:, :2] - self.base_lin_vel[:, :2]), dim=1 + ) # around 11 max error + return torch.exp(-(lin_vel_error * self.cfg.rewards.tracking_sigma * (1 / 10))) # tracking_sigma = 4 + + def _reward_tracking_ang_vel(self): + # Tracking of angular velocity commands (yaw) + ang_vel_error = torch.square(self.commands[:, 2] - self.base_ang_vel[:, 2]) # around 1 max error + return torch.exp(-(ang_vel_error * self.cfg.rewards.tracking_sigma)) + + def _reward_feet_air_time(self): + contact = self.contact_forces[:, self.feet_indices, 2] > 5.0 + stance_mask = self._get_gait_phase() + self.contact_filt = torch.logical_or(torch.logical_or(contact, stance_mask), self.last_contacts) + self.last_contacts = contact + first_contact = (self.feet_air_time > 0.0) * self.contact_filt + self.feet_air_time += self.dt + air_time = self.feet_air_time.clamp(0, 0.5) * first_contact + self.feet_air_time *= ~self.contact_filt + return air_time.sum(dim=1) + + def _reward_stumble(self): + # Penalize feet hitting vertical surfaces + return torch.any( + torch.norm(self.contact_forces[:, self.feet_indices, :2], dim=2) + > 5 * torch.abs(self.contact_forces[:, self.feet_indices, 2]), + dim=1, + ) + + def _reward_stand_still(self): + # Penalize motion at zero commands + return torch.sum(torch.abs(self.dof_pos - self.default_dof_pos), dim=1) * ( + torch.norm(self.commands[:, :2], dim=1) < 0.1 + ) + + def _reward_feet_contact_forces(self): + # penalize high contact forces + return torch.sum( + ( + torch.norm(self.contact_forces[:, self.feet_indices, :], dim=-1) - self.cfg.rewards.max_contact_force + ).clip(min=0.0), + dim=1, + ) + + # #Added from humanoids + + # def _reward_low_speed(self): + # """Rewards or penalizes the robot based on its speed relative to the commanded speed. + # This function checks if the robot is moving too slow, too fast, or at the desired speed, + # and if the movement direction matches the command. + # """ + # # Calculate the absolute value of speed and command for comparison + # absolute_speed = torch.abs(self.base_lin_vel[:, 0]) + # absolute_command = torch.abs(self.commands[:, 0]) + # # Define speed criteria for desired range + # speed_too_low = absolute_speed < 0.5 * absolute_command + # speed_too_high = absolute_speed > 1.2 * absolute_command + # speed_desired = ~(speed_too_low | speed_too_high) + + # # Check if the speed and command directions are mismatched + # sign_mismatch = torch.sign(self.base_lin_vel[:, 0]) != torch.sign(self.commands[:, 0]) + + # # Initialize reward tensor + # reward = torch.zeros_like(self.base_lin_vel[:, 0]) + + # # Assign rewards based on conditions + # # Speed too low + # reward[speed_too_low] = 1.0 #-1.0 + # # Speed too high + # reward[speed_too_high] = 1.5 #0.0 + # # Speed within desired range + # reward[speed_desired] = 3.0 #1.2 + # # Sign mismatch has the highest priority + # reward[sign_mismatch] = 0.0 #-2.0 + + # return reward * (self.commands[:, 0].abs() > 0.1) + + # #Added from humanoids + # def _reward_feet_clearance(self): + # """Calculates reward based on the clearance of the swing leg from the ground during movement. + # Encourages appropriate lift of the feet during the swing phase of the gait. + # """ + # # Compute feet contact mask + # contact = self.contact_forces[:, self.feet_indices, 2] > 5.0 + + # # Get the z-position of the feet and compute the change in z-position + # feet_z = self.rigid_state[:, self.feet_indices, 2] - self.cfg.asset.default_feet_height + # delta_z = feet_z - self.last_feet_z + # self.feet_height += delta_z + # self.last_feet_z = feet_z + + # # Compute swing mask + # swing_mask = 1 - self._get_gait_phase() + + # # feet height should be closed to target feet height at the peak + # rew_pos = torch.abs(self.feet_height - self.cfg.rewards.target_feet_height) < 0.02 + # rew_pos = torch.sum(rew_pos * swing_mask, dim=1) + # self.feet_height *= ~contact + + # return rew_pos diff --git a/sim/play.py b/sim/play.py index 9f215534..7f7d556a 100755 --- a/sim/play.py +++ b/sim/play.py @@ -17,9 +17,10 @@ import cv2 import h5py import numpy as np -from isaacgym import gymapi from tqdm import tqdm +from isaacgym import gymapi + logger = logging.getLogger(__name__) from sim.env import run_dir # noqa: E402 @@ -171,7 +172,7 @@ def play(args: argparse.Namespace) -> None: dset_actions[t] = actions.detach().numpy() if FIX_COMMAND: - env.commands[:, 0] = 0.5 + env.commands[:, 0] = 0.5 # 0 for standing policy env.commands[:, 1] = 0.0 env.commands[:, 2] = 0.0 env.commands[:, 3] = 0.0 diff --git a/sim/resources/quadruped/joints.py b/sim/resources/quadruped/joints.py new file mode 100644 index 00000000..a05df2ce --- /dev/null +++ b/sim/resources/quadruped/joints.py @@ -0,0 +1,261 @@ +"""Defines a more Pythonic interface for specifying the joint names. + +The best way to re-generate this snippet for a new robot is to use the +`sim/scripts/print_joints.py` script. This script will print out a hierarchical +tree of the various joint names in the robot. + +The OUTPUT: +Left + Elbow_Pitch: Left_Elbow_Pitch + Hip_Pitch: Left_Hip_Pitch + Knee_Pitch: Left_Knee_Pitch + Shoulder_Pitch: Left_Shoulder_Pitch +Right + Elbow_Pitch: Right_Elbow_Pitch + Hip_Pitch: Right_Hip_Pitch + Knee_Pitch: Right_Knee_Pitch + Shoulder_Pitch: Right_Shoulder_Pitch +""" + +import textwrap +from abc import ABC +from typing import Dict, List, Tuple, Union + + +class Node(ABC): + @classmethod + def children(cls) -> List["Union[Node, str]"]: + return [ + attr + for attr in (getattr(cls, attr) for attr in dir(cls) if not attr.startswith("__")) + if isinstance(attr, (Node, str)) + ] + + @classmethod + def joints(cls) -> List[str]: + return [ + attr + for attr in (getattr(cls, attr) for attr in dir(cls) if not attr.startswith("__")) + if isinstance(attr, str) + ] + + @classmethod + def joints_motors(cls) -> List[Tuple[str, str]]: + joint_names: List[Tuple[str, str]] = [] + for attr in dir(cls): + if not attr.startswith("__"): + attr2 = getattr(cls, attr) + if isinstance(attr2, str): + joint_names.append((attr, attr2)) + + return joint_names + + @classmethod + def all_joints(cls) -> List[str]: + leaves = cls.joints() + for child in cls.children(): + if isinstance(child, Node): + leaves.extend(child.all_joints()) + return leaves + + def __str__(self) -> str: + parts = [str(child) for child in self.children()] + parts_str = textwrap.indent("\n" + "\n".join(parts), " ") + return f"[{self.__class__.__name__}]{parts_str}" + + +# class LeftHand(Node): +# wrist_roll = "left hand roll" +# gripper = "left hand gripper" + + +class LeftArm(Node): + shoulder_pitch = "Left_Shoulder_Pitch" + elbow_pitch = "Left_Elbow_Pitch" + + +# class RightHand(Node): +# wrist_roll = "right hand roll" +# gripper = "right hand gripper" + + +class RightArm(Node): + shoulder_pitch = "Right_Shoulder_Pitch" + elbow_pitch = "Right_Elbow_Pitch" + + +class LeftLeg(Node): + hip_pitch = "Left_Hip_Pitch" + knee_pitch = "Left_Knee_Pitch" + + +class RightLeg(Node): + hip_pitch = "Right_Hip_Pitch" + knee_pitch = "Right_Knee_Pitch" + + +class Legs(Node): + left = LeftLeg() + right = RightLeg() + + +class Arms(Node): + left = LeftArm() + right = RightArm() + + +class Robot(Node): + # STEP TWO + height = 0.256 + rotation = [0.5000, -0.4996, -0.5000, 0.5004] # Which orientation the robot itself is + collision_links = [ + "lower_half_assembly_1_left_leg_1_foot_pad_1_simple", + "lower_half_assembly_1_right_leg_1_foot_pad_1_simple", + ] # not important/ no need to update? + + arms = Arms() + legs = Legs() + + @classmethod + def default_standing(cls) -> Dict[str, float]: + # Go to mujoco and get joint positions when standing pose + return { + # arms + Robot.arms.left.shoulder_pitch: 0.409, + Robot.arms.left.elbow_pitch: -0.977, + Robot.arms.right.shoulder_pitch: 0.312, + Robot.arms.right.elbow_pitch: 0.81, + # legs + Robot.legs.left.hip_pitch: 0.55, + Robot.legs.left.knee_pitch: -0.684, + Robot.legs.right.hip_pitch: 0.699, + Robot.legs.right.knee_pitch: -1.01, + } + + @classmethod + def default_limits2(cls) -> Dict[str, Dict[str, float]]: + return { + # left arm + Robot.arms.left.Left_Shoulder_Pitch: { + "lower": 0, + "upper": 1.22, + }, + Robot.arms.left.Left_Elbow_Pitch: { + "lower": -1.40, + "upper": 1.40, + }, + # right arm + Robot.arms.right.Right_Shoulder_Pitch: { + "lower": 0, + "upper": 1.22, + }, + Robot.arms.right.Right_Elbow_Pitch: { + "lower": -1.40, + "upper": 1.40, + }, + # left leg + Robot.legs.left.Left_Hip_Pitch: { + "lower": 0, + "upper": 1.57, + }, + Robot.legs.left.Left_Knee_Pitch: { + "lower": -1.40, + "upper": 1.40, + }, + # right leg + Robot.legs.right.Right_Hip_Pitch: { + "lower": 0, + "upper": 1.57, + }, + Robot.legs.right.Right_Knee_Pitch: { + "lower": -1.40, + "upper": 1.40, + }, + } + + @classmethod + def default_limits(cls) -> Dict[str, Dict[str, float]]: + return { + # left arm + Robot.arms.left.Left_Shoulder_Pitch: { + "lower": 0, + "upper": 1.22, + }, + Robot.arms.left.Left_Elbow_Pitch: { + "lower": -1.40, + "upper": 1.40, + }, + # right arm + Robot.arms.right.Right_Shoulder_Pitch: { + "lower": 0, + "upper": 1.22, + }, + Robot.arms.right.Right_Elbow_Pitch: { + "lower": -1.40, + "upper": 1.40, + }, + # left leg + Robot.legs.left.Left_Hip_Pitch: { + "lower": 0, + "upper": 1.22, + }, + Robot.legs.left.Left_Knee_Pitch: { + "lower": -1.40, + "upper": 1.40, + }, + # right leg + Robot.legs.right.Right_Hip_Pitch: { + "lower": 0, + "upper": 1.22, + }, + Robot.legs.right.Right_Knee_Pitch: { + "lower": -1.40, + "upper": 1.40, + }, + } + + # for quad + # p_gains + @classmethod + def stiffness(cls) -> Dict[str, float]: + return {"Elbow_Pitch": 20, "Shoulder_Pitch": 20, "Hip_Pitch": 20, "Knee_Pitch": 20} # 20 for quad + + # d_gains + @classmethod + def damping(cls) -> Dict[str, float]: + return {"Elbow_Pitch": 0.5, "Shoulder_Pitch": 0.5, "Hip_Pitch": 0.5, "Knee_Pitch": 0.5} # 0.5 for quad + + # pos_limits + @classmethod + def effort(cls) -> Dict[str, float]: + return {"Elbow_Pitch": 5, "Shoulder_Pitch": 5, "Hip_Pitch": 5, "Knee_Pitch": 5} + + # vel_limits + @classmethod + def velocity(cls) -> Dict[str, float]: + return { + "Elbow_Pitch": 5, + "Shoulder_Pitch": 5, + "Hip_Pitch": 5, + "Knee_Pitch": 5, + } + + @classmethod + def friction(cls) -> Dict[str, float]: + return { + "Elbow_Pitch": 0, + "Shoulder_Pitch": 0, + "Hip_Pitch": 0, + "Knee_Pitch": 0, + } + + +def print_joints() -> None: + joints = Robot.all_joints() + assert len(joints) == len(set(joints)), "Duplicate joint names found!" + print(Robot()) + + +if __name__ == "__main__": + # python -m sim.Robot.joints + print_joints() diff --git a/sim/resources/quadruped/robot.urdf b/sim/resources/quadruped/robot.urdf new file mode 100644 index 00000000..9536b9f6 --- /dev/null +++ b/sim/resources/quadruped/robot.urdf @@ -0,0 +1,257 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/sim/resources/quadruped/robot_fixed.urdf b/sim/resources/quadruped/robot_fixed.urdf new file mode 100644 index 00000000..9536b9f6 --- /dev/null +++ b/sim/resources/quadruped/robot_fixed.urdf @@ -0,0 +1,257 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/sim/resources/stompymicro/joints.py b/sim/resources/stompymicro/joints.py index 65ff0083..c11ed847 100644 --- a/sim/resources/stompymicro/joints.py +++ b/sim/resources/stompymicro/joints.py @@ -61,7 +61,7 @@ class LeftArm(Node): shoulder_yaw = "left_shoulder_yaw" shoulder_pitch = "left_shoulder_pitch" # shoulder_roll = "left_shoulder_roll" - elbow_pitch = "left_elbow_yaw" # FIXME: yaw vs pitch + elbow_pitch = "left_elbow_yaw" # FIXME: yaw vs pitch # hand = LeftHand() @@ -83,7 +83,7 @@ class LeftLeg(Node): hip_yaw = "left_hip_yaw" hip_pitch = "left_hip_pitch" knee_pitch = "left_knee_pitch" - ankle_pitch = "left_ankle_pitch" + ankle_pitch = "left_ankle_pitch" class RightLeg(Node): @@ -91,7 +91,7 @@ class RightLeg(Node): hip_yaw = "right_hip_yaw" hip_pitch = "right_hip_pitch" knee_pitch = "right_knee_pitch" - ankle_pitch = "right_ankle_pitch" + ankle_pitch = "right_ankle_pitch" class Legs(Node): diff --git a/sim/scripts/calibration.py b/sim/scripts/calibration.py index 3cfc6673..7a8d22eb 100755 --- a/sim/scripts/calibration.py +++ b/sim/scripts/calibration.py @@ -16,9 +16,9 @@ # Importing torch down here to avoid gymtorch issues. import torch # noqa: E402 # type: ignore[import] -from isaacgym import gymapi, gymtorch, gymutil from matplotlib import pyplot as plt +from isaacgym import gymapi, gymtorch, gymutil from sim.env import robot_urdf_path from sim.resources.stompymini.joints import Robot as Stompy diff --git a/sim/scripts/simulate_urdf.py b/sim/scripts/simulate_urdf.py index 93152d65..1d17c7a3 100755 --- a/sim/scripts/simulate_urdf.py +++ b/sim/scripts/simulate_urdf.py @@ -12,7 +12,6 @@ from typing import Any, Dict, Literal, NewType from isaacgym import gymapi, gymtorch, gymutil - from sim.env import robot_urdf_path from sim.resources.stompymini.joints import Robot as Stompy diff --git a/sim/utils/terrain.py b/sim/utils/terrain.py index e7593991..e9600e9e 100755 --- a/sim/utils/terrain.py +++ b/sim/utils/terrain.py @@ -30,6 +30,7 @@ # Copyright (c) 2024 Beijing RobotEra TECHNOLOGY CO.,LTD. All rights reserved. import numpy as np + from isaacgym import terrain_utils