NLP | 文本预处理

时间:2022-12-23 07:53:53

一.文本预处理

作用:文本语料在输送给模型前一般需要一系列的预处理工作,才能符合模型输入的要求,如:将文本转化成模型需要的张量,规范张量的尺寸等,而且科学的文本预处理环节还将有效指导模型超参数的选择,提升模型的评估指标

二.文本处理的基本方法

1.jieba的使用

精确模式分词:试图将句子最精确地切开,适合文本分析

import jieba
content = "工信处理干事每月经过下属科室都要亲口交代交换机等技术性器件的安装工作"
#返回一个生成器对象
jieba.cut(content,cut_all=False)
#若需直接返回列表内容,使用jieba.lcut即可
jieba.lcut(content,cut_all=False)

全模式分词:把句子中所有的可以成词的词语都扫描出来,速度非常快,但不能消除歧义

jieba.cut(content,cut_all = True)

搜索引擎模式分词:在精确模式的基础上,对长词再次切分,提高召回率,适合搜索引擎分词

#返回一个生成器对象
jieba.cut_for_search(content)
#返回列表内容
jieba.lcut_for_search(content)

中文繁体分词:针对中国香港,*地区的繁体文本进行分词

jieba.lcut(content)

使用用户自定义词典:添加自定义词典后,jieba能够准确识别词典中出现的词汇,提升整体的识别准确率。词典格式为每一行分三部分,词语,词频(可省略),词性(可省略),用空格隔开,顺序不可颠倒。词典具体词性含义可参照jieba词性对照表,将该词典存为userdict.txt,方便之后加载使用

jieba.load_userdict("./userdict.txt")

2.流行中英文分词工具hanlp

使用hanlp进行中文分词

import hanlp
#加载CTB_CONVSEG预训练模型进行分词任务
tokenizer = hanlp.load('CTB6_CONVSEG')
tokenizer("工信处女干事每月经过下属科室都要亲口交代交换机等技术性器件的安装工作")

使用hanlp进行英文分词

#英文分词只需要使用规则即可
tokenizer = hanlp.utils.rules.tokenize_english
tokenizer('Mr hanks bought hankcs.com for 1.5 thousand dollars')

使用hanlp进行中文命名实体识别

'''
命名实体:通常我们将人名,地名,机构名等专有名词的统称
命名实体识别:简称NER,就是识别出一段文本中可能存在的命名实体
命名实体是人类理解文本的基础单元,因此也是AI解决NLP领域高阶任务的重要基础环节
'''

#加载中文命名实体识别的预训练模型MSRA_NER_BERT_BASE_ZH
recognizer = hanlp.load(hanlp.pretrained.ner.MSRA_NER_BERT_BASE_ZH)
#它的输入是对句子进行字符分割的列表,因此在句子前加入了list()
recognizer(list('上海华安工业集团公司董事长'))
#返回结果是一个装有n个元组的列表,每个元组代表一个命名实体,元祖中的每一项分别代表具体的命名实体

使用hanlp进行中文词性标注

'''
词性:语言中对词的一种分类方法,以语法特征为主要依据,兼顾词汇意义对词进行划分的结果
常见的词性有14种,如:名词,动词,形容词等
词性标注:就是标注出一段文本中每个词汇的词性
词性标注的作用:以分词为基础,是对文本语言的另一个角度的理解,常常成为AI解决NLP领域高阶任务的重要
基础环节
'''

#加载中文命名实体识别的预训练模型CTB5_POS_RNN_FASTTEXT_ZH
tagger = hanlp.load(hanlp.pretrained.pos.CTB5_POS_RNN_FASTTEXT_ZH)
tagger(['我','的','希望','是','和平'])
#结果返回对应的词性
['PN','DEG','NN','VC','NN']

三.文本张量表示方法

定义:将一段文本使用张量进行表示,其中一般将词汇表示成向量,称作词向量,再由各个词向量按顺序组成矩阵形成文本表示

作用:将文本表示成张量(矩阵)形式,能够使语言文本可以作为计算机处理程序的输入,进行接下来一系列的解析工作

1.one-hot编码

