强化学习进阶:从理论到实践的高级技术解析

时间:2025-04-08 16:07:29

1. 高级强化学习算法

1.1 近端策略优化(PPO)

PPO是一种策略梯度方法,通过限制策略更新的幅度来提高训练稳定性。

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.distributions import Categorical

class PPONetwork(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(PPONetwork, self).__init__()
        self.fc1 = nn.Linear(input_dim, 64)
        self.fc2 = nn.Linear(64, 64)
        self.actor = nn.Linear(64, output_dim)
        self.critic = nn.Linear(64, 1)
        
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return torch.softmax(self.actor(x), dim=-1), self.critic(x)

class PPOAgent:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.gamma = 0.99
        self.epsilon = 0.2
        self.lr = 0.0003
        self.K_epochs = 3
        
        self.model = PPONetwork(state_dim, action_dim)
        self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
        self.memory = []
        
    def act(self, state):
        state = torch.FloatTensor(state)
        probs, _ = self.model(state)
        dist = Categorical(probs)
        action = dist.sample()
        return action.item(), dist.log_prob(action)
        
    def remember(self, state, action, log_prob, reward, done):
        self.memory.append((state, action, log_prob, reward, done))
        
    def learn(self):
        # 计算折扣奖励和优势函数
        states = torch.FloatTensor(np.array([x[0] for x in self.memory]))
        actions = torch.LongTensor(np.array([x[1] for x in self.memory]))
        old_log_probs = torch.FloatTensor(np.array([x[2] for x in self.memory]))
        rewards = []
        discounted_reward = 0
        
        for reward, is_terminal in zip(reversed([x[3] for x in self.memory]), 
                                      reversed([x[4] for x in self.memory])):
            if is_terminal:
                discounted_reward = 0
            discounted_reward = reward + (self.gamma * discounted_reward)
            rewards.insert(0, discounted_reward)
            
        rewards = torch.FloatTensor(rewards)
        rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-7)
        
        # 多次更新策略
        for _ in range(self.K_epochs):
            # 评估旧动作和新动作
            new_probs, state_values = self.model(states)
            dist = Categorical(new_probs)
            new_log_probs = dist.log_prob(actions)
            entropy = dist.entropy()
            
            # 计算比率(重要性采样)
            ratios = torch.exp(new_log_probs - old_log_probs.detach())
            
            # 计算优势函数
            advantages = rewards - state_values.detach()
            
            # 计算损失函数
            surr1 = ratios * advantages
            surr2 = torch.clamp(ratios, 1-self.epsilon, 1+self.epsilon) * advantages
            actor_loss = -torch.min(surr1, surr2).mean()
            critic_loss = nn.MSELoss()(state_values, rewards)
            loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy.mean()
            
            # 反向传播
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
            
        self.memory = []

1.2 深度确定性策略梯度(DDPG)

DDPG是一种适用于连续动作空间的Actor-Critic方法。

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque

class Actor(nn.Module):
    def __init__(self, state_dim, action_dim, max_action):
        super(Actor, self).__init__()
        self.fc1 = nn.Linear(state_dim, 400)
        self.fc2 = nn.Linear(400, 300)
        self.fc3 = nn.Linear(300, action_dim)
        self.max_action = max_action
        
    def forward(self, x):
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = torch.tanh(self.fc3(x)) * self.max_action
        return x

