项目分析:自然语言处理(语言情感分析)

时间:2024-10-17 13:06:20

在这个信息爆炸的时代,我们每天都在与海量的文本数据打交道。从社交媒体上的帖子、在线评论到新闻报道,文本信息无处不在。然而,这些文本不仅仅是文字的堆砌,它们背后蕴含着丰富的情感和观点。如何有效地理解和分析这些情感,成为了自然语言处理(NLP)领域中的一个重要课题——语言情感分析。本文将带你一窥语言情感分析的奥秘,探讨其原理、应用及未来发展趋势。

一、什么是语言情感分析?

语言情感分析,又称情感倾向性分析或情感挖掘,是指利用自然语言处理技术自动识别并提取文本中表达的情感倾向。这些情感可以是正面的(如“太棒了!”)、负面的(如“真失望!”)或是中性的。情感分析的核心在于理解文本背后的情感色彩,帮助机器像人一样“感知”情绪。

二、项目目标及处理方法:

1目标:

将评论内容转换为词向量。

2、每个词/字转换为词向量长度

        (维度)200

3、每一次传入的词/字的个数是否就是评论的长度?   

 应该是固定长度,每次传入数据与图像相似。     例如选择长度为32。则传入的数据为32*200

4、一条评论如果超过32个词/字怎么处理?   

 直接删除后面的内容

5、一条评论如果没有32个词/字怎么处理?   

 缺少的内容,统一使用一个数字(非词/字的数字)替代。

6、如果语料库中的词/字太多是否可以压缩?   

 可以,某些词/字出现的频率比较低,可能训练不出特征。因此可以选择频率比较高的词来训练。例如选择4760个。

7、被压缩的词/字如何处理?   

可以统一使用一个数字(非词/字的数字)替代。

三、项目步骤

1.构建词汇表(vocab),这个词汇表用于文本处理任务
# 导入tqdm库,用于显示循环的进度条  
from tqdm import tqdm  
# 导入pickle库,用于对象的序列化和反序列化  
import pickle as pkl  
  
# 定义常量,词表的最大长度限制  
MAX_VOC_SIZE = 4760  
# 定义常量,未知字和填充符号的标识  
UNK, PAD = '<UNK>', '<PAD>'  
  
def build_vocab(file_path, max_size, min_freq):  
    '''  
    功能: 基于文本内容建立词表vocab,vocab中包含语料库中的字  
    参数:  
        file_path: 需要读取的语料库的路径  
        max_size: 获取词频最高的前max_size个词  
        剔除字频低于min_freq个的词  
    '''  
    # 定义一个tokenizer函数,用于将文本分割成单个字符  
    tokenizer = lambda x: [y for y in x]  
    # 初始化一个空字典,用于保存词的统计信息  
    vocab_dic = {}  
    # 打开文件,准备读取内容  
    with open(file_path, 'r', encoding='UTF-8') as f:  
        i = 0  # 初始化行计数器  
        # 使用tqdm包装文件读取循环,以显示进度条  
        for line in tqdm(f):  
            # 如果是文件的第一行(通常是表头),则跳过  
            if i == 0:  
                i += 1  
                continue  
            # 去除每行开头的两个字符(可能是标签或分隔符),并去除首尾空白  
            lin = line[2:].strip()  
            # 如果处理后的行内容为空,则跳过  
            if not lin:  
                continue  
            # 遍历行内容中的每个字符,统计字符出现的次数  
            for word in tokenizer(lin):  
                vocab_dic[word] = vocab_dic.get(word, 0) + 1  
        # 过滤掉词频低于min_freq的字符,按词频降序排序,取前max_size个  
        vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] > min_freq], key=lambda x: x[1], reverse=True)[:max_size]  
        # 为过滤后的字符分配索引,从0开始  
        vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)}  
        # 更新词汇表字典,加入<UNK>和<PAD>及其对应的索引  
        vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1})  
        # 打印词汇表字典  
        print(vocab_dic)  
        # 使用pickle将词汇表字典序列化并保存到文件中  
        pkl.dump(vocab_dic, open('simplifyweibo_4_moods.pkl', 'wb'))  
        # 打印词汇表的大小  
        print(f"Vocab size: {len(vocab_dic)}")  
    # 返回构建好的词汇表字典  
    return vocab_dic  
  
