python实战(十一)——情感分析

时间:2024-12-17 17:52:19

一、概念

        情感分析(Sentiment Analysis),也称为意见挖掘(Opinion Mining),是自然语言处理(NLP)领域中的一个重要任务,广泛应用于舆情监控、市场调研、客户反馈分析等领域。它的目标是通过分析文本数据来识别和提取其中的情感信息

二、基于规则的情感分析

        基于规则的方法依赖于预定义的词典和规则来进行情感分析。例如,我们可以创建一个包含情感词汇及其情感极性的词典,人为地把诸如“good”,“happy”等正面词标记为积极,“bad”等负面词标记为消极等等。然后,定义一个规则来计算文本中积极和消极词汇的数量,并根据数量的对比来确定文本的情感极性。这种策略逻辑简单、解释性强,但依赖于词典的质量和覆盖范围,难以处理复杂的语言现象。

        实践上,我们可以去开源的平台下载一个特定的情感词典(例如哈工大情感词典等),基于词典来设计情感分析规则,应用于目标数据上。当然,现在也有许多开源第三方库集成了基于规则的情感分析方法,这里不再赘述。

三、基于机器学习/深度学习的情感分析

        基于机器学习和深度学习的方法通过训练模型来自动学习文本中的情感模式。首先,我们需要构建一个带情感标签的文本数据集;然后,使用词袋模型、词嵌入模型等文本向量化工具来将文本转换为特征向量;再然后,基于标注数据训练一个机器学习/深度学习模型;最后,使用训练好的模型对新文本进行情感预测。

        这种方式能够捕捉文本中的复杂模式和上下文信息,性能优越。同时,也允许我们更为多样化的分析需求,例如情感二极性分析(二分类)、情感多分类(高兴、难过、惊讶等),这取决于我们所构建的情感标签。 当然,基于机器学习/深度学习的方法需要大量计算资源和数据,尤其是人工标注成本较高。这里,我们构建一个模型对kaggle上的《twitter sentiment analysis》数据集进行分类。

1、导入必要的库

import torch
import torch.nn as nn
from transformers import BertTokenizer, BertModel
from torch.utils.data import Dataset, DataLoader
import pandas as pd

2、加载数据集

df = pd.read_csv('/kaggle/input/twitter-entity-sentiment-analysis/twitter_training.csv', header=None)
print('数据量:', len(df))
df.columns = ['id', 'topic', 'sentiment', 'text']
print(df.head())

        打印各列信息,没有空值。

df.info()

        再看一下标签分布,可见不是特别平衡。一般有两种处理方式,一种是使用数据增强方法,例如过采样或者欠采样;另一种是根据比例分别对不同的样本类别赋予对应的权重,我们将采用这一种方式来实现模型的无偏学习。

import collections
cnt = collections.Counter(df['sentiment'].tolist())
print(cnt)

        下面看看句子有没有什么问题,我们可以发现有句子处理后的长度是0,说明有一些数据是纯空格的。

length = [len(str(li).strip().split()) for li in df['text']]
print(min(length))
print(max(length))
print(sum(length)/len(length))

        这里,对无效数据进行清理,这里分词后长度大于等于3的句子才会保留(结合数据规律和经验,少于3个单词的英文句子几乎没什么太大意义),打印结果发现直接少了1万多条数据。

def word_count(sentence):
    return len(str(sentence).strip().split())
print(len(df))
df = df[df['text'].apply(word_count) >= 3]
print(len(df))

        最后对数据集的标签进行数值化处理。

label_dic = {'Negative': 0, 'Positive': 1, 'Neutral': 2, 'Irrelevant': 3}
df['sentiment'] = df['sentiment'].apply(lambda x: label_dic[x])

3、编写数据集类和模型类

        这里,我们在bert模型的后面再加上两层隐藏层和一层多分类输出层。

class taskDataset(Dataset):
    def __init__(self, data, label):
        self.data = data
        self.label = label
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        inputs= self.tokenizer.encode_plus(
                self.data[idx],
                None,
                max_length=30,
                truncation=True,
                pad_to_max_length=True,
            
            )
        ids=inputs['input_ids']
        mask=inputs['attention_mask']
        tt_ids=inputs['token_type_ids']


        return torch.tensor(ids, dtype=torch.long), torch.tensor(mask, dtype=torch.long), torch.tensor(tt_ids, dtype=torch.long),\
                torch.tensor(self.label[idx], dtype=torch.long)


