参考文献:
ES实现强化学习论文-from OpenAI
Mirrored Sampling and Sequential Selection for Evolution Strategies
https://morvanzhou.github.io/tutorials/machine-learning/evolutionary-algorithm/4-04-evolution-strategy-reinforcement-learning/
OpenAI官方解析
Policy Gradients和ES实现强化学习非常相似,都会基于reward进行parameters调整,但是最大的区别是前者是利用梯度下降反向传播的方式修改参数,而ES不用。PG是扰动action,通过action的reward来决定该action所占下降梯度的比重;ES为直接扰动神经网络中的parameters,不同的parameters具有不同的rewards,parameters通过产生reward的比例更新。
ES算法伪代码:
OpenAI使用ES优化二次方程 小例子:
# simple example: minimize a quadratic around some solution point
import numpy as np
solution = np.array([0.5, 0.1, -0.3])
def f(w): return -np.sum((w - solution)**2)
npop = 50 # population size
sigma = 0.1 # noise standard deviation
alpha = 0.001 # learning rate
w = np.random.randn(3) # initial guess
for i in range(300):
N = np.random.randn(npop, 3)
R = np.zeros(npop)
for j in range(npop):
w_try = w + sigma*N[j]
R[j] = f(w_try)
A = (R - np.mean(R)) / np.std(R)
w = w + alpha/(npop*sigma) * np.dot(N.T, A)
sigma*N[j]是因为sigma是产生noise的标准差,但是randn产生的是标准正态分布,由公式:
所以每一个由 产生的数 , 就属于
(1)在求解二次方程的例子中,使用了rewards来诱导更新,在上述的标准化操作之后,reward大的部分(前半部分)将会有正的引导参数,reward小的部分(后半部分)会有负的引导参数,且对于前半部分,reward越大,引导参数越大;对于后半部分,负引导参数越小。
(2)在OpenAI的论文中,使用了utility来诱导更新效率更高,utility就是更新时所占比例大小。所谓utility,是根据个体reward的排序对应相应的utility,而utility这个列表已经提前设置好了。设每次运行ES的个体为 ,
是 个体所占的平均比例,减去之后能减少本次的更新幅度。
utility在10和100个种群的情况下的图像:
(3)使用mirrored sampling方法,该方法是在产生一些噪点对解进行随机扰动时,同时也产生这些噪点的镜像。当某一噪点对解进行扰动时,其镜像噪点会对解进行相反方向的扰动。而那个得到更好效果的扰动在更新占比重更大。
import numpy as np
import gym
import multiprocessing as mp
n_kid = 10 # half of the training population
n_generation = 5000 # training step
lr = 0.05 # learning rate
sigma = 0.05 # mutation strength
n_core = mp.cpu_count() - 1 # the number of cpu core
game_config = [
dict(game="CartPole-v0",
n_feature=4, n_action=2, continuous_a=[False], ep_max_step=700, eval_threshold=500),
dict(game="MountainCar-v0",
n_feature=2, n_action=3, continuous_a=[False], ep_max_step=200, eval_threshold=-120),
dict(game="Pendulum-v0",
n_feature=3, n_action=1, continuous_a=[True, 2.], ep_max_step=200, eval_threshold=-180)
][2] # choose your game
# 随机扰动的正负
def sign(k_id):
return -1. if k_id % 2 == 0 else 1.
# 并不是传统的SGD,这里的梯度是指扰动的值
class SGD(object):
def __init__(self, params, learning_rate, momentum=0.9):
self.v = np.zeros_like(params).astype(np.float32)
self.lr = learning_rate
self.momentum = momentum
def get_gradients(self, gradients):
self.v = self.momentum * self.v + (1. - self.momentum) * gradients
return self.lr * self.v
# 将flat的一维的参数reshape为矩阵参数
def params_reshape(shapes, params):
p, index = [], 0
for i, shape in enumerate(shapes):
n_w, n_b = shape[0] * shape[1], shape[1]
p = p + [params[index: index + n_w].reshape(shape),
params[index + n_w: index + n_w + n_b].reshape(1, shape[1])]
index += n_w + n_b
return p
# 获得net所获得的奖励
def get_reward(shapes, params, env, ep_max_step, continuous_a, seed_and_id=None):
# 此时为需要对net中的参数进行随机扰动
if seed_and_id is not None:
seed, k_id = seed_and_id
np.random.seed(seed)
params += sign(k_id) * sigma * np.random.randn(params.size)
p = params_reshape(shapes, params)
# 获得奖励
s = env.reset()
ep_r = 0
for episode in range(ep_max_step):
a = get_action(p, s, continuous_a)
s, r, done, _ = env.step(a)
if env.spec._env_name == 'MountainCar' and s[0] > -0.1: r = 0
ep_r += r
if done:
break
return ep_r
# 根据net获得最有利的的action
def get_action(params, input, continuous_a):
input = input[np.newaxis, :]
l1 = np.tanh(input.dot(params[0]) + params[1])
l2 = np.tanh(l1.dot(params[2]) + params[3])
output = l2.dot(params[4]) + params[5]
# 对于离散行为,找寻输出值最大的那个
# 对于连续行为,输出值维度:(1,),直接输出动作
if not continuous_a[0]: # discrete action
return np.argmax(output, axis=1)[0]
else: # continuous action
return continuous_a[1] * np.tanh(output)[0] # pendulum的action范围为[-2., 2.]
# 初始化参数list和shape信息
def build_net():
def linear(n_in, n_out):
w = np.random.randn(n_in * n_out).astype(np.float32) * 0.1
b = np.random.randn(n_out).astype(np.float32) * 0.1
return (n_in, n_out), np.hstack((w, b))
s0, p0 = linear(game_config['n_feature'], 30)
s1, p1 = linear(30, 20)
s2, p2 = linear(20, game_config['n_action'])
return [s0, s1, s2], np.hstack((p0, p1, p2))
# 训练net
def train_net(net_shape, net_params, optimizer, utility, pool):
# 产生随机种子用于固定产生的随机数
noise_seed = np.random.randint(0, 2**32-1, size=n_kid, dtype=np.uint32).repeat(2)
# 使用进程池并行计算reward
jobs = [pool.apply_async(get_reward, (net_shape,
net_params,
env,
game_config['ep_max_step'],
game_config['continuous_a'],
[noise_seed[k_id], k_id],)) for k_id in range(n_kid * 2)]
# 获得群体的rewards结果
rewards = np.array([j.get() for j in jobs])
# 根据rewards将个体排序(从大到小)
kids_rank = np.argsort(rewards)[::-1]
'''rewards标准化进行引导更新,将rewards变换到[0,1] rewards = (rewards - np.min(rewards)) / np.ptp(rewards) '''
cumulative_update = np.zeros_like(net_params)
for ui, k_id in enumerate(kids_rank):
np.random.seed(noise_seed[k_id])
cumulative_update += utility[ui] * sign(k_id) * np.random.randn(net_params.size)
gradients = optimizer.get_gradients(cumulative_update/(2 * n_kid * sigma))
return net_params + gradients, rewards
if __name__ == '__main__':
base = n_kid * 2
rank = np.arange(1, base + 1)
# 计算重要性,也就是更新参数所占比例
util = np.maximum(0, np.log(base/2+1) - np.log(rank))
utility = util / util.sum() - 1/base
net_shapes, net_params = build_net()
env = gym.make(game_config['game']).unwrapped
optimizer = SGD(net_params, lr)
pool = mp.Pool(processes=n_core)
mar = None # move average reward
for i in range(n_generation):
net_params, kid_rewards = train_net(net_shapes, net_params, optimizer, utility, pool)
# test trained net without noise
net_r = get_reward(net_shapes, net_params, env, game_config['ep_max_step'],
game_config['continuous_a'], None)
mar = net_r if mar is None else 0.9 * mar + 0.1 * net_r
print('generation: ', i, 'move average reward %.1f' % mar)
if mar >= game_config['eval_threshold']:
break
print('TESTing...')
p = params_reshape(net_shapes, net_params)
while True:
s = env.reset()
for _ in range(game_config['ep_max_step']):
env.render()
a = get_action(p, s, game_config['continuous_a'])
s, r, done, _ = env.step(a)
if done:
break