if __name__ == '__main__':  
    # 调用build_vocab函数,传入文件路径、词表最大长度和最小词频阈值  
    vocab = build_vocab('simplifyweibo_4_moods.csv', MAX_VOC_SIZE, 1)  
    # 注意:这里的print('vocab')只是打印了字符串'vocab',并没有打印变量vocab的内容  
    # 如果要打印变量vocab的内容,应该使用print(vocab)  
    print('vocab')

结果展示:

2、评论删除、填充,切分数据集
from tqdm import tqdm
import pickle as pkl
import random
import torch

UNK, PAD = '<UNK>', '<PAD>'  # 未知字,padding符号


def load_dataset(path, pad_size=70):  # path为文件地址,pad_size为单条评论字符的最大长度
    contents = []  # 用来存储转换为数值标号的句子
    vocab = pkl.load(open('simplifyweibo_4_moods.pkl', 'rb'))  # 读取vocab词库文件,rb二进制只读
    tokenizer = lambda x: [y for y in x]  # 自定义函数用来将字符串分隔成单个字符并存入列表
    with open(path, 'r', encoding='utf8') as f:
        i = 0
        for line in tqdm(f):  # 遍历文件内容的每一行,同时展示进度条
            if i == 0:  # 此处循环目的为了跳过第一行的无用内容
                i += 1
                continue
            if not line:  # 筛选是不是空行,空行则跳过
                continue
            label = int(line[0])  # 返回当前行的标签
            content = line[2:].strip('\n')  # 取出标签和逗号后的所有内容,同时去除前后的换行符
            words_line = []
            token = tokenizer(content)  # 将每一行的内容进行分字,返回一个列表
            seq_len = len(token)  # 获取一行实际内容的长度
            if pad_size:
                if len(token) < pad_size:  # 如果一行的字符数少于70,则填充字符<PAD>,填充个数为少于的部分的个数
                    token.extend([PAD] * (pad_size - len(token)))
                else:  # 如果一行的字大于70,则只取前70个字
                    token = token[:pad_size]  # 如果一条评论种的宁大于或等于70个字,索引的切分
                    seq_len = pad_size  # 当前评论的长度
            # word to id
            for word in token:  # 遍历实际内容的每一个字符
                words_line.append(vocab.get(word, vocab.get(
                    UNK)))  # vocab为词库,其中为字典形式,使用get去获取遍历出来的字符的值,值可表示索引值,如果该字符不在词库中则将其值增加为字典中键UNK对应的值,words_line中存放的是每一行的每一个字符对应的索引值
            contents.append((words_line, int(label), seq_len))  # 将每一行评论的字符对应的索引以及这一行评论的类别,还有当前评论的实际内容的长度,以元组的形式存入列表
        random.shuffle(contents)  # 随机打乱每一行内容的顺序
        """切分80%训练集、10%验证集、10%测试集"""
        train_data = contents[: int(len(contents) * 0.8)]  # 前80%的评论数据作为训练集
        dev_data = contents[int(len(contents) * 0.8):int(len(contents) * 0.9)]  # 把80%~90%的评论数据集作为验证数热
        test_data = contents[int(len(contents) * 0.9):]  # 90%~最后的数据作为测试数据集
    return vocab, train_data, dev_data, test_data  # 返回词库、训练集、验证集、测试集,数据集为列表中的元组形式