class Critic(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, 400)
        self.fc2 = nn.Linear(400, 300)
        self.fc3 = nn.Linear(300, 1)
        
    def forward(self, x, u):
        x = torch.cat([x, u], 1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class DDPGAgent:
    def __init__(self, state_dim, action_dim, max_action):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.max_action = max_action
        
        self.actor = Actor(state_dim, action_dim, max_action)
        self.actor_target = Actor(state_dim, action_dim, max_action)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)
        
        self.critic = Critic(state_dim, action_dim)
        self.critic_target = Critic(state_dim, action_dim)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
        
        self.memory = deque(maxlen=1000000)
        self.batch_size = 64
        self.tau = 0.001
        self.gamma = 0.99
        
    def act(self, state, noise=0.1):
        state = torch.FloatTensor(state.reshape(1, -1))
        action = self.actor(state).cpu().data.numpy().flatten()
        if noise != 0:
            action = (action + np.random.normal(0, noise, size=self.action_dim))
        return np.clip(action, -self.max_action, self.max_action)
        
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done))
        
    def learn(self):
        if len(self.memory) < self.batch_size:
            return
            
        batch = random.sample(self.memory, self.batch_size)
        state = torch.FloatTensor(np.array([x[0] for x in batch]))
        action = torch.FloatTensor(np.array([x[1] for x in batch]))
        reward = torch.FloatTensor(np.array([x[2] for x in batch])).reshape(-1,1)
        next_state = torch.FloatTensor(np.array([x[3] for x in batch]))
        done = torch.FloatTensor(np.array([x[4] for x in batch])).reshape(-1,1)
        
        # 更新Critic
        next_action = self.actor_target(next_state)
        target_Q = self.critic_target(next_state, next_action)
        target_Q = reward + ((1 - done) * self.gamma * target_Q).detach()
        
        current_Q = self.critic(state, action)
        critic_loss = nn.MSELoss()(current_Q, target_Q)
        
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        
        # 更新Actor
        actor_loss = -self.critic(state, self.actor(state)).mean()
        
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        
        # 软更新目标网络
        for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
            
        for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
            target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)

2. 多智能体强化学习

2.1 独立Q学习(IQL)

class IQLAgent:
    def __init__(self, state_dim, action_dim, n_agents):
        self.agents = [DQNAgent(state_dim, action_dim) for _ in range(n_agents)]
        self.n_agents = n_agents
        
    def act(self, states):
        return [agent.act(state) for agent, state in zip(self.agents, states)]
        
    def learn(self, experiences):
        for i, (agent, exp) in enumerate(zip(self.agents, experiences)):
            state, action, reward, next_state, done = exp
            agent.remember(state, action, reward, next_state, done)
            agent.replay()

2.2 MADDPG

class MADDPG:
    def __init__(self, state_dims, action_dims, n_agents, max_actions):
        self.agents = [DDPGAgent(state_dims[i], action_dims[i], max_actions[i]) 
                      for i in range(n_agents)]
        self.n_agents = n_agents
        
    def act(self, states, noise=0.1):
        return [agent.act(state, noise) for agent, state in zip(self.agents, states)]
        
    def learn(self, experiences):
        for i, agent in enumerate(self.agents):
            # 每个智能体从全局经验中学习
            states = torch.FloatTensor(np.array([x[0] for x in experiences]))
            actions = torch.FloatTensor(np.array([x[1] for x in experiences]))
            rewards = torch.FloatTensor(np.array([x[2][i] for x in experiences])).reshape(-1,1)
            next_states = torch.FloatTensor(np.array([x[3] for x in experiences]))
            dones = torch.FloatTensor(np.array([x[4] for x in experiences])).reshape(-1,1)
            
            # 更新Critic
            next_actions = torch.cat([a.actor_target(next_states[:,i,:]) for i, a in enumerate(self.agents)], dim=1)
            target_Q = agent.critic_target(next_states.view(-1, self.n_agents*state_dims[0]), 
                                         next_actions)
            target_Q = rewards + ((1 - dones) * agent.gamma * target_Q).detach()
            
            current_Q = agent.critic(states.view(-1, self.n_agents*state_dims[0]), 
                                   actions.view(-1, self.n_agents*action_dims[0]))
            critic_loss = nn.MSELoss()(current_Q, target_Q)
            
            agent.critic_optimizer.zero_grad()
            critic_loss.backward()
            agent.critic_optimizer.step()
            
            # 更新Actor
            actions_pred = [a.actor(states[:,i,:]) if i == j else a.actor(states[:,i,:]).detach() 
                          for j, a in enumerate(self.agents)]
            actions_pred = torch.cat(actions_pred, dim=1)
            actor_loss = -agent.critic(states.view(-1, self.n_agents*state_dims[0]), 
                                     actions_pred).mean()
            
            agent.actor_optimizer.zero_grad()
            actor_loss.backward()
            agent.actor_optimizer.step()
            
            # 更新目标网络
            for param, target_param in zip(agent.critic.parameters(), agent.critic_target.parameters()):
                target_param.data.copy_(agent.tau * param.data + (1 - agent.tau) * target_param.data)
                
            for param, target_param in zip(agent.actor.parameters(), agent.actor_target.parameters()):
                target_param.data.copy_(agent.tau * param.data + (1 - agent.tau) * target_param.data)

