LSTM网络(长短期记忆网络)可以理解为是RNN的改进版,它的出现解决了RNN的记忆问题。也就是说RNN网络的某一时刻对距离此刻最近的时刻的状态最为敏感,而对距离此刻稍远时刻的状态不太敏感,这就导致了RNN某一时刻的输出受上一时刻影响最大(虽然不少情况确实如此),但实际情况并不是永远这样,例如一篇首尾呼应的文章,最后一段的内容显然极度依赖第一段的内容,而和文章中间部分的关系不大。
LSTM的出现,就是为了解决上述问题。通过在网络中增加精心设计的“门”来决定记住或者忘记之前时刻的内容,准确的说是决定要记住或者忘记之前时刻内容的程度(“门”的输出值为0-1,0表示“一点也不”,1表示“完全”,之间表示“部分”)。这样LSTM对于前面的问题,在预测最后一段内容时,就可以选择让第一段完全通过,而让中间部分完全不通过。
既然lstm这么厉害,接下来就介绍一下lstm的应用实例(keras实现),接下来也会贴出lstm的tensorflow实现:
lstm实现
模型介绍
实例的主要目的就是学习序列的变化规律,来预测下一时刻的序列值。
代码讲解
- 数据生成
import pandas as pd
from random import random
flow = (list(range(1,10,1)) + list(range(10,1,-1)))*100
pdata = pd.DataFrame({"a":flow, "b":flow})
pdata.b = pdata.b.shift(9)
data = pdata.iloc[10:] * random()
上述的代码是用来生成本次的数据集。最后乘以一个随机值得到data作为总的数据集。
- 数据预处理
import numpy as np
def _load_data(data, n_prev = 100):
""" data should be pd.DataFrame() """
docX, docY = [], []
for i in range(len(data)-n_prev):
docX.append(data.iloc[i:i+n_prev].as_matrix())
docY.append(data.iloc[i+n_prev].as_matrix())
alsX = np.array(docX)
alsY = np.array(docY)
return alsX, alsY
def train_test_split(df, test_size=0.1):
ntrn = round(len(df) * (1 - test_size))
X_train, y_train = _load_data(df.iloc[0:ntrn])
X_test, y_test = _load_data(df.iloc[ntrn:])
return (X_train, y_train), (X_test, y_test)
(X_train, y_train),(X_test, y_test)=train_test_split(data)
X_train:训练数据
y_train:训练标签
X_test:测试数据
y_test:测试标签
通过调用自定义的两个函数就可以得到上述的(X_train, y_train),(X_test, y_test)。
- 模型搭建
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
in_out_neurons = 2
hidden_neurons = 50
model = Sequential()
model.add(LSTM(in_out_neurons, hidden_neurons, return_sequences=False))
model.add(Dense(hidden_neurons, in_out_neurons))
model.add(Activation("linear"))
model.compile(loss="mean_squared_error", optimizer="rmsprop")
- 训练
model.fit(X_train, y_train, batch_size=700, nb_epoch=100, validation_split=0.05)
- 预测
predicted = model.predict(X_test)
经过训练,通过观察predicted和y_test发现,得到的效果很好!
下面是完整代码:
import pandas as pd
from random import random
flow = (list(range(1,10,1)) + list(range(10,1,-1)))*100
pdata = pd.DataFrame({"a":flow, "b":flow})
pdata.b = pdata.b.shift(9)
data = pdata.iloc[10:] * random() # some noise
import numpy as np
def _load_data(data, n_prev = 100):
""" data should be pd.DataFrame() """
docX, docY = [], []
for i in range(len(data)-n_prev):
docX.append(data.iloc[i:i+n_prev].as_matrix())
docY.append(data.iloc[i+n_prev].as_matrix())
alsX = np.array(docX)
alsY = np.array(docY)
return alsX, alsY
def train_test_split(df, test_size=0.1):
ntrn = round(len(df) * (1 - test_size))
X_train, y_train = _load_data(df.iloc[0:ntrn])
X_test, y_test = _load_data(df.iloc[ntrn:])
return (X_train, y_train), (X_test, y_test)
from keras.models import Sequential
from keras.layers.core import Dense, Activation
from keras.layers.recurrent import LSTM
in_out_neurons = 2
hidden_neurons = 50
model = Sequential()
model.add(LSTM(in_out_neurons, hidden_neurons, return_sequences=False))
model.add(Dense(hidden_neurons, in_out_neurons))
model.add(Activation("linear"))
model.compile(loss="mean_squared_error", optimizer="rmsprop")
(X_train, y_train), (X_test, y_test) = train_test_split(data) # retrieve data
model.fit(X_train, y_train, batch_size=700, nb_epoch=100, validation_split=0.05)
predicted = model.predict(X_test)