【实验六】基于前馈神经网络的二类任务

时间:2024-10-23 07:37:39

1 数据集构建

2 模型构建

2.1 线性层算子

2.2 Logistic算子

 2.3  层次串行组合

3 损失函数

4 模型优化

4.1 反向传播算法

4.2 损失函数

4.3 Logistic算子

4.4 线性层

4.5 整个网络

4.6 优化器

5 完善Runner类:RunnerV2_1

6 模型训练

7 性能评价

8 完整代码

9 调参与改进 

10【思考题】


1 数据集构建

        使用之前的二分类数据集:Moon1000数据集,包含训练集640条、验证集160条、测试集200条,每个样本包含2个特征。这里直接from data import make_moons,调用之前make_moons函数来生成数据集。

data.py

import math
import copy
import torch
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt

def make_moons(n_samples=1000, shuffle=True, noise=None):
    """
    生成带噪音的弯月形状数据
    输入:
        - n_samples:数据量大小,数据类型为int
        - shuffle:是否打乱数据,数据类型为bool
        - noise:以多大的程度增加噪声,数据类型为None或float,noise为None时表示不增加噪声
    输出:
        - X:特征数据,shape=[n_samples,2]
        - y:标签数据, shape=[n_samples]
    """
    n_samples_out = n_samples // 2
    n_samples_in = n_samples - n_samples_out
    # 采集第1类数据,特征为(x,y)
    # 使用'torch.linspace'在0到pi上均匀取n_samples_out个值
    # 使用'torch.cos'计算上述取值的余弦值作为特征1,使用'torch.sin'计算上述取值的正弦值作为特征2
    outer_circ_x = torch.cos(torch.linspace(0, math.pi, n_samples_out))
    outer_circ_y = torch.sin(torch.linspace(0, math.pi, n_samples_out))
    inner_circ_x = 1 - torch.cos(torch.linspace(0, math.pi, n_samples_in))
    inner_circ_y = 0.5 - torch.sin(torch.linspace(0, math.pi, n_samples_in))
    #print('外弯月特征x的形状:', outer_circ_x.shape, '外弯月特征y的形状:', outer_circ_y.shape)
    #print('内弯月特征x的形状:', inner_circ_x.shape, '内弯月特征y的形状:', inner_circ_y.shape)

    # 使用'torch.cat'将两类数据的特征1和特征2分别沿维度0拼接在一起,得到全部特征1和特征2
    # 使用'torch.stack'将两类特征沿维度1堆叠在一起
    X = torch.stack(
        [torch.cat([outer_circ_x, inner_circ_x]),
         torch.cat([outer_circ_y, inner_circ_y])],
        axis=1
    )

    #print('拼接后的形状:', torch.cat([outer_circ_x, inner_circ_x]).shape)
   # print('X的形状:', X.shape)

    # 使用'torch.zeros'将第一类数据的标签全部设置为0
    # 使用'torch.ones'将第二类数据的标签全部设置为1
    y = torch.cat(
        [torch.zeros(size=[n_samples_out]), torch.ones(size=[n_samples_in])]
    )

    print('y的形状:', y.shape)

    # 如果shuffle为True,将所有数据打乱
    if shuffle:
        # 使用'torch.randperm'生成一个数值在0到X.shape[0],随机排列的一维Tensor作为索引值,用于打乱数据
        print(X.shape[0])
        idx = torch.randperm(X.shape[0])
        X = X[idx]
        y = y[idx]
    # 如果noise不为None,则给特征值加入噪声
    if noise is not None:
        # 使用'torch.normal'生成符合正态分布的随机Tensor作为噪声,并加到原始特征上
        print(noise)
        X += torch.normal(mean=0.0, std=noise, size=X.shape)

    return X, y

# 下面的代码只在直接运行 data.py 时执行
if __name__ == "__main__":
    n_samples = 1000
    X, y = make_moons(n_samples=n_samples, shuffle=True, noise=0.2)
    # 可视化生成的数据集,不同颜色代表不同类别
    plt.figure(figsize=(5, 5))
    plt.scatter(x=X[:, 0].tolist(), y=X[:, 1].tolist(), marker='*', c=y.tolist())
    plt.xlim(-3, 4)
    plt.ylim(-3, 4)
    plt.savefig('线性数据集可视化.pdf')
    plt.show()
    print("生成的数据样本数:", len(y))
from data import make_moons
# 数据集构建
n_samples = 1000
X, y = make_moons(n_samples=n_samples, shuffle=True, noise=0.2)

# 划分数据集
num_train = 640  # 训练集样本数量
num_dev = 160    # 验证集样本数量
num_test = 200   # 测试集样本数量

# 根据指定数量划分数据集
X_train, y_train = X[:num_train], y[:num_train]  # 训练集
X_dev, y_dev = X[num_train:num_train + num_dev], y[num_train:num_train + num_dev]  # 验证集
X_test, y_test = X[num_train + num_dev:], y[num_train + num_dev:]  # 测试集

# 调整标签的形状,将其转换为[N, 1]的格式
y_train = y_train.reshape([-1, 1])
y_dev = y_dev.reshape([-1, 1])
y_test = y_test.reshape([-1, 1])

# 可视化生成的数据集
plt.figure(figsize=(5, 5))  # 设置图形大小
plt.scatter(x=X[:, 0], y=X[:, 1], marker='*', c=y, cmap='viridis')  # 绘制散点图
plt.xlim(-3, 4)  # 设置x轴范围
plt.ylim(-3, 4)  # 设置y轴范围
plt.grid(True, linestyle='--', alpha=0.3)  # 添加网格
plt.show()  # 显示图形

                                           

2 模型构建

本节仅简单搭建前向传播的网络架构,不涉及反向传播以及梯度的计算。

模型中用到的算子都继承Op基类。

# 定义共同的父类 Op
class Op(object):
    """
    基类,用于定义操作(如线性层、激活函数等)。

    方法:
        - __call__(inputs): 调用 forward 方法。
        - forward(inputs): 前向传播,必须在子类中实现。
        - backward(inputs): 反向传播,必须在子类中实现。
    """
    def __init__(self):
        pass

    def __call__(self, inputs):
        return self.forward(inputs)

    def forward(self, inputs):
        raise NotImplementedError

    def backward(self, inputs):
        raise NotImplementedError

