深度残差网络ResNet获得了2016年IEEE Conference on Computer Vision and Pattern Recognition的最佳论文奖,目前在谷歌学术的引用量已高达38295次。
Squeeze-and-Excitation Network(SENet)是一种较新的注意力机制下的深度学习方法。 在不同的样本中,不同的特征通道,在分类任务中的贡献大小,往往是不同的。SENet采用一个小型的子网络,获得一组权重,进而将这组权重与各个通道的特征分别相乘,以调整各个通道特征的大小。这个过程,就可以认为是在施加不同大小的注意力在各个特征通道上。
1 #!/usr/bin/env python3 2 # -*- coding: utf-8 -*- 3 """ 4 Created on Sat Dec 28 23:24:05 2019 5 Implemented using TensorFlow 1.0.1 and Keras 2.2.1 6 7 M. Zhao, S. Zhong, X. Fu, et al., Deep Residual Shrinkage Networks for Fault Diagnosis, 8 IEEE Transactions on Industrial Informatics, 2019, DOI: 10.1109/TII.2019.2943898 9 @author: super_9527 10 """ 11 12 from __future__ import print_function 13 import keras 14 import numpy as np 15 from keras.datasets import mnist 16 from keras.layers import Dense, Conv2D, BatchNormalization, Activation 17 from keras.layers import AveragePooling2D, Input, GlobalAveragePooling2D 18 from keras.optimizers import Adam 19 from keras.regularizers import l2 20 from keras import backend as K 21 from keras.models import Model 22 from keras.layers.core import Lambda 23 K.set_learning_phase(1) 24 25 # Input image dimensions 26 img_rows, img_cols = 28, 28 27 28 # The data, split between train and test sets 29 (x_train, y_train), (x_test, y_test) = mnist.load_data() 30 31 if K.image_data_format() == \'channels_first\': 32 x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols) 33 x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols) 34 input_shape = (1, img_rows, img_cols) 35 else: 36 x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1) 37 x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1) 38 input_shape = (img_rows, img_cols, 1) 39 40 # Noised data 41 x_train = x_train.astype(\'float32\') / 255. + 0.5*np.random.random([x_train.shape[0], img_rows, img_cols, 1]) 42 x_test = x_test.astype(\'float32\') / 255. + 0.5*np.random.random([x_test.shape[0], img_rows, img_cols, 1]) 43 print(\'x_train shape:\', x_train.shape) 44 print(x_train.shape[0], \'train samples\') 45 print(x_test.shape[0], \'test samples\') 46 47 # convert class vectors to binary class matrices 48 y_train = keras.utils.to_categorical(y_train, 10) 49 y_test = keras.utils.to_categorical(y_test, 10) 50 51 52 def abs_backend(inputs): 53 return K.abs(inputs) 54 55 def expand_dim_backend(inputs): 56 return K.expand_dims(K.expand_dims(inputs,1),1) 57 58 def sign_backend(inputs): 59 return K.sign(inputs) 60 61 def pad_backend(inputs, in_channels, out_channels): 62 pad_dim = (out_channels - in_channels)//2 63 inputs = K.expand_dims(inputs,-1) 64 inputs = K.spatial_3d_padding(inputs, ((0,0),(0,0),(pad_dim,pad_dim)), \'channels_last\') 65 return K.squeeze(inputs, -1) 66 67 # Residual Shrinakge Block 68 def residual_shrinkage_block(incoming, nb_blocks, out_channels, downsample=False, 69 downsample_strides=2): 70 71 residual = incoming 72 in_channels = incoming.get_shape().as_list()[-1] 73 74 for i in range(nb_blocks): 75 76 identity = residual 77 78 if not downsample: 79 downsample_strides = 1 80 81 residual = BatchNormalization()(residual) 82 residual = Activation(\'relu\')(residual) 83 residual = Conv2D(out_channels, 3, strides=(downsample_strides, downsample_strides), 84 padding=\'same\', kernel_initializer=\'he_normal\', 85 kernel_regularizer=l2(1e-4))(residual) 86 87 residual = BatchNormalization()(residual) 88 residual = Activation(\'relu\')(residual) 89 residual = Conv2D(out_channels, 3, padding=\'same\', kernel_initializer=\'he_normal\', 90 kernel_regularizer=l2(1e-4))(residual) 91 92 # Calculate global means 93 residual_abs = Lambda(abs_backend)(residual) 94 abs_mean = GlobalAveragePooling2D()(residual_abs) 95 96 # Calculate scaling coefficients 97 scales = Dense(out_channels, activation=None, kernel_initializer=\'he_normal\', 98 kernel_regularizer=l2(1e-4))(abs_mean) 99 scales = BatchNormalization()(scales) 100 scales = Activation(\'relu\')(scales) 101 scales = Dense(out_channels, activation=\'sigmoid\', kernel_regularizer=l2(1e-4))(scales) 102 scales = Lambda(expand_dim_backend)(scales) 103 104 # Calculate thresholds 105 thres = keras.layers.multiply([abs_mean, scales]) 106 107 # Soft thresholding 108 sub = keras.layers.subtract([residual_abs, thres]) 109 zeros = keras.layers.subtract([sub, sub]) 110 n_sub = keras.layers.maximum([sub, zeros]) 111 residual = keras.layers.multiply([Lambda(sign_backend)(residual), n_sub]) 112 113 # Downsampling (it is important to use the pooL-size of (1, 1)) 114 if downsample_strides > 1: 115 identity = AveragePooling2D(pool_size=(1,1), strides=(2,2))(identity) 116 117 # Zero_padding to match channels (it is important to use zero padding rather than 1by1 convolution) 118 if in_channels != out_channels: 119 identity = Lambda(pad_backend, arguments={\'in_channels\':in_channels,\'out_channels\':out_channels})(identity) 120 121 residual = keras.layers.add([residual, identity]) 122 123 return residual 124 125 126 # define and train a model 127 inputs = Input(shape=input_shape) 128 net = Conv2D(8, 3, padding=\'same\', kernel_initializer=\'he_normal\', kernel_regularizer=l2(1e-4))(inputs) 129 net = residual_shrinkage_block(net, 1, 8, downsample=True) 130 net = BatchNormalization()(net) 131 net = Activation(\'relu\')(net) 132 net = GlobalAveragePooling2D()(net) 133 outputs = Dense(10, activation=\'softmax\', kernel_initializer=\'he_normal\', kernel_regularizer=l2(1e-4))(net) 134 model = Model(inputs=inputs, outputs=outputs) 135 model.compile(loss=\'categorical_crossentropy\', optimizer=Adam(), metrics=[\'accuracy\']) 136 model.fit(x_train, y_train, batch_size=100, epochs=5, verbose=1, validation_data=(x_test, y_test)) 137 138 # get results 139 K.set_learning_phase(0) 140 DRSN_train_score = model.evaluate(x_train, y_train, batch_size=100, verbose=0) 141 print(\'Train loss:\', DRSN_train_score[0]) 142 print(\'Train accuracy:\', DRSN_train_score[1]) 143 DRSN_test_score = model.evaluate(x_test, y_test, batch_size=100, verbose=0) 144 print(\'Test loss:\', DRSN_test_score[0]) 145 print(\'Test accuracy:\', DRSN_test_score[1])
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Created on Mon Dec 23 21:23:09 2019 Implemented using TensorFlow 1.0 and TFLearn 0.3.2 M. Zhao, S. Zhong, X. Fu, B. Tang, M. Pecht, Deep Residual Shrinkage Networks for Fault Diagnosis, IEEE Transactions on Industrial Informatics, 2019, DOI: 10.1109/TII.2019.2943898 @author: super_9527 """ from __future__ import division, print_function, absolute_import import tflearn import numpy as np import tensorflow as tf from tflearn.layers.conv import conv_2d # Data loading from tflearn.datasets import cifar10 (X, Y), (testX, testY) = cifar10.load_data() # Add noise X = X + np.random.random((50000, 32, 32, 3))*0.1 testX = testX + np.random.random((10000, 32, 32, 3))*0.1 # Transform labels to one-hot format Y = tflearn.data_utils.to_categorical(Y,10) testY = tflearn.data_utils.to_categorical(testY,10) def residual_shrinkage_block(incoming, nb_blocks, out_channels, downsample=False, downsample_strides=2, activation=\'relu\', batch_norm=True, bias=True, weights_init=\'variance_scaling\', bias_init=\'zeros\', regularizer=\'L2\', weight_decay=0.0001, trainable=True, restore=True, reuse=False, scope=None, name="ResidualBlock"): # residual shrinkage blocks with channel-wise thresholds residual = incoming in_channels = incoming.get_shape().as_list()[-1] # Variable Scope fix for older TF try: vscope = tf.variable_scope(scope, default_name=name, values=[incoming], reuse=reuse) except Exception: vscope = tf.variable_op_scope([incoming], scope, name, reuse=reuse) with vscope as scope: name = scope.name #TODO for i in range(nb_blocks): identity = residual if not downsample: downsample_strides = 1 if batch_norm: residual = tflearn.batch_normalization(residual) residual = tflearn.activation(residual, activation) residual = conv_2d(residual, out_channels, 3, downsample_strides, \'same\', \'linear\', bias, weights_init, bias_init, regularizer, weight_decay, trainable, restore) if batch_norm: residual = tflearn.batch_normalization(residual) residual = tflearn.activation(residual, activation) residual = conv_2d(residual, out_channels, 3, 1, \'same\', \'linear\', bias, weights_init, bias_init, regularizer, weight_decay, trainable, restore) # get thresholds and apply thresholding abs_mean = tf.reduce_mean(tf.reduce_mean(tf.abs(residual),axis=2,keep_dims=True),axis=1,keep_dims=True) scales = tflearn.fully_connected(abs_mean, out_channels//4, activation=\'linear\',regularizer=\'L2\',weight_decay=0.0001,weights_init=\'variance_scaling\') scales = tflearn.batch_normalization(scales) scales = tflearn.activation(scales, \'relu\') scales = tflearn.fully_connected(scales, out_channels, activation=\'linear\',regularizer=\'L2\',weight_decay=0.0001,weights_init=\'variance_scaling\') scales = tf.expand_dims(tf.expand_dims(scales,axis=1),axis=1) thres = tf.multiply(abs_mean,tflearn.activations.sigmoid(scales)) # soft thresholding residual = tf.multiply(tf.sign(residual), tf.maximum(tf.abs(residual)-thres,0)) # Downsampling if downsample_strides > 1: identity = tflearn.avg_pool_2d(identity, 1, downsample_strides) # Projection to new dimension if in_channels != out_channels: if (out_channels - in_channels) % 2 == 0: ch = (out_channels - in_channels)//2 identity = tf.pad(identity, [[0, 0], [0, 0], [0, 0], [ch, ch]]) else: ch = (out_channels - in_channels)//2 identity = tf.pad(identity, [[0, 0], [0, 0], [0, 0], [ch, ch+1]]) in_channels = out_channels residual = residual + identity return residual # Real-time data preprocessing img_prep = tflearn.ImagePreprocessing() img_prep.add_featurewise_zero_center(per_channel=True) # Real-time data augmentation img_aug = tflearn.ImageAugmentation() img_aug.add_random_flip_leftright() img_aug.add_random_crop([32, 32], padding=4) # Build a Deep Residual Shrinkage Network with 3 blocks net = tflearn.input_data(shape=[None, 32, 32, 3], data_preprocessing=img_prep, data_augmentation=img_aug) net = tflearn.conv_2d(net, 16, 3, regularizer=\'L2\', weight_decay=0.0001) net = residual_shrinkage_block(net, 1, 16) net = residual_shrinkage_block(net, 1, 32, downsample=True) net = residual_shrinkage_block(net, 1, 32, downsample=True) net = tflearn.batch_normalization(net) net = tflearn.activation(net, \'relu\') net = tflearn.global_avg_pool(net) # Regression net = tflearn.fully_connected(net, 10, activation=\'softmax\') mom = tflearn.Momentum(0.1, lr_decay=0.1, decay_step=20000, staircase=True) net = tflearn.regression(net, optimizer=mom, loss=\'categorical_crossentropy\') # Training model = tflearn.DNN(net, checkpoint_path=\'model_cifar10\', max_checkpoints=10, tensorboard_verbose=0, clip_gradients=0.) model.fit(X, Y, n_epoch=100, snapshot_epoch=False, snapshot_step=500, show_metric=True, batch_size=100, shuffle=True, run_id=\'model_cifar10\') training_acc = model.evaluate(X, Y)[0] validation_acc = model.evaluate(testX, testY)[0]
M. Zhao, S. Zhong, X. Fu, et al., Deep residual shrinkage networks for fault diagnosis, IEEE Transactions on Industrial Informatics, DOI: 10.1109/TII.2019.2943898