定义:又称为独热编码,将每个词表示成具有n个元素的向量,这个词向量中只有一个元素是1,其他元素都是0,不同词汇元素为0的位置不同,其中n的大小是整个语料中不同词汇的总数

#导入用于对象保存与加载的joblib
from sklearn.externals import joblib
#导入keras中的词汇映射器Tokenizer
from keras.preprocessing.text import Tokenizer
#假定vocab为语料集所有不同词汇集合
vocab = {"周杰伦","陈奕迅","王力宏","李宗盛"}
#实例化一个词汇映射器对象
t = Tokenizer(num_words=None,char_level=False)
#使用映射器拟合现有文本数据
t.fit_on_texts(vocab)

for token in vocab:
  zero_list = [0]*len(vocab)
  #使用映射器转化现有文本数据,每个词汇对应从1开始的自然数
  #返回样式如:[[2]],取出其中的数字需要使用[0][0]
  token_index = t.texts_to_sequences([token])[0][0] - 1
  zero_list[token_index] = 1
  print(token,"的onehot编码为:",zero_list)

#使用joblib工具保存映射器,以便之后使用
tokenizer_path = "./Tokenizer"
joblib.dump(t,tokenizer_path)

onehot编码器的使用

#导入用于对象保存与加载的joblib
from sklearn.externals import joblib
#加载之前保存的Tokenizer,实例化一个t对象
t = joblib.load(tokenizer_path)

#编码token为"李宗盛"
token = "李宗盛"
#使用t获得token_index
token_index = t.texts_to_sequences([token])[0][0] - 1
#初始化一个zero_list
zero_list = [0]*len(vocab)
#令zero_list对应索引为1
zero_list[token_index] = 1
print(token,"的onehot编码为:",zero_list)

优势:操作简单,容易理解

劣势:完全割裂了词与词之间的联系,而且在大语料集下,每个向量的长度过大,占据大量内存

说明:正因为one-hot编码明显的劣势,这种编码方式被应用的地方越来越少,取而代之的是稠密向量的表示方法word2vec和word embedding

2.word2vec

定义:是一种流行的将词汇表示成向量的无监督训练方法,该过程将构建神经网络模型,将网络参数作为词汇的向量表示,它包含CBOW和skipgram两种训练模式

CBOW:给定一段用于训练的文本语料,再选定某段长度(窗口)作为研究对象,使用上下文词汇预测目标词汇

skipgram:给定一段用于训练的文本语料,再选定某段长度(窗口)作为研究对象,使用目标词汇预测上下文词汇

使用fasttext工具实现word2vec的训练和使用

第一步:获取训练数据

第二步:训练词向量

import fasttext
#使用fasttext的train_unsupervised(无监督训练方法)进行词向量的训练
#它的参数是数据集的持久化文件路径'data/fil9'
model = fasttext.train_unsupervised('data/fil9')

#通过get_word_vector方法来获得指定词汇的词向量
model.get_word_vector("the")

第三步:模型超参数设定

'''
在训练词向量过程中,我们可以设定很多常用超参数来调节我们的模型效果
无监督训练模式:skipgram或cbow,默认为skipgram
在实践中,skipgram模式再利用子词方面比cbow更好
词嵌入维度dim:默认为100,但随着语料库的增大,词嵌入的维度往往也要更大
数据循环次数epoch:默认为5,当你的数据集足够大,可能不需要那么多次
学习率lr:默认为0.05,根据经验,建议选择[0.01,1]范围内
使用的线程数thread:默认为12,一般建议和你的cpu核数相同
'''

model = fasttext.train_unsupervised('data/fil9',dim=300,epoch=1,lr=0.05,thread=12)

第四步:模型效果检验

'''
检查单词向量质量的一种简单方法就是查看其邻近单词,通过我们主观来判断这些邻近单词是否与目标单词相关来粗略评定模型效果好坏'''

model.get_nearest_neighbors('sports')

第五步:模型的保存与重加载

#保存模型
model.save_model("fil9.bin")

#加载模型
model = fasttext.load_model("fil9.bin")
model.get_word_vector("the")

3.word embedding(词嵌入)

通过一定的方式将词汇映射到指定维度(一般是更高维度)的空间