2.1 线性层算子

class Linear(Op):
    """
    线性层(全连接层)。

    参数:
        - input_size: 输入特征的数量
        - output_size: 输出特征的数量
        - name: 层的名称
        - weight_init: 权重初始化函数
        - bias_init: 偏置初始化函数

    属性:
        - params: 存储权重和偏置
        - inputs: 前向传播的输入
    """
    def __init__(self, input_size, output_size, name, weight_init=torch.randn, bias_init=torch.zeros):
        self.params = {}
        self.params['W'] = weight_init(size=(input_size, output_size))
        self.params['b'] = bias_init(size=(1, output_size))
        self.inputs = None
        self.name = name

    def forward(self, inputs):
        self.inputs = inputs
        outputs = torch.matmul(self.inputs, self.params['W']) + self.params['b']  # 线性变换
        return outputs

2.2 Logistic算子

class Logistic(Op):
    """
    Sigmoid 激活函数。

    属性:
        - inputs: 前向传播的输入
        - outputs: 前向传播的输出
        - params: 存储模型参数
        - name: 层的名称
    """
    def __init__(self):
        self.inputs = None
        self.outputs = None
        self.params = {}
        self.name = "Logistic"

    def forward(self, inputs):
        outputs = 1.0 / (1.0 + torch.exp(-inputs))  # Sigmoid 函数
        self.outputs = outputs
        return self.outputs

 2.3  层次串行组合

        将不同的隐藏层、输入层、输出层之间串联起来,不断交叉重复使用它们来构建一个多层的神经网络。

        这里构建网络传播过程为:输入数据 X 经过第一个线性层 fc1--->得到线性组合结果 z1---->z1 通过第一个激活函数 act_fn1,得到激活值 a1----->a1 经过第二个线性层 fc2,得到第二个线性组合结果 z2---->z2 通过第二个激活函数 act_fn2,得到最终的预测值 a2。

class Model_MLP_L2(Op):
    """
    二层全连接神经网络模型。

    参数:
        - input_size: 输入层特征数量
        - hidden_size: 隐藏层特征数量
        - output_size: 输出层特征数量

    属性:
        - layers: 存储模型的所有层
    """
    def __init__(self, input_size, hidden_size, output_size):
        self.fc1 = Linear(input_size, hidden_size, name="fc1")
        self.act_fn1 = Logistic()  # 激活函数
        self.fc2 = Linear(hidden_size, output_size, name="fc2")
        self.act_fn2 = Logistic()  # 激活函数
        self.layers = [self.fc1, self.act_fn1, self.fc2, self.act_fn2]  # 按顺序存储层

    def __call__(self, X):
        return self.forward(X)

    def forward(self, X):
        z1 = self.fc1(X)  # 第一层前向传播
        a1 = self.act_fn1(z1)  # 第一层激活
        z2 = self.fc2(a1)  # 第二层前向传播
        a2 = self.act_fn2(z2)  # 第二层激活
        return a2  # 返回输出

测试一下

        实例化一个两层的前馈网络,令其输入层维度为5,隐藏层维度为10,输出层维度为1。 并随机生成一条长度为5的数据输入两层神经网络,观察输出结果。

# 实例化模型
model = Model_MLP_L2(input_size=5, hidden_size=10, output_size=1)
# 随机生成1条长度为5的数据
X = torch.rand(size=[1, 5])
result = model(X)
print ("result: ", result)

 输出:

result:  tensor([[0.5554]])

        在这个示例中,result 是基于输入 X 经过神经网络层层计算得出的,体现了输入特征与模型输出之间的关系。result: 这是一个形状为(1, 1)的张量,表示模型对输入样本的预测值。由于输出层的维度为1,这个结果可以用于二分类问题的概率预测。

3 损失函数

采用二分类交叉熵损失函数。

# 实现交叉熵损失函数
class BinaryCrossEntropyLoss(Op):
    def __init__(self):
        self.predicts = None
        self.labels = None
        self.num = None
 
    def __call__(self, predicts, labels):
        return self.forward(predicts, labels)
 
    def forward(self, predicts, labels):
        self.predicts = predicts
        self.labels = labels
        self.num = self.predicts.shape[0]
        loss = -1. / self.num * (torch.matmul(self.labels.t(), torch.log(self.predicts)) + torch.matmul((1-self.labels.t()), torch.log(1-self.predicts)))
        loss = torch.squeeze(loss, axis=1)
        return loss
 

4 模型优化

        神经网络的层数通常比较深,其梯度计算和上一章中的线性分类模型的不同的点在于:线性模型通常比较简单可以直接计算梯度,而神经网络相当于一个复合函数,需要利用链式法则进行反向传播来计算梯度。

4.1 反向传播算法

4.2 损失函数

class BinaryCrossEntropyLoss(Op):
    """
    二分类交叉熵损失函数。

    属性:
        - predicts: 模型预测值
        - labels: 真实标签
        - num: 样本数量
        - model: 需要计算梯度的模型
    """
    def __init__(self, model):
        self.predicts = None
        self.labels = None
        self.num = None
        self.model = model

    def __call__(self, predicts, labels):
        return self.forward(predicts, labels)

    def forward(self, predicts, labels):
        self.predicts = predicts
        self.labels = labels
        self.num = self.predicts.shape[0]
        loss = -1. / self.num * (torch.matmul(self.labels.t(), torch.log(self.predicts)) +
                                 torch.matmul((1 - self.labels.t()), torch.log(1 - self.predicts)))
        loss = torch.squeeze(loss, axis=1)  # 压缩维度
        return loss

    def backward(self):
        loss_grad_predicts = -1.0 * (self.labels / self.predicts -
                                     (1 - self.labels) / (1 - self.predicts)) / self.num  # 计算梯度
        self.model.backward(loss_grad_predicts)  # 反向传播

