深度学习之keras使用
keras安装
- 安装Numpy、Scipy等科学计算库
- 安装theano、tensorflow eg:CPU版tensorflow pip install tensorflow
- pip install keras
修改Backend底层框架Theano或者Tensorflow
使用import keras屏幕会显示当前使用的Backend
~/.keras/keras.json
{
"image_dim_ordering": "tf",
"epsilon": 1e-07,
"floatx": "float32",
"backend": "theano"
}
{
"image_dim_ordering": "tf",
"epsilon": 1e-07,
"floatx": "float32",
"backend": "tensorflow"
}
改动最后一行”backend”,如果出错,可以在其他文本编辑器内编辑好这段文本,然后整体拷贝到这个文件里。
terminal中直接输入临时环境变量执行
# python2+输入:
KERAS_BACKEND=tensorflow python -c "from keras import backend"
# python3+输入:
KERAS_BACKEND=tensorflow python3 -c "from keras import backend"
python代码临时修改环境变量(修改影响的范围是仅这个脚本内)
import os
os.environ['KERAS_BACKEND']='tensorflow'
Regressor 回归
Sequential:用来一层一层一层的去建立神经层
layers.Dense:全连接层
Sequential 建立 model
model.add 添加神经层
import numpy as np
np.random.seed(1337) # for reproducibility
from keras.models import Sequential
from keras.layers import Dense
import matplotlib.pyplot as plt # 可视化模块
# create some data
X = np.linspace(-1, 1, 200)
np.random.shuffle(X) # randomize the data
Y = 0.5 * X + 2 + np.random.normal(0, 0.05, (200, ))
# plot data
plt.scatter(X, Y)
plt.show()
X_train, Y_train = X[:160], Y[:160] # train 前 160 data points
X_test, Y_test = X[160:], Y[160:] # test 后 40 data points
#Sequential 建立 model, 再用 model.add 添加神经层
model = Sequential()
#如果需要添加下一个神经层的时候,不用再定义输入的纬度,因为它默认就把前一层的输出作为当前层的输入
model.add(Dense(output_dim=1, input_dim=1))
#激活模型 误差函数用的是 mse 均方误差;优化器用的是 sgd 随机梯度下降法
# choose loss function and optimizing method
model.compile(loss='mse', optimizer='sgd')
#训练模型训练的时候用 model.train_on_batch 一批一批的训练 X_train, Y_train。默认的返回值是 cost,每100步输出一下结果
# training
print('Training -----------')
for step in range(301):
cost = model.train_on_batch(X_train, Y_train)
if step % 100 == 0:
print('train cost: ', cost)
#检验模型 model.evaluate,输入测试集的x和y, 输出 cost,weights 和 biases。其中 weights 和 biases 是取在模型的第一层 model.layers[0] 学习到的参数。
# test
print('\nTesting ------------')
cost = model.evaluate(X_test, Y_test, batch_size=40)
print('test cost:', cost)
W, b = model.layers[0].get_weights()
print('Weights=', W, '\nbiases=', b)
#可视化结果
# plotting the prediction
Y_pred = model.predict(X_test)
plt.scatter(X_test, Y_test)
plt.plot(X_test, Y_pred)
plt.show()
Classifier 分类
MNIST 0-9数字识别
models.Sequential,用来一层一层一层的去建立神经层;
layers.Dense 意思是这个神经层是全连接层。
layers.Activation 激励函数。
optimizers.RMSprop 优化器采用 RMSprop,加速神经网络训练方法。
import numpy as np
np.random.seed(1337) # for reproducibility
from keras.datasets import mnist
from keras.utils import np_utils
from keras.models import Sequential
from keras.layers import Dense, Activation
from keras.optimizers import RMSprop
# download the mnist to the path '~/.keras/datasets/' if it is the first time to be called
# X shape (60,000 28x28), y shape (10,000, )
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# data pre-processing x标准化到0-1 y使用one-hot
X_train = X_train.reshape(X_train.shape[0], -1) / 255. # normalize
X_test = X_test.reshape(X_test.shape[0], -1) / 255. # normalize
y_train = np_utils.to_categorical(y_train, num_classes=10)
y_test = np_utils.to_categorical(y_test, num_classes=10)
print(X_train[1].shape)
print(y_train[:3])
# Another way to build your neural net
model = Sequential([
Dense(32, input_dim=784), #32 是输出的维度,784 是输入的维度
Activation('relu'), #激励函数用到的是 relu 函数
Dense(10), #10个输出 输入不用定义 默认为上一层输出
Activation('softmax'),
])
# Another way to define your optimizer 优化器
rmsprop = RMSprop(lr=0.001, rho=0.9, epsilon=1e-08, decay=0.0)
# We add metrics to get more results you want to see
#metrics评价可为cost,accuracy,score
model.compile(optimizer=rmsprop,
loss='categorical_crossentropy', #crossentropy交叉熵
metrics=['accuracy'])
print('Training ------------')
# Another way to train the model nb_epoch训练次数 batch_size 每批处理32个
model.fit(X_train, y_train, epochs=2, batch_size=32)
print('\nTesting ------------')
# Evaluate the model with the metrics we defined earlier
loss, accuracy = model.evaluate(X_test, y_test)
print('test loss: ', loss)
print('test accuracy: ', accuracy)
#预测
#classes = model.predict(X_test)
CNN
分类
import os
os.environ['KERAS_BACKEND']='theano'
import numpy as np
np.random.seed(1337) # for reproducibility
from keras.datasets import mnist
from keras.utils import np_utils
from keras.models import Sequential
from keras.layers import Dense, Activation, Convolution2D, MaxPooling2D, Flatten
from keras.optimizers import Adam
# download the mnist to the path '~/.keras/datasets/' if it is the first time to be called
# X shape (60,000 28x28), y shape (10,000, )
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# data pre-processing
X_train = X_train.reshape(-1, 1,28, 28)/255.
X_test = X_test.reshape(-1, 1,28, 28)/255.
y_train = np_utils.to_categorical(y_train, num_classes=10)
y_test = np_utils.to_categorical(y_test, num_classes=10)
# Another way to build your CNN
model = Sequential()
# Conv layer 1 output shape (32, 28, 28) 卷积层,滤波器数量为32,大小是5*5
model.add(Convolution2D(
nb_filter=32,
nb_row=5,
nb_col=5,
border_mode='same', # Padding method
dim_ordering='th', # if use tensorflow, to set the input dimension order to theano ("th") style, but you can change it.
input_shape=(1, # channels
28, 28,) # height & width
))
model.add(Activation('relu'))
# Pooling layer 1 (max pooling) output shape (32, 14, 14) pooling(池化,下采样)
model.add(MaxPooling2D(
pool_size=(2, 2),
strides=(2, 2),
border_mode='same', # Padding method
))
# Conv layer 2 output shape (64, 14, 14)
model.add(Convolution2D(64, 5, 5, border_mode='same'))
model.add(Activation('relu'))
# Pooling layer 2 (max pooling) output shape (64, 7, 7)
model.add(MaxPooling2D(pool_size=(2, 2), border_mode='same'))
# Fully connected layer 1 input shape (64 * 7 * 7) = (3136), output shape (1024) 数据抹平成一维
model.add(Flatten())
model.add(Dense(1024))
model.add(Activation('relu'))
# Fully connected layer 2 to shape (10) for 10 classes
model.add(Dense(10))
model.add(Activation('softmax'))
# Another way to define your optimizer 优化方法
adam = Adam(lr=1e-4)
# We add metrics to get more results you want to see
model.compile(optimizer=adam,
loss='categorical_crossentropy',
metrics=['accuracy'])
print('Training ------------')
# Another way to train the model
model.fit(X_train, y_train, epochs=1, batch_size=32,)
print('\nTesting ------------')
# Evaluate the model with the metrics we defined earlier
loss, accuracy = model.evaluate(X_test, y_test)
print('\ntest loss: ', loss)
print('\ntest accuracy: ', accuracy)
RNN
分类
import os
os.environ['KERAS_BACKEND']='theano'
import numpy as np
np.random.seed(1337) # for reproducibility
from keras.datasets import mnist
from keras.utils import np_utils
from keras.models import Sequential
from keras.layers import SimpleRNN, Activation, Dense
from keras.optimizers import Adam
TIME_STEPS = 28 # same as the height of the image
INPUT_SIZE = 28 # same as the width of the image
BATCH_SIZE = 50
BATCH_INDEX = 0
OUTPUT_SIZE = 10
CELL_SIZE = 50
LR = 0.001
# download the mnist to the path '~/.keras/datasets/' if it is the first time to be called
# X shape (60,000 28x28), y shape (10,000, )
(X_train, y_train), (X_test, y_test) = mnist.load_data()
# data pre-processing
X_train = X_train.reshape(-1, 28, 28) / 255. # normalize
X_test = X_test.reshape(-1, 28, 28) / 255. # normalize
y_train = np_utils.to_categorical(y_train, num_classes=10)
y_test = np_utils.to_categorical(y_test, num_classes=10)
# build RNN model
model = Sequential()
# RNN cell
model.add(SimpleRNN(
# for batch_input_shape, if using tensorflow as the backend, we have to put None for the batch_size.
# Otherwise, model.evaluate() will get error.
batch_input_shape=(None, TIME_STEPS, INPUT_SIZE), # Or: input_dim=INPUT_SIZE, input_length=TIME_STEPS,
output_dim=CELL_SIZE,
unroll=True,
))
# output layer
model.add(Dense(OUTPUT_SIZE))
model.add(Activation('softmax'))
# optimizer
adam = Adam(LR)
model.compile(optimizer=adam,
loss='categorical_crossentropy',
metrics=['accuracy'])
# training
for step in range(4001):
# data shape = (batch_num, steps, inputs/outputs)
X_batch = X_train[BATCH_INDEX: BATCH_INDEX+BATCH_SIZE, :, :]
Y_batch = y_train[BATCH_INDEX: BATCH_INDEX+BATCH_SIZE, :]
cost = model.train_on_batch(X_batch, Y_batch)
BATCH_INDEX += BATCH_SIZE
BATCH_INDEX = 0 if BATCH_INDEX >= X_train.shape[0] else BATCH_INDEX
if step % 500 == 0:
cost, accuracy = model.evaluate(X_test, y_test, batch_size=y_test.shape[0], verbose=False)
print('test cost: ', cost, 'test accuracy: ', accuracy)
回归
# import os
# os.environ['KERAS_BACKEND']='tensorflow'
import numpy as np
np.random.seed(1337) # for reproducibility
import matplotlib.pyplot as plt
from keras.models import Sequential
from keras.layers import LSTM, TimeDistributed, Dense
from keras.optimizers import Adam
BATCH_START = 0
TIME_STEPS = 20
BATCH_SIZE = 50
INPUT_SIZE = 1
OUTPUT_SIZE = 1
CELL_SIZE = 20
LR = 0.006
def get_batch():
global BATCH_START, TIME_STEPS
# xs shape (50batch, 20steps)
xs = np.arange(BATCH_START, BATCH_START+TIME_STEPS*BATCH_SIZE).reshape((BATCH_SIZE, TIME_STEPS)) / (10*np.pi)
seq = np.sin(xs)
res = np.cos(xs)
BATCH_START += TIME_STEPS
# plt.plot(xs[0, :], res[0, :], 'r', xs[0, :], seq[0, :], 'b--')
# plt.show()
return [seq[:, :, np.newaxis], res[:, :, np.newaxis], xs]
model = Sequential()
# build a LSTM RNN
model.add(LSTM(
batch_input_shape=(BATCH_SIZE, TIME_STEPS, INPUT_SIZE), # Or: input_dim=INPUT_SIZE, input_length=TIME_STEPS,
output_dim=CELL_SIZE,
return_sequences=True, # True: output at all steps. False: output as last step.
stateful=True, # True: the final state of batch1 is feed into the initial state of batch2
))
# add output layer
model.add(TimeDistributed(Dense(OUTPUT_SIZE)))
adam = Adam(LR)
model.compile(optimizer=adam,
loss='mse',)
print('Training ------------')
for step in range(501):
# data shape = (batch_num, steps, inputs/outputs)
X_batch, Y_batch, xs = get_batch()
cost = model.train_on_batch(X_batch, Y_batch)
pred = model.predict(X_batch, BATCH_SIZE)
plt.plot(xs[0, :], Y_batch[0].flatten(), 'r', xs[0, :], pred.flatten()[:TIME_STEPS], 'b--')
plt.ylim((-1.2, 1.2))
plt.draw()
plt.pause(0.1)
if step % 10 == 0:
print('train cost: ', cost)
Autoencoder 自编码
自编码,简单来说就是把输入数据进行一个压缩和解压缩的过程
# import os
# os.environ['KERAS_BACKEND']='tensorflow'
import numpy as np
np.random.seed(1337) # for reproducibility
from keras.datasets import mnist
from keras.models import Model
from keras.layers import Dense, Input
import matplotlib.pyplot as plt
# download the mnist to the path '~/.keras/datasets/' if it is the first time to be called
# X shape (60,000 28x28), y shape (10,000, )
(x_train, _), (x_test, y_test) = mnist.load_data()
# data pre-processing
x_train = x_train.astype('float32') / 255. - 0.5 # minmax_normalized
x_test = x_test.astype('float32') / 255. - 0.5 # minmax_normalized
x_train = x_train.reshape((x_train.shape[0], -1))
x_test = x_test.reshape((x_test.shape[0], -1))
print(x_train.shape)
print(x_test.shape)
# in order to plot in a 2D figure
encoding_dim = 2
# this is our input placeholder
input_img = Input(shape=(784,))
# encoder layers
encoded = Dense(128, activation='relu')(input_img)
encoded = Dense(64, activation='relu')(encoded)
encoded = Dense(10, activation='relu')(encoded)
encoder_output = Dense(encoding_dim)(encoded)
# decoder layers
decoded = Dense(10, activation='relu')(encoder_output)
decoded = Dense(64, activation='relu')(decoded)
decoded = Dense(128, activation='relu')(decoded)
decoded = Dense(784, activation='tanh')(decoded)
# construct the autoencoder model
autoencoder = Model(input=input_img, output=decoded)
# construct the encoder model for plotting
encoder = Model(input=input_img, output=encoder_output)
# compile autoencoder
autoencoder.compile(optimizer='adam', loss='mse')
# training
autoencoder.fit(x_train, x_train,
nb_epoch=20,
batch_size=256,
shuffle=True)
# plotting
encoded_imgs = encoder.predict(x_test)
plt.scatter(encoded_imgs[:, 0], encoded_imgs[:, 1], c=y_test)
plt.colorbar()
plt.show()
模型保存/载入
model.save
需要已经安装了 HDF5 这个模块 pip install h5py
model.to_json
import numpy as np
np.random.seed(1337) # for reproducibility
from keras.models import Sequential
from keras.layers import Dense
from keras.models import load_model
# create some data
X = np.linspace(-1, 1, 200)
np.random.shuffle(X) # randomize the data
Y = 0.5 * X + 2 + np.random.normal(0, 0.05, (200, ))
X_train, Y_train = X[:160], Y[:160] # first 160 data points
X_test, Y_test = X[160:], Y[160:] # last 40 data points
model = Sequential()
model.add(Dense(output_dim=1, input_dim=1))
model.compile(loss='mse', optimizer='sgd')
for step in range(301):
cost = model.train_on_batch(X_train, Y_train)
# save
print('test before save: ', model.predict(X_test[0:2]))
model.save('my_model.h5') # HDF5 file, you have to pip3 install h5py if don't have it
del model # deletes the existing model pip install h5py
# load
model = load_model('my_model.h5')
print('test after load: ', model.predict(X_test[0:2]))
# save and load weights 只保存权重而不保存模型的结构
model.save_weights('my_model_weights.h5')
model.load_weights('my_model_weights.h5')
# save and load fresh network without trained weights
from keras.models import model_from_json
json_string = model.to_json()
model = model_from_json(json_string)
查看版本or升级
pip install –upgrade tensorflow
## osx升级出错需要先执行下面这个
pip install --upgrade --ignore-installed setuptools
import tensorflow as tf
print tf.__version__
import keras
print keras.__version__
import theano as th
print th.__version__
字符编码 LSTM加法训练
from __future__ import print_function
from keras.models import Sequential
from keras import layers
import numpy as np
from six.moves import range
class CharacterTable(object):
"""Given a set of characters:
+ Encode them to a one hot integer representation
+ Decode the one hot integer representation to their character output
+ Decode a vector of probabilities to their character output
"""
def __init__(self, chars):
"""Initialize character table.
# Arguments
chars: Characters that can appear in the input.
"""
self.chars = sorted(set(chars)) ##将字符排序
self.char_indices = dict((c, i) for i, c in enumerate(self.chars)) ##利用枚举把值、索引放入字典
self.indices_char = dict((i, c) for i, c in enumerate(self.chars))
def encode(self, C, num_rows):
"""One hot encode given string C.
# Arguments
num_rows: Number of rows in the returned one hot encoding. This is
used to keep the # of rows for each data the same.
"""
x = np.zeros((num_rows, len(self.chars))) ##生成 n个字符x编码长度 的矩阵
for i, c in enumerate(C):
x[i, self.char_indices[c]] = 1 ##利用下标在编码位置处置为1
return x
def decode(self, x, calc_argmax=True):
if calc_argmax:
x = x.argmax(axis=-1) ##返回每行值最大的索引
return ''.join(self.indices_char[x] for x in x) ##根据索引拼接字符串
## ASCII 调用颜色
class colors:
ok = '\033[92m'
fail = '\033[91m'
close = '\033[0m'
# Parameters for the model and dataset.
TRAINING_SIZE = 50000
DIGITS = 3
INVERT = True
# Maximum length of input is 'int + int' (e.g., '345+678'). Maximum length of
# int is DIGITS.
MAXLEN = DIGITS + 1 + DIGITS ##最大长度
# All the numbers, plus sign and space for padding.
chars = '0123456789+ ' ##字典里所有字符
ctable = CharacterTable(chars)
questions = []
expected = []
seen = set()
print('Generating data...') ##生成数据
while len(questions) < TRAINING_SIZE:
##np.random.randint(1, DIGITS + 1) 随机一个1-DIGITS之间的数-即选择加法的位数
##定义了一个取随机整数的函数
f = lambda: int(''.join(np.random.choice(list('0123456789'))
for i in range(np.random.randint(1, DIGITS + 1))))
a, b = f(), f()
# Skip any addition questions we've already seen
# Also skip any such that x+Y == Y+x (hence the sorting).
key = tuple(sorted((a, b))) ##排序后的数组转元组
if key in seen:
continue
seen.add(key)
# Pad the data with spaces such that it is always MAXLEN.
q = '{}+{}'.format(a, b)
query = q + ' ' * (MAXLEN - len(q))
ans = str(a + b) ##真实结果
# Answers can be of maximum size DIGITS + 1.
ans += ' ' * (DIGITS + 1 - len(ans))
if INVERT:
# Reverse the query, e.g., '12+345 ' becomes ' 543+21'. (Note the
# space used for padding.)
query = query[::-1]
questions.append(query)
expected.append(ans)
print('Total addition questions:', len(questions))
print('Vectorization...')
x = np.zeros((len(questions), MAXLEN, len(chars)), dtype=np.bool) ##生成5000个7x12的矩阵7代表每个输入的对齐后长度 12代表字符编码
y = np.zeros((len(questions), DIGITS + 1, len(chars)), dtype=np.bool)#y最大位数
for i, sentence in enumerate(questions):
x[i] = ctable.encode(sentence, MAXLEN)
for i, sentence in enumerate(expected):
y[i] = ctable.encode(sentence, DIGITS + 1)
# Shuffle (x, y) in unison as the later parts of x will almost all be larger
# digits.
indices = np.arange(len(y)) ##生成0-49999的数字
np.random.shuffle(indices) ##打乱顺序
x = x[indices] ##打乱x y顺序
y = y[indices]
# Explicitly set apart 10% for validation data that we never train over.
# 取10%的数据做验证
split_at = len(x) - len(x) // 10
(x_train, x_val) = x[:split_at], x[split_at:]
(y_train, y_val) = y[:split_at], y[split_at:]
print('Training Data:')
print(x_train.shape)
print(y_train.shape)
print('Validation Data:')
print(x_val.shape)
print(y_val.shape)
# Try replacing GRU, or SimpleRNN.
RNN = layers.LSTM ##定义RNN使用
HIDDEN_SIZE = 128 ##定义隐层
BATCH_SIZE = 128 ##定义最小训练单元
LAYERS = 1 ##定义层数
print('Build model...')
model = Sequential()
# "Encode" the input sequence using an RNN, producing an output of HIDDEN_SIZE.
# Note: In a situation where your input sequences have a variable length,
# use input_shape=(None, num_feature).
model.add(RNN(HIDDEN_SIZE, input_shape=(MAXLEN, len(chars)))) #定义输入是7x12
# As the decoder RNN's input, repeatedly provide with the last hidden state of
# RNN for each time step. Repeat 'DIGITS + 1' times as that's the maximum
# length of output, e.g., when DIGITS=3, max output is 999+999=1998.
model.add(layers.RepeatVector(DIGITS + 1)) ##将输入重复n次 4次 4x128
# The decoder RNN could be multiple layers stacked or a single layer.
for _ in range(LAYERS):
# By setting return_sequences to True, return not only the last output but
# all the outputs so far in the form of (num_samples, timesteps,
# output_dim). This is necessary as TimeDistributed in the below expects
# the first dimension to be the timesteps.
model.add(RNN(HIDDEN_SIZE, return_sequences=True))
# Apply a dense layer to the every temporal slice of an input. For each of step
# of the output sequence, decide which character should be chosen.
model.add(layers.TimeDistributed(layers.Dense(len(chars))))
model.add(layers.Activation('softmax')) ##激活函数层
model.compile(loss='categorical_crossentropy',
optimizer='adam',
metrics=['accuracy'])
model.summary()
# Train the model each generation and show predictions against the validation
# dataset. 迭代200次
for iteration in range(1, 200):
print()
print('-' * 50)
print('Iteration', iteration)
model.fit(x_train, y_train,
batch_size=BATCH_SIZE,
epochs=1,
validation_data=(x_val, y_val))
# Select 10 samples from the validation set at random so we can visualize
# errors.
for i in range(10):
ind = np.random.randint(0, len(x_val))
rowx, rowy = x_val[np.array([ind])], y_val[np.array([ind])]
preds = model.predict_classes(rowx, verbose=0)
q = ctable.decode(rowx[0])
correct = ctable.decode(rowy[0])
guess = ctable.decode(preds[0], calc_argmax=False)
print('Q', q[::-1] if INVERT else q)
print('T', correct)
if correct == guess:
print(colors.ok + '☑' + colors.close, end=" ")
else:
print(colors.fail + '☒' + colors.close, end=" ")
print(guess)
print('---')