Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VecEnc Support TD3 #495

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
241 changes: 138 additions & 103 deletions stable_baselines/td3/td3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from stable_baselines.a2c.utils import total_episode_reward_logger
from stable_baselines.common import tf_util, OffPolicyRLModel, SetVerbosity, TensorboardWriter
from stable_baselines.common.vec_env import VecEnv
from stable_baselines.common.runners import AbstractEnvRunner
from stable_baselines.deepq.replay_buffer import ReplayBuffer
from stable_baselines.ppo2.ppo2 import safe_mean, get_schedule_fn
from stable_baselines.sac.sac import get_vars
Expand Down Expand Up @@ -63,7 +64,7 @@ def __init__(self, policy, env, gamma=0.99, learning_rate=3e-4, buffer_size=5000
_init_setup_model=True, policy_kwargs=None, full_tensorboard_log=False):

super(TD3, self).__init__(policy=policy, env=env, replay_buffer=None, verbose=verbose,
policy_base=TD3Policy, requires_vec_env=False, policy_kwargs=policy_kwargs)
policy_base=TD3Policy, requires_vec_env=True, policy_kwargs=policy_kwargs)

self.buffer_size = buffer_size
self.learning_rate = learning_rate
Expand Down Expand Up @@ -294,121 +295,103 @@ def learn(self, total_timesteps, callback=None, seed=None,
start_time = time.time()
episode_rewards = [0.0]
episode_successes = []
runner = Runner(env=self.env, model=self)
if self.action_noise is not None:
self.action_noise.reset()
obs = self.env.reset()
self.episode_reward = np.zeros((1,))
ep_info_buf = deque(maxlen=100)
n_updates = 0
infos_values = []

for step in range(total_timesteps):
for step in range(0, total_timesteps, self.env.num_envs):
if callback is not None:
# Only stop training if return value is False, not when it is None. This is for backwards
# compatibility with callbacks that have no return statement.
if callback(locals(), globals()) is False:
break

# Before training starts, randomly sample actions
# from a uniform distribution for better exploration.
# Afterwards, use the learned policy
# if random_exploration is set to 0 (normal setting)
if (self.num_timesteps < self.learning_starts
or np.random.rand() < self.random_exploration):
# No need to rescale when sampling random action
rescaled_action = action = self.env.action_space.sample()
else:
action = self.policy_tf.step(obs[None]).flatten()
# Add noise to the action, as the policy
# is deterministic, this is required for exploration
if self.action_noise is not None:
action = np.clip(action + self.action_noise(), -1, 1)
# Rescale from [-1, 1] to the correct bounds
rescaled_action = action * np.abs(self.action_space.low)

assert action.shape == self.env.action_space.shape

new_obs, reward, done, info = self.env.step(rescaled_action)

# Store transition in the replay buffer.
self.replay_buffer.add(obs, action, reward, new_obs, float(done))
obs = new_obs

# Retrieve reward and episode length if using Monitor wrapper
maybe_ep_info = info.get('episode')
if maybe_ep_info is not None:
ep_info_buf.extend([maybe_ep_info])

if writer is not None:
# Write reward per episode to tensorboard
ep_reward = np.array([reward]).reshape((1, -1))
ep_done = np.array([done]).reshape((1, -1))
self.episode_reward = total_episode_reward_logger(self.episode_reward, ep_reward,
ep_done, writer, self.num_timesteps)

if step % self.train_freq == 0:
mb_infos_vals = []
# Update policy, critics and target networks
for grad_step in range(self.gradient_steps):
# Break if the warmup phase is not over
# or if there are not enough samples in the replay buffer
if not self.replay_buffer.can_sample(self.batch_size) \
or self.num_timesteps < self.learning_starts:
break
n_updates += 1
# Compute current learning_rate
frac = 1.0 - step / total_timesteps
current_lr = self.learning_rate(frac)
# Update policy and critics (q functions)
# Note: the policy is updated less frequently than the Q functions
# this is controlled by the `policy_delay` parameter
mb_infos_vals.append(
self._train_step(step, writer, current_lr, (step + grad_step) % self.policy_delay == 0))

# Log losses and entropy, useful for monitor training
if len(mb_infos_vals) > 0:
infos_values = np.mean(mb_infos_vals, axis=0)

episode_rewards[-1] += reward
if done:
if self.action_noise is not None:
self.action_noise.reset()
if not isinstance(self.env, VecEnv):
obs = self.env.reset()
episode_rewards.append(0.0)

maybe_is_success = info.get('is_success')
if maybe_is_success is not None:
episode_successes.append(float(maybe_is_success))

if len(episode_rewards[-101:-1]) == 0:
mean_reward = -np.inf
else:
mean_reward = round(float(np.mean(episode_rewards[-101:-1])), 1)

num_episodes = len(episode_rewards)
self.num_timesteps += 1
# Display training infos
if self.verbose >= 1 and done and log_interval is not None and len(episode_rewards) % log_interval == 0:
fps = int(step / (time.time() - start_time))
logger.logkv("episodes", num_episodes)
logger.logkv("mean 100 episode reward", mean_reward)
if len(ep_info_buf) > 0 and len(ep_info_buf[0]) > 0:
logger.logkv('ep_rewmean', safe_mean([ep_info['r'] for ep_info in ep_info_buf]))
logger.logkv('eplenmean', safe_mean([ep_info['l'] for ep_info in ep_info_buf]))
logger.logkv("n_updates", n_updates)
logger.logkv("current_lr", current_lr)
logger.logkv("fps", fps)
logger.logkv('time_elapsed', int(time.time() - start_time))
if len(episode_successes) > 0:
logger.logkv("success rate", np.mean(episode_successes[-100:]))
if len(infos_values) > 0:
for (name, val) in zip(self.infos_names, infos_values):
logger.logkv(name, val)
logger.logkv("total timesteps", self.num_timesteps)
logger.dumpkvs()
# Reset infos:
infos_values = []
obs, new_obs, action, reward, done, info = runner.run()

for i in range(self.env.num_envs):
# Store transition in the replay buffer.
self.replay_buffer.add(obs[i], action[i], reward[i], new_obs[i], float(done[i]))
# Retrieve reward and episode length if using Monitor wrapper
maybe_ep_info = info[i].get('episode')
if maybe_ep_info is not None:
ep_info_buf.extend([maybe_ep_info])
if writer is not None:
# Write reward per episode to tensorboard
ep_reward = np.array([reward[i]]).reshape((1, -1))
ep_done = np.array([done[i]]).reshape((1, -1))
self.episode_reward = total_episode_reward_logger(self.episode_reward, ep_reward,
ep_done, writer, self.num_timesteps)

if step % self.train_freq == 0:
mb_infos_vals = []
# Update policy, critics and target networks
for grad_step in range(self.gradient_steps):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By putting the update inside the for loop that is used to store new samples, it seems that you are changing the algorithm
Also be careful with step % train_freq when you don't increment step by 1

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will look into the algorithm part. As for the step % train_freq, I actually do increment step by 1 (at the end of the inner for-loop) so that should be fine.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok then, but the for step in range(0, total_timesteps, self.n_envs): was misleading

Copy link
Author

@acyclics acyclics Oct 3, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. Alright, I will clarify that for-loop expression.

# Break if the warmup phase is not over
# or if there are not enough samples in the replay buffer
if not self.replay_buffer.can_sample(self.batch_size) \
or (self.num_timesteps // self.env.num_envs) < self.learning_starts:
acyclics marked this conversation as resolved.
Show resolved Hide resolved
break
n_updates += 1
# Compute current learning_rate
frac = 1.0 - step / total_timesteps
current_lr = self.learning_rate(frac)
# Update policy and critics (q functions)
# Note: the policy is updated less frequently than the Q functions
# this is controlled by the `policy_delay` parameter
mb_infos_vals.append(
self._train_step(step, writer, current_lr, (step + grad_step) % self.policy_delay == 0))

# Log losses and entropy, useful for monitor training
if len(mb_infos_vals) > 0:
infos_values = np.mean(mb_infos_vals, axis=0)

episode_rewards[-1] += reward[i]
if done[i]:
if self.action_noise is not None:
self.action_noise.reset()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same remark as before, I think you should have a action_noise object per env, maybe we need to create wrapper for that or modify the noise class to handle it better

if not isinstance(self.env, VecEnv):
runner.reset()
episode_rewards.append(0.0)

maybe_is_success = info[i].get('is_success')
if maybe_is_success is not None:
episode_successes.append(float(maybe_is_success))

if len(episode_rewards[-101:-1]) == 0:
mean_reward = -np.inf
else:
mean_reward = round(float(np.mean(episode_rewards[-101:-1])), 1)

num_episodes = len(episode_rewards)
self.num_timesteps += 1
# Display training infos
if self.verbose >= 1 and done[i] and log_interval is not None and len(episode_rewards) % log_interval == 0:
fps = int(step / (time.time() - start_time))
logger.logkv("episodes", num_episodes)
logger.logkv("mean 100 episode reward", mean_reward)
if len(ep_info_buf) > 0 and len(ep_info_buf[0]) > 0:
logger.logkv('ep_rewmean', safe_mean([ep_info['r'] for ep_info in ep_info_buf]))
logger.logkv('eplenmean', safe_mean([ep_info['l'] for ep_info in ep_info_buf]))
logger.logkv("n_updates", n_updates)
logger.logkv("current_lr", current_lr)
logger.logkv("fps", fps)
logger.logkv('time_elapsed', int(time.time() - start_time))
if len(episode_successes) > 0:
logger.logkv("success rate", np.mean(episode_successes[-100:]))
if len(infos_values) > 0:
for (name, val) in zip(self.infos_names, infos_values):
logger.logkv(name, val)
logger.logkv("total timesteps", self.num_timesteps)
logger.dumpkvs()
# Reset infos:
infos_values = []

step += 1

return self

def action_probability(self, observation, state=None, mask=None, actions=None, logp=False):
Expand Down Expand Up @@ -473,3 +456,55 @@ def save(self, save_path, cloudpickle=False):
params_to_save = self.get_parameters()

self._save_to_file(save_path, data=data, params=params_to_save, cloudpickle=cloudpickle)


class Runner(AbstractEnvRunner):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a runner? It seems that you only need to save the obs variable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that other implementations that uses VecEnv use a Runner. I used a Runner here as I feel like that best enables future developers to build on top of it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PPO and A2C use a runner because some additional computation/transformations are needed (computation of the GAE notably) which is not the case of TD3 who only need to fill a replay buffer.
However, at some point, we will need to refactor and unify SAC/DDPG/TD3 which have a lot in common.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah cool. So perhaps some akin to a "runner" for all three?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, more a common method collect_rollout that would be part of the OffPolicy class, but this is not the subject of this PR.

def __init__(self, *, env, model):
"""
A runner to learn the policy of an environment for a model

:param env: (Gym environment) The environment to learn from
"""
super().__init__(env=env, model=model.policy_tf, n_steps=1)
self.td3 = model

def reset(self):
self.obs[:] = self.env.reset()

def run(self):
"""
Run a learning step of the model

:return:
- observations: (np.ndarray) the observations
- actions: (np.ndarray) the actions
- rewards: (np.ndarray) the rewards
- new_observations: (np.ndarray) the new observations
- infos: (dict) the extra information of the model
"""
# Before training starts, randomly sample actions
# from a uniform distribution for better exploration.
# Afterwards, use the learned policy
# if random_exploration is set to 0 (normal setting)
prev_obs = self.obs
if (self.td3.num_timesteps < self.td3.learning_starts
or np.random.rand() < self.td3.random_exploration):
# No need to rescale when sampling random action
rescaled_action = action = [self.env.action_space.sample() for _ in range(self.env.num_envs)]
else:
action = self.model.step(prev_obs).flatten()
action = [np.array([a]) for a in action]
# Add noise to the action, as the policy
# is deterministic, this is required for exploration
if self.td3.action_noise is not None:
action = np.clip(action + self.td3.action_noise(), -1, 1)
# Rescale from [-1, 1] to the correct bounds
rescaled_action = action * np.abs(self.td3.action_space.low)

for i in range(self.env.num_envs):
assert action[i].shape == self.env.action_space.shape

self.obs, reward, done, info = self.env.step(rescaled_action)

return prev_obs, self.obs, action, reward, done, info