4.3 Logistic算子

        为实现的Logistic算子增加反向函数backward(),在每次反向传播时,首先计算Sigmoid函数的导数,然后将这个导数与上游传递的梯度相乘,得到当前层输入的梯度。这个梯度将被传递到前一层,用于更新前一层的参数。

class Logistic(Op):
    """
    Sigmoid 激活函数。

    属性:
        - inputs: 前向传播的输入
        - outputs: 前向传播的输出
        - params: 存储模型参数
        - name: 层的名称
    """
    def __init__(self):
        self.inputs = None
        self.outputs = None
        self.params = {}
        self.name = "Logistic"

    def forward(self, inputs):
        outputs = 1.0 / (1.0 + torch.exp(-inputs))  # Sigmoid 函数
        self.outputs = outputs
        return self.outputs

    def backward(self, grads):
        outputs_grad_inputs = self.outputs * (1.0 - self.outputs)  # Sigmoid 导数
        return grads * outputs_grad_inputs  # 返回梯度

4.4 线性层

        为实现的线性层算子增加反向函数backward(),在每次反向传播时,首先计算权重和偏置的梯度,然后将这些梯度用于更新权重和偏置。计算得到的梯度将被传递到前一层,用于更新前一层的参数。

class Linear(Op):
    """
    线性层(全连接层)。

    参数:
        - input_size: 输入特征的数量
        - output_size: 输出特征的数量
        - name: 层的名称
        - weight_init: 权重初始化函数
        - bias_init: 偏置初始化函数

    属性:
        - params: 存储权重和偏置
        - grads: 存储梯度
        - inputs: 前向传播的输入
    """
    def __init__(self, input_size, output_size, name, weight_init=torch.randn, bias_init=torch.zeros):
        self.params = {}
        self.params['W'] = weight_init(size=(input_size, output_size))
        self.params['b'] = bias_init(size=(1, output_size))
        self.inputs = None
        self.grads = {}
        self.name = name

    def forward(self, inputs):
        self.inputs = inputs
        outputs = torch.matmul(self.inputs, self.params['W']) + self.params['b']  # 线性变换
        return outputs

    def backward(self, grads):
        self.grads['W'] = torch.matmul(self.inputs.T, grads)  # 计算权重梯度
        self.grads['b'] = torch.sum(grads, axis=0)  # 计算偏置梯度
        return torch.matmul(grads, self.params['W'].T)  # 返回上层梯度

4.5 整个网络

       为之前搭建的网络Model_MLP_L2 增加反向函数backward(),从输出层开始,根据损失函数计算的梯度,依次通过各层的反向传播,更新模型的权重和偏置。

Model_MLP_L2 --backward():

  1. 从输出层开始,接收上游传递的梯度 loss_grad_a2。
  2. 通过第二个激活函数层 self.act_fn2 进行反向传播,计算第二层线性层的梯度 loss_grad_z2。
  3. 通过第二个线性层 self.fc2 进行反向传播,计算第一层激活函数的梯度 loss_grad_a1。
  4. 通过第一个激活函数层 self.act_fn1 进行反向传播,计算第一层线性层的梯度 loss_grad_z1。
  5. 最后通过第一个线性层 self.fc1 进行反向传播,更新模型的权重和偏置。
class Model_MLP_L2(Op):
    """
    二层全连接神经网络模型。

    参数:
        - input_size: 输入层特征数量
        - hidden_size: 隐藏层特征数量
        - output_size: 输出层特征数量

    属性:
        - layers: 存储模型的所有层
    """
    def __init__(self, input_size, hidden_size, output_size):
        self.fc1 = Linear(input_size, hidden_size, name="fc1")
        self.act_fn1 = Logistic()  # 激活函数
        self.fc2 = Linear(hidden_size, output_size, name="fc2")
        self.act_fn2 = Logistic()  # 激活函数
        self.layers = [self.fc1, self.act_fn1, self.fc2, self.act_fn2]  # 按顺序存储层

    def __call__(self, X):
        return self.forward(X)

    def forward(self, X):
        z1 = self.fc1(X)  # 第一层前向传播
        a1 = self.act_fn1(z1)  # 第一层激活
        z2 = self.fc2(a1)  # 第二层前向传播
        a2 = self.act_fn2(z2)  # 第二层激活
        return a2  # 返回输出

    def backward(self, loss_grad_a2):
        # 反向传播
        loss_grad_z2 = self.act_fn2.backward(loss_grad_a2)  # 第二层反向传播
        loss_grad_a1 = self.fc2.backward(loss_grad_z2)  # 第一层反向传播
        loss_grad_z1 = self.act_fn1.backward(loss_grad_a1)  # 激活函数反向传播
        self.fc1.backward(loss_grad_z1)  # 更新权重和偏置

4.6 优化器

        计算好神经网络参数的梯度后,将梯度下降法中参数的更新过程实现在优化器中。此处的优化器需要遍历每层,对每层的参数分别做更新。

        这里采用批量梯度下降是一种简单而有效的优化方法,它在每次更新参数时都使用整个训练集。这通常会导致每次更新步骤比较慢,但可以保证使用所有数据来计算梯度,从而使得梯度的估计更加准确。

class BatchGD(Optimizer):
    """
    批量梯度下降优化器。

    参数:
        - init_lr: 初始学习率
        - model: 需要优化的模型
    """
    def __init__(self, init_lr, model):
        super(BatchGD, self).__init__(init_lr=init_lr, model=model)

    def step(self):
        # 更新参数
        for layer in self.model.layers:
            if isinstance(layer.params, dict):
                for key in layer.params.keys():
                    layer.params[key] = layer.params[key] - self.init_lr * layer.grads[key]  # 更新权重和偏置
class Optimizer(Op):
    """
    优化器基类,用于更新模型参数。

    参数:
        - model: 需要优化的模型
        - init_lr: 初始学习率

    方法:
        - step(): 执行一步优化,更新模型参数。
        - zero_grad(): 清零模型的梯度。
    """
    def __init__(self, model, init_lr):
        self.model = model
        self.init_lr = init_lr

    def step(self):
        raise NotImplementedError("必须在子类中实现该方法。")

    def zero_grad(self):
        for layer in self.model.layers:
            if hasattr(layer, 'grads'):
                for key in layer.grads.keys():
                    layer.grads[key] = torch.zeros_like(layer.grads[key])  # 清零梯度

