1. 高级强化学习算法
1.1 近端策略优化(PPO)
PPO是一种策略梯度方法,通过限制策略更新的幅度来提高训练稳定性。
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from torch.distributions import Categorical
class PPONetwork(nn.Module):
def __init__(self, input_dim, output_dim):
super(PPONetwork, self).__init__()
self.fc1 = nn.Linear(input_dim, 64)
self.fc2 = nn.Linear(64, 64)
self.actor = nn.Linear(64, output_dim)
self.critic = nn.Linear(64, 1)
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return torch.softmax(self.actor(x), dim=-1), self.critic(x)
class PPOAgent:
def __init__(self, state_dim, action_dim):
self.state_dim = state_dim
self.action_dim = action_dim
self.gamma = 0.99
self.epsilon = 0.2
self.lr = 0.0003
self.K_epochs = 3
self.model = PPONetwork(state_dim, action_dim)
self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)
self.memory = []
def act(self, state):
state = torch.FloatTensor(state)
probs, _ = self.model(state)
dist = Categorical(probs)
action = dist.sample()
return action.item(), dist.log_prob(action)
def remember(self, state, action, log_prob, reward, done):
self.memory.append((state, action, log_prob, reward, done))
def learn(self):
# 计算折扣奖励和优势函数
states = torch.FloatTensor(np.array([x[0] for x in self.memory]))
actions = torch.LongTensor(np.array([x[1] for x in self.memory]))
old_log_probs = torch.FloatTensor(np.array([x[2] for x in self.memory]))
rewards = []
discounted_reward = 0
for reward, is_terminal in zip(reversed([x[3] for x in self.memory]),
reversed([x[4] for x in self.memory])):
if is_terminal:
discounted_reward = 0
discounted_reward = reward + (self.gamma * discounted_reward)
rewards.insert(0, discounted_reward)
rewards = torch.FloatTensor(rewards)
rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-7)
# 多次更新策略
for _ in range(self.K_epochs):
# 评估旧动作和新动作
new_probs, state_values = self.model(states)
dist = Categorical(new_probs)
new_log_probs = dist.log_prob(actions)
entropy = dist.entropy()
# 计算比率(重要性采样)
ratios = torch.exp(new_log_probs - old_log_probs.detach())
# 计算优势函数
advantages = rewards - state_values.detach()
# 计算损失函数
surr1 = ratios * advantages
surr2 = torch.clamp(ratios, 1-self.epsilon, 1+self.epsilon) * advantages
actor_loss = -torch.min(surr1, surr2).mean()
critic_loss = nn.MSELoss()(state_values, rewards)
loss = actor_loss + 0.5 * critic_loss - 0.01 * entropy.mean()
# 反向传播
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
self.memory = []
1.2 深度确定性策略梯度(DDPG)
DDPG是一种适用于连续动作空间的Actor-Critic方法。
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import random
from collections import deque
class Actor(nn.Module):
def __init__(self, state_dim, action_dim, max_action):
super(Actor, self).__init__()
self.fc1 = nn.Linear(state_dim, 400)
self.fc2 = nn.Linear(400, 300)
self.fc3 = nn.Linear(300, action_dim)
self.max_action = max_action
def forward(self, x):
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
x = torch.tanh(self.fc3(x)) * self.max_action
return x
class Critic(nn.Module):
def __init__(self, state_dim, action_dim):
super(Critic, self).__init__()
self.fc1 = nn.Linear(state_dim + action_dim, 400)
self.fc2 = nn.Linear(400, 300)
self.fc3 = nn.Linear(300, 1)
def forward(self, x, u):
x = torch.cat([x, u], 1)
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
x = self.fc3(x)
return x
class DDPGAgent:
def __init__(self, state_dim, action_dim, max_action):
self.state_dim = state_dim
self.action_dim = action_dim
self.max_action = max_action
self.actor = Actor(state_dim, action_dim, max_action)
self.actor_target = Actor(state_dim, action_dim, max_action)
self.actor_target.load_state_dict(self.actor.state_dict())
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=1e-4)
self.critic = Critic(state_dim, action_dim)
self.critic_target = Critic(state_dim, action_dim)
self.critic_target.load_state_dict(self.critic.state_dict())
self.critic_optimizer = optim.Adam(self.critic.parameters(), lr=1e-3)
self.memory = deque(maxlen=1000000)
self.batch_size = 64
self.tau = 0.001
self.gamma = 0.99
def act(self, state, noise=0.1):
state = torch.FloatTensor(state.reshape(1, -1))
action = self.actor(state).cpu().data.numpy().flatten()
if noise != 0:
action = (action + np.random.normal(0, noise, size=self.action_dim))
return np.clip(action, -self.max_action, self.max_action)
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done))
def learn(self):
if len(self.memory) < self.batch_size:
return
batch = random.sample(self.memory, self.batch_size)
state = torch.FloatTensor(np.array([x[0] for x in batch]))
action = torch.FloatTensor(np.array([x[1] for x in batch]))
reward = torch.FloatTensor(np.array([x[2] for x in batch])).reshape(-1,1)
next_state = torch.FloatTensor(np.array([x[3] for x in batch]))
done = torch.FloatTensor(np.array([x[4] for x in batch])).reshape(-1,1)
# 更新Critic
next_action = self.actor_target(next_state)
target_Q = self.critic_target(next_state, next_action)
target_Q = reward + ((1 - done) * self.gamma * target_Q).detach()
current_Q = self.critic(state, action)
critic_loss = nn.MSELoss()(current_Q, target_Q)
self.critic_optimizer.zero_grad()
critic_loss.backward()
self.critic_optimizer.step()
# 更新Actor
actor_loss = -self.critic(state, self.actor(state)).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
# 软更新目标网络
for param, target_param in zip(self.critic.parameters(), self.critic_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
for param, target_param in zip(self.actor.parameters(), self.actor_target.parameters()):
target_param.data.copy_(self.tau * param.data + (1 - self.tau) * target_param.data)
2. 多智能体强化学习
2.1 独立Q学习(IQL)
class IQLAgent:
def __init__(self, state_dim, action_dim, n_agents):
self.agents = [DQNAgent(state_dim, action_dim) for _ in range(n_agents)]
self.n_agents = n_agents
def act(self, states):
return [agent.act(state) for agent, state in zip(self.agents, states)]
def learn(self, experiences):
for i, (agent, exp) in enumerate(zip(self.agents, experiences)):
state, action, reward, next_state, done = exp
agent.remember(state, action, reward, next_state, done)
agent.replay()
2.2 MADDPG
class MADDPG:
def __init__(self, state_dims, action_dims, n_agents, max_actions):
self.agents = [DDPGAgent(state_dims[i], action_dims[i], max_actions[i])
for i in range(n_agents)]
self.n_agents = n_agents
def act(self, states, noise=0.1):
return [agent.act(state, noise) for agent, state in zip(self.agents, states)]
def learn(self, experiences):
for i, agent in enumerate(self.agents):
# 每个智能体从全局经验中学习
states = torch.FloatTensor(np.array([x[0] for x in experiences]))
actions = torch.FloatTensor(np.array([x[1] for x in experiences]))
rewards = torch.FloatTensor(np.array([x[2][i] for x in experiences])).reshape(-1,1)
next_states = torch.FloatTensor(np.array([x[3] for x in experiences]))
dones = torch.FloatTensor(np.array([x[4] for x in experiences])).reshape(-1,1)
# 更新Critic
next_actions = torch.cat([a.actor_target(next_states[:,i,:]) for i, a in enumerate(self.agents)], dim=1)
target_Q = agent.critic_target(next_states.view(-1, self.n_agents*state_dims[0]),
next_actions)
target_Q = rewards + ((1 - dones) * agent.gamma * target_Q).detach()
current_Q = agent.critic(states.view(-1, self.n_agents*state_dims[0]),
actions.view(-1, self.n_agents*action_dims[0]))
critic_loss = nn.MSELoss()(current_Q, target_Q)
agent.critic_optimizer.zero_grad()
critic_loss.backward()
agent.critic_optimizer.step()
# 更新Actor
actions_pred = [a.actor(states[:,i,:]) if i == j else a.actor(states[:,i,:]).detach()
for j, a in enumerate(self.agents)]
actions_pred = torch.cat(actions_pred, dim=1)
actor_loss = -agent.critic(states.view(-1, self.n_agents*state_dims[0]),
actions_pred).mean()
agent.actor_optimizer.zero_grad()
actor_loss.backward()
agent.actor_optimizer.step()
# 更新目标网络
for param, target_param in zip(agent.critic.parameters(), agent.critic_target.parameters()):
target_param.data.copy_(agent.tau * param.data + (1 - agent.tau) * target_param.data)
for param, target_param in zip(agent.actor.parameters(), agent.actor_target.parameters()):
target_param.data.copy_(agent.tau * param.data + (1 - agent.tau) * target_param.data)
3. 分层强化学习
3.1 选项框架(Option Framework)
class Option:
def __init__(self, initiation_set, policy, termination_condition):
self.initiation_set = initiation_set # 可以启动该选项的状态集合
self.policy = policy # 选项内部策略
self.termination_condition = termination_condition # 终止条件函数
def is_available(self, state):
return state in self.initiation_set
def should_terminate(self, state):
return self.termination_condition(state)
class HierarchicalAgent:
def __init__(self, options, meta_policy):
self.options = options
self.meta_policy = meta_policy # 选择选项的策略
self.current_option = None
def act(self, state):
if self.current_option is None or self.current_option.should_terminate(state):
available_options = [opt for opt in self.options if opt.is_available(state)]
if not available_options:
return None # 需要定义默认行为
self.current_option = self.meta_policy.select_option(state, available_options)
return self.current_option.policy(state)
4. 强化学习前沿技术
4.1 基于模型的强化学习(MBRL)
class DynamicsModel(nn.Module):
def __init__(self, state_dim, action_dim):
super(DynamicsModel, self).__init__()
self.fc1 = nn.Linear(state_dim + action_dim, 256)
self.fc2 = nn.Linear(256, 256)
self.fc3 = nn.Linear(256, state_dim)
def forward(self, state, action):
x = torch.cat([state, action], dim=1)
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return self.fc3(x)
class MBAgent:
def __init__(self, state_dim, action_dim):
self.state_dim = state_dim
self.action_dim = action_dim
self.dynamics_model = DynamicsModel(state_dim, action_dim)
self.optimizer = optim.Adam(self.dynamics_model.parameters(), lr=0.001)
self.real_buffer = deque(maxlen=10000)
self.imagined_buffer = deque(maxlen=10000)
def train_dynamics(self, states, actions, next_states):
pred_next_states = self.dynamics_model(states, actions)
loss = nn.MSELoss()(pred_next_states, next_states)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def plan(self, initial_state, horizon=10, num_sequences=100):
# 使用交叉熵方法(CEM)进行规划
mean = torch.zeros(horizon * self.action_dim)
std = torch.ones(horizon * self.action_dim)
for _ in range(5): # CEM迭代次数
# 生成动作序列
actions = torch.normal(mean, std).reshape(num_sequences, horizon, self.action_dim)
# 评估动作序列
states = torch.FloatTensor(initial_state).repeat(num_sequences, 1)
total_rewards = torch.zeros(num_sequences)
for t in range(horizon):
next_states = self.dynamics_model(states, actions[:, t, :])
rewards = self.reward_model(states, actions[:, t, :], next_states)
total_rewards += rewards
states = next_states
# 选择最优的k个序列
_, top_indices = torch.topk(total_rewards, k=20)
elite_actions = actions[top_indices]
# 更新分布参数
mean = elite_actions.mean(dim=0).flatten()
std = elite_actions.std(dim=0).flatten()
return mean.reshape(horizon, self.action_dim)[0] # 返回第一个动作
4.2 逆强化学习(IRL)
class Discriminator(nn.Module):
def __init__(self, state_dim, action_dim):
super(Discriminator, self).__init__()
self.fc1 = nn.Linear(state_dim + action_dim, 256)
self.fc2 = nn.Linear(256, 256)
self.fc3 = nn.Linear(256, 1)
def forward(self, state, action):
x = torch.cat([state, action], dim=1)
x = torch.relu(self.fc1(x))
x = torch.relu(self.fc2(x))
return torch.sigmoid(self.fc3(x))
class IRAgent:
def __init__(self, state_dim, action_dim):
self.state_dim = state_dim
self.action_dim = action_dim
self.discriminator = Discriminator(state_dim, action_dim)
self.policy = PolicyNetwork(state_dim, action_dim)
self.optimizer = optim.Adam([
{'params': self.discriminator.parameters()},
{'params': self.policy.parameters()}
], lr=0.0003)
def train_step(self, expert_states, expert_actions, agent_states, agent_actions):
# 训练判别器
expert_labels = torch.ones(expert_states.shape[0], 1)
agent_labels = torch.zeros(agent_states.shape[0], 1)
all_states = torch.cat([expert_states, agent_states], dim=0)
all_actions = torch.cat([expert_actions, agent_actions], dim=0)
all_labels = torch.cat([expert_labels, agent_labels], dim=0)
preds = self.discriminator(all_states, all_actions)
d_loss = nn.BCELoss()(preds, all_labels)
# 训练策略
agent_preds = self.discriminator(agent_states, agent_actions)
p_loss = -torch.log(agent_preds + 1e-8).mean()
total_loss = d_loss + p_loss
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
return d_loss.item(), p_loss.item()
5. 强化学习实践建议
- 超参数调优: • 学习率:通常从3e-4开始尝试 • 折扣因子γ:0.9-0.99之间 • 批量大小:32-512之间 • 探索率:初始1.0,逐渐衰减到0.01-0.1
- 训练技巧: • 使用学习率调度器 • 实现梯度裁剪 • 监控训练曲线和关键指标 • 定期保存模型检查点
- 调试策略: • 检查奖励是否合理 • 验证价值函数是否收敛 • 分析探索是否充分 • 可视化智能体行为
- 性能优化: • 使用GPU加速 • 并行化环境交互 • 优化数据管道 • 减少不必要的计算
6. 强化学习应用案例
6.1 机器人控制
# 使用PPO训练机械臂到达目标位置
env = gym.make('FetchReach-v1')
state_dim = env.observation_space['observation'].shape[0] + env.observation_space['achieved_goal'].shape[0]
action_dim = env.action_space.shape[0]
max_action = float(env.action_space.high[0])
agent = PPOAgent(state_dim, action_dim)
episodes = 10000
for e in range(episodes):
obs = env.reset()
state = np.concatenate([obs['observation'], obs['achieved_goal']])
total_reward = 0
done = False
while not done:
action, log_prob = agent.act(state)
next_obs, reward, done, info = env.step(action)
next_state = np.concatenate([next_obs['observation'], next_obs['achieved_goal']])
agent.remember(state, action, log_prob, reward, done)
state = next_state
total_reward += reward
if done:
print(f"Episode: {e+1}, Reward: {total_reward:.2f}")
break
agent.learn()
6.2 金融交易
class TradingEnvironment:
def __init__(self, data, initial_balance=10000):
self.data = data # 市场数据
self.index = 0
self.balance = initial_balance
self.positions = 0
self.max_steps = len(data) - 1
def reset(self):
self.index = 0
self.balance = 10000
self.positions = 0
return self._get_state()
def _get_state(self):
# 返回当前状态(市场数据+持仓信息)
return np.concatenate([
self.data.iloc[self.index].values,
[self.balance / 10000, self.positions]
])
def step(self, action):
# action: 0=持有, 1=买入, 2=卖出
current_price = self.data.iloc[self.index]['close']
reward = 0
done = False
if action == 1 and self.balance >= current_price: # 买入
self.positions += 1
self.balance -= current_price
elif action == 2 and self.positions > 0: # 卖出
self.positions -= 1
self.balance += current_price
reward = current_price - self.data.iloc[self.index-1]['close']
self.index += 1
done = self.index >= self.max_steps
if done:
# 结算所有持仓
self.balance += self.positions * current_price
self.positions = 0
reward = (self.balance - 10000) / 10000 # 回报率
return self._get_state(), reward, done, {}
# 使用DQN训练交易策略
data = load_market_data() # 加载市场数据
env = TradingEnvironment(data)
state_dim = len(env.reset())
action_dim = 3
agent = DQNAgent(state_dim, action_dim)
episodes = 1000
for e in range(episodes):
state = env.reset()
total_reward = 0
done = False
while not done:
action = agent.act(state)
next_state, reward, done, _ = env.step(action)
agent.remember(state, action, reward, next_state, done)
state = next_state
total_reward += reward
if done:
print(f"Episode: {e+1}, Profit: {total_reward*100:.2f}%")
break
agent.replay()
7. 总结与展望
本文深入探讨了强化学习的进阶技术和实践方法,包括:
- 高级算法如PPO、DDPG及其实现
- 多智能体系统设计模式
- 分层强化学习架构
- 前沿方向如基于模型的学习和逆强化学习
- 实际应用案例和工程实践建议
强化学习仍在快速发展中,未来的重要方向包括:
• 提高样本效率和训练稳定性 • 发展更强大的迁移学习和元学习能力 • 结合大型语言模型的决策能力 • 解决安全性和可解释性问题 • 开发更强大的多智能体协作框架
通过掌握这些高级技术,开发者可以构建更强大、更实用的强化学习系统,解决现实世界中的复杂决策问题。