3.定义一个类,用于迭代地处理数据集,将其分割成指定大小的批次(batch),并能够在GPU或其他设备上运行

  
class DatasetIterater(object):  
    def __init__(self, batches, batch_size, device):  
        """  
        初始化DatasetIterater对象  
        :param batches: 数据集,通常是一个包含多个数据项的列表,每个数据项是一个元组,包含输入数据和标签等  
        :param batch_size: 每个批次的数据量  
        :param device: 指定数据应该被加载到的设备,例如'cpu'或'cuda'  
        """  
        self.batch_size = batch_size  # 设置批次大小  
        self.batches = batches  # 存储整个数据集  
        self.n_batches = len(batches) // batch_size  # 计算完整批次的数量  
        self.residue = False  # 标记是否有剩余数据不足以形成一个完整批次  
        # 如果数据集长度不是批次大小的整数倍,则设置residue为True  
        if len(batches) % self.n_batches != 0:  
            self.residue = True  
        self.index = 0  # 初始化当前批次的索引  
        self.device = device  # 设置数据加载的设备  
  
    def _to_tensor(self, datas):  
        """  
        将数据集转换为张量并发送到指定设备  
        :param datas: 数据集的一个批次,是一个包含多个元组的列表,每个元组包含输入数据、标签和序列长度  
        :return: 转换后的输入数据和标签的张量元组  
        """  
        x = torch.LongTensor([_[0] for _ in datas]).to(self.device)  # 转换输入数据为长整型张量并发送到设备  
        y = torch.LongTensor([_[1] for _ in datas]).to(self.device)  # 转换标签为长整型张量并发送到设备  
        seq_len = torch.LongTensor([_[2] for _ in datas]).to(self.device)  # 转换序列长度为长整型张量并发送到设备  
        return (x, seq_len), y  # 返回输入数据和标签的张量元组  
  
    def __next__(self):  
        """  
        实现迭代器的__next__方法,用于获取下一个批次的数据  
        :return: 下一个批次的数据,是一个包含输入数据和标签的张量元组  
        """  
        # 如果存在剩余数据且当前索引等于完整批次的数量,则处理剩余数据  
        if self.residue and self.index == self.n_batches:  
            batches = self.batches[self.index * self.batch_size: len(self.batches)]  
            self.index += 1  
            batches = self._to_tensor(batches)  # 转换数据类型为张量  
            return batches  
        # 如果当前索引超过了完整批次的数量,则重置索引并抛出StopIteration异常以结束迭代  
        elif self.index > self.n_batches:  
            self.index = 0  
            raise StopIteration  
        # 否则,提取当前批次的数据,转换数据类型为张量,并返回  
        else:  
            batches = self.batches[self.index * self.batch_size:(self.index + 1) * self.batch_size]  
            self.index += 1  
            batches = self._to_tensor(batches)  
            return batches  
  
    def __iter__(self):  
        """  
        实现迭代器的__iter__方法,使对象本身成为迭代器  
        :return: 返回迭代器对象本身  
        """  
        return self  
  
    def __len__(self):  
        """  
        实现迭代器的__len__方法,返回迭代器的长度(即批次数)  
        :return: 迭代器的长度,如果有剩余数据则加1  
        """  
        if self.residue:  
            return self.n_batches + 1  
        else:  
            return self.n_batches
4、定义模型,前向传播函数(文件名为TestRNN)
import torch.nn as nn  # 导入PyTorch的神经网络模块  
  
class Model(nn.Module):  # 定义一个名为Model的类,它继承自nn.Module  
    def __init__(self, embedding_pertrainde, n_vocab, embed, num_classes):  
        # 类的初始化方法  
        # embedding_pertrainde: 预训练的词嵌入矩阵,如果为None则不使用预训练词嵌入  
        # n_vocab: 词汇表的大小  
        # embed: 词嵌入的维度  
        # num_classes: 输出类别的数量  
        super(Model, self).__init__()  # 调用父类的初始化方法  
  
        # 如果提供了预训练的词嵌入,则使用它们初始化Embedding层,并设置padding_idx和freeze参数  
        if embedding_pertrainde is not None:  
            self.embedding = nn.Embedding.from_pretrained(embedding_pertrainde, padding_idx=n_vocab-1, freeze=False)  
        # 如果没有提供预训练的词嵌入,则使用随机初始化的词嵌入创建Embedding层  
        else:  
            self.embedding = nn.Embedding(n_vocab, embed, padding_idx=n_vocab-1)  
          
        # 定义一个LSTM层,输入维度为embed,隐藏层维度为128,层数为3,双向LSTM,batch_first=True,dropout为0.3  
        self.lstm = nn.LSTM(embed, 128, 3, bidirectional=True, batch_first=True, dropout=0.3)  
          
        # 定义一个全连接层,输入维度为128*2(因为LSTM是双向的,所以隐藏状态维度是单向的两倍),输出维度为num_classes  
        self.fc = nn.Linear(128*2, num_classes)  
  
    def forward(self, x):  
        # 定义模型的前向传播过程  
        # 注意:这里的x应该是一个元组,但通常我们只关心第一个元素(输入数据),第二个元素(输入数据的长度)在这个模型中没有使用  
        x, _ = x  # 忽略输入数据的长度,只保留输入数据  
          
        # 将输入数据通过Embedding层转换为词嵌入表示  
        out = self.embedding(x)  
          
        # 将词嵌入表示通过LSTM层,得到LSTM的输出  
        out, _ = self.lstm(out)  # LSTM的输出是一个元组,但这里我们只关心输出序列的最后一个时间步的隐藏状态  
          
        # 将LSTM的最后一个时间步的隐藏状态通过全连接层,得到最终的输出  
        # 注意:out[:,-1,:]表示取输出序列的最后一个时间步的所有隐藏状态(因为是双向LSTM,所以有两个隐藏状态)  
        out = self.fc(out[:,-1,:])  
          
        # 返回模型的输出  
        return out
