在这个信息爆炸的时代,我们每天都在与海量的文本数据打交道。从社交媒体上的帖子、在线评论到新闻报道,文本信息无处不在。然而,这些文本不仅仅是文字的堆砌,它们背后蕴含着丰富的情感和观点。如何有效地理解和分析这些情感,成为了自然语言处理(NLP)领域中的一个重要课题——语言情感分析。本文将带你一窥语言情感分析的奥秘,探讨其原理、应用及未来发展趋势。
一、什么是语言情感分析?
语言情感分析,又称情感倾向性分析或情感挖掘,是指利用自然语言处理技术自动识别并提取文本中表达的情感倾向。这些情感可以是正面的(如“太棒了!”)、负面的(如“真失望!”)或是中性的。情感分析的核心在于理解文本背后的情感色彩,帮助机器像人一样“感知”情绪。
二、项目目标及处理方法:
1目标:
将评论内容转换为词向量。
2、每个词/字转换为词向量长度
(维度)200
3、每一次传入的词/字的个数是否就是评论的长度?
应该是固定长度,每次传入数据与图像相似。 例如选择长度为32。则传入的数据为32*200
4、一条评论如果超过32个词/字怎么处理?
直接删除后面的内容
5、一条评论如果没有32个词/字怎么处理?
缺少的内容,统一使用一个数字(非词/字的数字)替代。
6、如果语料库中的词/字太多是否可以压缩?
可以,某些词/字出现的频率比较低,可能训练不出特征。因此可以选择频率比较高的词来训练。例如选择4760个。
7、被压缩的词/字如何处理?
可以统一使用一个数字(非词/字的数字)替代。
三、项目步骤
1.构建词汇表(vocab),这个词汇表用于文本处理任务
# 导入tqdm库,用于显示循环的进度条
from tqdm import tqdm
# 导入pickle库,用于对象的序列化和反序列化
import pickle as pkl
# 定义常量,词表的最大长度限制
MAX_VOC_SIZE = 4760
# 定义常量,未知字和填充符号的标识
UNK, PAD = '<UNK>', '<PAD>'
def build_vocab(file_path, max_size, min_freq):
'''
功能: 基于文本内容建立词表vocab,vocab中包含语料库中的字
参数:
file_path: 需要读取的语料库的路径
max_size: 获取词频最高的前max_size个词
剔除字频低于min_freq个的词
'''
# 定义一个tokenizer函数,用于将文本分割成单个字符
tokenizer = lambda x: [y for y in x]
# 初始化一个空字典,用于保存词的统计信息
vocab_dic = {}
# 打开文件,准备读取内容
with open(file_path, 'r', encoding='UTF-8') as f:
i = 0 # 初始化行计数器
# 使用tqdm包装文件读取循环,以显示进度条
for line in tqdm(f):
# 如果是文件的第一行(通常是表头),则跳过
if i == 0:
i += 1
continue
# 去除每行开头的两个字符(可能是标签或分隔符),并去除首尾空白
lin = line[2:].strip()
# 如果处理后的行内容为空,则跳过
if not lin:
continue
# 遍历行内容中的每个字符,统计字符出现的次数
for word in tokenizer(lin):
vocab_dic[word] = vocab_dic.get(word, 0) + 1
# 过滤掉词频低于min_freq的字符,按词频降序排序,取前max_size个
vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] > min_freq], key=lambda x: x[1], reverse=True)[:max_size]
# 为过滤后的字符分配索引,从0开始
vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)}
# 更新词汇表字典,加入<UNK>和<PAD>及其对应的索引
vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1})
# 打印词汇表字典
print(vocab_dic)
# 使用pickle将词汇表字典序列化并保存到文件中
pkl.dump(vocab_dic, open('simplifyweibo_4_moods.pkl', 'wb'))
# 打印词汇表的大小
print(f"Vocab size: {len(vocab_dic)}")
# 返回构建好的词汇表字典
return vocab_dic
if __name__ == '__main__':
# 调用build_vocab函数,传入文件路径、词表最大长度和最小词频阈值
vocab = build_vocab('simplifyweibo_4_moods.csv', MAX_VOC_SIZE, 1)
# 注意:这里的print('vocab')只是打印了字符串'vocab',并没有打印变量vocab的内容
# 如果要打印变量vocab的内容,应该使用print(vocab)
print('vocab')
结果展示:
2、评论删除、填充,切分数据集
from tqdm import tqdm
import pickle as pkl
import random
import torch
UNK, PAD = '<UNK>', '<PAD>' # 未知字,padding符号
def load_dataset(path, pad_size=70): # path为文件地址,pad_size为单条评论字符的最大长度
contents = [] # 用来存储转换为数值标号的句子
vocab = pkl.load(open('simplifyweibo_4_moods.pkl', 'rb')) # 读取vocab词库文件,rb二进制只读
tokenizer = lambda x: [y for y in x] # 自定义函数用来将字符串分隔成单个字符并存入列表
with open(path, 'r', encoding='utf8') as f:
i = 0
for line in tqdm(f): # 遍历文件内容的每一行,同时展示进度条
if i == 0: # 此处循环目的为了跳过第一行的无用内容
i += 1
continue
if not line: # 筛选是不是空行,空行则跳过
continue
label = int(line[0]) # 返回当前行的标签
content = line[2:].strip('\n') # 取出标签和逗号后的所有内容,同时去除前后的换行符
words_line = []
token = tokenizer(content) # 将每一行的内容进行分字,返回一个列表
seq_len = len(token) # 获取一行实际内容的长度
if pad_size:
if len(token) < pad_size: # 如果一行的字符数少于70,则填充字符<PAD>,填充个数为少于的部分的个数
token.extend([PAD] * (pad_size - len(token)))
else: # 如果一行的字大于70,则只取前70个字
token = token[:pad_size] # 如果一条评论种的宁大于或等于70个字,索引的切分
seq_len = pad_size # 当前评论的长度
# word to id
for word in token: # 遍历实际内容的每一个字符
words_line.append(vocab.get(word, vocab.get(
UNK))) # vocab为词库,其中为字典形式,使用get去获取遍历出来的字符的值,值可表示索引值,如果该字符不在词库中则将其值增加为字典中键UNK对应的值,words_line中存放的是每一行的每一个字符对应的索引值
contents.append((words_line, int(label), seq_len)) # 将每一行评论的字符对应的索引以及这一行评论的类别,还有当前评论的实际内容的长度,以元组的形式存入列表
random.shuffle(contents) # 随机打乱每一行内容的顺序
"""切分80%训练集、10%验证集、10%测试集"""
train_data = contents[: int(len(contents) * 0.8)] # 前80%的评论数据作为训练集
dev_data = contents[int(len(contents) * 0.8):int(len(contents) * 0.9)] # 把80%~90%的评论数据集作为验证数热
test_data = contents[int(len(contents) * 0.9):] # 90%~最后的数据作为测试数据集
return vocab, train_data, dev_data, test_data # 返回词库、训练集、验证集、测试集,数据集为列表中的元组形式
3.定义一个类,用于迭代地处理数据集,将其分割成指定大小的批次(batch),并能够在GPU或其他设备上运行
class DatasetIterater(object):
def __init__(self, batches, batch_size, device):
"""
初始化DatasetIterater对象
:param batches: 数据集,通常是一个包含多个数据项的列表,每个数据项是一个元组,包含输入数据和标签等
:param batch_size: 每个批次的数据量
:param device: 指定数据应该被加载到的设备,例如'cpu'或'cuda'
"""
self.batch_size = batch_size # 设置批次大小
self.batches = batches # 存储整个数据集
self.n_batches = len(batches) // batch_size # 计算完整批次的数量
self.residue = False # 标记是否有剩余数据不足以形成一个完整批次
# 如果数据集长度不是批次大小的整数倍,则设置residue为True
if len(batches) % self.n_batches != 0:
self.residue = True
self.index = 0 # 初始化当前批次的索引
self.device = device # 设置数据加载的设备
def _to_tensor(self, datas):
"""
将数据集转换为张量并发送到指定设备
:param datas: 数据集的一个批次,是一个包含多个元组的列表,每个元组包含输入数据、标签和序列长度
:return: 转换后的输入数据和标签的张量元组
"""
x = torch.LongTensor([_[0] for _ in datas]).to(self.device) # 转换输入数据为长整型张量并发送到设备
y = torch.LongTensor([_[1] for _ in datas]).to(self.device) # 转换标签为长整型张量并发送到设备
seq_len = torch.LongTensor([_[2] for _ in datas]).to(self.device) # 转换序列长度为长整型张量并发送到设备
return (x, seq_len), y # 返回输入数据和标签的张量元组
def __next__(self):
"""
实现迭代器的__next__方法,用于获取下一个批次的数据
:return: 下一个批次的数据,是一个包含输入数据和标签的张量元组
"""
# 如果存在剩余数据且当前索引等于完整批次的数量,则处理剩余数据
if self.residue and self.index == self.n_batches:
batches = self.batches[self.index * self.batch_size: len(self.batches)]
self.index += 1
batches = self._to_tensor(batches) # 转换数据类型为张量
return batches
# 如果当前索引超过了完整批次的数量,则重置索引并抛出StopIteration异常以结束迭代
elif self.index > self.n_batches:
self.index = 0
raise StopIteration
# 否则,提取当前批次的数据,转换数据类型为张量,并返回
else:
batches = self.batches[self.index * self.batch_size:(self.index + 1) * self.batch_size]
self.index += 1
batches = self._to_tensor(batches)
return batches
def __iter__(self):
"""
实现迭代器的__iter__方法,使对象本身成为迭代器
:return: 返回迭代器对象本身
"""
return self
def __len__(self):
"""
实现迭代器的__len__方法,返回迭代器的长度(即批次数)
:return: 迭代器的长度,如果有剩余数据则加1
"""
if self.residue:
return self.n_batches + 1
else:
return self.n_batches
4、定义模型,前向传播函数(文件名为TestRNN)
import torch.nn as nn # 导入PyTorch的神经网络模块
class Model(nn.Module): # 定义一个名为Model的类,它继承自nn.Module
def __init__(self, embedding_pertrainde, n_vocab, embed, num_classes):
# 类的初始化方法
# embedding_pertrainde: 预训练的词嵌入矩阵,如果为None则不使用预训练词嵌入
# n_vocab: 词汇表的大小
# embed: 词嵌入的维度
# num_classes: 输出类别的数量
super(Model, self).__init__() # 调用父类的初始化方法
# 如果提供了预训练的词嵌入,则使用它们初始化Embedding层,并设置padding_idx和freeze参数
if embedding_pertrainde is not None:
self.embedding = nn.Embedding.from_pretrained(embedding_pertrainde, padding_idx=n_vocab-1, freeze=False)
# 如果没有提供预训练的词嵌入,则使用随机初始化的词嵌入创建Embedding层
else:
self.embedding = nn.Embedding(n_vocab, embed, padding_idx=n_vocab-1)
# 定义一个LSTM层,输入维度为embed,隐藏层维度为128,层数为3,双向LSTM,batch_first=True,dropout为0.3
self.lstm = nn.LSTM(embed, 128, 3, bidirectional=True, batch_first=True, dropout=0.3)
# 定义一个全连接层,输入维度为128*2(因为LSTM是双向的,所以隐藏状态维度是单向的两倍),输出维度为num_classes
self.fc = nn.Linear(128*2, num_classes)
def forward(self, x):
# 定义模型的前向传播过程
# 注意:这里的x应该是一个元组,但通常我们只关心第一个元素(输入数据),第二个元素(输入数据的长度)在这个模型中没有使用
x, _ = x # 忽略输入数据的长度,只保留输入数据
# 将输入数据通过Embedding层转换为词嵌入表示
out = self.embedding(x)
# 将词嵌入表示通过LSTM层,得到LSTM的输出
out, _ = self.lstm(out) # LSTM的输出是一个元组,但这里我们只关心输出序列的最后一个时间步的隐藏状态
# 将LSTM的最后一个时间步的隐藏状态通过全连接层,得到最终的输出
# 注意:out[:,-1,:]表示取输出序列的最后一个时间步的所有隐藏状态(因为是双向LSTM,所以有两个隐藏状态)
out = self.fc(out[:,-1,:])
# 返回模型的输出
return out
5、定义训练、测试函数
import torch
import torch.nn.functional as F
import torch.nn as nn
import numpy as np
from sklearn import metrics
import time
# 评估模型性能的函数
def evaluate(class_list, model, data_iter, test=False):
model.eval() # 设置模型为评估模式
loss_total = 0 # 初始化总损失为0
predict_all = np.array([], dtype=int) # 初始化预测结果数组
labels_all = np.array([], dtype=int) # 初始化真实标签数组
with torch.no_grad(): # 禁用梯度计算
for texts, labels in data_iter: # 遍历数据迭代器
outputs = model(texts) # 通过模型得到输出
loss = F.cross_entropy(outputs, labels) # 计算交叉熵损失
loss_total += loss # 累加损失
labels = labels.data.cpu().numpy() # 将标签转换为NumPy数组并移到CPU
predic = torch.max(outputs.data, 1)[1].cpu().numpy() # 得到预测结果并移到CPU
labels_all = np.append(labels_all, labels) # 追加真实标签到数组
predict_all = np.append(predict_all, predic) # 追加预测结果到数组
acc = metrics.accuracy_score(labels_all, predict_all) # 计算准确率
if test: # 如果是测试模式
report = metrics.classification_report(labels_all, predict_all, target_names=class_list, digits=4) # 生成分类报告
return acc, loss_total / len(data_iter), report # 返回准确率、平均损失和分类报告
return acc, loss_total / len(data_iter) # 返回准确率和平均损失
# 在测试集上测试模型的函数
def test(model, test_iter, class_list):
model.load_state_dict(torch.load('TextRNN.skpt')) # 加载模型参数(注意文件扩展名可能是.pth或.ckpt,这里可能是个笔误)
model.eval() # 设置模型为评估模式
start_time = time.time() # 记录开始时间
test_acc, test_loss, test_report = evaluate(class_list, model, test_iter, test=True) # 评估模型
msg = "Test Loss:{0:>5.2},Test Acc:{1:6.2%}" # 定义输出格式
print(msg.format(test_loss, test_acc)) # 打印测试损失和准确率
print(test_report) # 打印分类报告
pass # 这个pass是多余的,可以删除
# 训练模型的函数
def train(model, train_iter, dev_iter, test_iter, class_list):
model.train() # 设置模型为训练模式
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) # 定义Adam优化器
total_batch = 0 # 初始化总批次数为0
dev_best_loss = float('inf') # 初始化验证集最佳损失为正无穷
last_improve = 0 # 初始化最后改进的批次为0
flag = False # 初始化提前停止标志为False
epochs = 2 # 定义训练轮数为2
for epoch in range(epochs): # 遍历训练轮数
print('Epoch [{}/{}]'.format(epoch + 1, epochs)) # 打印当前轮数
for i, (trains, labels) in enumerate(train_iter): # 遍历训练数据迭代器
outputs = model(trains) # 通过模型得到输出
loss = F.cross_entropy(outputs, labels) # 计算交叉熵损失
model.zero_grad() # 清零梯度
loss.backward() # 反向传播
optimizer.step() # 更新参数
if total_batch % 100 == 0: # 每100个批次输出一次信息
predic = torch.max(outputs.data, 1)[1].cpu() # 得到预测结果并移到CPU
train_acc = metrics.accuracy_score(labels.data.cpu(), predic) # 计算训练准确率
dev_acc, dev_loss = evaluate(class_list, model, dev_iter) # 在验证集上评估模型
if dev_loss < dev_best_loss: # 如果当前验证损失低于最佳损失
dev_best_loss = dev_loss # 更新最佳损失
torch.save(model.state_dict(), 'TextRNN.ckpt') # 保存模型参数
last_improve = total_batch # 更新最后改进的批次
msg = 'Iter:{0:>6},Train Loss:{1:>5.2},Train Acc:{2:>6.2%},Val Loss:{3:>5.2},Val Acc:{4:>6.2%}' # 定义输出格式
print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc)) # 打印信息
model.train() # 重新设置模型为训练模式
total_batch += 1 # 累加总批次数
if total_batch - last_improve > 10000: # 如果超过10000个批次没有改进
print("No optimization for a long time, auto-stopping...") # 打印提前停止信息
flag = True # 设置提前停止标志为True
if flag: # 如果提前停止标志为True
break # 跳出训练循环
test(model, test_iter, class_list) #进行测试
6、定义主函数(main)
import torch # 导入PyTorch库,用于构建和训练神经网络
import numpy as np # 导入NumPy库,用于进行高效的数组和矩阵运算
import load_dataset, TextRNN # 导入自定义的load_dataset模块和TextRNN模块
from train_eval_test import train # 从train_eval_test模块导入train函数,用于模型的训练、评估和测试
# 设置设备,优先选择CUDA(GPU),如果不可用则选择MPS(Apple的金属性能着色器),否则使用CPU
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
# 设置随机种子以保证结果的可重复性
np.random.seed(1) # 设置NumPy的随机种子
torch.manual_seed(1) # 设置PyTorch的全局随机种子
torch.cuda.manual_seed_all(1) # 如果使用CUDA,则为所有CUDA设备设置随机种子
torch.backends.cudnn.deterministic = True # 设置CuDNN为确定性模式,进一步保证结果的一致性
# 加载数据集
vocab, train_data, dev_data, test_data = load_dataset.load_dataset('simplifyweibo_4_moods.csv') # 从CSV文件中加载词汇表和数据集(训练集、开发集、测试集)
# 创建数据集迭代器
train_iter = load_dataset.DatasetIterater(train_data, 128, device) # 为训练数据创建迭代器,批量大小为128
dev_iter = load_dataset.DatasetIterater(dev_data, 128, device) # 为开发数据创建迭代器,批量大小为128
test_iter = load_dataset.DatasetIterater(test_data, 128, device) # 为测试数据创建迭代器,批量大小为128
# 加载预训练的词嵌入
embedding_pretrained = torch.tensor(np.load('embedding_Tencent.npz')['embeddings'].astype('float32')) # 从.npz文件中加载预训练的词嵌入
# 根据是否加载了预训练的词嵌入来确定嵌入维度,否则使用默认值200
embed = embedding_pretrained.size(1) if embedding_pretrained is not None else 200
# 定义类别列表和类别数量
class_list = ['喜悦', '愤怒', '厌恶', '低落'] # 定义情感分类的类别列表
num_classes = len(class_list) # 计算类别数量
# 初始化TextRNN模型
model = TextRNN.Model(embedding_pretrained, len(vocab), embed, num_classes).to(device) # 创建TextRNN模型实例,并将其移动到指定的设备上
# 训练模型
train(model, train_iter, dev_iter, test_iter, class_list) # 使用训练、开发和测试迭代器训练模型,并输出分类结果
代码结果:
结语
尽管情感分析取得了显著进展,但仍面临诸多挑战,如多语言支持、复杂情感识别(如讽刺、幽默)、跨领域适应性等。随着深度学习技术的不断进步,尤其是预训练语言模型(如GPT系列、BERT)的兴起,情感分析的准确性和泛化能力得到了显著提升。
未来,情感分析将更加智能化和个性化,能够更精细地捕捉和解读人类情感。同时,结合其他技术如图像识别、语音识别,实现多模态情感分析,将为人工智能带来更加丰富的情感理解和交互能力。
语言情感分析作为自然语言处理的一个重要分支,正逐步渗透到我们生活的方方面面,改变着人与信息的交互方式。随着技术的不断演进,我们有理由相信,未来的机器不仅能理解文字的字面意义,更能深刻感知并回应人类的情感需求,共同构建一个更加智能、和谐的社会。