广义:包括所有密集词汇向量的表示方法,如word2vec可以认为是word embedding的一种

狭义:指在神经网络中加入的embedding层,对整个网络进行训练的同时产生的embedding矩阵(embedding层的参数),这个embedding矩阵就是训练过程中所有输入词汇的向量表示组成的矩阵

#通过使用tensorboard可视化嵌入的词向量

import torch
import json
from torch.utils.tensorboard import SummaryWriter

#实例化一个摘要写入对象
writer = SummaryWriter()

#随机初始化一个100*5的矩阵,认为它是我们已经得到的词嵌入矩阵
#代表100个词汇,每个词汇被表示成50维的向量
embedded = torch.randn(100,50)

#导入事先准备好的100个中文词汇文件,形成meta列表原始词汇
meta = list(map(lambda x:x.strip(),fileinput.FileInput("./vocab100.csv")))
writer.add_embedding(embedded,metadata=meta)
writer.close()

四.文本数据分析

作用:能够有效帮助我们理解数据语料,快速检查出语料可能存在的问题,并指导之后模型训练过程中一些超参数的选择

1.标签数量分布

import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt

train_data = pd.read_csv("./data/train.csv",sep="\t")
test_data = pd.read_csv("./data/test.csv",sep="\t")

#获得训练数据标签数量分布
sns.countplot("label",data=train_data)
plt.title("train_data")
plt.show()

#获得验证数据标签数量分布
sns.countplot("label",data=valid_data)
plt.title("valid_data")
plt.show()

2.句子长度分布

#在训练数据中添加新的句子长度列,每个元素的值都是对应的句子列的长度
train_data["sentence_length"] = list(map(lambda x:len(x),train_data["sentence"]))

#绘制句子长度列的数量分布图
sns.countplot("sentence_length",data=train_data)
#主要关注count长度分布的纵坐标,不需要绘制横坐标,横坐标范围通过dist图进行查看
plt.xticks([])
plt.show()

#绘制dist长度分布图
sns.distplot(train_data["sentence_length"])

#主要关注dist长度分布横坐标,不需要绘制纵坐标
plt.yticks([])
plt.show()

#在验证数据中添加新的句子长度列,每个元素的值都是对应的句子列的长度
valid_data["sentence_length"] = list(map(lambda x:len(x),valid_data["sentence"]))

#绘制句子长度列的数量分布图
sns.countplot("sentence_length",data=valid_data)

#主要关注count长度分布的纵坐标,不需要绘制横坐标,横坐标范围通过dist图进行查看
plt.xticks([])
plt.show()

#绘制dist长度分布图
sns.distplot(valid_data["sentence_length"])

#主要关注dist长度分布横坐标,不需要绘制纵坐标
plt.yticks([])
plt.show()

3.不同词汇总数统计

#导入jieba用于分词
#导入chain方法用于扁平化列表
import jieba
import itertools import chain

#进行训练集的句子分词,并统计出不同词汇的总数
train_vocab = set(chain(*map(lambda x:jieba.lcut(x),train_data["sentence"])))
print("训练集共包含不同词汇总数为:",len(train_vocab))

#进行验证集的句子分词,并统计出不同词汇的总数
valid_vocab = set(chain(*map(lambda x:jieba.lcut(x),valid_data["sentence"])))
print("训练集共包含不同词汇总数为:",len(valid_vacab))

4.训练集词云

#使用jieba中的词性标注功能
import jieba.posseg as pseg

def get_a_list(text):
  #用于获取形容词列表
  #使用jieba的词性标注方法切分文本,获得具有词性属性flag和词汇属性word的对象
  #从而判断flag是否为形容词,来返回对应的词汇
  r = []
  for g in pseg.lcut(text):
    if g.flag == "a":
      r.append(g.word)
  return r

#导入绘制词云的工具包
from wordcloud import WordCloud

def get_word_cloud(keywords_list):
  #实例化绘制词云的类,其中参数font_path是字体路径,为了能够显示中文
  #max_words指词云图像最多显示多少个词,background_color为背景颜色
  wordcloud = WordCloud(font_path="./SimHei.ttf",max_words=100,backgroud_color="white")
  #将传入的列表转化成词云生成器需要的字符串形式
  keywords_string = " ".join(keywords_list)
  #生成词云
  wordcloud.generate(keywords_string)

  #绘制图像并显示
  plt.figure()
  plt.imshow(wordcloud,interpolation="bilinear")
  plt.axis("off")
  plt.show()