5、定义训练、测试函数
import torch  
import torch.nn.functional as F  
import torch.nn as nn  
import numpy as np  
from sklearn import metrics  
import time  
  
# 评估模型性能的函数  
def evaluate(class_list, model, data_iter, test=False):  
    model.eval()  # 设置模型为评估模式  
    loss_total = 0  # 初始化总损失为0  
    predict_all = np.array([], dtype=int)  # 初始化预测结果数组  
    labels_all = np.array([], dtype=int)  # 初始化真实标签数组  
    with torch.no_grad():  # 禁用梯度计算  
        for texts, labels in data_iter:  # 遍历数据迭代器  
            outputs = model(texts)  # 通过模型得到输出  
            loss = F.cross_entropy(outputs, labels)  # 计算交叉熵损失  
            loss_total += loss  # 累加损失  
            labels = labels.data.cpu().numpy()  # 将标签转换为NumPy数组并移到CPU  
            predic = torch.max(outputs.data, 1)[1].cpu().numpy()  # 得到预测结果并移到CPU  
            labels_all = np.append(labels_all, labels)  # 追加真实标签到数组  
            predict_all = np.append(predict_all, predic)  # 追加预测结果到数组  
  
    acc = metrics.accuracy_score(labels_all, predict_all)  # 计算准确率  
    if test:  # 如果是测试模式  
        report = metrics.classification_report(labels_all, predict_all, target_names=class_list, digits=4)  # 生成分类报告  
        return acc, loss_total / len(data_iter), report  # 返回准确率、平均损失和分类报告  
    return acc, loss_total / len(data_iter)  # 返回准确率和平均损失  
  
# 在测试集上测试模型的函数  
def test(model, test_iter, class_list):  
    model.load_state_dict(torch.load('TextRNN.skpt'))  # 加载模型参数(注意文件扩展名可能是.pth或.ckpt,这里可能是个笔误)  
    model.eval()  # 设置模型为评估模式  
    start_time = time.time()  # 记录开始时间  
    test_acc, test_loss, test_report = evaluate(class_list, model, test_iter, test=True)  # 评估模型  
    msg = "Test Loss:{0:>5.2},Test Acc:{1:6.2%}"  # 定义输出格式  
    print(msg.format(test_loss, test_acc))  # 打印测试损失和准确率  
    print(test_report)  # 打印分类报告  
    pass  # 这个pass是多余的,可以删除  
  
# 训练模型的函数  
def train(model, train_iter, dev_iter, test_iter, class_list):  
    model.train()  # 设置模型为训练模式  
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)  # 定义Adam优化器  
  
    total_batch = 0  # 初始化总批次数为0  
    dev_best_loss = float('inf')  # 初始化验证集最佳损失为正无穷  
    last_improve = 0  # 初始化最后改进的批次为0  
    flag = False  # 初始化提前停止标志为False  
    epochs = 2  # 定义训练轮数为2  
    for epoch in range(epochs):  # 遍历训练轮数  
        print('Epoch [{}/{}]'.format(epoch + 1, epochs))  # 打印当前轮数  
        for i, (trains, labels) in enumerate(train_iter):  # 遍历训练数据迭代器  
            outputs = model(trains)  # 通过模型得到输出  
            loss = F.cross_entropy(outputs, labels)  # 计算交叉熵损失  
            model.zero_grad()  # 清零梯度  
            loss.backward()  # 反向传播  
            optimizer.step()  # 更新参数  
            if total_batch % 100 == 0:  # 每100个批次输出一次信息  
                predic = torch.max(outputs.data, 1)[1].cpu()  # 得到预测结果并移到CPU  
                train_acc = metrics.accuracy_score(labels.data.cpu(), predic)  # 计算训练准确率  
                dev_acc, dev_loss = evaluate(class_list, model, dev_iter)  # 在验证集上评估模型  
                if dev_loss < dev_best_loss:  # 如果当前验证损失低于最佳损失  
                    dev_best_loss = dev_loss  # 更新最佳损失  
                    torch.save(model.state_dict(), 'TextRNN.ckpt')  # 保存模型参数  
                    last_improve = total_batch  # 更新最后改进的批次  
  
                msg = 'Iter:{0:>6},Train Loss:{1:>5.2},Train Acc:{2:>6.2%},Val Loss:{3:>5.2},Val Acc:{4:>6.2%}'  # 定义输出格式  
                print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc))  # 打印信息  
  
                model.train()  # 重新设置模型为训练模式 
            total_batch += 1  # 累加总批次数  
            if total_batch - last_improve > 10000:  # 如果超过10000个批次没有改进  
                print("No optimization for a long time, auto-stopping...")  # 打印提前停止信息  
                flag = True  # 设置提前停止标志为True  
  
        if flag:  # 如果提前停止标志为True  
            break  # 跳出训练循环  
  
    test(model, test_iter, class_list) #进行测试
