引言
本文是七月在线《NLP中的对话机器人》的视频笔记,主要介绍FAQ问答型聊天机器人的实现。
讲得还不错,关键是只要1分钱
FAQ问答机器人
FAQ就是一些常见问题与回答,比如https://letsencrypt.org/docs/faq/。
但是我们要做的不是一问一答形式的,而是类似*那种一问多答,即包括用户提问、网友回答和最佳答案。有人提问,然后会有人在上面回复,每个问题可能有多个回答。
数据集仓库地址 : https://github.com/SophonPlus/ChineseNlpCorpus
数据集
我们先来了解下数据集。
可以看到,有4个字段,其中标题和问题类似发帖时的标题和正文,问题可以为空。
剩下的是reply和is_best分别代表回复和是否为最佳答案。
从这种数据集我们可以想一下它的应用场景。
场景一
假设在提问者手动选择最佳答案之前,我们可以
- 对多个回答进行排序,最相关的、最好的排在前面,不好的排在后面
- 从这些回答中找出最佳回答。二分类任务
数据
我们这里用的数据是农行问答数据,下载地址: https://pan.baidu.com/s/1n-jT9SKkt6cwI_PjCd7i_g
模型
关于这种类似的任务,我们应该得到句子的向量表示,即句向量。可能说到句向量,大家第一时间想到的都是BERT来实现,但这里我们先用简单的模型来实现,简单的模型速度快,可以快速验证我们的思路。
关于问题和回答我们都需要一个句向量编码,我们可以采用Dual Encoder的架构,训练两个编码器,一个用于问题的编码,另一个用于回答的编码。这两个编码器是独立的。我们知道,编码器的选择一般有RNN、CNN和Transformer等。得到了问题和回答的句向量编码后,我们可以使用余弦相似度来计算问题和回答的匹配程度,也可以使用一个复杂一点的神经网络来计算匹配度。
每当拿到一个新的问题和数据,建议从最简单的模型开始,搭建出一个baseline,然后以这个baseline为基础开始调试自己的模型。因为简单的模型往往表现不会太差,容易调试,且模型的可解释性好。当我们确定自己的baseline没有问题之后可以开始尝试更复杂的模型,通过循序渐进地尝试。一般从一些简单的模型开始一步步叠加新的组件,直到效果令人满意位置。如果一开始就采用太复杂的模型,很多情况下我们就无法理解究竟模型中的哪些组件时重要的,哪些是不重要的。
实现
本节包含完整的代码,首先是需要引入的依赖:
from collections import Counter
import pickle
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import pandas as pd
from tqdm import tqdm
DualEncoder
Dual Encoder即两个独立的Encoder,这里分别计算问题和回答的句向量,最后通过余弦相似度计算它们之间的关联程度。
# DualEncoder
class DualEncoder(nn.Module):
def __init__(self, encoder1, encoder2, type="cosine"):
super(DualEncoder, self).__init__()
self.encoder1 = encoder1
self.encoder2 = encoder2
if type != 'cosine':
# 训练一个简单的神经网络来计算相似度
self.linear = nn.Sequential(
# 拼接encoder1和encoder2的输出(向量),转换成100维的表示
nn.Linear(self.encoder1.hidden_size + self.encoder2.hidden_size, 100),
# 经过ReLU激活
nn.ReLU(),
# 再转换成一个数值,表示相似程度
nn.Linear(100, 1)
)
def forward(self, x, x_mask, y, y_mask):
x_rep = self.encoder1(x, x_mask)
y_rep = self.encoder2(y, y_mask)
return x_rep, y_rep
主要实现了前向传播方法。
Encoder
# GRUEncoder
class GRUEncoder(nn.Module):
def __init__(self, vocab_size, embed_size, hidden_size, dropout_p=0.1, avg_hidden=True, n_layers=1, bidirectional=True):
super(GRUEncoder, self).__init__()
self.hidden_size = hidden_size
self.embed = nn.Embedding(vocab_size, embed_size)
if bidirectional:
# 大小除以2,使得拼接两个方向后大小不变
hidden_size //= 2
# 这种生成句子表征的建议使用bidirectional=True
self.rnn = nn.GRU(embed_size, hidden_size, num_layers=n_layers, bidirectional=bidirectional,dropout=dropout_p)
self.dropout = nn.Dropout(dropout_p)
self.bidirectional = bidirectional
self.avg_hidden = avg_hidden
def forward(self, x, mask):
x_embed = self.embed(x) # 先得到嵌入表示
x_embed = self.dropout(x_embed) # 再经过dropout
seq_len = mask.sum(1) # 计算有效长度
# 压缩批次内填充数据
# 通过压缩填充加快训练效率,具体可参考文章: https://blog.csdn.net/yjw123456/article/details/118855324
x_embed = torch.nn.utils.rnn.pack_padded_sequence(
input=x_embed,
lengths=seq_len.cpu(),
batch_first=True,
enforce_sorted=False
)
output, hidden = self.rnn(x_embed)
# output (batch_size, seq_len, hidden_size)
# hidden (num_directions * num_layers, batch_size, hidden_size)
output, seq_len = torch.nn.utils.rnn.pad_packed_sequence(
sequence=output,
batch_first=True,
padding_value=0,
total_length=mask.shape[1]
)
if self.avg_hidden:
# 对RNN输出每个时刻的输出求均值
# mask.unsqueeze(2) 使维度个数和output一致
# hiden (batch_size, hidden_size)
hidden = torch.sum(output * mask.unsqueeze(2), 1) / torch.sum(mask, 1, keepdim=True)
else:
if self.bidirectional:
# 拼接两个方向上的输出
# hidden[-2,:,:] (batch_size, hidden_size / 2)
# hidden[-1,:,:] (batch_size, hidden_size / 2)
# hidden (batch_size, hidden_size)
hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]),dim=1)
else:
# 取出最顶层(若num_layers > 1)的hidden
hidden = hidden[-1,:,:]
# 需要保证各种情况下的hidden大小都是一致的
# 经过一层dropout
hidden = self.dropout(hidden)
return hidden
这里采用GRU作为Encoder的实现,支持多种表征的获取,默认是平均每个时刻的输出。
词典
处理NLP任务基本上都需要一个词典:
# 构建分词器
class Tokenizer:
def __init__(self, vocab):
self.id2word = ["UNK"] + vocab # 保证未知词UNK的id为0
self.word2id = {w:i for i,w in enumerate(vocab)}
def text2id(self, text):
# 对中文简单的按字拆分
return [self.word2id.get(w, 0) for w in str(text)]
def id2text(self, ids):
return "".join([self.id2word[id] for id in ids])
def __len__(self):
return len(self.id2word)
def create_tokenizer(texts, vocab_size):
"""
创建分词器,输入文本列表和词典大小
"""
all_vocab = ""
for text in texts:
all_vocab += str(text)
vocab_count = Counter(all_vocab) # 按字拆分
# 最频繁的vocab_size个单词
vocab = vocab_count.most_common(vocab_size)
# (char, count) 从中取出char
vocab = [w[0] for w in vocab]
return Tokenizer(vocab)
def list2tensor(sents, tokenizer):
"""
将文本列表结合分词器转换为tensor
"""
res = []
mask = []
for sent in sents:
res.append(tokenizer.text2id(sent))
max_len = max([len(sent) for sent in res])
# 按最大长度进行填充
for i in range(len(res)):
_mask = np.zeros((1, max_len)) # 中的元素0表示填充词,1表示非填充
_mask[:,:len(res[i])] = 1 # 有效位元素置1
res[i] = np.expand_dims(np.array(res[i] + [0] * (max_len - len(res[i]))), 0) # 增加一个维度
mask.append(_mask)
res = np.concatenate(res, axis=0) # 按维度0进行拼接
mask = np.concatenate(mask, axis=0)
# 分别转换为long类型和float类型的tensor
res = torch.tensor(res).long()
mask = torch.tensor(mask).float()
return res, mask
这里的分词器结合了词典的功能,代码如上,我们可以通过text2id
方法获取文本中每个字的ID。
加载数据集
数据集下载地址: https://pan.baidu.com/s/1n-jT9SKkt6cwI_PjCd7i_g
# 数据集位置
file_path = '../dataset/nonghangzhidao_filter.csv'
df = pd.read_csv(file_path)[["title", "reply", "is_best"]] # 只需要这三个字段
df.head()
看一下开头那么几条数据,对数据长什么样的有一个基本的了解。
拆分数据集
由于数据集本身没有进行拆分,因此我们这里实现拆分数据集的代码:
np.random.seed(42) # 设定随机种子可以防止每次的训练/测试集数据不一样
# 拆分训练/测试集
def shuffle_and_split_data(data, test_ratio):
shuffled_indices = np.random.permutation(len(data))
test_set_size = int(len(data) * test_ratio)
test_indices = shuffled_indices[:test_set_size] # 前test_set_size作为测试集
train_indices = shuffled_indices[test_set_size:] # 剩下的作为训练集
return data.iloc[train_indices], data.iloc[test_indices]
# 20%的数据作为测试集
train_set, test_set = shuffle_and_split_data(df, 0.2)
print(len(train_set), len(test_set))
(31876, 7968)
在拆分数据集的同时进行了洗牌操作,打散数据。
这里设置了随机种子,方式每次运行的训练集、测试集中的数据都不一样。
将文本转换为张量
首先我们取出文本内容:
texts = list(train_set["title"]) + list(train_set["reply"]) # 取出文本内容
查看前10条:
print(texts[:10])
['有没有什么借款的口子?',
'农行信用卡办哪一张好',
'窝的银行卡必须要办理转账才能收到钱吗?',
'请问成都兴百惠公司贷款不成功不收费是真的吗?',
'得到死者老婆的身份证,还得到死者的户口本和火化证能取走死者银行卡里的钱吗',
'借个爱奇艺vip会员,只用1天',
'微信怎么能有微',
'农村60岁拿钱的,在外地农行能交钱吗',
'信而富为什么不下款了',
'大众金融为什么还完贷款,不给办解压绿本']
下面我们创建词典,并基于词典将文本转换为张量:
tokenizer = create_tokenizer(texts, 5000)
print(len(tokenizer))
3383
词典中的单词(字)个数为3383。调用上面实现的list2tensor
函数:
sents = list(train_set["title"][:3])
print(list2tensor(sents, tokenizer))
(tensor([[ 15, 211, 15, 238, 97, 52, 4, 2, 488, 233, 125, 0,
0, 0, 0, 0, 0, 0, 0],
[ 9, 1, 12, 13, 18, 62, 248, 39, 379, 178, 0, 0,
0, 0, 0, 0, 0, 0, 0],
[2231, 2, 7, 1, 18, 373, 442, 71, 62, 34, 64, 21,
493, 63, 81, 41, 206, 100, 125]]),
tensor([[1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0.,
0.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
0.],
[1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
1.]]))
可以看到,同时返回了两个tensor,一个是文本对应的ID列表,另一个是mask,表示对应位置的单词是否为填充单词。填充单词用0表示,非填充单词用1表示。
这样做的好处是,可以在计算损失时乘以这个mask,就可以忽略0处(填充词)的损失;通过mask.sum()
就可以知道句子的有效长度。
编写训练代码
数据和模型都准备好了,下一步是编写训练代码:
# 编写训练代码
def train(df, model, loss_fn, optimizer, device, tokenizer, loss_function, batch_size):
# 设成训练模型,让dropout生效
model.train()
df = df.sample(frac=1) # 每次训练时shuffle数据
# 分批处理
for i in range(0, df.shape[0], batch_size):
# 得到批次数据
batch_df = df.iloc[i:i+batch_size]
title = list(batch_df["title"])
reply = list(batch_df["reply"])
# 构建目标tensor(1或0)
target = torch.tensor(batch_df["is_best"].to_numpy()).float()
if loss_function == "cosine":
# 为了符合CosineEmbeddingLoss的要求,将0替换成-1
target[target == 0] = -1
x, x_mask = list2tensor(title, tokenizer)
y, y_mask = list2tensor(reply, tokenizer)
# 都切换到同一设备(cpu/gpu)
x, x_mask, y, y_mask, target = x.to(device), x_mask.to(device), y.to(device), y_mask.to(device), target.to(device)
# 计算x和y的表征
x_rep, y_rep = model(x, x_mask, y, y_mask)
# 根据需要使用不同的损失
if loss_function == "cosine":
loss = loss_fn(x_rep, y_rep, target)
else:
# 拼接x_pre和y_rep,并传入linear
logits = model.linear(torch.cat([x_rep, y_rep], 1))
loss = loss_fn(logits, target)
optimizer.zero_grad()
loss.backward()
# 梯度裁剪
nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
if loss_function == "cosine":
sim = F.cosine_similarity(x_rep, y_rep)
# 余弦相似度范围在[-1,1]之间,因此这里使用0作为分界
sim[sim < 0] = -1
sim[sim >= 0] = 1
else:
sim = model.linear(torch.cat([x_rep, y_rep], 1))
# sim = torch.sigmoid(logits) 可以不用sigmoid
sim[sim < 0] = 0
sim[sim >= 0] = 1
sim = sim.view(-1)
target = target.view(-1)
# 计算准确率
num_corrects = torch.sum(sim == target).item()
total_counts = target.shape[0]
print(f"accuracy:{num_corrects / total_counts}")
return num_corrects / total_counts
这里根据不同的设置使用了不同的损失函数,我们后面介绍。
定义参数
# 定义参数
loss_function = "cosine"
batch_size = 64
output_dir = "./models"
num_epochs = 10
vocab_size = 5000
hidden_size = 300
embed_size = 600
我们可以根据需要调整这里的参数。
开始训练
# 构建两个Encoder
title_encoder = GRUEncoder(len(tokenizer), embed_size, hidden_size)
reply_encoder = GRUEncoder(len(tokenizer), embed_size, hidden_size)
# 传入DualEncoder模型
model = DualEncoder(title_encoder, reply_encoder, type=loss_function)
# 设置特定的损失函数
if loss_function == "cosine":
loss_fn = nn.CosineEmbeddingLoss()
else:
loss_fn = nn.BCEWithLogitsLoss()
# Adam优化器
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
# 有GPU就用GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 分词器(词典)可以重复使用,为了不出问题,在推理时要使用和训练时同样的分词器(词典)
pickle.dump(tokenizer, open(os.path.join(output_dir,"tokenizer.pickle"), "wb"))
for epoch in tqdm(range(num_epochs)):
train(train_set, model, loss_fn, optimizer, device, tokenizer, loss_function, batch_size)
可以看到,这里有两种损失函数,分别是CosineEmbeddingLoss
和BCEWithLogitsLoss
。
我们分别来看看。
CosineEmbeddingLoss
官方文档: https://pytorch.org/docs/stable/generated/torch.nn.CosineEmbeddingLoss.html
用于衡量两个向量是相似还是不相似的。输入主要为input1
(
x
1
x_1
x1)、input2
(
x
2
x_2
x2)、target
(
y
y
y)。
这里要求target
也就是
y
y
y取值-1
或1
,取值1
代表正例。
对于正例来说,我们希望它们的余弦距离(1-cosine相似度)尽可能小,余弦相似度取值范围是[-1,1]
,越接近1表示越相似,在向量中余弦相似度-1
表示两个向量方向相反。所以
1
−
cos
(
x
1
,
x
2
)
1-\cos(x_1,x_2)
1−cos(x1,x2)当相似度为
1
1
1时,损失为
0
0
0,否则相似度越小损失越大;
对于负例来说,我们希望它们的cosine相似度(1-余弦距离)尽可能小,这里让损失最小为
0
0
0,所以取了一个max
,由于余弦相似度取值的限制,margin
也只能取[-1,1]
之间。文档建议使用
0.5
0.5
0.5,默认为零。当为
0.5
0.5
0.5时,实际上余弦相似度表示没有很相似,减去这个值,可以认为损失为
0
0
0。这个取值也可以尝试设置一下,看看会带来怎样的影响。
BCEWithLogitsLoss
因为我们要判断是否为最佳答案,这可以看成是一个二分类问题,正例为 1 1 1,负例为 0 0 0,因此也可以使用交叉熵来作为损失函数。
这里不再赘述,感兴趣都可以参考https://blog.csdn.net/yjw123456/article/details/121734499
了解完损失函数之后,我们来看下模型的表现:
10%|█ | 1/10 [00:56<08:28, 56.52s/it]accuracy:0.75
20%|██ | 2/10 [01:54<07:37, 57.19s/it]accuracy:0.75
30%|███ | 3/10 [02:50<06:37, 56.74s/it]accuracy:1.0
40%|████ | 4/10 [03:45<05:36, 56.08s/it]accuracy:1.0
50%|█████ | 5/10 [04:41<04:40, 56.03s/it]accuracy:1.0
60%|██████ | 6/10 [05:37<03:43, 55.93s/it]accuracy:0.75
70%|███████ | 7/10 [06:32<02:47, 55.77s/it]accuracy:0.5
80%|████████ | 8/10 [07:30<01:52, 56.41s/it]accuracy:1.0
90%|█████████ | 9/10 [08:26<00:56, 56.22s/it]accuracy:1.0
100%|██████████| 10/10 [09:22<00:00, 56.21s/it]accuracy:1.0
可以看到,后面的几次准确率为1,但是要注意,这仅仅是在训练集上的结果,准确率为1,也有可能是过拟合了。因此,我们需要在测试集上进行验证。下面添加相关代码。
# 编写在测试集上评估的代码
# 编写训练代码
def evaluate(df, model, loss_fn, device, tokenizer, loss_function, batch_size):
# 设成训练模型,让dropout失效
model.eval()
df = df.sample(frac=1) # 每次shuffle数据
# 分批处理
for i in range(0, df.shape[0], batch_size):
batch_df = df.iloc[i:i+batch_size]
title = list(batch_df["title"])
reply = list(batch_df["reply"])
# 构建目标tensor
target = torch.tensor(batch_df["is_best"].to_numpy()).float()
if loss_function == "cosine":
target[target == 0] = -1 # 符合CosineEmbeddingLoss的要求
x, x_mask = list2tensor(title, tokenizer)
y, y_mask = list2tensor(reply, tokenizer)
# 都切换到同一设备(cpu/gpu)
x, x_mask, y, y_mask, target = x.to(device), x_mask.to(device), y.to(device), y_mask.to(device), target.to(device)
# 不需要计算梯度
with torch.no_grad():
# 计算x和y的表征
x_rep, y_rep = model(x, x_mask, y, y_mask)
if loss_function == "cosine":
loss = loss_fn(x_rep, y_rep, target)
sim = F.cosine_similarity(x_rep, y_rep)
# 余弦相似度范围在[-1,1]之间,因此这里使用0作为分界
sim[sim < 0] = -1
sim[sim >= 0] = 1
else:
logits = model.linear(torch.cat([x_rep, y_rep], 1))
loss = loss_fn(logits, target)
# sim = torch.sigmoid(logits) 可以不用sigmoid
sim = logits
sim[sim < 0] = 0
sim[sim >= 0] = 1
sim = sim.view(-1)
target = target.view(-1)
# 计算准确率
num_corrects = torch.sum(sim == target).item()
total_counts = target.shape[0]
print(f"test accuracy:{num_corrects / total_counts}, loss:{loss.item()}")
return num_corrects / total_counts
和训练时差不多,但有几点要注意,首先model.eval()
让dropout失效,其次torch.no_grad()
让推理时不计算梯度。
最后,改写下训练时的代码,每轮训练完成后在测试集上进行评估,保存测试集上准确率最好的模型。
# 构建两个Encoder
title_encoder = GRUEncoder(len(tokenizer), embed_size, hidden_size)
reply_encoder = GRUEncoder(len(tokenizer), embed_size, hidden_size)
# 传入DualEncoder模型
model = DualEncoder(title_encoder, reply_encoder, type=loss_function)
# 设置特定的损失函数
if loss_function == "cosine":
loss_fn = nn.CosineEmbeddingLoss()
else:
loss_fn = nn.BCEWithLogitsLoss()
# Adam优化器
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
# 有GPU就用GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# 分词器(词典)可以重复使用,为了不出问题,在推理时要使用和训练时同样的分词器(词典)
best_acc = 0
pickle.dump(tokenizer, open(os.path.join(output_dir,"tokenizer.pickle"), "wb"))
for epoch in tqdm(range(num_epochs)):
train(train_set, model, loss_fn, optimizer, device, tokenizer, loss_function, batch_size)
acc = evaluate(test_set, model, loss_fn, device, tokenizer, loss_function, batch_size)
if acc > best_acc:
best_acc = acc
print("saving best model")
torch.save(model.state_dict(), os.path.join(output_dir, "model.pth"))
0%| | 0/10 [00:00<?, ?it/s]train accuracy:0.75, loss:0.21646611392498016
10%|█ | 1/10 [01:02<09:23, 62.66s/it]test accuracy:0.84375, loss:0.13674256205558777
saving best model
train accuracy:0.75, loss:0.27253156900405884
20%|██ | 2/10 [02:01<08:04, 60.60s/it]test accuracy:0.78125, loss:0.11578086018562317
train accuracy:1.0, loss:0.07311038672924042
30%|███ | 3/10 [03:00<06:57, 59.59s/it]test accuracy:0.84375, loss:0.15137901902198792
train accuracy:0.75, loss:0.22285231947898865
40%|████ | 4/10 [03:59<05:55, 59.31s/it]test accuracy:0.75, loss:0.252815842628479
train accuracy:1.0, loss:0.22264127433300018
50%|█████ | 5/10 [04:58<04:57, 59.43s/it]test accuracy:0.75, loss:0.21923129260540009
train accuracy:1.0, loss:0.05729319155216217
60%|██████ | 6/10 [05:57<03:56, 59.14s/it]test accuracy:0.9375, loss:0.09478427469730377
saving best model
train accuracy:0.75, loss:0.31617870926856995
70%|███████ | 7/10 [06:56<02:57, 59.01s/it]test accuracy:0.9375, loss:0.07146801054477692
train accuracy:1.0, loss:0.046390995383262634
80%|████████ | 8/10 [07:55<01:58, 59.01s/it]test accuracy:0.71875, loss:0.26382946968078613
train accuracy:1.0, loss:0.025176048278808594
90%|█████████ | 9/10 [08:54<00:59, 59.23s/it]test accuracy:0.6875, loss:0.22439222037792206
train accuracy:0.75, loss:0.39026355743408203
100%|██████████| 10/10 [09:53<00:00, 59.37s/it]test accuracy:0.8125, loss:0.20696812868118286
看起来还不错,最好的时候在测试集上的准确率为93.75%
参考
- 七月在线《NLP中的对话机器人》
- PyTorch官网