#获得训练集上正样本
p_train_data = train_data[train_data["label"]==1]["sentence"]

#对正样本的每个句子的形容词
train_p_a_vocab = chain(*map(lambda x:get_a_list(x),p_train_data))
print(train_p_n_vocab)

#获得训练集上负样本
n_train_data = train_data[train_data["label"]==0]["sentence"]

#获得负样本的每个句子的形容词
train_n_a_vocab = chain(*map(lambda x:get_a_list(x),n_train_data))

#调用绘制词云函数
get_word_cloud(train_p_a_vocab)
get_word_cloud(train_n_a_vocab)

5.验证集词云

#获得验证集上正样本
p_valid_data = valid_data[valid_data["label"]==1]["sentence"]

#对正样本的每个句子的形容词
valid_p_a_vacab = chain(*map(lambda x:get_a_list(x),p_valid_data))

#获得验证集上负样本
n_valid_data = valid_data[valid_data["label"]==0]["sentence"]

#获得负样本的每个句子的形容词
valid_n_a_vocab = chain(*map(lambda x:get_a_list(x),n_valid_data))

#调用绘制词云函数
get_word_cloud(valid_p_a_vocab)
get_word_cloud(valid_n_a_vocab)

五.文本特征处理

作用:包括为语料添加具有普适性的文本特征,如n-gram特征,以及对加入特征之后的文本语料进行必要的处理,如长度规范。这些特征处理工作能够有效地将重要的文本特征加入模型训练中,增强模型评估指标

1.添加n-gram特征

给定一段文本序列,其中n个词或字的相邻共现特征即n-gram特征

常用的n-gram特征是bi-gram和tri-gram特征,分别对应的n为2和3

#提取n-gram特征
#一般n取2或3,这里以取2为例

ngram_range=2

def create_ngram_set(input_list):
  '''
  description:从数值列表中提取所有的n-gram特征
  param input_list:输入的数值列表,可以看作是词汇映射后的列表,里面每个数字的取值范围为[1,25000]
  return:n-gram特征组成的集合
  '''
  return set(zip(*[input_list[i:] for i in range(ngram_range)]))

#调用
input_list = [1,3,2,1,5,3]
res = create_ngram_set(input_list)
print(res)

2.文本长度规范

一般模型的输入需要等尺寸大小的矩阵,因此在进入模型前需要对每条文本数值映射后的长度进行规范,此时将根据句子长度分布分析出覆盖绝大多数文本的合理长度,对超长文本进行截断,对不足文本进行补齐(一般使用数字0),这个过程就是文本长度规范

from keras.preprocessing import sequence

#cutlen根据数据分析中句子长度分布,覆盖90%左右语料的最短长度
cutlen = 10

def padding(x_train):
  '''
  description:对输入文本张量进行长度规范
  param_x_train:文本的张量表示
  return:进行截断补齐后的文本张量表示
  '''
  return sequence.pad_sequences(x_train,cutlen)

#调用
x_train = [[1,23,5,32,55,63,2,32,45,67,22],[2,4,55,3]]
res = padding(x_train)
print(res)

六.文本数据增强

常见的文本数据增强方法:回译数据增强法

定义:是文本数据增强方面效果较好的增强方法,一般基于google翻译接口,将文本数据翻译成另外一种语言(一般选择小语种),之后再翻译回原语言,即可认为得到与原语料同标签的新语料,新语料加入到原数据集中即可认为是对原数据集数据增强

优势:操作简便,获得新语料质量高

问题:在端文本回译过程中,新语料与原语料可能存在很高的重复率,并不能有效增大样本的特征空间

高重复率解决办法:进行连续的多语言翻译,根据经验,最多只采用3次连续翻译,更多的翻译次数将产生效率低下,语义失真等问题