3. 分层强化学习

3.1 选项框架(Option Framework)

class Option:
    def __init__(self, initiation_set, policy, termination_condition):
        self.initiation_set = initiation_set  # 可以启动该选项的状态集合
        self.policy = policy  # 选项内部策略
        self.termination_condition = termination_condition  # 终止条件函数
        
    def is_available(self, state):
        return state in self.initiation_set
        
    def should_terminate(self, state):
        return self.termination_condition(state)

class HierarchicalAgent:
    def __init__(self, options, meta_policy):
        self.options = options
        self.meta_policy = meta_policy  # 选择选项的策略
        self.current_option = None
        
    def act(self, state):
        if self.current_option is None or self.current_option.should_terminate(state):
            available_options = [opt for opt in self.options if opt.is_available(state)]
            if not available_options:
                return None  # 需要定义默认行为
            self.current_option = self.meta_policy.select_option(state, available_options)
            
        return self.current_option.policy(state)

4. 强化学习前沿技术

4.1 基于模型的强化学习(MBRL)

class DynamicsModel(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(DynamicsModel, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, state_dim)
        
    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return self.fc3(x)

class MBAgent:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.dynamics_model = DynamicsModel(state_dim, action_dim)
        self.optimizer = optim.Adam(self.dynamics_model.parameters(), lr=0.001)
        self.real_buffer = deque(maxlen=10000)
        self.imagined_buffer = deque(maxlen=10000)
        
    def train_dynamics(self, states, actions, next_states):
        pred_next_states = self.dynamics_model(states, actions)
        loss = nn.MSELoss()(pred_next_states, next_states)
        
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()
        return loss.item()
        
    def plan(self, initial_state, horizon=10, num_sequences=100):
        # 使用交叉熵方法(CEM)进行规划
        mean = torch.zeros(horizon * self.action_dim)
        std = torch.ones(horizon * self.action_dim)
        
        for _ in range(5):  # CEM迭代次数
            # 生成动作序列
            actions = torch.normal(mean, std).reshape(num_sequences, horizon, self.action_dim)
            
            # 评估动作序列
            states = torch.FloatTensor(initial_state).repeat(num_sequences, 1)
            total_rewards = torch.zeros(num_sequences)
            
            for t in range(horizon):
                next_states = self.dynamics_model(states, actions[:, t, :])
                rewards = self.reward_model(states, actions[:, t, :], next_states)
                total_rewards += rewards
                states = next_states
                
            # 选择最优的k个序列
            _, top_indices = torch.topk(total_rewards, k=20)
            elite_actions = actions[top_indices]
            
            # 更新分布参数
            mean = elite_actions.mean(dim=0).flatten()
            std = elite_actions.std(dim=0).flatten()
            
        return mean.reshape(horizon, self.action_dim)[0]  # 返回第一个动作

4.2 逆强化学习(IRL)

class Discriminator(nn.Module):
    def __init__(self, state_dim, action_dim):
        super(Discriminator, self).__init__()
        self.fc1 = nn.Linear(state_dim + action_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.fc3 = nn.Linear(256, 1)
        
    def forward(self, state, action):
        x = torch.cat([state, action], dim=1)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        return torch.sigmoid(self.fc3(x))

class IRAgent:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.discriminator = Discriminator(state_dim, action_dim)
        self.policy = PolicyNetwork(state_dim, action_dim)
        self.optimizer = optim.Adam([
            {'params': self.discriminator.parameters()},
            {'params': self.policy.parameters()}
        ], lr=0.0003)
        
    def train_step(self, expert_states, expert_actions, agent_states, agent_actions):
        # 训练判别器
        expert_labels = torch.ones(expert_states.shape[0], 1)
        agent_labels = torch.zeros(agent_states.shape[0], 1)
        
        all_states = torch.cat([expert_states, agent_states], dim=0)
        all_actions = torch.cat([expert_actions, agent_actions], dim=0)
        all_labels = torch.cat([expert_labels, agent_labels], dim=0)
        
        preds = self.discriminator(all_states, all_actions)
        d_loss = nn.BCELoss()(preds, all_labels)
        
        # 训练策略
        agent_preds = self.discriminator(agent_states, agent_actions)
        p_loss = -torch.log(agent_preds + 1e-8).mean()
        
        total_loss = d_loss + p_loss
        
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()
        
        return d_loss.item(), p_loss.item()

5. 强化学习实践建议

  1. 超参数调优: • 学习率:通常从3e-4开始尝试 • 折扣因子γ:0.9-0.99之间 • 批量大小:32-512之间 • 探索率:初始1.0,逐渐衰减到0.01-0.1
  2. 训练技巧: • 使用学习率调度器 • 实现梯度裁剪 • 监控训练曲线和关键指标 • 定期保存模型检查点
  3. 调试策略: • 检查奖励是否合理 • 验证价值函数是否收敛 • 分析探索是否充分 • 可视化智能体行为
  4. 性能优化: • 使用GPU加速 • 并行化环境交互 • 优化数据管道 • 减少不必要的计算

6. 强化学习应用案例

6.1 机器人控制

# 使用PPO训练机械臂到达目标位置
env = gym.make('FetchReach-v1')
state_dim = env.observation_space['observation'].shape[0] + env.observation_space['achieved_goal'].shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])

