隐马尔科夫模型HMM之Baum-Welch算法Python代码实现

时间:2025-01-30 21:55:54

☕️ 本文系列文章汇总:

(1)HMM开篇:基本概念和几个要素

(2)HMM计算问题:前后向算法

         代码实现 

(3)HMM学习问题:Baum-Welch算法

(4)  HMM预测问题:维特比算法
本篇算法原理分析及公式推导请参考: HMM学习问题:Baum-Welch算法

原理解析及公式推导已在系列博客中介绍,本篇重点用python实现一下Baum-Welch算法,走起~

目录

1. 初始化一些参数

2. 定义前向算法获得α_{ij}

3. 定义后向算法获得β_{ij}

4. 根据《统计学习方法》公式10.24计算γ_{t}(i)

5. 根据《统计学习方法》公式10.26计算ξ_{t}(i, j)

6. 根据《统计学习方法》算法【10.4】定义模型训练过程

7. 整体代码

8. 实例


1. 初始化一些参数

    def __init__(self, N, M, V):
         = ((N), size=N)  # 状态转移概率矩阵
         = ((M), size=N)  # 观测概率矩阵
         = (((N), size=1))[0]  # 初始状态概率矩阵
         = V # 所有可能的观测
         = N # 所有可能的状态长度
         = M # 所有可能的观测长度

这里用到的`(args, size)`是随机生成一个维度为args,size行的数组,并保证每一行之和为1

2. 定义前向算法获得α_{ij}

    def forward(self):
        """
        前向算法,Baum welch算法需要用到
        :param O: 已知的观测序列
        :return: alpha_{i}
        """
        row, col = len(), [0]
        alpha_t_plus_1 = ((row, col))
        obj_index = ([0])
        # 初值α 公式10.15
        alpha_t_plus_1[0][:] =  * [:].T[obj_index]
        for t, o in enumerate([1:]):
            t += 1
            # 递推 公式10.16
            obj_index = (o)
            alpha_ji = alpha_t_plus_1[t - 1][:].T @ 
            alpha_t_plus_1[t][:] = alpha_ji * [:].T[obj_index]

         = alpha_t_plus_1

3. 定义后向算法获得β_{ij}

    def backward(self):
        """
        后向算法,Baum welch算法需要用到
        :param O: 已知的观测序列
        :return: beta_{i}
        """
        row, col = len(), [0]
        betaT = ((row + 1, col))
        # 初值β 公式10.19
        betaT[0][:] = [1] * [0]
        for t, o in enumerate([::-1][1:]):
            t += 1
            # 反向递推 公式10.20
            obj_index = ([t - 1])
            beta_t =  * [:].T[obj_index] @ betaT[t - 1][:].T
            betaT[t][:] = beta_t
        # 由于我们这里要的是beta矩阵,不做probs的计算,所以不需要这一行,即不计算公式【10.27】
        # betaT[-1][:] = [[i] * [i][([0])] * betaT[-2][i] for i in range([0])]
        # 注意这里计算后向算法时,betaT是倒着存放的,所以我们需要按照beta1,beta2,...,betaT的顺序取
         = betaT[:-1][::-1]

上述前向和后向算法的具体实现在上一篇博客已经给出,这里不再解释

4. 根据《统计学习方法》公式10.24计算γ_{t}(i)

    def gamma(self, t, i):
        """
        根据课本公式【10.24】计算γ
        :param t: 当前时间点
        :param i: 当前状态节点
        :return: γ值
        """
        numerator = [t][i] * [t][i]
        denominator = 0.

        for j in range():
            denominator += ([t][j] * [t][j])

        return numerator / denominator

5. 根据《统计学习方法》公式10.26计算ξ_{t}(i, j)

    def ksi(self, t, i, j):
        """
        根据公式【10.26】计算 ξ
        :param t: 当前时间点
        :param i: 当前状态节点
        :param j: 同i
        :return:
        """
        obj_index = ([t + 1])
        numerator = [t][i] * [i][j] * [j][obj_index] * [t + 1][j]
        denominator = 0.

        for i in range():
            for j in range():
                denominator += [t][i] * [i][j] * [j][obj_index] * [t + 1][j]

        return numerator / denominator

6. 根据《统计学习方法》算法【10.4】定义模型训练过程

    def train(self, O, n):
        """
        根据算法【10.4】训练模型
        :param O: 已知观测序列
        :param n: 最大迭代步长
        :return: 模型参数λ=(π,A,B)
        """
         = O
         = len(O)
        maxIter = 0

        while maxIter < n:
            tempA = ((, ))
            tempB = ((, ))
            tempPi = ([0.] * )

            # 根据前向算法和后向算法得到α和β
            ()
            ()

            maxIter += 1
            # a_{ij},公式【10.39】
            for i in range():
                for j in range():
                    numerator = 0.
                    denominator = 0.
                    for t in range( - 1):
                        numerator += (t, i, j)
                        denominator += (t, i)
                    tempA[i][j] = numerator / denominator

            # b_{i}{j},公式【10.40】
            for j in range():
                for k in range():
                    numerator = 0.
                    denominator = 0.
                    for t in range():
                        if [t] == [k]:
                            numerator += (t, j)
                        denominator += (t, j)
                    tempB[j][k] = numerator / denominator

            # π_{i},公式【10.41】
            for i in range():
                tempPi[i] = (0, i)
            # 更新
             = tempA
             = tempB
             = tempPi

        return AttrDict(
            pi=,
            A=,
            B=
        )

