实验目的
了解并掌握基于隐马尔可夫模型(HMM)的分词方法,重点掌握Viterbi算法。
实验要求
1、对给定的语料库(或自行准备)进行统计分析,确定 HMM 模型的三个参数;
2、根据上一步求得的参数,使用 Viterbi 算法对下面的句子完成分词,并显示分词结果。
“今天天气真不错。”
“我们都喜欢晴天”
kaggle datasets download -d helloyouth/chinese-phrase-pool
参考语料库:
https://www.kaggle.com/datasets/helloyouth/chinese-phrase-pool
注:文件 all_train_text.txt, 一行一个句子,每个词使用一个空格隔开。
实验基础
序列标注
序列标注 Sequence Labeling,即给定一个输入序列,使用数学模型或者人工对这个序列的每一个位置标注一个相应的标签,是一个序列到序列的过程。
序列标注是自然语言处理中最常见、最基础的问题, (中文)分词 是序列标注常见的子任务之一。在中文分词中,通常把输入的待分词的句子的每个字标注B, E, S, M四个标签中的一个(B, E, S, M分别表示词语开头,词语结尾,单独成词,词语中)。
马尔可夫链
马尔可夫链是一个较为宽泛的概念,在统计学中,指当一个随机过程在给定现在状态及所有过去状态情况下,其未来状态的条件概率分布仅依赖于当前状态;
拿一个简化的股票涨跌模型举例说明,已知今天及之前几天中某股票的涨跌状态,若明天该股票下跌或上涨的概率可仅由今天的涨跌状态求出,则该股票的涨跌情况是一个马尔可夫链。
隐马尔可夫模型(HMM)
HMM的两个基本假设
马尔可夫假设
假设隐藏的马尔可夫链在任意时刻的状态只依赖于其前一时刻的状态,与其他时刻的状态及观测无关,也与时刻t无关。
观察独立性假设
即假设任意时刻的观测只依赖于该时刻的马尔可夫链的状态与其他观测及状态无关。
模型五元组(2个状态、3个概率矩阵)
- 隐状态集合Q
- 观测状态集合V
- 状态转移概率矩阵A
- 观测状态概率矩阵B
- 初始状态概率分布π
举例说明在中文分词任务中HMM的5个元素
- Q = { B, E, M, S }(N=4)
- 观测状态集合V
给定一个语料库(每个句子表示一种“序列”)
- 观察值集合为语料库中所有的汉字(包括标点符号)组成的集合
- 观察状态集合V为当前语料库中每个汉字各自的状态(B, E, M, S中的一种),更确切的说应该是由语料库中的每个句子转换后的状态序列(一个句子对应一个状态序列,所有序列构成集合V)。
- 如给定以下语料库(观测序列集合,这里已分好词了,即已确定了状态序列):
“今天/晴天/,/我们/去/浅水湾/玩/吧/!”
“今天/天气/真/不错/。”
以上每个句子对应一个“观测序列”,转换为状态序列为:
B E B E S B E S B M E S S S
B E B E S B E S
- 初始状态概率分布π
统计V中的每个序列的第一个状态,计算4种状态(B, E, M, S)的频率(其实M, E的频率不必统计,因为一定为0),最后计算的各自的概率。这里B, E, M, S的出现频次分别为2,0,0,0,所以π = (1.0, 0, 0, 0)。
- 状态转移概率矩阵A
当前状态到下一状态的概率,最后需要求出一个4x4的概率矩阵,矩阵坐标为{B, E, M, S} X {B, E, M, S},过程如下:
① 统计当前状态分别为B, E, M, S,下一状态为B, E, M, S的出现次数
② 计算概率,得到概率矩阵A如下
注:每一行的概率和为1。矩阵第一行的值表示“当前状态为B时,下一状态分别为B, E, M, S的概率”。
- 观测状态概率矩阵(发射矩阵)B
某种状态下,所有观测值(这里指含标点符号在内的中文汉字)出现的概率。也即统计每个观察值(字)的状态(B, E, M, S中的一种)及状态出现的次数。一般地,在编程实现时实际是一个双重字典。
统计出现次数:
计算发射矩阵(概率矩阵):
注:如状态’B'下,在给定观察序列集合中,总共出现7次,根据所有字出现的频次/出现总次数,以此计算得到出现的概率。
使用HMM模型进行预测
HMM的三要素为(A、B、),基于给定的观测序列集合求出该三要素的值。如预测“我们今天去玩”的状态序列,最后可根据状态序列进行分词。
HMM预测即需要找到一条状态序列最优路径(概率最大),如下图所示。
预测“我们今天去玩”的状态序列为“BEBESS”的概率计算简要说明如下图所示。
一般使用Viterbi算法解决HMM预测问题,即用动态规划求概率最大路径(最优路径),具体实现见源码部分(难点)。
四、实验源码与运行结果
(运行于Jupyter Notebook)
# 对词语进行序列标注(即将中文词语的每个字标注成B/M/E/S)
def word2lables(word: str):
text_len = len(word)
# 单字成词,标注成‘S’
if text_len == 1:
return "S"
# 开头是 B, 结尾是 E,中间都是M
return "B" + "M" * (text_len - 2) + "E"
print(word2lables('人们'))
print(word2lables('常'))
print(word2lables('教科书'))
# 将原始的语料库文本文件转换为对应的状态文件(状态序列文件),即将语料库中的每个词转换为对应的序列标签
def text_to_state(file="all_train_text.txt"):
out_file = "all_train_state.txt"
sentences_all = open(file, "r", encoding="utf-8").read().split("\n") # 打开文件并按行切分到 sentences_all 中(一个列表)
print('句子的状态序列获取中...')
with open(out_file, "w", encoding="utf-8") as f: # 打开写入的文件
for sentence in sentences_all: # 逐行 遍历 , sentence 是一个句子, 有可能为空
if len(sentence): # 如果 sentence 不为空
state_ = ""
for w in sentence.split(" "): # 当前句子按照空格切分, w是句子中的一个词语
if len(w): # 如果 w 不为空
state_ = state_ + word2lables(w) # 制作单个词语的label
state_ = state_ + "\n" # 句子的状态序列(字符串)结尾都加一个换行符 "\n"
f.write(state_) # 写入文件, state_ 是一个字符串
print('获取完毕!')
f_name = "files_dir/all_train_text.txt"
text_to_state(f_name)
def get_texts2states(file_text = "all_train_text.txt",file_state = "all_train_state.txt"):
all_texts = []
# 按行获取所有的文本
for text in open(file_text, "r", encoding="utf-8").read().strip().split("\n"):
text = text.replace(' ','')
all_texts.append(text)
# 按行获取所有的状态
all_states = open(file_state, "r", encoding="utf-8").read().strip().split("\n")
return all_texts,all_states
all_texts,all_states = get_texts2states("files_dir/all_train_text.txt",'all_train_state.txt')
if all_texts is not None:
print('状态序列总数量:',len(all_states))
print('第一个句子及其状态序列:')
print(all_texts[0])
print(all_states[0])
# 定义 HMM类, 其实最关键的就是三大矩阵
class HMM:
def __init__(self,all_states: list,all_texts: list):
self.all_states = all_states
self.all_texts = all_texts
self.states_to_index = {"B": 0, "E": 1, "M": 2, "S": 3} # 给每个状态定义一个索引, 以后可以根据状态获取索引
self.index_to_states = ["B", "E", "M", "S"] # 根据索引获取对应状态
self.len_states = len(self.states_to_index) # 状态长度 : 这里是4,即共有4个状态
self.init_matrix = np.zeros((self.len_states)) # 初始矩阵 : 1 * 4 , 对应的是 BMSE
self.transfer_matrix = np.zeros((self.len_states, self.len_states)) # 转移状态矩阵: 4 * 4 ,
# 发射矩阵, 这里使用 2级 字典嵌套
# 注意这里初始化了一个 total 键 , 存储当前状态出现的总次数, 为了后面的归一化使用
self.emit_matrix = {"B":{"total":0}, "M":{"total":0}, "S":{"total":0}, "E":{"total":0}}
# 计算 初始矩阵
def cal_init_matrix(self, state):
self.init_matrix[ self.states_to_index[state[0]] ] += 1 # BMSE 四种状态, 对应状态出现 1次 就 +1
# 计算转移矩阵
def cal_transfer_matrix(self, states):
# 状态转移 从当前状态转移到后一状态, 即 从 sta1 每一元素转移到 sta2 中
sta1 = states[:-1]
sta2 = states[1:]
for s1, s2 in zip(sta1, sta2): # 同时遍历 sta1 , sta2
self.transfer_matrix[self.states_to_index[s1],self.states_to_index[s2]] += 1
# 计算发射矩阵
def cal_emit_matrix(self, sentence, states):
# word, state分别为句子中的单个字及其状态
for word, state in zip(sentence, states):
self.emit_matrix[state][word] = self.emit_matrix[state].get(word,0) + 1
self.emit_matrix[state]["total"] += 1 # 注意这里多添加了一个 total 键 , 存储当前状态出现的总次数, 为了后面的归一化使用
# 将矩阵归一化
def normalize(self):
self.init_matrix = self.init_matrix/np.sum(self.init_matrix)
self.transfer_matrix = self.transfer_matrix/np.sum(self.transfer_matrix,axis = 1,keepdims = True)
self.emit_matrix = {state:{word:t/word_times["total"]*1000 for word,t in word_times.items() if word != "total"} for state,word_times in self.emit_matrix.items()}
# 训练开始, 其实就是HMM的3要素(3个矩阵π、A、B)的求解过程
# 参数保存在文件para_saved_file中
def train(self, para_saved_file="three_matrix.pkl"):
if os.path.exists(para_saved_file): # 如果参数文件存在,则直接加载参数,不进行训练
self.init_matrix, self.transfer_matrix, self.emit_matrix = pickle.load(open(para_saved_file,"rb"))
return
# 参数训练,获得HMM的三个矩阵
print('HMM训练中...')
for sentence, states in zip(self.all_texts, self.all_states): # 按行读取文件, 调用3个矩阵的求解函数
self.cal_init_matrix(states[0]) # 计算三大矩阵
self.cal_transfer_matrix(states)
self.cal_emit_matrix(sentence, states)
self.normalize() # 矩阵求完之后进行归一化
# 保存参数到文件中
pickle.dump([self.init_matrix, self.transfer_matrix, self.emit_matrix], open(para_saved_file, "wb"))
print('训练完成!')
hmm = HMM(all_states[:200], all_texts[:200])
hmm.train()
print('初始状态矩阵:\n',hmm.init_matrix)
def viterbi_t( text, hmm):
states = hmm.index_to_states # 状态(4种)
emit_p = hmm.emit_matrix
trans_p = hmm.transfer_matrix
start_p = hmm.init_matrix
V = [{}] # 存储路径的概率
path = {} # 存储路径
# 路径起点
for y in states:
# 初始状态矩阵某初始状态概率*发射矩阵中状态y下出现第一个字(text[0])的概率
V[0][y] = start_p[hmm.states_to_index[y]] * emit_p[y].get(text[0], 0)
# 初始路径
path[y] = [y]
# 遍历待分词句子text的第二个字到最后一个字
for t in range(1, len(text)):
# 新路径
V.append({})
newpath = {}
# 检验训练的发射概率矩阵中是否有该字
neverSeen = text[t] not in emit_p['S'].keys() and \
text[t] not in emit_p['M'].keys() and \
text[t] not in emit_p['E'].keys() and \
text[t] not in emit_p['B'].keys()
for y in states:
emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0 # 未出现过的字单独成词
temp = []
for y0 in states:
if V[t - 1][y0] > 0:
temp.append((V[t - 1][y0] * trans_p[hmm.states_to_index[y0],hmm.states_to_index[y]] * emitP, y0))
(prob, state) = max(temp)
newpath[y] = path[state] + [y]
path = newpath
# state 为最大概率路径(状态序列)
(prob, state) = max([(V[len(text) - 1][y], y) for y in states]) # 求最大概率的路径
result = "" # 分词结果(空格隔开每个词)
for t,s in zip(text,path[state]):
result += t
if s == "S" or s == "E" : # 如果是 S 或者 E 就在后面添加空格
result += " "
return result
text1 = "今天天气真不错"
text2 = "我们都喜欢晴天"
result1 = viterbi_t(text1,hmm)
result2 = viterbi_t(text2,hmm)
print(result1)
print(result2)
五、实验小结
上个实验中,我们使用正向最大匹配进行中文分词,这是一种基于规则的分词算法,而本实验所用分词算法是基于统计学的。我的通俗理解是,给定一个语料库(即已分好词的一些句子、段落、文章),然后统计语料库中的的词语出现的“最大概率”,比如“你真好”可分为“你/真好”或者“你/真/好”,这取决于我们所用语料库中“真好”这个词出现的频率是否比“真”、“好”这两个词的大。
不难发现,这种分词算法的一个不足之处就是,对语料库中没有出现过的词语进行分词得到的结果可能不是正确的;理论上语料库越大越好,但越大的语料库就需要更强的计算机程序处理能力。目前还没有完美的分词算法,我们一般只能根据具体情况具体分析,针对特定的分词场景选择合适的分词算法,以到达我们想要的效果。