#导入google翻译接口工具
from googletrans import Translator
#实例化翻译对象
translator = Translator()
#进行第一次批量翻译,翻译目标是韩语
translations = translator.translate([p_sample1,p_sample2,n_sample1,n_sample2,dest='ko')
#获得翻译后的结果
ko_res = list(map(lambda x:x.text,translations))
#打印结果
print(ko_res)

#最后在翻译回中文,完成回译全部流程
translations = translator.translate(ko_res,dest='zh-cn')
cn_res = list(map(lambda x:x.text,translations))
print(cn_res)

七.案例:新闻主题分类任务

定义: 以一段新闻报道中的文本描述内容为输入,使用模型帮助我们判断它最有可能属于哪一种类型的新闻,这是典型的文本分类问题。这里我们假定每种类型是互斥的,即文本描述有且只有一种类型

import torch
import torchtext
import torchtext.datasets import text_classification
import os

#定义数据下载路径,当前路径的data文件夹
load_data_path = "./data"
#如果不存在该路径,则创建这个路径
if not os.path.isdir(load_data_path):
  os.mkdir(load_data_path)

#选取torchtext中的文本分类数据集'AG_NEWS'即新闻主题分类数据,保存在指定目录下
#并将数值映射后的训练和验证数据加载到内存中
train_dataset,test_dataset = text_classification.DATASETS['AG_NEWS'](root=load_data_path)

第一步:构建带有embedding层的文本分类模型

#导入必备的torch模型构建工具
import torch.nn as nn
import torch.nn.functional as F

#指定BATCH_SIZE的大小
BATCH_SIZE = 16

#进行可用设备检测,有GPU的话优先使用GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class TextSentiment(nn.Module):
  #文本分类模型
  def __init__(self,vocab_size,embed_dim,num_class):
    '''
    description:类的初始化函数
    vocab_size:整个语料包含的不同词汇总数
    embed_dim:指定词嵌入的维度
    num_class:文本分类的类别总数
    '''
    super().__init__()
    #实例化embedding层,sparse=True代表每次对该层求解梯度时,只更新部分权重
    self.embedding = nn.Embedding(vocab_size,embed_dim,sparse=True)
    #实例化线性层
    self.fc = nn.Linear(embed_dim,num_class)
    #为各层初始化权重
    self.init_weights()

  def init_weights(self):
    #初始化权重函数
    #指定初始权重的取值范围数
    initrange = 0.5
    #各层的权重参数都是初始化为均匀分布
    self.embedding.weight.data.uniform_(-initrange,initrange)
    self.fc.weight.data.uniform_(-initrange,initrange)
    #偏置初始化为0
    self.fc.bias.data.zero_()

  def forward(self,text):
    '''
    text:文本数值映射后的结果
    return:与类别数尺寸相同的张量,用以判断文本类别
    '''
    #获得embedding的结果embedded 
    embedded = self.embedding(text)
    #将(m,32)转化成(BATCH_SIZE,32)以便通过fc层后能计算相应的损失
    #首先,我们已知m的值远大于BATCH_SIZE=16,用m整除BATCH_SIZE获得m*包含c个BATCH_SIZE
    c = embedded.size(0)
    #之后再从embedded中取c+BATCH_SIZE个向量得到新的embedded
    #这个新的embedded中的向量个数可以整除BATCH_SIZE
    embedded = embedded[:BATCH_SIZE*C]
    #因为我们想利用平均池化的方法求embedded中指定行数的列的平均数
    #但平均池化方法是作用在行上的,并且需要3维输入
    #因此我们对新的embedded进行转置并拓展维度
    embedded = embedded.transpose(1,0).unsqueeze(0)
    #然后就是调用平均池化的方法,并且核的大小为c
    #即取每c的元素计算一次均值作为结果
    embedded = F.avg_pool1d(embedded,kernel_size=c)
    #最后,还需要减去新增的维度,然后转置回去输送给fc层
    return self.fc(embedded[0].transpose(1,0))

实例化模型

#获得整个语料包含的不同词汇总数
VOCAB_SIZE = len(train_dataset.get_vocab())
#指定词嵌入维度
EMBED_DIM = 32
#获得类别总数
NUN_CLASS = len(train_dataset.get_labels())
#实例化模型
model = TextSentiment(VOCAB_SIZE,EMBED_DIM,NUM_CLASS).to(device)

第二步:对数据进行batch处理

def generate_batch(batch):
  '''
  description:生成batch数据函数
  batch:由样本张量和对应标签的元组组成batch_size大小的列表
  return:样本张量和标签各自的列表形式(张量)
  '''
  #从batch中获得标签张量
  label = torch.tensor([entry[1] for entry in batch])
  #从batch中获得样本张量
  text = [entry[0] for entry in batch]
  text = torch.cat(text)
  #返回结果
  return text,label

第三步:构建训练与验证函数

#导入torch中的数据加载器方法
from torch.utils.data import DataLoader

def train(train_data):
  #模型训练函数
  #初始化训练损失和准确率为0
  train_loss = 0
  train_acc = 0
 
  #使用数据加载器生成BATCH_SIZE大小的数据进行批次训练
  #data就是N多个generate_batch函数处理后的BATCH_SIZE大小的数据生成器
  data = DataLoader(train_data,batch_size=BATCH_SIZE,shuffle=True,collate_
                    fn=generate_batch)

  #对data进行循环遍历,使用每个batch的数据进行参数更新
  for i,(text,cls) in enumerate(data):
    #设置优化器初始梯度为0
    optimizer.zero_grad()
    #模型输入一个批次数据,获得输出
    output = model(text)
    #根据真实标签与模型输出计算损失
    loss = criterion(output,cls)
    #将该批次的损失加到总损失中
    train_loss += loss.item()
    #误差反向传播
    loss.backward()
    #参数进行更新
    optimizer.step()
    #将该批次的准确率加到总准确率中
    train_acc += (output.argmax(1) == cls).sum().item()

  #调整优化器学习率
  scheduler.step()

  #返回本轮训练的平均损失和平均准确率
  return train_loss / len(train_data),train_acc / len(train_data)

def valid(valid_data):
  #模型验证函数
  #初始化验证损失和准确率为0
  loss = 0
  acc = 0

  #和训练相同,使用DataLoader获得训练数据生成器
  data = DataLoader(valid_data,batch_size=BATCH_SIZE,collate_fn=generate_batch)
  #按批次取出数据验证
  for text,cls in data:
    #验证阶段,不再求解梯度
    with torch.no_grad():
      #使用模型获得输出
      output = model(text)
      #计算损失
      loss = criterion(output,cls)
      #将损失和准确率加到总损失和准确率中
      loss += loss.item()
      acc += (output.argmax(1) == cls).sum().item()

  #返回本轮验证的平均损失和平均准确率
  return loss/len(valid_data),acc/len(valid_data)
 

第四步:进行模型训练与验证

#导入时间工具包
import time

#导入数据随机划分方法工具
from torch.utils.data.dataset import random_split

#指定训练轮数
N_EPOCHS = 10

#定义初始的验证损失
min_valid_loss = float('inf')

#选择损失函数,这里选择预定义的交叉熵损失函数
criterion = torch.nn.CrossEntropyLoss().to(device)
#选择随机梯度下降优化器
optimizer = torch.optim.SGD(model.parameters(),lr=4.0)
#选择优化器步长调节方法StepLR,用来衰减学习率
scheduler = torch.optim.lr_scheduler.StepLR(optimizer,1,gamma=0.9)

#从train_dataset取出0.95作为训练集,先取其长度
train_len = int(len(train_dataset)*0.95)

#然后使用random_split进行乱序划分,得到对应的训练集和验证集
sub_train_,sub_valid_=random_split(train_dataset,[train_len,len(train_dataset)-train_len])

#开始每一轮训练
for epoch in range(N_EPOCHS):
  #记录该轮训练的开始时间
  start_time = time.time()
  #调用train和valid函数得到训练和验证的平均损失,平均准确率
  train_loss,train_acc = train(sub_train_)
  valid_loss,valid_acc = valid(sub_valid_)

  #计算训练和验证的总耗时
  secs = int(time.time() - start_time)
  #用分钟和秒表示
  mins = secs/60
  secs = secs%60

  #打印训练和验证耗时,平均损失,平均准确率