5 完善Runner类:RunnerV2_1

在之前的基础上添加的主要功能有:

1、支持自定义算子的梯度计算,在训练过程中调用self.loss_fn.backward()从损失函数开始反向计算梯度;

2、每层的模型保存和加载,将每一层的参数分别进行保存和加载。

import os
import torch

class RunnerV2_1(object):
    def __init__(self, model, optimizer, metric, loss_fn, **kwargs):
        # 初始化 RunnerV2_1 类的实例
        self.model = model  # 存储模型
        self.optimizer = optimizer  # 存储优化器
        self.loss_fn = loss_fn  # 存储损失函数
        self.metric = metric  # 存储评估指标

        # 记录训练过程中的评估指标变化情况
        self.train_scores = []  # 用于存储每个训练轮次的得分
        self.dev_scores = []    # 用于存储每个验证轮次的得分

        # 记录训练过程中的损失变化情况
        self.train_loss = []    # 用于存储每个训练轮次的损失
        self.dev_loss = []      # 用于存储每个验证轮次的损失

    def train(self, train_set, dev_set, **kwargs):
        # 传入训练轮数,如果没有传入值则默认为0
        num_epochs = kwargs.get("num_epochs", 0)
        # 传入日志打印频率,如果没有传入值则默认为100
        log_epochs = kwargs.get("log_epochs", 100)

        # 传入模型保存路径
        save_dir = kwargs.get("save_dir", None)

        # 记录全局最优指标
        best_score = 0
        # 进行 num_epochs 轮训练
        for epoch in range(num_epochs):
            X, y = train_set  # 解包训练集数据
            # 获取模型预测
            logits = self.model(X)  # 模型输出

            # 计算交叉熵损失
            trn_loss = self.loss_fn(logits, y)  # 返回一个张量

            # 记录当前训练损失
            self.train_loss.append(trn_loss.item())
            # 计算评估指标
            trn_score = self.metric(logits, y)  # 计算训练集得分
            self.train_scores.append(trn_score)  # 记录得分

            # 反向传播计算梯度
            self.loss_fn.backward()

            # 参数更新
            self.optimizer.step()  # 使用优化器更新模型参数

            # 评估验证集
            dev_score, dev_loss = self.evaluate(dev_set)
            # 如果当前指标为最优指标,保存该模型
            if dev_score > best_score:
                print(f"[Evaluate] best accuracy performance has been updated: {best_score:.5f} --> {dev_score:.5f}")
                best_score = dev_score  # 更新最佳得分
                if save_dir:  # 如果指定了保存路径
                    self.save_model(save_dir)  # 保存模型

            # 打印训练过程中的损失和状态
            if log_epochs and epoch % log_epochs == 0:
                print(f"[Train] epoch: {epoch}/{num_epochs}, loss: {trn_loss.item()}")

    def evaluate(self, data_set):
        # 解包验证集数据
        X, y = data_set
        # 计算模型输出
        logits = self.model(X)  # 模型预测
        # 计算损失函数
        loss = self.loss_fn(logits, y).item()  # 计算验证集损失
        self.dev_loss.append(loss)  # 记录验证损失
        # 计算评估指标
        score = self.metric(logits, y)  # 计算验证集得分
        self.dev_scores.append(score)  # 记录得分
        return score, loss  # 返回得分和损失

    def predict(self, X):
        # 对输入数据进行预测
        return self.model(X)

    def save_model(self, save_dir):
        # 确保目录存在
        os.makedirs(save_dir, exist_ok=True)

        # 对模型每层参数分别进行保存,保存文件名称与该层名称相同
        for layer in self.model.layers:  # 遍历所有层
            if isinstance(layer.params, dict):  # 检查层参数是否为字典
                torch.save(layer.params, os.path.join(save_dir, layer.name + ".pdparams"))  # 保存参数

    def load_model(self, model_dir):
        # 获取所有层参数名称和保存路径之间的对应关系
        model_file_names = os.listdir(model_dir)  # 列出模型目录下的所有文件
        name_file_dict = {}
        for file_name in model_file_names:
            name = file_name.replace(".pdparams", "")  # 提取层名称
            name_file_dict[name] = os.path.join(model_dir, file_name)  # 构建名称与路径的字典

        # 加载每层参数
        for layer in self.model.layers:  # 遍历所有层
            if isinstance(layer.params, dict):  # 检查层参数是否为字典
                name = layer.name  # 获取层名称
                file_path = name_file_dict[name]  # 获取对应的文件路径
                layer.params = torch.load(file_path)  # 加载参数

6 模型训练

        基于RunnerV2_1,使用训练集和验证集进行模型训练,共训练2000个epoch。评价指标为之前实验介绍的accuracy,具体代码如下:

def accuracy(preds, labels):
    """
    计算模型预测的准确率。

    参数:
        - preds: 预测值,shape=[N, 1](二分类)或 [N, C](多分类)
        - labels: 真实标签,shape=[N, 1]

    返回:
        - 准确率:float
    """
    if preds.shape[1] == 1:
        preds = torch.round(preds)  # 二分类,四舍五入
    else:
        preds = torch.argmax(preds, dim=1)  # 多分类,获取最大元素索引

    correct = (preds == labels).sum().item()
    accuracy = correct / len(labels)
    return accuracy

模型训练: 

# 设置随机种子以确保结果可重复
torch.manual_seed(111)

# 定义训练参数
epoch_num = 1000  # 训练轮数
model_saved_dir = "model"  # 模型保存目录

# 网络参数
input_size = 2  # 输入层维度为2
hidden_size = 5  # 隐藏层维度为5
output_size = 1  # 输出层维度为1

# 定义多层感知机模型
model = Model_MLP_L2(input_size=input_size, hidden_size=hidden_size, output_size=output_size)

# 定义损失函数
loss_fn = BinaryCrossEntropyLoss(model)

# 定义优化器,设置学习率
learning_rate = 0.2
optimizer = BatchGD(learning_rate, model)