7. 整体代码

import random
import numpy as np

(1)  # 好像不起租用


class AttrDict(dict):
    # 一个小trick,将结果返回成一个字典格式
    def __init__(self, *args, **kwargs):
        super(AttrDict, self).__init__(*args, **kwargs)
        self.__dict__ = self


class Baum_Welch:

    def __init__(self, N, M, V):
         = ((N), size=N)  # 状态转移概率矩阵
         = ((M), size=N)  # 观测概率矩阵
         = (((N), size=1))[0]  # 初始状态概率矩阵
         = V # 所有可能的观测
         = N # 所有可能的状态长度
         = M # 所有可能的观测长度

    def forward(self):
        """
        前向算法,Baum welch算法需要用到
        :param O: 已知的观测序列
        :return: alpha_{i}
        """
        row, col = len(), [0]
        alpha_t_plus_1 = ((row, col))
        obj_index = ([0])
        # 初值α 公式10.15
        alpha_t_plus_1[0][:] =  * [:].T[obj_index]
        for t, o in enumerate([1:]):
            t += 1
            # 递推 公式10.16
            obj_index = (o)
            alpha_ji = alpha_t_plus_1[t - 1][:].T @ 
            alpha_t_plus_1[t][:] = alpha_ji * [:].T[obj_index]

         = alpha_t_plus_1

    def backward(self):
        """
        后向算法,Baum welch算法需要用到
        :param O: 已知的观测序列
        :return: beta_{i}
        """
        row, col = len(), [0]
        betaT = ((row + 1, col))
        # 初值β 公式10.19
        betaT[0][:] = [1] * [0]
        for t, o in enumerate([::-1][1:]):
            t += 1
            # 反向递推 公式10.20
            obj_index = ([t - 1])
            beta_t =  * [:].T[obj_index] @ betaT[t - 1][:].T
            betaT[t][:] = beta_t
        # betaT[-1][:] = [[i] * [i][([0])] * betaT[-2][i] for i in range([0])]
         = betaT[:-1][::-1]

    def gamma(self, t, i):
        """
        根据课本公式【10.24】计算γ
        :param t: 当前时间点
        :param i: 当前状态节点
        :return: γ值
        """
        numerator = [t][i] * [t][i]
        denominator = 0.

        for j in range():
            denominator += ([t][j] * [t][j])

        return numerator / denominator

    def ksi(self, t, i, j):
        """
        根据公式【10.26】计算 ξ
        :param t: 当前时间点
        :param i: 当前状态节点
        :param j: 同i
        :return:
        """
        obj_index = ([t + 1])
        numerator = [t][i] * [i][j] * [j][obj_index] * [t + 1][j]
        denominator = 0.

        for i in range():
            for j in range():
                denominator += [t][i] * [i][j] * [j][obj_index] * [t + 1][j]

        return numerator / denominator

    def train(self, O, n):
        """
        根据算法【10.4】训练模型
        :param O: 已知观测序列
        :param n: 最大迭代步长
        :return: 模型参数λ=(π,A,B)
        """
         = O
         = len(O)
        maxIter = 0

        while maxIter < n:
            tempA = ((, ))
            tempB = ((, ))
            tempPi = ([0.] * )

            # 根据前向算法和后向算法得到α和β
            ()
            ()

            maxIter += 1
            # a_{ij},公式【10.39】
            for i in range():
                for j in range():
                    numerator = 0.
                    denominator = 0.
                    for t in range( - 1):
                        numerator += (t, i, j)
                        denominator += (t, i)
                    tempA[i][j] = numerator / denominator

            # b_{i}{j},公式【10.40】
            for j in range():
                for k in range():
                    numerator = 0.
                    denominator = 0.
                    for t in range():
                        if [t] == [k]:
                            numerator += (t, j)
                        denominator += (t, j)
                    tempB[j][k] = numerator / denominator

            # π_{i},公式【10.41】
            for i in range():
                tempPi[i] = (0, i)
            # 更新
             = tempA
             = tempB
             = tempPi

        return AttrDict(
            pi=,
            A=,
            B=
        )

8. 实例

if __name__ == '__main__':
    bm = Baum_Welch(N=3, M=2, V=['红', '白'])
    O = ['红', '白', '红']
    res = (O, 3)
    print()
    print()
    print()

我们将n设置为3轮,可以得到如下结果:

π: [0.38663305 0.61098003 0.00238692]
A: [[0.00183726 0.03990598 0.95825676]
 [0.03327835 0.93088611 0.03583554]
 [0.00324882 0.9861656  0.01058559]]
B: [[0.98736893 0.01263107]
 [0.72787272 0.27212728]
 [0.03464711 0.96535289]]

可以看出,每个参数的每一行之和约等于1,这是正确的。要注意,由于这里使用random随机初始化,所以每次初始化的结果都不一样。参数的初始化很影响最后的计算结果,这是个十分玄学的过程。因为原理及公式推导我已经弄明白了,所以我在编写代码的时候,基本不会再去纠结于公式内涵,基本是无脑按照公式来写的,这样会降低错误概率。

代码已经放到GitHub上了,我将会持续更新其它算法的实现。