ES实现强化学习

时间:2022-01-05 12:28:58

参考文献:
ES实现强化学习论文-from OpenAI
Mirrored Sampling and Sequential Selection for Evolution Strategies
https://morvanzhou.github.io/tutorials/machine-learning/evolutionary-algorithm/4-04-evolution-strategy-reinforcement-learning/
OpenAI官方解析
Policy Gradients和ES实现强化学习非常相似,都会基于reward进行parameters调整,但是最大的区别是前者是利用梯度下降反向传播的方式修改参数,而ES不用。PG是扰动action,通过action的reward来决定该action所占下降梯度的比重;ES为直接扰动神经网络中的parameters,不同的parameters具有不同的rewards,parameters通过产生reward的比例更新。

ES实现强化学习

ES算法伪代码:
ES实现强化学习

OpenAI使用ES优化二次方程 f ( x ) = m i n ( x x 0 ) 2 小例子:

# simple example: minimize a quadratic around some solution point
import numpy as np
solution = np.array([0.5, 0.1, -0.3])
def f(w): return -np.sum((w - solution)**2)

npop = 50      # population size
sigma = 0.1    # noise standard deviation
alpha = 0.001  # learning rate
w = np.random.randn(3) # initial guess
for i in range(300):
  N = np.random.randn(npop, 3)
  R = np.zeros(npop)
  for j in range(npop):
    w_try = w + sigma*N[j]
    R[j] = f(w_try)
  A = (R - np.mean(R)) / np.std(R)
  w = w + alpha/(npop*sigma) * np.dot(N.T, A)

sigma*N[j]是因为sigma是产生noise的标准差,但是randn产生的是标准正态分布,由公式:

X N ( μ , σ 2 ) , Y = X μ σ N ( 0 , 1 )

所以每一个由 N ( 0 , 1 ) 产生的数 x x σ + μ 就属于 N ( μ , σ 2 )
(1)在求解二次方程的例子中,使用了rewards来诱导更新,在上述的标准化操作之后,reward大的部分(前半部分)将会有正的引导参数,reward小的部分(后半部分)会有负的引导参数,且对于前半部分,reward越大,引导参数越大;对于后半部分,负引导参数越小。
(2)在OpenAI的论文中,使用了utility来诱导更新效率更高,utility就是更新时所占比例大小。所谓utility,是根据个体reward的排序对应相应的utility,而utility这个列表已经提前设置好了。设每次运行ES的个体为 n
r a n k = [ 1 , 2 , , n ]

u t i l = m a x ( 0 , l n ( n + 1 ) l n ( r a n k ) )

u t i l i t y = u t i l s u m ( u t i l ) 1 2 n

1 2 n 2 n 个体所占的平均比例,减去之后能减少本次的更新幅度。
utility在10和100个种群的情况下的图像:
ES实现强化学习
(3)使用mirrored sampling方法,该方法是在产生一些噪点对解进行随机扰动时,同时也产生这些噪点的镜像。当某一噪点对解进行扰动时,其镜像噪点会对解进行相反方向的扰动。而那个得到更好效果的扰动在更新占比重更大。
ES实现强化学习

import numpy as np
import gym
import multiprocessing as mp


n_kid = 10  # half of the training population
n_generation = 5000  # training step
lr = 0.05  # learning rate
sigma = 0.05  # mutation strength
n_core = mp.cpu_count() - 1  # the number of cpu core
game_config = [
    dict(game="CartPole-v0",
         n_feature=4, n_action=2, continuous_a=[False], ep_max_step=700, eval_threshold=500),
    dict(game="MountainCar-v0",
         n_feature=2, n_action=3, continuous_a=[False], ep_max_step=200, eval_threshold=-120),
    dict(game="Pendulum-v0",
         n_feature=3, n_action=1, continuous_a=[True, 2.], ep_max_step=200, eval_threshold=-180)
][2]    # choose your game


# 随机扰动的正负
def sign(k_id):
    return -1. if k_id % 2 == 0 else 1.


# 并不是传统的SGD,这里的梯度是指扰动的值
class SGD(object):
    def __init__(self, params, learning_rate, momentum=0.9):
        self.v = np.zeros_like(params).astype(np.float32)
        self.lr = learning_rate
        self.momentum = momentum

    def get_gradients(self, gradients):
        self.v = self.momentum * self.v + (1. - self.momentum) * gradients
        return self.lr * self.v


# 将flat的一维的参数reshape为矩阵参数
def params_reshape(shapes, params):
    p, index = [], 0
    for i, shape in enumerate(shapes):
        n_w, n_b = shape[0] * shape[1], shape[1]
        p = p + [params[index: index + n_w].reshape(shape),
                 params[index + n_w: index + n_w + n_b].reshape(1, shape[1])]
        index += n_w + n_b
    return p


