Keras的简单应用(三分类问题)——改编自cifar10vgg16

时间:2022-02-15 14:01:29

此为改编自cifar10的三分类问题:

from __future__ import print_function  #此为在老版本的python中兼顾新特性的一种方法
import keras
from keras.preprocessing.image import ImageDataGenerator
from keras.preprocessing import image
from keras.models import Sequential
from keras.models import model_from_json
from keras.layers import Dense, Dropout, Activation, Flatten
from keras.layers import Conv2D, MaxPooling2D, BatchNormalization
from keras import optimizers
from keras import backend as K
from keras import regularizers
from keras.applications.imagenet_utils import preprocess_input

import scipy.misc
import numpy as np
import matplotlib.pyplot as plt
import h5py


#from keras.datasets import cifar10
#from keras.layers.core import Lambda
#from matplotlib.pyplot import imshow


class teeth3vgg:
    def __init__(self,train=False):
        self.num_classes = 3
        self.weight_decay = 0.0005  #权值衰减,目的是防止过拟合
        self.x_shape = [32,32,3]

        self.model = self.build_model()
        if train:
            self.model = self.train(self.model)
        else:
            #self.model.load_weights('weight.h5')

            # 加载模型数据和weights
            self.model = model_from_json(open('my_model_architecture.json').read())
            self.model.load_weights('my_model_weights.h5')


    def build_model(self):
        # Build the network of vgg for 10 classes with massive dropout and weight decay as described in the paper.

        model = Sequential()
        weight_decay = self.weight_decay

        model.add(Conv2D(64, (3, 3), padding='same',input_shape=self.x_shape,kernel_regularizer=regularizers.l2(weight_decay)))
        #kernel_regularizer表示施加在权重上的正则项
        model.add(Activation('relu'))
        model.add(BatchNormalization())
        #该层在每个batch(批次)上将前一层的激活值重新规范化,即使得其输出数据的均值接近0,其标准差接近1
        model.add(Dropout(0.3))

        model.add(Conv2D(64, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())

        model.add(MaxPooling2D(pool_size=(2, 2)))

        model.add(Conv2D(128, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())
        model.add(Dropout(0.4))

        model.add(Conv2D(128, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())

        model.add(MaxPooling2D(pool_size=(2, 2)))

        model.add(Conv2D(256, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())
        model.add(Dropout(0.4))

        model.add(Conv2D(256, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())
        model.add(Dropout(0.4))

        model.add(Conv2D(256, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())

        model.add(MaxPooling2D(pool_size=(2, 2)))


        model.add(Conv2D(512, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())
        model.add(Dropout(0.4))

        model.add(Conv2D(512, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())
        model.add(Dropout(0.4))

        model.add(Conv2D(512, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())

        model.add(MaxPooling2D(pool_size=(2, 2)))


        model.add(Conv2D(512, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())
        model.add(Dropout(0.4))

        model.add(Conv2D(512, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())
        model.add(Dropout(0.4))

        model.add(Conv2D(512, (3, 3), padding='same',kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())

        model.add(MaxPooling2D(pool_size=(2, 2)))
        model.add(Dropout(0.5))

        model.add(Flatten())  #即把多维的输入一维化,常用于卷积层到全连接层的过渡
        model.add(Dense(512,kernel_regularizer=regularizers.l2(weight_decay)))
        model.add(Activation('relu'))
        model.add(BatchNormalization())

        model.add(Dropout(0.5))
        model.add(Dense(self.num_classes))  #该全连接层的输出维度为3
        model.add(Activation('softmax'))
        return model


    def normalize(self, X_train, X_valida, X_test):
        #this function normalize inputs for zero mean and unit variance
        # it is used when training a model.
        # Input: training set and test set
        # Output: normalized training set and test set according to the trianing set statistics.
        mean = np.mean(X_train, axis=(0, 1, 2, 3)) #计算均值
        std = np.std(X_train, axis=(0, 1, 2, 3)) #计算标准差
        X_train = (X_train-mean)/(std+1e-7)
        X_valida = (X_valida- mean)/(std + 1e-7)
        X_test = (X_test-mean)/(std+1e-7)
        return X_train, X_valida, X_test

    def normalize_production(self,x):
        #this function is used to normalize instances in production according to saved training set statistics
        # Input: X - a training set
        # Output X - a normalized training set according to normalization constants.

        #these values produced during first training and are general for the standard cifar10 training set normalization
        mean = np.mean(x)
        std = np.std(x)
        return (x-mean)/(std+1e-7)
    #'''
    def predict(self, x, normalize=True, batch_size=50):
        if normalize:
            x = self.normalize_production(x)
        return self.model.predict(x, batch_size)
    #'''
    def train(self,model):

        #training parameters
        #batch_size = 128
        batch_size = 100
        #maxepoches = 250
        maxepoches = 150
        learning_rate = 0.01
        lr_decay = 1e-6
        lr_drop = 20
        # The data, shuffled and split between train and test sets:

        #(x_train, y_train), (x_test, y_test) = cifar10.load_data()
        train_dataset = h5py.File('data.h5', 'r')
        x_train = np.array(train_dataset['X_train'][:])  # your train set features
        y_train = np.array(train_dataset['y_train'][:])  # your train set labels
        x_valida = np.array(train_dataset['X_valida'][:])  # your valida set features
        y_valida = np.array(train_dataset['y_valida'][:])  # your valida set labels
        x_test = np.array(train_dataset['X_test'][:])  # your test set features
        y_test = np.array(train_dataset['y_test'][:])  # your test set labels
        train_dataset.close()

        x_train = x_train.astype('float32')
        x_valida = x_valida.astype('float32')
        x_test = x_test.astype('float32')
        x_train, x_valida, x_test = self.normalize(x_train, x_valida, x_test)

        y_train = keras.utils.to_categorical(y_train, self.num_classes)
        y_valida = keras.utils.to_categorical(y_valida, self.num_classes)
        y_test = keras.utils.to_categorical(y_test, self.num_classes)

        def lr_scheduler(epoch):
            return learning_rate * (0.5 ** (epoch // lr_drop))
        reduce_lr = keras.callbacks.LearningRateScheduler(lr_scheduler)

        #data augmentation
        datagen = ImageDataGenerator(  #图片生成器
            featurewise_center=False,  # set input mean to 0 over the dataset
            samplewise_center=False,  # set each sample mean to 0
            featurewise_std_normalization=False,  # divide inputs by std of the dataset
            samplewise_std_normalization=False,  # divide each input by its std
            zca_whitening=False,  # apply ZCA whitening
            rotation_range=15,  # randomly rotate images in the range (degrees, 0 to 180)
            width_shift_range=0.1,  # randomly shift images horizontally (fraction of total width)
            height_shift_range=0.1,  # randomly shift images vertically (fraction of total height)
            horizontal_flip=True,  # randomly flip images
            vertical_flip=False)  # randomly flip images
        # (std, mean, and principal components if ZCA whitening is applied).
        datagen.fit(x_train)
        print('***********************')
        print(x_train.shape)


        #optimization details  #优化器
        sgd = optimizers.SGD(lr=learning_rate, decay=lr_decay, momentum=0.9, nesterov=True)
        model.compile(loss='categorical_crossentropy', optimizer=sgd,metrics=['accuracy'])


        # training process in a for loop with learning rate drop every 25 epoches.

        historytemp = model.fit_generator(datagen.flow(x_train, y_train,
                                         batch_size=batch_size),
                            steps_per_epoch=x_train.shape[0] // batch_size,
                            epochs=maxepoches,
                            validation_data=(x_valida, y_valida), callbacks=[reduce_lr],verbose=2)  #verbose用来设置进度条
        # verbose是屏显模式,官方这么说的:verbose: 0 for no logging to stdout, 1 for progress bar logging, 2 for one log line per epoch.
        # 就是说0是不屏显,1是显示一个进度条,2是每个epoch都显示一行数据

        #model.save_weights('weight.h5')
        json_string = model.to_json()  # 等价于 json_string = model.get_config()
        open('my_model_architecture.json', 'w').write(json_string)
        model.save_weights('my_model_weights.h5')

        return model

if __name__ == '__main__':


    #(x_train, y_train), (x_test, y_test) = cifar10.load_data()
    train_dataset = h5py.File('data.h5', 'r')
    x_train = np.array(train_dataset['X_train'][:])  # your train set features
    y_train = np.array(train_dataset['y_train'][:])  # your train set labels
    x_valida = np.array(train_dataset['X_valida'][:])  # your valida set features
    y_valida = np.array(train_dataset['y_valida'][:])  # your valida set labels
    x_test = np.array(train_dataset['X_test'][:])  # your test set features
    y_test = np.array(train_dataset['y_test'][:])  # your test set labels
    #train_dataset.close()

    x_train = x_train.astype('float32')
    x_valida = x_valida.astype('float32')
    x_test = x_test.astype('float32')

    y_train = keras.utils.to_categorical(y_train, 3)  #把类别转化成了one-hot的格式
    y_valida = keras.utils.to_categorical(y_valida, 3)
    y_test = keras.utils.to_categorical(y_test, 3)

    print('读取数据成功啦!来看看标签吧!')
    print(y_train[0])
    print(y_train[1])
    print(y_train[2])
    print(y_train[3])
    print(y_train[4])
    print(y_train[5])
    print(y_train[6])
    print(y_train[7])
    print(y_train[8])
    print(y_train[9])
    print(y_train[10])
    #print(x_train[0])

    model = teeth3vgg()

    print('模型建立完毕!')

    #对验证集进行正向传播评估正确率
    predicted_x = model.predict(x_valida)
    print(model.predict(x_valida))

    residuals = (np.argmax(predicted_x, 1) == np.argmax(y_valida, 1))  #argmax返回的是最大数的索引
    print('------------------------***')
    print(residuals)
    print(sum(residuals))
    print(len(residuals))
    loss = sum(residuals)/len(residuals)
    print("the validation 0/1 loss is: ",loss)

    # 对测试集进行正向传播评估正确率
    predicted_x1 = model.predict(x_test)
    print(predicted_x1.shape)
    print(y_test.shape)
    #print(model.predict(x_test))

    residuals1 = (np.argmax(predicted_x1, 1) == np.argmax(y_test, 1))  # argmax返回的是最大数的索引
    print('------------------------***')
    #print(residuals1)
    print(sum(residuals1))
    print(len(residuals1))
    loss1 = sum(residuals1) / len(residuals1)
    print("the test 0/1 loss is: ", loss1)

    '''
    score = model.evaluate(x_test, y_test, batch_size=10, verbose=1)
    print("loss  :",score[0])
    print("acc  :", score[1])
    
    score = model.evaluate(x_test, y_test, verbose=0)
    print('Val loss:', score[0])
    print('Val accuracy:', score[1])
    '''
    #model.evaluate(x_test, y_test, batch_size=100, show_accuracy=True, verbose=1)  # show_accuracy就是显示每次迭代后的正确率
    img_path = 'E:/MatlabWorkspace/picdst14/247.jpg'
    img = image.load_img(img_path, target_size=(32, 32))
    x = image.img_to_array(img)
    print('Source input image shape:', x.shape)
    x = np.expand_dims(x, 0) #增加维度,在下标为dim=0的轴上增加一维;-1表示最后一维
    x = preprocess_input(x)  #数据预处理,提高算法的效率
    print('Input image shape:', x.shape)
    my_image = scipy.misc.imread(img_path)
    plt.imshow(my_image)
    print("class prediction vector [p(0), p(1), p(2)] = ")
    px=model.predict(x)
    print(px)
    #print(model.predict(x_test[3]))
    if np.argmax(px,1)==0:
        print('该区域为牙齿!')
    elif np.argmax(px,1)==1:
        print('该区域为牙菌斑!')
    else:
        print('该区域为其他!')
    K.clear_session()
    '''
    fig, ax = plt.subplots(2, 1)
    ax[0].plot(history.history['loss'], color='b', label="Training loss")
    ax[0].plot(history.history['val_loss'], color='r', label="validation loss", axes=ax[0])
    ax[0].legend(loc='best', shadow=True)
    ax[1].plot(history.history['acc'], color='b', label="Training accuracy")
    ax[1].plot(history.history['val_acc'], color='r', label="Validation accuracy")
    ax[1].legend(loc='best', shadow=True)
    plt.show()
    '''


张量:A tensor is something that transforms like a tensor! 一个量,在不同的参考系下按照某种特定的法则进行变换,就是张量。

YouTube视频:https://www.youtube.com/watch?v=f5liqUk0ZTw