class taskModel(nn.Module):
    def __init__(self, output_size):
        super(taskModel, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.fc1 = nn.Linear(768, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, output_size)
        self.drop = nn.Dropout(0.2)
        self.relu = nn.ReLU()

    def forward(self, ids, mask, tt_ids):
        _,x = self.bert(ids, mask, tt_ids, return_dict=False)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.drop(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.drop(x)
        x = self.fc3(x)

        return x

4、数据和模块准备

        先将数据转化为torch需要的输入格式,以便我们批量训练。

texts, labels = df['text'].tolist(), df['sentiment'].tolist()
dataset = taskDataset(texts, labels)
data_loader = DataLoader(dataset, batch_size=16, shuffle=True)

        这里,为了应对数据类别的不平衡,我们为每个类别都配上一个权重。需要注意的是,我们是对数字类别标签labels进行类别权重的计算的,这样一来,class_weight每个位置对应的值,恰好也是当前数字类别的权重值,这样才不会混淆对应关系。

# 计算类别权重
class_counts = torch.bincount(torch.tensor(labels))
print(class_counts)
class_weights = 1.0 / class_counts.float()
print(class_weights)

        然后,我们实例化模型只训练bert的最后一层和下游的全连接层,并加载到GPU上,同时定义好优化器和损失函数。

device = torch.device('cuda') if torch.cuda.is_available() else 'cpu'
model = taskModel(4)
model.to(device)
for name, params in model.named_parameters():
    params.requires_grad = False
    if 'fc' in name or 'layer.11' in name:
        params.requires_grad = True
        print(name, len(params))

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(weight=class_weights)
criterion.to(device)

5、模型训练

        这里,模型迭代20次,且我们最后只保存损失最小的那一次迭代对应的模型。

from tqdm import tqdm

epochs = 20
model.train()
best_loss = int(1e9)
for epoch in range(epochs):
    total_loss = 0
    for ids, mask, tt_ids, lab in tqdm(data_loader):
        ids = ids.to(device)
        mask = mask.to(device)
        tt_ids = tt_ids.to(device)
        
        output = model(ids, mask, tt_ids)
        output = output.to(device)
        optimizer.zero_grad()
        loss = criterion(output, lab.to(device))
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
    if total_loss/len(data_loader)<best_loss:
        best_model = model
    print(f'Epoch: {epoch}, Loss: {total_loss/len(data_loader)}')
model = best_model

6、模型性能评估

        这里我们实现逐条数据输入获得模型的预测结果,不再使用DataLoader加载了,毕竟单条输入更贴合实际的应用状况。

test_df = pd.read_csv('/kaggle/input/twitter-entity-sentiment-analysis/twitter_validation.csv', header=None)
test_df.columns = ['id', 'topic', 'sentiment', 'text']
test_df['sentiment'] = test_df['sentiment'].apply(lambda x: label_dic[x])

model.eval()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
predictions = []
with torch.no_grad():
    for text in test_df['text'].values:
        inputs = tokenizer.encode_plus(
                text,
                None,
                max_length=30,
                truncation=True,
                pad_to_max_length=True,
            
            )
        ids = torch.tensor([inputs['input_ids']], dtype=torch.long)
        mask = torch.tensor([inputs['attention_mask']], dtype=torch.long)
        tt_ids = torch.tensor([inputs['token_type_ids']], dtype=torch.long)
        ids = ids.to(device)
        mask = mask.to(device)
        tt_ids = tt_ids.to(device)
        pred = model(ids, mask, tt_ids)
        predictions.append(np.argmax(pred[0].cpu().numpy()))

from sklearn.metrics import precision_score, recall_score, f1_score
test_labels = test_df['sentiment'].tolist()
print('prec:', precision_score(test_labels, predictions, average='macro'))
print('rec:', recall_score(test_labels, predictions, average='macro'))
print('f1:', f1_score(test_labels, predictions, average='macro'))

        可见,初步的建模模型已经取得了70%+的指标表现了。当然,观看损失的下降情况会发现,当前20个epoch损失还没收敛,可以增加epoch来提升模型性能。同时,对于原始文本数据,我们也并未进行精细化的处理,仅仅筛选并剔除了纯空格的无效数据。像“im happy”之类的表述,“im”即“I am”,我们也未进行批量处理,如果对原始文本进一步进行数据清洗,相信模型的表现还会提高不少。

四、完整代码

import torch
import torch.nn as nn
from transformers import BertTokenizer, BertModel
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score
import collections


df = pd.read_csv('/kaggle/input/twitter-entity-sentiment-analysis/twitter_training.csv', header=None)
print('数据量:', len(df))
df.columns = ['id', 'topic', 'sentiment', 'text']
print(df.head())

cnt = collections.Counter(df['sentiment'].tolist())
print(cnt)

def word_count(sentence):
    return len(str(sentence).strip().split())
print(len(df))
df = df[df['text'].apply(word_count) >= 3]
print(len(df))

label_dic = {'Negative': 0, 'Positive': 1, 'Neutral': 2, 'Irrelevant': 3}
df['sentiment'] = df['sentiment'].apply(lambda x: label_dic[x])

class taskDataset(Dataset):
    def __init__(self, data, label):
        self.data = data
        self.label = label
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        inputs= self.tokenizer.encode_plus(
                self.data[idx],
                None,
                max_length=30,
                truncation=True,
                pad_to_max_length=True,
            
            )
        ids=inputs['input_ids']
        mask=inputs['attention_mask']
        tt_ids=inputs['token_type_ids']


        return torch.tensor(ids, dtype=torch.long), torch.tensor(mask, dtype=torch.long), torch.tensor(tt_ids, dtype=torch.long),\
                torch.tensor(self.label[idx], dtype=torch.long)


class taskModel(nn.Module):
    def __init__(self, output_size):
        super(taskModel, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.fc1 = nn.Linear(768, 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, output_size)
        self.drop = nn.Dropout(0.2)
        self.relu = nn.ReLU()

    def forward(self, ids, mask, tt_ids):
        _,x = self.bert(ids, mask, tt_ids, return_dict=False)
        x = self.fc1(x)
        x = self.relu(x)
        x = self.drop(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.drop(x)
        x = self.fc3(x)

        return x


texts, labels = df['text'].tolist(), df['sentiment'].tolist()
dataset = taskDataset(texts, labels)
data_loader = DataLoader(dataset, batch_size=16, shuffle=True)

# 计算类别权重
class_counts = torch.bincount(torch.tensor(labels))
print(class_counts)
class_weights = 1.0 / class_counts.float()
print(class_weights)

device = torch.device('cuda') if torch.cuda.is_available() else 'cpu'
model = taskModel(4)
model.to(device)
for name, params in model.named_parameters():
    params.requires_grad = False
    if 'fc' in name or 'layer.11' in name:
        params.requires_grad = True
        print(name, len(params))

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(weight=class_weights)
criterion.to(device)

epochs = 20
model.train()
best_loss = int(1e9)
for epoch in range(epochs):
    total_loss = 0
    for ids, mask, tt_ids, lab in tqdm(data_loader):
        ids = ids.to(device)
        mask = mask.to(device)
        tt_ids = tt_ids.to(device)
        
        output = model(ids, mask, tt_ids)
        output = output.to(device)
        optimizer.zero_grad()
        loss = criterion(output, lab.to(device))
        total_loss += loss.item()
        loss.backward()
        optimizer.step()
    if total_loss/len(data_loader)<best_loss:
        best_model = model
    print(f'Epoch: {epoch}, Loss: {total_loss/len(data_loader)}')
model = best_model

test_df = pd.read_csv('/kaggle/input/twitter-entity-sentiment-analysis/twitter_validation.csv', header=None)
test_df.columns = ['id', 'topic', 'sentiment', 'text']
test_df['sentiment'] = test_df['sentiment'].apply(lambda x: label_dic[x])

model.eval()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
predictions = []
with torch.no_grad():
    for text in test_df['text'].values:
        inputs = tokenizer.encode_plus(
                text,
                None,
                max_length=30,
                truncation=True,
                pad_to_max_length=True,
            
            )
        ids = torch.tensor([inputs['input_ids']], dtype=torch.long)
        mask = torch.tensor([inputs['attention_mask']], dtype=torch.long)
        tt_ids = torch.tensor([inputs['token_type_ids']], dtype=torch.long)
        ids = ids.to(device)
        mask = mask.to(device)
        tt_ids = tt_ids.to(device)
        pred = model(ids, mask, tt_ids)
        predictions.append(np.argmax(pred[0].cpu().numpy()))

test_labels = test_df['sentiment'].tolist()
print('prec:', precision_score(test_labels, predictions, average='macro'))
print('rec:', recall_score(test_labels, predictions, average='macro'))
print('f1:', f1_score(test_labels, predictions, average='macro'))