# 获得net所获得的奖励
def get_reward(shapes, params, env, ep_max_step, continuous_a, seed_and_id=None):
    # 此时为需要对net中的参数进行随机扰动
    if seed_and_id is not None:
        seed, k_id = seed_and_id
        np.random.seed(seed)
        params += sign(k_id) * sigma * np.random.randn(params.size)
    p = params_reshape(shapes, params)

    # 获得奖励
    s = env.reset()
    ep_r = 0
    for episode in range(ep_max_step):
        a = get_action(p, s, continuous_a)
        s, r, done, _ = env.step(a)
        if env.spec._env_name == 'MountainCar' and s[0] > -0.1: r = 0
        ep_r += r
        if done:
            break
    return ep_r


# 根据net获得最有利的的action
def get_action(params, input, continuous_a):
    input = input[np.newaxis, :]
    l1 = np.tanh(input.dot(params[0]) + params[1])
    l2 = np.tanh(l1.dot(params[2]) + params[3])
    output = l2.dot(params[4]) + params[5]
    # 对于离散行为,找寻输出值最大的那个
    # 对于连续行为,输出值维度:(1,),直接输出动作
    if not continuous_a[0]:  # discrete action
        return np.argmax(output, axis=1)[0]
    else:  # continuous action
        return continuous_a[1] * np.tanh(output)[0]  # pendulum的action范围为[-2., 2.]


# 初始化参数list和shape信息
def build_net():
    def linear(n_in, n_out):
        w = np.random.randn(n_in * n_out).astype(np.float32) * 0.1
        b = np.random.randn(n_out).astype(np.float32) * 0.1
        return (n_in, n_out), np.hstack((w, b))

    s0, p0 = linear(game_config['n_feature'], 30)
    s1, p1 = linear(30, 20)
    s2, p2 = linear(20, game_config['n_action'])
    return [s0, s1, s2], np.hstack((p0, p1, p2))


# 训练net
def train_net(net_shape, net_params, optimizer, utility, pool):
    # 产生随机种子用于固定产生的随机数
    noise_seed = np.random.randint(0, 2**32-1, size=n_kid, dtype=np.uint32).repeat(2)
    # 使用进程池并行计算reward
    jobs = [pool.apply_async(get_reward, (net_shape,
                                          net_params,
                                          env,
                                          game_config['ep_max_step'],
                                          game_config['continuous_a'],
                                          [noise_seed[k_id], k_id],)) for k_id in range(n_kid * 2)]
    # 获得群体的rewards结果
    rewards = np.array([j.get() for j in jobs])
    # 根据rewards将个体排序(从大到小)
    kids_rank = np.argsort(rewards)[::-1]
    '''rewards标准化进行引导更新,将rewards变换到[0,1] rewards = (rewards - np.min(rewards)) / np.ptp(rewards) '''
    cumulative_update = np.zeros_like(net_params)
    for ui, k_id in enumerate(kids_rank):
        np.random.seed(noise_seed[k_id])
        cumulative_update += utility[ui] * sign(k_id) * np.random.randn(net_params.size)

    gradients = optimizer.get_gradients(cumulative_update/(2 * n_kid * sigma))
    return net_params + gradients, rewards


if __name__ == '__main__':
    base = n_kid * 2
    rank = np.arange(1, base + 1)
    # 计算重要性,也就是更新参数所占比例
    util = np.maximum(0, np.log(base/2+1) - np.log(rank))
    utility = util / util.sum() - 1/base

    net_shapes, net_params = build_net()
    env = gym.make(game_config['game']).unwrapped
    optimizer = SGD(net_params, lr)
    pool = mp.Pool(processes=n_core)
    mar = None  # move average reward
    for i in range(n_generation):
        net_params, kid_rewards = train_net(net_shapes, net_params, optimizer, utility, pool)

        # test trained net without noise
        net_r = get_reward(net_shapes, net_params, env, game_config['ep_max_step'], 
        game_config['continuous_a'], None)
        mar = net_r if mar is None else 0.9 * mar + 0.1 * net_r
        print('generation: ', i, 'move average reward %.1f' % mar)
        if mar >= game_config['eval_threshold']:
            break

    print('TESTing...')
    p = params_reshape(net_shapes, net_params)
    while True:
        s = env.reset()
        for _ in range(game_config['ep_max_step']):
            env.render()
            a = get_action(p, s, game_config['continuous_a'])
            s, r, done, _ = env.step(a)
            if done:
                break