diff --git a/docs/misc/changelog.rst b/docs/misc/changelog.rst index e59be461c5..0b23c5f407 100644 --- a/docs/misc/changelog.rst +++ b/docs/misc/changelog.rst @@ -71,6 +71,8 @@ Bug Fixes: Only ``TRPO`` and ``PPO1`` update it differently (after synchronization) because they rely on MPI - Fixed bug in ``TRPO`` with NaN standardized advantages (@richardwu) - Fixed partial minibatch computation in ExpertDataset (@richardwu) +- Fixed normalization (with ``VecNormalize``) for off-policy algorithms +- Fixed ``sync_envs_normalization`` to sync the reward normalization too Deprecations: ^^^^^^^^^^^^^ diff --git a/stable_baselines/common/base_class.py b/stable_baselines/common/base_class.py index b107f298b8..e19f26b69b 100644 --- a/stable_baselines/common/base_class.py +++ b/stable_baselines/common/base_class.py @@ -16,7 +16,8 @@ from stable_baselines.common.save_util import data_to_json, json_to_data, params_to_bytes, bytes_to_params from stable_baselines.common.policies import get_policy_from_name, ActorCriticPolicy from stable_baselines.common.runners import AbstractEnvRunner -from stable_baselines.common.vec_env import VecEnvWrapper, VecEnv, DummyVecEnv +from stable_baselines.common.vec_env import (VecEnvWrapper, VecEnv, DummyVecEnv, + VecNormalize, unwrap_vec_normalize) from stable_baselines.common.callbacks import BaseCallback, CallbackList, ConvertCallback from stable_baselines import logger @@ -91,6 +92,9 @@ def __init__(self, policy, env, verbose=0, *, requires_vec_env, policy_base, " environment.") self.n_envs = 1 + # Get VecNormalize object if it exists + self._vec_normalize_env = unwrap_vec_normalize(self.env) + def get_env(self): """ returns the current environment (can be None if not defined) @@ -99,6 +103,15 @@ def get_env(self): """ return self.env + def get_vec_normalize_env(self) -> Optional[VecNormalize]: + """ + Return the ``VecNormalize`` wrapper of the training env + if it exists. + + :return: Optional[VecNormalize] The ``VecNormalize`` env. + """ + return self._vec_normalize_env + def set_env(self, env): """ Checks the validity of the environment, and if it is coherent, set it as the current environment. @@ -142,6 +155,7 @@ def set_env(self, env): self.n_envs = 1 self.env = env + self._vec_normalize_env = unwrap_vec_normalize(env) # Invalidated by environment change. self.episode_reward = None diff --git a/stable_baselines/common/buffers.py b/stable_baselines/common/buffers.py index 0d864875c7..ec9eb2ba6b 100644 --- a/stable_baselines/common/buffers.py +++ b/stable_baselines/common/buffers.py @@ -1,12 +1,14 @@ import random +from typing import Optional, List, Union import numpy as np from stable_baselines.common.segment_tree import SumSegmentTree, MinSegmentTree +from stable_baselines.common.vec_env import VecNormalize class ReplayBuffer(object): - def __init__(self, size): + def __init__(self, size: int): """ Implements a ring buffer (FIFO). @@ -17,7 +19,7 @@ def __init__(self, size): self._maxsize = size self._next_idx = 0 - def __len__(self): + def __len__(self) -> int: return len(self._storage) @property @@ -26,11 +28,11 @@ def storage(self): return self._storage @property - def buffer_size(self): + def buffer_size(self) -> int: """float: Max capacity of the buffer""" return self._maxsize - def can_sample(self, n_samples): + def can_sample(self, n_samples: int) -> bool: """ Check if n_samples samples can be sampled from the buffer. @@ -40,7 +42,7 @@ def can_sample(self, n_samples): """ return len(self) >= n_samples - def is_full(self): + def is_full(self) -> int: """ Check whether the replay buffer is full or not. @@ -86,7 +88,27 @@ def extend(self, obs_t, action, reward, obs_tp1, done): self._storage[self._next_idx] = data self._next_idx = (self._next_idx + 1) % self._maxsize - def _encode_sample(self, idxes): + @staticmethod + def _normalize_obs(obs: np.ndarray, + env: Optional[VecNormalize] = None) -> np.ndarray: + """ + Helper for normalizing the observation. + """ + if env is not None: + return env.normalize_obs(obs) + return obs + + @staticmethod + def _normalize_reward(reward: np.ndarray, + env: Optional[VecNormalize] = None) -> np.ndarray: + """ + Helper for normalizing the reward. + """ + if env is not None: + return env.normalize_reward(reward) + return reward + + def _encode_sample(self, idxes: Union[List[int], np.ndarray], env: Optional[VecNormalize] = None): obses_t, actions, rewards, obses_tp1, dones = [], [], [], [], [] for i in idxes: data = self._storage[i] @@ -96,13 +118,19 @@ def _encode_sample(self, idxes): rewards.append(reward) obses_tp1.append(np.array(obs_tp1, copy=False)) dones.append(done) - return np.array(obses_t), np.array(actions), np.array(rewards), np.array(obses_tp1), np.array(dones) + return (self._normalize_obs(np.array(obses_t), env), + np.array(actions), + self._normalize_reward(np.array(rewards), env), + self._normalize_obs(np.array(obses_tp1), env), + np.array(dones)) - def sample(self, batch_size, **_kwargs): + def sample(self, batch_size: int, env: Optional[VecNormalize] = None, **_kwargs): """ Sample a batch of experiences. :param batch_size: (int) How many transitions to sample. + :param env: (Optional[VecNormalize]) associated gym VecEnv + to normalize the observations/rewards when sampling :return: - obs_batch: (np.ndarray) batch of observations - act_batch: (numpy float) batch of actions executed given obs_batch @@ -112,7 +140,7 @@ def sample(self, batch_size, **_kwargs): and 0 otherwise. """ idxes = [random.randint(0, len(self._storage) - 1) for _ in range(batch_size)] - return self._encode_sample(idxes) + return self._encode_sample(idxes, env=env) class PrioritizedReplayBuffer(ReplayBuffer): @@ -181,7 +209,7 @@ def _sample_proportional(self, batch_size): idx = self._it_sum.find_prefixsum_idx(mass) return idx - def sample(self, batch_size, beta=0): + def sample(self, batch_size: int, beta: float = 0, env: Optional[VecNormalize] = None): """ Sample a batch of experiences. @@ -191,6 +219,8 @@ def sample(self, batch_size, beta=0): :param batch_size: (int) How many transitions to sample. :param beta: (float) To what degree to use importance weights (0 - no corrections, 1 - full correction) + :param env: (Optional[VecNormalize]) associated gym VecEnv + to normalize the observations/rewards when sampling :return: - obs_batch: (np.ndarray) batch of observations - act_batch: (numpy float) batch of actions executed given obs_batch @@ -210,7 +240,7 @@ def sample(self, batch_size, beta=0): max_weight = (p_min * len(self._storage)) ** (-beta) p_sample = self._it_sum[idxes] / self._it_sum.sum() weights = (p_sample * len(self._storage)) ** (-beta) / max_weight - encoded_sample = self._encode_sample(idxes) + encoded_sample = self._encode_sample(idxes, env=env) return tuple(list(encoded_sample) + [weights, idxes]) def update_priorities(self, idxes, priorities): @@ -232,4 +262,3 @@ def update_priorities(self, idxes, priorities): self._it_min[idxes] = priorities ** self._alpha self._max_priority = max(self._max_priority, np.max(priorities)) - diff --git a/stable_baselines/common/vec_env/__init__.py b/stable_baselines/common/vec_env/__init__.py index bde160e2be..e3343f5f66 100644 --- a/stable_baselines/common/vec_env/__init__.py +++ b/stable_baselines/common/vec_env/__init__.py @@ -43,8 +43,9 @@ def sync_envs_normalization(env: Union[gym.Env, VecEnv], eval_env: Union[gym.Env return while isinstance(env_tmp, VecEnvWrapper): if isinstance(env_tmp, VecNormalize): - # No need to sync the reward scaling + # sync reward and observation scaling eval_env_tmp.obs_rms = deepcopy(env_tmp.obs_rms) + eval_env_tmp.ret_rms = deepcopy(env_tmp.ret_rms) env_tmp = env_tmp.venv # Make pytype happy, in theory env and eval_env have the same type assert isinstance(eval_env_tmp, VecEnvWrapper), "the second env differs from the first env" diff --git a/stable_baselines/ddpg/ddpg.py b/stable_baselines/ddpg/ddpg.py index 2835538dac..c7665dbbc9 100644 --- a/stable_baselines/ddpg/ddpg.py +++ b/stable_baselines/ddpg/ddpg.py @@ -650,7 +650,8 @@ def _train_step(self, step, writer, log=False): :return: (float, float) critic loss, actor loss """ # Get a batch - obs, actions, rewards, next_obs, terminals = self.replay_buffer.sample(batch_size=self.batch_size) + obs, actions, rewards, next_obs, terminals = self.replay_buffer.sample(batch_size=self.batch_size, + env=self._vec_normalize_env) # Reshape to match previous behavior and placeholder shape rewards = rewards.reshape(-1, 1) terminals = terminals.reshape(-1, 1) @@ -735,7 +736,8 @@ def _get_stats(self): if self.stats_sample is None: # Get a sample and keep that fixed for all further computations. # This allows us to estimate the change in value for the same set of inputs. - obs, actions, rewards, next_obs, terminals = self.replay_buffer.sample(batch_size=self.batch_size) + obs, actions, rewards, next_obs, terminals = self.replay_buffer.sample(batch_size=self.batch_size, + env=self._vec_normalize_env) self.stats_sample = { 'obs': obs, 'actions': actions, @@ -777,7 +779,7 @@ def _adapt_param_noise(self): return 0. # Perturb a separate copy of the policy to adjust the scale for the next "real" perturbation. - obs, *_ = self.replay_buffer.sample(batch_size=self.batch_size) + obs, *_ = self.replay_buffer.sample(batch_size=self.batch_size, env=self._vec_normalize_env) self.sess.run(self.perturb_adaptive_policy_ops, feed_dict={ self.param_noise_stddev: self.param_noise.current_stddev, }) @@ -832,6 +834,9 @@ def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="D # Prepare everything. self._reset() obs = self.env.reset() + # Retrieve unnormalized observation for saving into the buffer + if self._vec_normalize_env is not None: + obs_ = self._vec_normalize_env.get_original_obs().squeeze() eval_obs = None if self.eval_env is not None: eval_obs = self.eval_env.reset() @@ -894,23 +899,37 @@ def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="D callback.on_training_end() return self - if writer is not None: - ep_rew = np.array([reward]).reshape((1, -1)) - ep_done = np.array([done]).reshape((1, -1)) - tf_util.total_episode_reward_logger(self.episode_reward, ep_rew, ep_done, - writer, self.num_timesteps) step += 1 total_steps += 1 if rank == 0 and self.render: self.env.render() - episode_reward += reward - episode_step += 1 # Book-keeping. epoch_actions.append(action) epoch_qs.append(q_value) - self._store_transition(obs, action, reward, new_obs, done) + + # Store only the unnormalized version + if self._vec_normalize_env is not None: + new_obs_ = self._vec_normalize_env.get_original_obs().squeeze() + reward_ = self._vec_normalize_env.get_original_reward().squeeze() + else: + # Avoid changing the original ones + obs_, new_obs_, reward_ = obs, new_obs, reward + + self._store_transition(obs_, action, reward_, new_obs_, done) obs = new_obs + # Save the unnormalized observation + if self._vec_normalize_env is not None: + obs_ = new_obs_ + + episode_reward += reward_ + episode_step += 1 + + if writer is not None: + ep_rew = np.array([reward_]).reshape((1, -1)) + ep_done = np.array([done]).reshape((1, -1)) + tf_util.total_episode_reward_logger(self.episode_reward, ep_rew, ep_done, + writer, self.num_timesteps) if done: # Episode done. diff --git a/stable_baselines/deepq/dqn.py b/stable_baselines/deepq/dqn.py index e451763171..6c2491f510 100644 --- a/stable_baselines/deepq/dqn.py +++ b/stable_baselines/deepq/dqn.py @@ -190,6 +190,9 @@ def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="D reset = True obs = self.env.reset() + # Retrieve unnormalized observation for saving into the buffer + if self._vec_normalize_env is not None: + obs_ = self._vec_normalize_env.get_original_obs().squeeze() for _ in range(total_timesteps): # Take action and update exploration to the newest value @@ -221,17 +224,27 @@ def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="D if callback.on_step() is False: break + # Store only the unnormalized version + if self._vec_normalize_env is not None: + new_obs_ = self._vec_normalize_env.get_original_obs().squeeze() + reward_ = self._vec_normalize_env.get_original_reward().squeeze() + else: + # Avoid changing the original ones + obs_, new_obs_, reward_ = obs, new_obs, rew # Store transition in the replay buffer. - self.replay_buffer.add(obs, action, rew, new_obs, float(done)) + self.replay_buffer.add(obs_, action, reward_, new_obs_, float(done)) obs = new_obs + # Save the unnormalized observation + if self._vec_normalize_env is not None: + obs_ = new_obs_ if writer is not None: - ep_rew = np.array([rew]).reshape((1, -1)) + ep_rew = np.array([reward_]).reshape((1, -1)) ep_done = np.array([done]).reshape((1, -1)) tf_util.total_episode_reward_logger(self.episode_reward, ep_rew, ep_done, writer, self.num_timesteps) - episode_rewards[-1] += rew + episode_rewards[-1] += reward_ if done: maybe_is_success = info.get('is_success') if maybe_is_success is not None: @@ -254,10 +267,12 @@ def learn(self, total_timesteps, callback=None, log_interval=100, tb_log_name="D assert self.beta_schedule is not None, \ "BUG: should be LinearSchedule when self.prioritized_replay True" experience = self.replay_buffer.sample(self.batch_size, - beta=self.beta_schedule.value(self.num_timesteps)) + beta=self.beta_schedule.value(self.num_timesteps), + env=self._vec_normalize_env) (obses_t, actions, rewards, obses_tp1, dones, weights, batch_idxes) = experience else: - obses_t, actions, rewards, obses_tp1, dones = self.replay_buffer.sample(self.batch_size) + obses_t, actions, rewards, obses_tp1, dones = self.replay_buffer.sample(self.batch_size, + env=self._vec_normalize_env) weights, batch_idxes = np.ones_like(rewards), None # pytype:enable=bad-unpacking diff --git a/stable_baselines/sac/sac.py b/stable_baselines/sac/sac.py index a8806a3bd6..710fda60f3 100644 --- a/stable_baselines/sac/sac.py +++ b/stable_baselines/sac/sac.py @@ -317,7 +317,7 @@ def setup_model(self): def _train_step(self, step, writer, learning_rate): # Sample a batch from the replay buffer - batch = self.replay_buffer.sample(self.batch_size) + batch = self.replay_buffer.sample(self.batch_size, env=self._vec_normalize_env) batch_obs, batch_actions, batch_rewards, batch_next_obs, batch_dones = batch feed_dict = { @@ -378,6 +378,10 @@ def learn(self, total_timesteps, callback=None, if self.action_noise is not None: self.action_noise.reset() obs = self.env.reset() + # Retrieve unnormalized observation for saving into the buffer + if self._vec_normalize_env is not None: + obs_ = self._vec_normalize_env.get_original_obs().squeeze() + n_updates = 0 infos_values = [] @@ -414,9 +418,20 @@ def learn(self, total_timesteps, callback=None, if callback.on_step() is False: break + # Store only the unnormalized version + if self._vec_normalize_env is not None: + new_obs_ = self._vec_normalize_env.get_original_obs().squeeze() + reward_ = self._vec_normalize_env.get_original_reward().squeeze() + else: + # Avoid changing the original ones + obs_, new_obs_, reward_ = obs, new_obs, reward + # Store transition in the replay buffer. - self.replay_buffer.add(obs, action, reward, new_obs, float(done)) + self.replay_buffer.add(obs_, action, reward_, new_obs_, float(done)) obs = new_obs + # Save the unnormalized observation + if self._vec_normalize_env is not None: + obs_ = new_obs_ # Retrieve reward and episode length if using Monitor wrapper maybe_ep_info = info.get('episode') @@ -425,7 +440,7 @@ def learn(self, total_timesteps, callback=None, if writer is not None: # Write reward per episode to tensorboard - ep_reward = np.array([reward]).reshape((1, -1)) + ep_reward = np.array([reward_]).reshape((1, -1)) ep_done = np.array([done]).reshape((1, -1)) tf_util.total_episode_reward_logger(self.episode_reward, ep_reward, ep_done, writer, self.num_timesteps) @@ -457,7 +472,7 @@ def learn(self, total_timesteps, callback=None, callback.on_rollout_start() - episode_rewards[-1] += reward + episode_rewards[-1] += reward_ if done: if self.action_noise is not None: self.action_noise.reset() diff --git a/stable_baselines/td3/td3.py b/stable_baselines/td3/td3.py index 29737a42bc..1119a9f7f4 100644 --- a/stable_baselines/td3/td3.py +++ b/stable_baselines/td3/td3.py @@ -242,7 +242,7 @@ def setup_model(self): def _train_step(self, step, writer, learning_rate, update_policy): # Sample a batch from the replay buffer - batch = self.replay_buffer.sample(self.batch_size) + batch = self.replay_buffer.sample(self.batch_size, env=self._vec_normalize_env) batch_obs, batch_actions, batch_rewards, batch_next_obs, batch_dones = batch feed_dict = { @@ -298,6 +298,9 @@ def learn(self, total_timesteps, callback=None, if self.action_noise is not None: self.action_noise.reset() obs = self.env.reset() + # Retrieve unnormalized observation for saving into the buffer + if self._vec_normalize_env is not None: + obs_ = self._vec_normalize_env.get_original_obs().squeeze() n_updates = 0 infos_values = [] @@ -334,9 +337,20 @@ def learn(self, total_timesteps, callback=None, if callback.on_step() is False: break + # Store only the unnormalized version + if self._vec_normalize_env is not None: + new_obs_ = self._vec_normalize_env.get_original_obs().squeeze() + reward_ = self._vec_normalize_env.get_original_reward().squeeze() + else: + # Avoid changing the original ones + obs_, new_obs_, reward_ = obs, new_obs, reward + # Store transition in the replay buffer. - self.replay_buffer.add(obs, action, reward, new_obs, float(done)) + self.replay_buffer.add(obs_, action, reward_, new_obs_, float(done)) obs = new_obs + # Save the unnormalized observation + if self._vec_normalize_env is not None: + obs_ = new_obs_ # Retrieve reward and episode length if using Monitor wrapper maybe_ep_info = info.get('episode') @@ -345,7 +359,7 @@ def learn(self, total_timesteps, callback=None, if writer is not None: # Write reward per episode to tensorboard - ep_reward = np.array([reward]).reshape((1, -1)) + ep_reward = np.array([reward_]).reshape((1, -1)) ep_done = np.array([done]).reshape((1, -1)) tf_util.total_episode_reward_logger(self.episode_reward, ep_reward, ep_done, writer, self.num_timesteps) @@ -378,7 +392,7 @@ def learn(self, total_timesteps, callback=None, callback.on_rollout_start() - episode_rewards[-1] += reward + episode_rewards[-1] += reward_ if done: if self.action_noise is not None: self.action_noise.reset() diff --git a/tests/test_vec_normalize.py b/tests/test_vec_normalize.py index 5a0d216e6c..b98c99f7d7 100644 --- a/tests/test_vec_normalize.py +++ b/tests/test_vec_normalize.py @@ -2,7 +2,9 @@ import gym import numpy as np +import pytest +from stable_baselines import DDPG, DQN, SAC, TD3 from stable_baselines.common.running_mean_std import RunningMeanStd from stable_baselines.common.vec_env import (DummyVecEnv, VecNormalize, VecFrameStack, sync_envs_normalization, unwrap_vec_normalize) @@ -123,6 +125,20 @@ def test_normalize_external(): assert obs.shape == norm_obs.shape +@pytest.mark.parametrize("model_class", [DDPG, DQN, SAC, TD3]) +def test_offpolicy_normalization(model_class): + if model_class == DQN: + env = DummyVecEnv([lambda: gym.make('CartPole-v1')]) + else: + env = DummyVecEnv([make_env]) + env = VecNormalize(env, norm_obs=True, norm_reward=True, clip_obs=10., clip_reward=10.) + + model = model_class('MlpPolicy', env, verbose=1) + model.learn(total_timesteps=1000) + # Check getter + assert isinstance(model.get_vec_normalize_env(), VecNormalize) + + def test_sync_vec_normalize(): env = DummyVecEnv([make_env]) @@ -147,6 +163,7 @@ def test_sync_vec_normalize(): obs = env.reset() original_obs = env.get_original_obs() + dummy_rewards = np.random.rand(10) # Normalization must be different assert not np.allclose(obs, eval_env.normalize_obs(original_obs)) @@ -154,6 +171,7 @@ def test_sync_vec_normalize(): # Now they must be synced assert np.allclose(obs, eval_env.normalize_obs(original_obs)) + assert np.allclose(env.normalize_reward(dummy_rewards), eval_env.normalize_reward(dummy_rewards)) def test_mpi_runningmeanstd():