6、定义主函数(main)
import torch  # 导入PyTorch库,用于构建和训练神经网络  
import numpy as np  # 导入NumPy库,用于进行高效的数组和矩阵运算  
import load_dataset, TextRNN  # 导入自定义的load_dataset模块和TextRNN模块  
from train_eval_test import train  # 从train_eval_test模块导入train函数,用于模型的训练、评估和测试  
  
# 设置设备,优先选择CUDA(GPU),如果不可用则选择MPS(Apple的金属性能着色器),否则使用CPU  
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"  
  
# 设置随机种子以保证结果的可重复性  
np.random.seed(1)  # 设置NumPy的随机种子  
torch.manual_seed(1)  # 设置PyTorch的全局随机种子  
torch.cuda.manual_seed_all(1)  # 如果使用CUDA,则为所有CUDA设备设置随机种子  
torch.backends.cudnn.deterministic = True  # 设置CuDNN为确定性模式,进一步保证结果的一致性  
  
# 加载数据集  
vocab, train_data, dev_data, test_data = load_dataset.load_dataset('simplifyweibo_4_moods.csv')  # 从CSV文件中加载词汇表和数据集(训练集、开发集、测试集)  
  
# 创建数据集迭代器  
train_iter = load_dataset.DatasetIterater(train_data, 128, device)  # 为训练数据创建迭代器,批量大小为128  
dev_iter = load_dataset.DatasetIterater(dev_data, 128, device)  # 为开发数据创建迭代器,批量大小为128  
test_iter = load_dataset.DatasetIterater(test_data, 128, device)  # 为测试数据创建迭代器,批量大小为128  
  
# 加载预训练的词嵌入  
embedding_pretrained = torch.tensor(np.load('embedding_Tencent.npz')['embeddings'].astype('float32'))  # 从.npz文件中加载预训练的词嵌入  
  
# 根据是否加载了预训练的词嵌入来确定嵌入维度,否则使用默认值200  
embed = embedding_pretrained.size(1) if embedding_pretrained is not None else 200  
  
# 定义类别列表和类别数量  
class_list = ['喜悦', '愤怒', '厌恶', '低落']  # 定义情感分类的类别列表  
num_classes = len(class_list)  # 计算类别数量  
  
# 初始化TextRNN模型  
model = TextRNN.Model(embedding_pretrained, len(vocab), embed, num_classes).to(device)  # 创建TextRNN模型实例,并将其移动到指定的设备上  
  
# 训练模型  
train(model, train_iter, dev_iter, test_iter, class_list)  # 使用训练、开发和测试迭代器训练模型,并输出分类结果

       

代码结果:

结语

尽管情感分析取得了显著进展,但仍面临诸多挑战,如多语言支持、复杂情感识别(如讽刺、幽默)、跨领域适应性等。随着深度学习技术的不断进步,尤其是预训练语言模型(如GPT系列、BERT)的兴起,情感分析的准确性和泛化能力得到了显著提升。

未来,情感分析将更加智能化和个性化,能够更精细地捕捉和解读人类情感。同时,结合其他技术如图像识别、语音识别,实现多模态情感分析,将为人工智能带来更加丰富的情感理解和交互能力。

语言情感分析作为自然语言处理的一个重要分支,正逐步渗透到我们生活的方方面面,改变着人与信息的交互方式。随着技术的不断演进,我们有理由相信,未来的机器不仅能理解文字的字面意义,更能深刻感知并回应人类的情感需求,共同构建一个更加智能、和谐的社会。