# 定义评价方法
metric = accuracy

# 实例化RunnerV2_1类,并传入训练配置
runner = RunnerV2_1(model, optimizer, metric, loss_fn)

# 训练模型
runner.train([X_train, y_train], [X_dev, y_dev], num_epochs=epoch_num, log_epochs=50, save_dir=model_saved_dir)

运行结果:

[Evaluate] best accuracy performance has been updated: 0.00000 --> 0.48750
[Train] epoch: 0/1000, loss: 0.734169602394104
[Evaluate] best accuracy performance has been updated: 0.48750 --> 0.49375
[Evaluate] best accuracy performance has been updated: 0.49375 --> 0.54375
[Evaluate] best accuracy performance has been updated: 0.54375 --> 0.65625
[Evaluate] best accuracy performance has been updated: 0.65625 --> 0.71875
[Evaluate] best accuracy performance has been updated: 0.71875 --> 0.78125
[Evaluate] best accuracy performance has been updated: 0.78125 --> 0.82500
[Evaluate] best accuracy performance has been updated: 0.82500 --> 0.85000
[Evaluate] best accuracy performance has been updated: 0.85000 --> 0.86875
[Evaluate] best accuracy performance has been updated: 0.86875 --> 0.87500
[Evaluate] best accuracy performance has been updated: 0.87500 --> 0.88750
[Evaluate] best accuracy performance has been updated: 0.88750 --> 0.90625
[Train] epoch: 50/1000, loss: 0.46939030289649963
[Train] epoch: 100/1000, loss: 0.4078947603702545
[Train] epoch: 150/1000, loss: 0.38109686970710754
[Evaluate] best accuracy performance has been updated: 0.90625 --> 0.91250
[Train] epoch: 200/1000, loss: 0.366441011428833
[Train] epoch: 250/1000, loss: 0.3574114739894867
[Train] epoch: 300/1000, loss: 0.3515494763851166
[Train] epoch: 350/1000, loss: 0.3476543426513672
[Train] epoch: 400/1000, loss: 0.3450251519680023
[Train] epoch: 450/1000, loss: 0.34321925044059753
[Train] epoch: 500/1000, loss: 0.34195104241371155
[Train] epoch: 550/1000, loss: 0.3410356044769287
[Train] epoch: 600/1000, loss: 0.3403526842594147
[Train] epoch: 650/1000, loss: 0.33982399106025696
[Train] epoch: 700/1000, loss: 0.33939802646636963
[Train] epoch: 750/1000, loss: 0.3390413224697113
[Train] epoch: 800/1000, loss: 0.3387317359447479
[Train] epoch: 850/1000, loss: 0.33845433592796326
[Train] epoch: 900/1000, loss: 0.33819931745529175
[Train] epoch: 950/1000, loss: 0.33795997500419617

可视化观察训练集与验证集的损失函数变化情况  

# 打印训练集和验证集的损失
plt.figure()  # 创建新的图形
plt.plot(range(epoch_num), runner.train_loss, color="#8E004D", label="Train loss")  # 绘制训练损失
plt.plot(range(epoch_num), runner.dev_loss, color="#E20079", linestyle='--', label="Dev loss")  # 绘制验证损失
plt.xlabel("epoch", fontsize='x-large')  # 设置x轴标签
plt.ylabel("loss", fontsize='x-large')  # 设置y轴标签
plt.legend(fontsize='large')  # 显示图例
plt.show()  # 显示损失图

 

7 性能评价

      使用 load_model 方法从指定目录加载先前训练好的模型参数,使用测试集对训练中的模型进行评价,通过 调用RunnerV2_1的evaluate 方法在测试集上进行性能评估,返回模型的准确率和损失值。

# 加载训练好的模型
runner.load_model(model_saved_dir)

# 在测试集上对模型进行评价
score, loss = runner.evaluate([X_test, y_test])  # 评估模型性能

# 打印测试集的准确率和损失
print("[Test] score/loss: {:.4f}/{:.4f}".format(score, loss))

运行结果:

[Test] score/loss: 0.8500/0.3527

从结果来看,模型在测试集上取得了较高的准确率。 

对结果进行可视化:

# 均匀生成40000个数据点
x1, x2 = torch.meshgrid(torch.linspace(-math.pi, math.pi, 200), torch.linspace(-math.pi, math.pi, 200))
x = torch.stack([torch.flatten(x1), torch.flatten(x2)], axis=1)  # 将生成的点堆叠成二维数组

# 使用模型进行预测
y = runner.predict(x)  # 预测类别
y = torch.squeeze((y >= 0.5).to(torch.float32), axis=-1)  # 将概率值转化为类别标签

# 绘制类别区域
plt.ylabel('x2')  # 设置y轴标签
plt.xlabel('x1')  # 设置x轴标签
plt.scatter(x[:, 0].tolist(), x[:, 1].tolist(), c=y.tolist(), cmap=plt.cm.Spectral)  # 绘制类别区域

# 可视化训练集、验证集和测试集数据
plt.scatter(X_train[:, 0].tolist(), X_train[:, 1].tolist(), marker='*', c=torch.squeeze(y_train, axis=-1).tolist())  # 绘制训练集
plt.scatter(X_dev[:, 0].tolist(), X_dev[:, 1].tolist(), marker='*', c=torch.squeeze(y_dev, axis=-1).tolist())  # 绘制验证集
plt.scatter(X_test[:, 0].tolist(), X_test[:, 1].tolist(), marker='*', c=torch.squeeze(y_test, axis=-1).tolist())  # 绘制测试集
plt.show()  # 显示最终图形

                   

8 完整代码

 需要导入nndl模块、data模块、Runner2-1模块内用到的部分类/函数

from data import make_moons
from nndl import Op
from nndl import accuracy
from nndl import Optimizer
import numpy as np
import torch
from Runner2_1 import RunnerV2_1
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
# 定义共同的父类 Op

