Implementing New Algorithms (Advanced)¶
In this section, we will walk through the implementation of vanilla policy gradient
algorithm provided in the algorithm, available at rllab/algos/vpg.py
. It utilizes
many functionalities provided by the framework, which we describe below.
The BatchPolopt
Class¶
The VPG
class inherits from BatchPolopt
, which is an abstract
class inherited by algorithms with a common structure. The structure is as
follows:
Initialize policy \(\pi\) with parameter \(\theta_1\).
Initialize the computational graph structure.
For iteration \(k = 1, 2, \ldots\):
- Sample N trajectories \(\tau_1\), ..., \(\tau_n\) under the current policy \(\theta_k\), where \(\tau_i = (s_t^i, a_t^i, R_t^i)_{t=0}^{T-1}\). Note that the last state is dropped since no action is taken after observing the last state.
- Update the policy based on the collected on-policy trajectories.
- Print diagnostic information and store intermediate results.
Note the parallel between the structure above and the pseudocode for VPG. The
BatchPolopt
class takes care of collecting samples and common diagnostic
information. It also provides an abstraction of the general procedure above, so
that algorithm implementations only need to fill the missing pieces. The core
of the BatchPolopt
class is the train()
method:
def train(self):
# ...
self.init_opt()
for itr in xrange(self.start_itr, self.n_itr):
paths = self.obtain_samples(itr)
samples_data = self.process_samples(itr, paths)
self.optimize_policy(itr, samples_data)
params = self.get_itr_snapshot(itr, samples_data)
logger.save_itr_params(itr, params)
# ...
The methods obtain_samples
and process_samples
are implemented
for you. The derived class needs to provide implementation for init_opt
,
which initializes the computation graph, optimize_policy
, which updates
the policy based on the collected data, and get_itr_snapshot
, which
returns a dictionary of objects to be persisted per iteration.
The BatchPolopt
class powers quite a few algorithms:
- Vanilla Policy Gradient:
rllab/algos/vpg.py
- Natural Policy Gradient:
rllab/algos/npg.py
- Reward-Weighted Regression:
rllab/algos/erwr.py
- Trust Region Policy Optimization:
rllab/algos/trpo.py
- Relative Entropy Policy Search:
rllab/algos/reps.py
To give an illustration, here’s how we might implement init_opt
for VPG
(the actual code in rllab/algos/vpg.py
is longer due to the need to log
extra diagnostic information as well as supporting recurrent policies):
from rllab.misc.ext import extract, compile_function, new_tensor
# ...
def init_opt(self):
obs_var = self.env.observation_space.new_tensor_variable(
'obs',
extra_dims=1,
)
action_var = self.env.action_space.new_tensor_variable(
'action',
extra_dims=1,
)
advantage_var = TT.vector('advantage')
dist = self.policy.distribution
old_dist_info_vars = {
k: TT.matrix('old_%s' % k)
for k in dist.dist_info_keys
}
old_dist_info_vars_list = [old_dist_info_vars[k] for k in dist.dist_info_keys]
state_info_vars = {
k: ext.new_tensor(
k,
ndim=2 + is_recurrent,
dtype=theano.config.floatX
) for k in self.policy.state_info_keys
}
state_info_vars_list = [state_info_vars[k] for k in self.policy.state_info_keys]
dist_info_vars = self.policy.dist_info_sym(obs_var, state_info_vars)
logli = dist.log_likelihood_sym(action_var, dist_info_vars)
# formulate as a minimization problem
# The gradient of the surrogate objective is the policy gradient
surr_obj = - TT.mean(logli * advantage_var)
input_list = [obs_var, action_var, advantage_var] + state_info_vars_list
self.optimizer.update_opt(surr_obj, target=self.policy, inputs=input_list)
The code is very similar to what we implemented in the basic version. Note that
we use an optimizer, which in this case would be an instance of rllab.optimizers.first_order_optimizer.FirstOrderOptimizer
.
Here’s how we might implement optimize_policy
:
def optimize_policy(self, itr, policy, samples_data, opt_info):
inputs = ext.extract(
samples_data,
"observations", "actions", "advantages"
)
agent_infos = samples_data["agent_infos"]
state_info_list = [agent_infos[k] for k in self.policy.state_info_keys]
inputs += tuple(state_info_list)
self.optimizer.optimize(inputs)
Parallel Sampling¶
The rllab.parallel_sampler
module takes care of parallelizing the
sampling process and aggregating the collected trajectory data. It is used
by the BatchPolopt
class like below:
# At the beginning of training, we need to register the environment and the policy
# onto the parallel_sampler
parallel_sampler.populate_task(self.env, self.policy)
# ...
# Within each iteration, we just need to update the policy parameters to
# each worker
cur_params = self.policy.get_param_values()
paths = parallel_sampler.request_samples(
policy_params=cur_params,
max_samples=self.batch_size,
max_path_length=self.max_path_length,
)
The returned paths
is a list of dictionaries with keys rewards
,
observations
, actions
, env_infos
, and agent_infos
.
The latter two, env_infos
and agent_infos
are in turn dictionaries,
whose values are numpy arrays of the environment and agent (policy) information
per time step stacked together. agent_infos
will contain at least information
that would be returned by calling policy.dist_info()
. For a gaussian
distribution with diagonal variance, this would be the means and the logarithm
of the standard deviations.
After collecting the trajectories, the process_samples
method in the
BatchPolopt
class computes the empirical returns and advantages by
using the baseline specified through command line arguments (we’ll talk about
this below). Then it trains the baseline using the collected data, and
concatenates all rewards, observations, etc. together to form a single huge
tensor, just as we did for the basic algorithm implementation.
One different semantics from the basic implementation is that, rather than collecting a fixed number of trajectories with potentially different number of steps per trajectory (if the environment implements a termination condition), we specify a desired total number of samples (i.e. time steps) per iteration. The number of actual samples collected will be around this number, although sometimes slightly larger, to make sure that all trajectories are run until either the horizon or the termination condition is met.