python实现时间序列分解法(Time-series Decomposition)——以预测气温为例

时间:2024-04-15 18:08:32

程序简介

利用乘法型的时间序列分解算法预测北京气温变化
程序输入:观测数据,周期长度,需要往后预测的个数
程序输出:预测值,模型结构

时间序列分解使用加法模型或乘法模型讲原始系列拆分为四部分:长期趋势变动T、季节变动S(显式周期,固定幅度、长度的周期波动)、循环变动C(隐式周期,周期长不具严格规则的波动)和不规则变动L。本例使用的是乘法模型。

程序/数据集下载

点击进入下载地址

代码分析

导入模块、路径

# -*- coding: utf-8 -*-
from Module.BuildModel import TimeSeriesSplit
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import os

#路径目录
baseDir = \'\'#当前目录
staticDir = os.path.join(baseDir,\'Static\')#静态文件目录
resultDir = os.path.join(baseDir,\'Result\')#结果文件目录

读取北京气象数据,分为测试集和训练集,查看内容,程序主要使用平均温度数据

#读取数据
data = pd.read_excel(staticDir+\'/北京气象数据.xlsx\')
#前24个做训练数据,后面的做测试数据
train = data[0:24][\'平均温度(℃)\'].values
test = data[24:][\'平均温度(℃)\'].values
data.head()
城市 日期 最低温度(℃) 最高温度(℃) 平均温度(℃) 湿度(%) 风速(m/s) 风级(级) 气压(hpa) 能见度(km) 总降水量(mm) 平均总云量(%)
0 北京市 2017年03月 -1.2 19.1 9.0 38 2.5 2 1018 14.2 5.7 64
1 北京市 2017年04月 5.4 33.1 17.4 35 2.5 2 1008 14.5 0.0 66
2 北京市 2017年05月 12.5 35.4 23.2 41 2.6 2 1006 14.6 15.5 65
3 北京市 2017年06月 12.7 38.0 25.5 50 2.3 2 1003 14.3 60.2 73
4 北京市 2017年07月 19.8 36.3 27.9 70 1.8 2 1001 9.2 48.7 77

进行时间分解建模,使用的TimeSeriesSplit类位于项目文件的Modlue/BuildModel.py中,下文将给出代码

本文使用的时间分解乘法模型,其公式为D=T×S×C×I,D为预测值,T为长期趋势,S为季节性因子,C为周期波动(不容易得出,往往由研究者自定或忽略),I为随机波动(无法计算,忽略)

而T长期趋势是由单独的一个模型求出,本程序使用的是线性模型

#建模
EMA = 12#周期长度,即12个月
model = TimeSeriesSplit(train,EMA)
#预测
result = model.predict(test.shape[0])
print(\'季节性因子\',np.round(result[\'seasonFactor\'][\'value\'],2))
print(\'长期趋势系数和截距\',np.round(result[\'Ta\'][\'value\'],2),np.round(result[\'Tb\'][\'value\'],2))
print(\'预测值\',np.round(result[\'predict\'][\'value\'],2))
季节性因子 [ 0.61  1.14  1.6   1.96  2.03  2.01  1.66  0.94  0.34 -0.01 -0.23 -0.05]
长期趋势系数和截距 -0.52 19.75
预测值 [ 4.45  7.76 10.09 11.34 10.69  9.55  7.01  3.48  1.08 -0.02 -0.49]

这部分是上文使用的TimeSeriesSplit类对序列进行时间分解乘法建模,输入为原序列和周期长度(这里为12,因为1年12个月)。该代码块可以直接运行用来进行小实验测试。

# -*- coding: utf-8 -*-
from sklearn.linear_model import LinearRegression
import pandas as pd
import numpy as np

class TimeSeriesSplit():
    def __init__(self,series,EMA):
        \'\'\'
        时间序列分解算法,乘法模型,由于循环波动难以确认,受随机因素影响大,不予考虑
        series:时间序列
        EMA:移动平均项数,也是周期的时长        
        \'\'\'
        self.buildModel(series,EMA)
    
    def predict(self,num):
        \'\'\'
        往后预测num个数,返回的是整个模型的信息
        num:预测个数
        \'\'\'
        result = []
        for i in range(num):
            #季节因子
            S = self.seasFactors[(i+len(self.series))%len(self.seasFactors)]
            #长期趋势
            T = self.regression.predict(i+len(self.series))[0][0]
            result.append(T*S)
        info = {
                \'predict\':{\'value\':result,\'desc\':\'往后预测的%s个数\'%num},
                \'Ta\':{\'value\':self.regression.coef_[0][0],\'desc\':\'长期趋势线性模型的系数\'},
                \'Tb\':{\'value\':self.regression.intercept_[0],\'desc\':\'长期趋势线性模型的截距\'},
                \'seasonFactor\':{\'value\':self.seasFactors,\'desc\':\'季节因子\'},
                }
        return info
        
    
    def buildModel(self,series,EMA):
        \'\'\'
        建模,预测
        series:时间序列
        EMA:移动平均项数,也是周期的时长
        \'\'\'
        series = np.array(series).reshape(-1)
        #移动平均数
        moveSeies = self.calMoveSeries(series,EMA)
        #季节因子
        seasonFactors = self.calSeasonFactors(series,moveSeies,EMA)
        #长期趋势建模
        regression = self.buildLongTrend(series)
        #收尾,设置对象属性
        self.series = series
        self.seasFactors = seasonFactors
        self.regression = regression

    def calMoveSeries(self,series,EMA):
        \'\'\'
        计算移动平均数
        series:时间序列
        EMA:移动平均项数,也是周期的时长
        \'\'\'
        #计算移动平均
        moveSeries = []
        for i in range(0,series.shape[0]-EMA+1):
            moveSeries.append(series[i:i+EMA].mean())
        moveSeries = np.array(moveSeries).reshape(-1)
        #如果项数为复数,则移动平均后数据索引无法对应原数据,要进行第2次项数为2的移动平均
        if EMA % 2 == 0:
            moveSeries2 = []
            for i in range(0,moveSeries.shape[0]-2+1):
                moveSeries2.append(moveSeries[i:i+2].mean())
            moveSeries = np.array(moveSeries2).reshape(-1)
        return moveSeries

    def calSeasonFactors(self,series,moveSeries,EMA):
        \'\'\'
        计算季节性因子
        series:时间序列
        moveSeries:移动平均数
        EMA:移动平均项数,也是周期的时长
        \'\'\'
        #移动平均后的第一项索引对应原数据的索引
        startIndex = int((series.shape[0] - moveSeries.shape[0])/2)
        #观测值除以移动平均值
        factors = []
        for i in range(len(moveSeries)):
            factors.append(series[startIndex+i]/moveSeries[i])
        #去掉尾部多余部分
        rest = len(factors)%EMA
        factors = factors[:len(factors)-rest]
        factors = np.array(factors).reshape(-1,EMA)

        #平均值可能不是1,调整
        seasonFactors = factors.mean(axis=0)/factors.mean()
        #按季顺序进行索引调整
        seasonFactors = seasonFactors[startIndex:].reshape(-1).tolist() + seasonFactors[:startIndex].reshape(-1).tolist()
        seasonFactors = np.array(seasonFactors).reshape(-1)
        return seasonFactors
    
    def buildLongTrend(self,series):
        \'\'\'
        计算长期趋势
        series:时间序列
        \'\'\'
        #建立线性模型
        reg = LinearRegression()
        #季节索引从0开始
        index = np.array(range(series.shape[0])).reshape(-1,1)
        reg.fit(index,series.reshape(-1,1))
        return reg
    
if __name__ == "__main__":
    import matplotlib.pyplot as plt
    #https://wenku.baidu.com/view/89933c24a417866fb94a8e0a.html?from=search
    #https://blog.****.net/weixin_40159138/article/details/90603344
    #销售数据
    data = [
            3017.60,3043.54,2094.35,2809.84,
            3274.80,3163.28,2114.31,3024.57,
            3327.48,3439.48,3493.93,3490.79,
            3685.08,3661.23,2378.43,3459.55,
            3849.63,3701.18,2642.38,3585.52,
            4078.66,3907.06,2818.46,4089.50,
            4339.61,4148.60,2976.45,4084.64,
            4242.42,3997.58,2881.01,4036.23,
            4360.33,4360.53,3172.18,4223.76,
            4690.48,4694.48,3342.35,4577.63,
            4965.46,5026.05,3470.14,4525.94,
            5258.71,5489.58,3596.76,3881.60            
    ]
    #plt.plot(range(len(data)),data)
    model = TimeSeriesSplit(data,4)
    #往后预测4个数,也就是1年4个季度的数
    print(model.predict(4))

将预测值和观测值可视化,因为时间序列分解法适合长期预测,所以只需要一次建模,从图中可看出序列的趋势得到很好的拟合,这一点其他时间序列预测算法不易得到,因为大多数算法只适合短期预测

#用来正常显示中文标签
plt.rcParams[\'font.sans-serif\']=[\'SimHei\'] 
#用来正常显示负号
plt.rcParams[\'axes.unicode_minus\']=False
plt.plot(range(len(result[\'predict\'][\'value\'])),result[\'predict\'][\'value\'],label="预测值")
plt.plot(range(len(result[\'predict\'][\'value\'])),test,label="观测值")
plt.legend()
plt.title(\'时间序列分解法预测效果\')
plt.savefig(resultDir+\'/时间序列分解法预测效果.png\',dpi=100,bbox_inches=\'tight\')