class Linear(Op):
    """
    线性层(全连接层)。

    参数:
        - input_size: 输入特征的数量
        - output_size: 输出特征的数量
        - name: 层的名称
        - weight_init: 权重初始化函数
        - bias_init: 偏置初始化函数

    属性:
        - params: 存储权重和偏置
        - grads: 存储梯度
        - inputs: 前向传播的输入
    """
    def __init__(self, input_size, output_size, name, weight_init=torch.randn, bias_init=torch.zeros):
        self.params = {}
        self.params['W'] = weight_init(size=(input_size, output_size))
        self.params['b'] = bias_init(size=(1, output_size))
        self.inputs = None
        self.grads = {}
        self.name = name

    def forward(self, inputs):
        self.inputs = inputs
        outputs = torch.matmul(self.inputs, self.params['W']) + self.params['b']  # 线性变换
        return outputs

    def backward(self, grads):
        self.grads['W'] = torch.matmul(self.inputs.T, grads)  # 计算权重梯度
        self.grads['b'] = torch.sum(grads, axis=0)  # 计算偏置梯度
        return torch.matmul(grads, self.params['W'].T)  # 返回上层梯度


class Logistic(Op):
    """
    Sigmoid 激活函数。

    属性:
        - inputs: 前向传播的输入
        - outputs: 前向传播的输出
        - params: 存储模型参数
        - name: 层的名称
    """
    def __init__(self):
        self.inputs = None
        self.outputs = None
        self.params = {}
        self.name = "Logistic"

    def forward(self, inputs):
        outputs = 1.0 / (1.0 + torch.exp(-inputs))  # Sigmoid 函数
        self.outputs = outputs
        return self.outputs

    def backward(self, grads):
        outputs_grad_inputs = self.outputs * (1.0 - self.outputs)  # Sigmoid 导数
        return grads * outputs_grad_inputs  # 返回梯度


class BinaryCrossEntropyLoss(Op):
    """
    二分类交叉熵损失函数。

    属性:
        - predicts: 模型预测值
        - labels: 真实标签
        - num: 样本数量
        - model: 需要计算梯度的模型
    """
    def __init__(self, model):
        self.predicts = None
        self.labels = None
        self.num = None
        self.model = model

    def __call__(self, predicts, labels):
        return self.forward(predicts, labels)

    def forward(self, predicts, labels):
        self.predicts = predicts
        self.labels = labels
        self.num = self.predicts.shape[0]
        loss = -1. / self.num * (torch.matmul(self.labels.t(), torch.log(self.predicts)) +
                                 torch.matmul((1 - self.labels.t()), torch.log(1 - self.predicts)))
        loss = torch.squeeze(loss, axis=1)  # 压缩维度
        return loss

    def backward(self):
        loss_grad_predicts = -1.0 * (self.labels / self.predicts -
                                     (1 - self.labels) / (1 - self.predicts)) / self.num  # 计算梯度
        self.model.backward(loss_grad_predicts)  # 反向传播


class BatchGD(Optimizer):
    """
    批量梯度下降优化器。

    参数:
        - init_lr: 初始学习率
        - model: 需要优化的模型
    """
    def __init__(self, init_lr, model):
        super(BatchGD, self).__init__(init_lr=init_lr, model=model)

    def step(self):
        # 更新参数
        for layer in self.model.layers:
            if isinstance(layer.params, dict):
                for key in layer.params.keys():
                    layer.params[key] = layer.params[key] - self.init_lr * layer.grads[key]  # 更新权重和偏置


class Model_MLP_L2(Op):
    """
    二层全连接神经网络模型。

    参数:
        - input_size: 输入层特征数量
        - hidden_size: 隐藏层特征数量
        - output_size: 输出层特征数量

    属性:
        - layers: 存储模型的所有层
    """
    def __init__(self, input_size, hidden_size, output_size):
        self.fc1 = Linear(input_size, hidden_size, name="fc1")
        self.act_fn1 = Logistic()  # 激活函数
        self.fc2 = Linear(hidden_size, output_size, name="fc2")
        self.act_fn2 = Logistic()  # 激活函数
        self.layers = [self.fc1, self.act_fn1, self.fc2, self.act_fn2]  # 按顺序存储层

    def __call__(self, X):
        return self.forward(X)

    def forward(self, X):
        z1 = self.fc1(X)  # 第一层前向传播
        a1 = self.act_fn1(z1)  # 第一层激活
        z2 = self.fc2(a1)  # 第二层前向传播
        a2 = self.act_fn2(z2)  # 第二层激活
        return a2  # 返回输出

    def backward(self, loss_grad_a2):
        # 反向传播
        loss_grad_z2 = self.act_fn2.backward(loss_grad_a2)  # 第二层反向传播
        loss_grad_a1 = self.fc2.backward(loss_grad_z2)  # 第一层反向传播
        loss_grad_z1 = self.act_fn1.backward(loss_grad_a1)  # 激活函数反向传播
        self.fc1.backward(loss_grad_z1)  # 更新权重和偏置


# 实例化模型
model = Model_MLP_L2(input_size=5, hidden_size=10, output_size=1)
# 随机生成1条长度为5的数据
X = torch.rand(size=(1, 5))
result = model(X)
print("result: ", result)

# 数据集构建
n_samples = 1000
X, y = make_moons(n_samples=n_samples, shuffle=True, noise=0.2)

# 划分数据集
num_train = 640  # 训练集样本数量
num_dev = 160    # 验证集样本数量
num_test = 200   # 测试集样本数量

# 根据指定数量划分数据集
X_train, y_train = X[:num_train], y[:num_train]  # 训练集
X_dev, y_dev = X[num_train:num_train + num_dev], y[num_train:num_train + num_dev]  # 验证集
X_test, y_test = X[num_train + num_dev:], y[num_train + num_dev:]  # 测试集

# 调整标签的形状,将其转换为[N, 1]的格式
y_train = y_train.reshape([-1, 1])
y_dev = y_dev.reshape([-1, 1])
y_test = y_test.reshape([-1, 1])

# 可视化生成的数据集
plt.figure(figsize=(5, 5))  # 设置图形大小
plt.scatter(x=X[:, 0], y=X[:, 1], marker='*', c=y, cmap='viridis')  # 绘制散点图
plt.xlim(-3, 4)  # 设置x轴范围
plt.ylim(-3, 4)  # 设置y轴范围
plt.grid(True, linestyle='--', alpha=0.3)  # 添加网格
plt.show()  # 显示图形