agent = PPOAgent(state_dim, action_dim)
episodes = 10000

for e in range(episodes):
    obs = env.reset()
    state = np.concatenate([obs['observation'], obs['achieved_goal']])
    total_reward = 0
    done = False
    
    while not done:
        action, log_prob = agent.act(state)
        next_obs, reward, done, info = env.step(action)
        next_state = np.concatenate([next_obs['observation'], next_obs['achieved_goal']])
        
        agent.remember(state, action, log_prob, reward, done)
        state = next_state
        total_reward += reward
        
        if done:
            print(f"Episode: {e+1}, Reward: {total_reward:.2f}")
            break
            
    agent.learn()

6.2 金融交易

class TradingEnvironment:
    def __init__(self, data, initial_balance=10000):
        self.data = data  # 市场数据
        self.index = 0
        self.balance = initial_balance
        self.positions = 0
        self.max_steps = len(data) - 1
        
    def reset(self):
        self.index = 0
        self.balance = 10000
        self.positions = 0
        return self._get_state()
        
    def _get_state(self):
        # 返回当前状态(市场数据+持仓信息)
        return np.concatenate([
            self.data.iloc[self.index].values,
            [self.balance / 10000, self.positions]
        ])
        
    def step(self, action):
        # action: 0=持有, 1=买入, 2=卖出
        current_price = self.data.iloc[self.index]['close']
        reward = 0
        done = False
        
        if action == 1 and self.balance >= current_price:  # 买入
            self.positions += 1
            self.balance -= current_price
        elif action == 2 and self.positions > 0:  # 卖出
            self.positions -= 1
            self.balance += current_price
            reward = current_price - self.data.iloc[self.index-1]['close']
            
        self.index += 1
        done = self.index >= self.max_steps
        
        if done:
            # 结算所有持仓
            self.balance += self.positions * current_price
            self.positions = 0
            reward = (self.balance - 10000) / 10000  # 回报率
            
        return self._get_state(), reward, done, {}

# 使用DQN训练交易策略
data = load_market_data()  # 加载市场数据
env = TradingEnvironment(data)
state_dim = len(env.reset())
action_dim = 3

agent = DQNAgent(state_dim, action_dim)
episodes = 1000

for e in range(episodes):
    state = env.reset()
    total_reward = 0
    done = False
    
    while not done:
        action = agent.act(state)
        next_state, reward, done, _ = env.step(action)
        agent.remember(state, action, reward, next_state, done)
        state = next_state
        total_reward += reward
        
        if done:
            print(f"Episode: {e+1}, Profit: {total_reward*100:.2f}%")
            break
            
    agent.replay()

7. 总结与展望

本文深入探讨了强化学习的进阶技术和实践方法,包括:

  1. 高级算法如PPO、DDPG及其实现
  2. 多智能体系统设计模式
  3. 分层强化学习架构
  4. 前沿方向如基于模型的学习和逆强化学习
  5. 实际应用案例和工程实践建议

强化学习仍在快速发展中,未来的重要方向包括:

• 提高样本效率和训练稳定性 • 发展更强大的迁移学习和元学习能力 • 结合大型语言模型的决策能力 • 解决安全性和可解释性问题 • 开发更强大的多智能体协作框架

通过掌握这些高级技术,开发者可以构建更强大、更实用的强化学习系统,解决现实世界中的复杂决策问题。