# 设置随机种子以确保结果可重复
torch.manual_seed(111)

# 定义训练参数
epoch_num = 1000  # 训练轮数
model_saved_dir = "model"  # 模型保存目录

# 网络参数
input_size = 2  # 输入层维度为2
hidden_size = 5  # 隐藏层维度为5
output_size = 1  # 输出层维度为1

# 定义多层感知机模型
model = Model_MLP_L2(input_size=input_size, hidden_size=hidden_size, output_size=output_size)

# 定义损失函数
loss_fn = BinaryCrossEntropyLoss(model)

# 定义优化器,设置学习率
learning_rate = 0.2
optimizer = BatchGD(learning_rate, model)

# 定义评价方法
metric = accuracy

# 实例化RunnerV2_1类,并传入训练配置
runner = RunnerV2_1(model, optimizer, metric, loss_fn)

# 训练模型
runner.train([X_train, y_train], [X_dev, y_dev], num_epochs=epoch_num, log_epochs=50, save_dir=model_saved_dir)

# 打印训练集和验证集的损失
plt.figure()  # 创建新的图形
plt.plot(range(epoch_num), runner.train_loss, color="#8E004D", label="Train loss")  # 绘制训练损失
plt.plot(range(epoch_num), runner.dev_loss, color="#E20079", linestyle='--', label="Dev loss")  # 绘制验证损失
plt.xlabel("epoch", fontsize='x-large')  # 设置x轴标签
plt.ylabel("loss", fontsize='x-large')  # 设置y轴标签
plt.legend(fontsize='large')  # 显示图例
plt.show()  # 显示损失图

# 加载训练好的模型
runner.load_model(model_saved_dir)

# 在测试集上对模型进行评价
score, loss = runner.evaluate([X_test, y_test])  # 评估模型性能

# 打印测试集的准确率和损失
print("[Test] score/loss: {:.4f}/{:.4f}".format(score, loss))

import math

# 均匀生成40000个数据点
x1, x2 = torch.meshgrid(torch.linspace(-math.pi, math.pi, 200), torch.linspace(-math.pi, math.pi, 200))
x = torch.stack([torch.flatten(x1), torch.flatten(x2)], axis=1)  # 将生成的点堆叠成二维数组

# 使用模型进行预测
y = runner.predict(x)  # 预测类别
y = torch.squeeze((y >= 0.5).to(torch.float32), axis=-1)  # 将概率值转化为类别标签

# 绘制类别区域
plt.ylabel('x2')  # 设置y轴标签
plt.xlabel('x1')  # 设置x轴标签
plt.scatter(x[:, 0].tolist(), x[:, 1].tolist(), c=y.tolist(), cmap=plt.cm.Spectral)  # 绘制类别区域

# 可视化训练集、验证集和测试集数据
plt.scatter(X_train[:, 0].tolist(), X_train[:, 1].tolist(), marker='*', c=torch.squeeze(y_train, axis=-1).tolist())  # 绘制训练集
plt.scatter(X_dev[:, 0].tolist(), X_dev[:, 1].tolist(), marker='*', c=torch.squeeze(y_dev, axis=-1).tolist())  # 绘制验证集
plt.scatter(X_test[:, 0].tolist(), X_test[:, 1].tolist(), marker='*', c=torch.squeeze(y_test, axis=-1).tolist())  # 绘制测试集
plt.show()  # 显示最终图形


9 调参与改进 

测试一:增加隐藏层数目为10 ,其他不变

Test] score/loss: 0.9000/0.2702

【总结】 准确率增大,损失减小,增加隐藏层数目可以提高模型复杂度,使其预测更准。

测试二:增加隐藏层数目为25 ,其他不变 、

[Test] score/loss: 0.8400/0.3225

【总结】 准确率减小,过于增加隐藏层数目容易产生过拟合问题。

测试三:学习率调为0.001,步长过小,在1000次内无法收敛到一个最优值。

0

测试四:调整学习率为1,其他不变

[Test] score/loss: 0.9150/0.2319

【总结】:泛化能力增强,精度增大,损失减小,可能跳过了之前的一个局部最优值。

测试五:将隐藏层的输出激活函数采用ReLU,其他不变

 ReLU 激活函数
class ReLU(Op):
    def __init__(self):
        self.inputs = None
        self.outputs = None
        self.params = {}
        self.name = "ReLU"

    def forward(self, inputs):
        self.inputs = inputs
        self.outputs = torch.maximum(inputs, torch.tensor(0.0))
        return self.outputs

    def backward(self, grads):
        return grads * (self.inputs > 0).float()
 Model_MLP_L2网络中的隐藏层输出将隐藏层的激活函数更改为 ReLU,输出层保持 Sigmoid
  def __init__(self, input_size, hidden_size, output_size):
        self.fc1 = Linear(input_size, hidden_size, name="fc1")
        self.act_fn1 =ReLU()  # 激活函数
        self.fc2 = Linear(hidden_size, output_size, name="fc2")
        self.act_fn2 = Logistic()  # 激活函数
        self.layers = [self.fc1, self.act_fn1, self.fc2, self.act_fn2]  # 按顺序存储层

 测试集运行结果如下:

[Test] score/loss: 0.9000/0.2493

可视化结果如图: 

【总结】准确率由0.85提高至0.90,损失由0.3527讲至0.2493,决策边界更准确,ReLU能够引入更强的非线性特征,使网络可以学习到复杂的函数关系。

10【思考题】

对比“基于Logistic回归的二分类任务”与“基于前馈神经网络的二分类任务”,谈谈自己的看法。

 点击链接查看基于Logistic回归的二分类任务

Logistic回归模型相对简单,适用于较简单的线性可分的数据。前馈神经网络较为复杂。通过隐藏层和激活函数,能够捕捉非线性关系,可以处理更复杂的数据模式,适用于特征较多或存在非线性关系的情况。通过两次实验对比,前馈神经网络的最终score并没显示出多大的优越性。我认为原因有二:

1、由于二分类问题较为简单,没有太大必要使用太复杂的网络,使用简单的二分类模型也可以达到很好的效果。

2、网络的超参数设置不够好,或者激活函数的选择不恰当、优化器选择不恰当等等,总归无论如何神经网络肯定是更强大的。

鉴于内心对神经网络的“崇拜“,借助资料以及其他参考代码,我重新搭建了一下网络架构,大部分组件直接调用了torch.nn里面的类,其中设置学习率0.01、epoch 1000、隐藏层神经元数 16-、使用Adam优化器(Just想证明神经网络的强大之处是简单的二分类模型无法比拟的)

import torch
from data import make_moons
import torch.nn as nn
import torch.optim as optim
import math
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt

# 设置中文字体
matplotlib.rcParams['font.sans-serif'] = ['SimHei']
matplotlib.rcParams['font.family'] = 'sans-serif'

# 定义两层神经网络
class TwoLayerNN(nn.Module):
    def __init__(self):
        super(TwoLayerNN, self).__init__()
        self.fc1 = nn.Linear(2, 16)  # 输入层到隐藏层
        self.fc2 = nn.Linear(16, 1)  # 隐藏层到输出层
        self.sigmoid = nn.Sigmoid()  # 激活函数

    def forward(self, x):
        x = torch.relu(self.fc1(x))  # 使用ReLU激活函数
        x = self.sigmoid(self.fc2(x))  # 输出层使用Sigmoid激活函数
        return x

# 训练模型并保存损失值
def train_model(X, y, num_epochs=1000, learning_rate=0.01):
    model = TwoLayerNN()
    criterion = nn.BCELoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    losses = []
    for epoch in range(num_epochs):
        model.train()
        optimizer.zero_grad()

        outputs = model(X).view(-1, 1)  # 确保输出形状为[N, 1]
        loss = criterion(outputs, y)
        losses.append(loss.item())

        loss.backward()
        optimizer.step()

        if (epoch + 1) % 100 == 0:
            print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {loss.item():.4f}')
    return model, losses

# 计算准确率
def calculate_accuracy(model, X, y):
    with torch.no_grad():
        outputs = model(X).view(-1)  # [N, 1] 转换为 [N]
        predicted = (outputs >= 0.5).float()  # 根据阈值预测类别
        accuracy = (predicted == y.view(-1)).float().mean().item()  # 计算准确率
    return accuracy

# 可视化分类边界
def plot_decision_boundary(model, X, y):
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, 100), np.linspace(y_min, y_max, 100))
    grid = torch.tensor(np.c_[xx.ravel(), yy.ravel()], dtype=torch.float32)

    with torch.no_grad():
        preds = model(grid).squeeze().numpy()

    preds = preds.reshape(xx.shape)

    plt.figure(figsize=(10, 5))
    plt.contourf(xx, yy, preds, levels=[0, 0.5, 1], cmap='coolwarm', alpha=0.5)
    plt.scatter(X[:, 0].tolist(), X[:, 1].tolist(), c=y.tolist(), marker='*')
    plt.xlim(-3, 4)
    plt.ylim(-3, 4)
    plt.title("分类边界")
    plt.show()

# 可视化损失图像
def plot_loss(losses):
    plt.figure(figsize=(10, 5))
    plt.plot(losses, label='Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('训练损失图像')
    plt.legend()
    plt.grid()
    plt.show()

# 主函数
if __name__ == "__main__":
    # 数据集构建
    n_samples = 1000
    X, y = make_moons(n_samples=n_samples, shuffle=True, noise=0.2)

    # 划分数据集
    num_train = 640  # 训练集样本数量
    num_dev = 160  # 验证集样本数量
    num_test = 200  # 测试集样本数量

    # 根据指定数量划分数据集
    X_train, y_train = X[:num_train], y[:num_train]  # 训练集
    X_dev, y_dev = X[num_train:num_train + num_dev], y[num_train:num_train + num_dev]  # 验证集
    X_test, y_test = X[num_train + num_dev:], y[num_train + num_dev:]  # 测试集

    # 调整标签的形状,将其转换为[N, 1]的格式
    y_train = y_train.reshape([-1, 1])
    y_dev = y_dev.reshape([-1, 1])
    y_test = y_test.reshape([-1, 1])

    # 可视化生成的数据集
    plt.figure(figsize=(5, 5))  # 设置图形大小
    plt.scatter(x=X[:, 0], y=X[:, 1], marker='*', c=y, cmap='viridis')  # 绘制散点图
    plt.xlim(-3, 4)  # 设置x轴范围
    plt.ylim(-3, 4)  # 设置y轴范围
    plt.grid(True, linestyle='--', alpha=0.3)  # 添加网格
    plt.show()  # 显示图形

    # 训练模型
    model, losses = train_model(torch.FloatTensor(X_train), torch.FloatTensor(y_train))

    # 可视化分类边界
    plot_decision_boundary(model, torch.FloatTensor(X_train), torch.FloatTensor(y_train))
    plot_loss(losses)

    # 计算并打印准确率
    train_accuracy = calculate_accuracy(model, torch.FloatTensor(X_train), torch.FloatTensor(y_train))
    dev_accuracy = calculate_accuracy(model, torch.FloatTensor(X_dev), torch.FloatTensor(y_dev))
    test_accuracy = calculate_accuracy(model, torch.FloatTensor(X_test), torch.FloatTensor(y_test))

    print(f'训练集准确率: {train_accuracy:.4f}')
    print(f'验证集准确率: {dev_accuracy:.4f}')
    print(f'测试集准确率: {test_accuracy:.4f}')

​​​​​​​

可见经过一些微调和组件的更改,分类效果已经相当好了!!!!

参考链接:

torch.cast

paddle.cast

【Pytorch】torch.normal()函数

关于numpy,torch中seed()方法的一些理解

paddle前馈神经网络理论解读

Python当中类的__call__()方法、forward()方法以及__getitem__()方法作用是否会重叠

深度学习之前馈神经网络的入门学习(我觉得是全网最详细的)