使用GAN 进行异常检测——anoGAN,TODO,待用于安全分析实验

时间:2022-09-06 20:41:56

先说实验成功的代码:

git clone https://github.com/tkwoo/anogan-keras.git

mkdir weights

python main.py --mode train

即可看到效果了!

核心代码:main.py

from __future__ import print_function

import matplotlib
matplotlib.use('Qt5Agg') import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from keras.datasets import mnist
import argparse
import anogan os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' parser = argparse.ArgumentParser()
parser.add_argument('--img_idx', type=int, default=14)
parser.add_argument('--label_idx', type=int, default=7)
parser.add_argument('--mode', type=str, default='test', help='train, test')
args = parser.parse_args() ### 0. prepare data
(X_train, y_train), (X_test, y_test) = mnist.load_data()
X_train = (X_train.astype(np.float32) - 127.5) / 127.5
X_test = (X_test.astype(np.float32) - 127.5) / 127.5 X_train = X_train[:,:,:,None]
X_test = X_test[:,:,:,None] X_test_original = X_test.copy() X_train = X_train[y_train==1]
X_test = X_test[y_test==1]
print ('train shape:', X_train.shape) ### 1. train generator & discriminator
if args.mode == 'train':
Model_d, Model_g = anogan.train(64, X_train) ### 2. test generator
generated_img = anogan.generate(25)
img = anogan.combine_images(generated_img)
img = (img*127.5)+127.5
img = img.astype(np.uint8)
img = cv2.resize(img, None, fx=4, fy=4, interpolation=cv2.INTER_NEAREST) ### opencv view
# cv2.namedWindow('generated', 0)
# cv2.resizeWindow('generated', 256, 256)
# cv2.imshow('generated', img)
# cv2.imwrite('result_latent_10/generator.png', img)
# cv2.waitKey() ### plt view
# plt.figure(num=0, figsize=(4, 4))
# plt.title('trained generator')
# plt.imshow(img, cmap=plt.cm.gray)
# plt.show() # exit() ### 3. other class anomaly detection def anomaly_detection(test_img, g=None, d=None):
model = anogan.anomaly_detector(g=g, d=d)
ano_score, similar_img = anogan.compute_anomaly_score(model, test_img.reshape(1, 28, 28, 1), iterations=500, d=d) # anomaly area, 255 normalization
np_residual = test_img.reshape(28,28,1) - similar_img.reshape(28,28,1)
np_residual = (np_residual + 2)/4 np_residual = (255*np_residual).astype(np.uint8)
original_x = (test_img.reshape(28,28,1)*127.5+127.5).astype(np.uint8)
similar_x = (similar_img.reshape(28,28,1)*127.5+127.5).astype(np.uint8) original_x_color = cv2.cvtColor(original_x, cv2.COLOR_GRAY2BGR)
residual_color = cv2.applyColorMap(np_residual, cv2.COLORMAP_JET)
show = cv2.addWeighted(original_x_color, 0.3, residual_color, 0.7, 0.) return ano_score, original_x, similar_x, show ### compute anomaly score - sample from test set
# test_img = X_test_original[y_test==1][30] ### compute anomaly score - sample from strange image
# test_img = X_test_original[y_test==0][30] ### compute anomaly score - sample from strange image
img_idx = args.img_idx
label_idx = args.label_idx
test_img = X_test_original[y_test==label_idx][img_idx]
# test_img = np.random.uniform(-1,1, (28,28,1)) start = cv2.getTickCount()
score, qurey, pred, diff = anomaly_detection(test_img)
time = (cv2.getTickCount() - start) / cv2.getTickFrequency() * 1000
print ('%d label, %d : done'%(label_idx, img_idx), '%.2f'%score, '%.2fms'%time)
# cv2.imwrite('./qurey.png', qurey)
# cv2.imwrite('./pred.png', pred)
# cv2.imwrite('./diff.png', diff) ## matplot view
plt.figure(1, figsize=(3, 3))
plt.title('query image')
plt.imshow(qurey.reshape(28,28), cmap=plt.cm.gray) print("anomaly score : ", score)
plt.figure(2, figsize=(3, 3))
plt.title('generated similar image')
plt.imshow(pred.reshape(28,28), cmap=plt.cm.gray) plt.figure(3, figsize=(3, 3))
plt.title('anomaly detection')
plt.imshow(cv2.cvtColor(diff,cv2.COLOR_BGR2RGB))
plt.show() ### 4. tsne feature view ### t-SNE embedding
### generating anomaly image for test (radom noise image) from sklearn.manifold import TSNE random_image = np.random.uniform(0, 1, (100, 28, 28, 1))
print("random noise image")
plt.figure(4, figsize=(2, 2))
plt.title('random noise image')
plt.imshow(random_image[0].reshape(28,28), cmap=plt.cm.gray) # intermidieate output of discriminator
model = anogan.feature_extractor()
feature_map_of_random = model.predict(random_image, verbose=1)
feature_map_of_minist = model.predict(X_test_original[y_test != 1][:300], verbose=1)
feature_map_of_minist_1 = model.predict(X_test[:100], verbose=1) # t-SNE for visulization
output = np.concatenate((feature_map_of_random, feature_map_of_minist, feature_map_of_minist_1))
output = output.reshape(output.shape[0], -1)
anomaly_flag = np.array([1]*100+ [0]*300) X_embedded = TSNE(n_components=2).fit_transform(output)
plt.figure(5)
plt.title("t-SNE embedding on the feature representation")
plt.scatter(X_embedded[:100,0], X_embedded[:100,1], label='random noise(anomaly)')
plt.scatter(X_embedded[100:400,0], X_embedded[100:400,1], label='mnist(anomaly)')
plt.scatter(X_embedded[400:,0], X_embedded[400:,1], label='mnist(normal)')
plt.legend()
plt.show()

anogan.py

from __future__ import print_function
from keras.models import Sequential, Model
from keras.layers import Input, Reshape, Dense, Dropout, MaxPooling2D, Conv2D, Flatten
from keras.layers import Conv2DTranspose, LeakyReLU
from keras.layers.core import Activation
from keras.layers.normalization import BatchNormalization
from keras.optimizers import Adam, RMSprop
from keras import backend as K
from keras import initializers
import tensorflow as tf
import numpy as np
from tqdm import tqdm
import cv2
import math from keras.utils. generic_utils import Progbar ### combine images for visualization
def combine_images(generated_images):
num = generated_images.shape[0]
width = int(math.sqrt(num))
height = int(math.ceil(float(num)/width))
shape = generated_images.shape[1:4]
image = np.zeros((height*shape[0], width*shape[1], shape[2]),
dtype=generated_images.dtype)
for index, img in enumerate(generated_images):
i = int(index/width)
j = index % width
image[i*shape[0]:(i+1)*shape[0], j*shape[1]:(j+1)*shape[1],:] = img[:, :, :]
return image ### generator model define
def generator_model():
inputs = Input((10,))
fc1 = Dense(input_dim=10, units=128*7*7)(inputs)
fc1 = BatchNormalization()(fc1)
fc1 = LeakyReLU(0.2)(fc1)
fc2 = Reshape((7, 7, 128), input_shape=(128*7*7,))(fc1)
up1 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(fc2)
conv1 = Conv2D(64, (3, 3), padding='same')(up1)
conv1 = BatchNormalization()(conv1)
conv1 = Activation('relu')(conv1)
up2 = Conv2DTranspose(64, (2, 2), strides=(2, 2), padding='same')(conv1)
conv2 = Conv2D(1, (5, 5), padding='same')(up2)
outputs = Activation('tanh')(conv2) model = Model(inputs=[inputs], outputs=[outputs])
return model ### discriminator model define
def discriminator_model():
inputs = Input((28, 28, 1))
conv1 = Conv2D(64, (5, 5), padding='same')(inputs)
conv1 = LeakyReLU(0.2)(conv1)
pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
conv2 = Conv2D(128, (5, 5), padding='same')(pool1)
conv2 = LeakyReLU(0.2)(conv2)
pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
fc1 = Flatten()(pool2)
fc1 = Dense(1)(fc1)
outputs = Activation('sigmoid')(fc1) model = Model(inputs=[inputs], outputs=[outputs])
return model ### d_on_g model for training generator
def generator_containing_discriminator(g, d):
d.trainable = False
ganInput = Input(shape=(10,))
x = g(ganInput)
ganOutput = d(x)
gan = Model(inputs=ganInput, outputs=ganOutput)
# gan.compile(loss='binary_crossentropy', optimizer='adam')
return gan def load_model():
d = discriminator_model()
g = generator_model()
d_optim = RMSprop()
g_optim = RMSprop(lr=0.0002)
g.compile(loss='binary_crossentropy', optimizer=g_optim)
d.compile(loss='binary_crossentropy', optimizer=d_optim)
d.load_weights('./weights/discriminator.h5')
g.load_weights('./weights/generator.h5')
return g, d ### train generator and discriminator
def train(BATCH_SIZE, X_train): ### model define
d = discriminator_model()
g = generator_model()
d_on_g = generator_containing_discriminator(g, d)
d_optim = RMSprop(lr=0.0004)
g_optim = RMSprop(lr=0.0002)
g.compile(loss='mse', optimizer=g_optim)
d_on_g.compile(loss='mse', optimizer=g_optim)
d.trainable = True
d.compile(loss='mse', optimizer=d_optim) for epoch in range(10):
print ("Epoch is", epoch)
n_iter = int(X_train.shape[0]/BATCH_SIZE)
progress_bar = Progbar(target=n_iter) for index in range(n_iter):
# create random noise -> U(0,1) 10 latent vectors
noise = np.random.uniform(0, 1, size=(BATCH_SIZE, 10)) # load real data & generate fake data
image_batch = X_train[index*BATCH_SIZE:(index+1)*BATCH_SIZE]
generated_images = g.predict(noise, verbose=0) # visualize training results
if index % 20 == 0:
image = combine_images(generated_images)
image = image*127.5+127.5
cv2.imwrite('./result/'+str(epoch)+"_"+str(index)+".png", image) # attach label for training discriminator
X = np.concatenate((image_batch, generated_images))
y = np.array([1] * BATCH_SIZE + [0] * BATCH_SIZE) # training discriminator
d_loss = d.train_on_batch(X, y) # training generator
d.trainable = False
g_loss = d_on_g.train_on_batch(noise, np.array([1] * BATCH_SIZE))
d.trainable = True progress_bar.update(index, values=[('g',g_loss), ('d',d_loss)])
print ('') # save weights for each epoch
g.save_weights('weights/generator.h5', True)
d.save_weights('weights/discriminator.h5', True)
return d, g ### generate images
def generate(BATCH_SIZE):
g = generator_model()
g.load_weights('weights/generator.h5')
noise = np.random.uniform(0, 1, (BATCH_SIZE, 10))
generated_images = g.predict(noise)
return generated_images ### anomaly loss function
def sum_of_residual(y_true, y_pred):
return K.sum(K.abs(y_true - y_pred)) ### discriminator intermediate layer feautre extraction
def feature_extractor(d=None):
if d is None:
d = discriminator_model()
d.load_weights('weights/discriminator.h5')
intermidiate_model = Model(inputs=d.layers[0].input, outputs=d.layers[-7].output)
intermidiate_model.compile(loss='binary_crossentropy', optimizer='rmsprop')
return intermidiate_model ### anomaly detection model define
def anomaly_detector(g=None, d=None):
if g is None:
g = generator_model()
g.load_weights('weights/generator.h5')
intermidiate_model = feature_extractor(d)
intermidiate_model.trainable = False
g = Model(inputs=g.layers[1].input, outputs=g.layers[-1].output)
g.trainable = False
# Input layer cann't be trained. Add new layer as same size & same distribution
aInput = Input(shape=(10,))
gInput = Dense((10), trainable=True)(aInput)
gInput = Activation('sigmoid')(gInput) # G & D feature
G_out = g(gInput)
D_out= intermidiate_model(G_out)
model = Model(inputs=aInput, outputs=[G_out, D_out])
model.compile(loss=sum_of_residual, loss_weights= [0.90, 0.10], optimizer='rmsprop') # batchnorm learning phase fixed (test) : make non trainable
K.set_learning_phase(0) return model ### anomaly detection
def compute_anomaly_score(model, x, iterations=500, d=None):
z = np.random.uniform(0, 1, size=(1, 10)) intermidiate_model = feature_extractor(d)
d_x = intermidiate_model.predict(x) # learning for changing latent
loss = model.fit(z, [x, d_x], batch_size=1, epochs=iterations, verbose=0)
similar_data, _ = model.predict(z) loss = loss.history['loss'][-1] return loss, similar_data

效果图:

使用GAN 进行异常检测——anoGAN,TODO,待用于安全分析实验

detect strange imager never seen!!! refer:https://github.com/yjucho1/anoGAN

## compute anomaly score - sample from strange image

test_img = plt.imread('assets/test_img.png')
test_img = test_img[:,:,0] model = anogan.anomaly_detector()
ano_score, similar_img = anogan.compute_anomaly_score(model, test_img.reshape(1, 28, 28, 1)) plt.figure(figsize=(2, 2))
plt.imshow(test_img.reshape(28,28), cmap=plt.cm.gray)
plt.show()
print("anomaly score : " + str(ano_score))
plt.figure(figsize=(2, 2))
plt.imshow(test_img.reshape(28,28), cmap=plt.cm.gray)
residual = test_img.reshape(28,28) - similar_img.reshape(28, 28)
plt.imshow(residual, cmap='jet', alpha=.5)
plt.show()

使用GAN 进行异常检测——anoGAN,TODO,待用于安全分析实验

anomaly score : 446.46844482421875

使用GAN 进行异常检测——anoGAN,TODO,待用于安全分析实验

https://github.com/yjucho1/anoGAN

from keras.models import Sequential, Model
from keras.layers import Input, Reshape, Dense, Dropout, UpSampling2D, Conv2D, Flatten
from keras.layers.advanced_activations import LeakyReLU
from keras.optimizers import Adam
from keras import backend as K
from keras import initializers
import tensorflow as tf
import numpy as np
from tqdm import tqdm def generator_model():
generator = Sequential()
generator.add(Dense(128*7*7, input_dim=100, kernel_initializer=initializers.RandomNormal(stddev=0.02)))
generator.add(LeakyReLU(0.2))
generator.add(Reshape((7, 7, 128)))
generator.add(UpSampling2D(size=(2, 2)))
generator.add(Conv2D(64, kernel_size=(5, 5), padding='same'))
generator.add(LeakyReLU(0.2))
generator.add(UpSampling2D(size=(2, 2)))
generator.add(Conv2D(1, kernel_size=(5, 5), padding='same', activation='tanh'))
generator.compile(loss='binary_crossentropy', optimizer='adam')
return generator def discriminator_model():
discriminator = Sequential()
discriminator.add(Conv2D(64, kernel_size=(5, 5), strides=(2, 2), padding='same', input_shape=(28,28, 1), kernel_initializer=initializers.RandomNormal(stddev=0.02)))
discriminator.add(LeakyReLU(0.2))
discriminator.add(Dropout(0.3))
discriminator.add(Conv2D(128, kernel_size=(5, 5), strides=(2, 2), padding='same'))
discriminator.add(LeakyReLU(0.2))
discriminator.add(Dropout(0.3))
discriminator.add(Flatten())
discriminator.add(Dense(1, activation='sigmoid'))
discriminator.compile(loss='binary_crossentropy', optimizer='adam')
return discriminator def generator_containing_discriminator(g, d):
d.trainable = False
ganInput = Input(shape=(100,))
x = g(ganInput)
ganOutput = d(x)
gan = Model(inputs=ganInput, outputs=ganOutput)
gan.compile(loss='binary_crossentropy', optimizer='adam')
return gan def train(BATCH_SIZE, X_train):
d = discriminator_model()
print("#### discriminator ######")
d.summary()
g = generator_model()
print("#### generator ######")
g.summary()
d_on_g = generator_containing_discriminator(g, d)
d.trainable = True
for epoch in tqdm(range(200)):
for index in range(int(X_train.shape[0]/BATCH_SIZE)):
noise = np.random.uniform(0, 1, size=(BATCH_SIZE, 100))
image_batch = X_train[index*BATCH_SIZE:(index+1)*BATCH_SIZE]
generated_images = g.predict(noise, verbose=0)
X = np.concatenate((image_batch, generated_images))
y = np.array([1] * BATCH_SIZE + [0] * BATCH_SIZE)
d_loss = d.train_on_batch(X, y)
noise = np.random.uniform(0, 1, (BATCH_SIZE, 100))
d.trainable = False
g_loss = d_on_g.train_on_batch(noise, np.array([1] * BATCH_SIZE))
d.trainable = True
g.save_weights('assets/generator', True)
d.save_weights('assets/discriminator', True)
return d, g def generate(BATCH_SIZE):
g = generator_model()
g.load_weights('assets/generator')
noise = np.random.uniform(0, 1, (BATCH_SIZE, 100))
generated_images = g.predict(noise)
return generated_images def sum_of_residual(y_true, y_pred):
return tf.reduce_sum(abs(y_true - y_pred)) def feature_extractor():
d = discriminator_model()
d.load_weights('assets/discriminator')
intermidiate_model = Model(inputs=d.layers[0].input, outputs=d.layers[-5].output)
intermidiate_model.compile(loss='binary_crossentropy', optimizer='adam')
return intermidiate_model def anomaly_detector():
g = generator_model()
g.load_weights('assets/generator')
g.trainable = False
intermidiate_model = feature_extractor()
intermidiate_model.trainable = False aInput = Input(shape=(100,))
gInput = Dense((100))(aInput)
G_out = g(gInput)
D_out= intermidiate_model(G_out)
model = Model(inputs=aInput, outputs=[G_out, D_out])
model.compile(loss=sum_of_residual, loss_weights= [0.9, 0.1], optimizer='adam')
return model def compute_anomaly_score(model, x):
z = np.random.uniform(0, 1, size=(1, 100))
intermidiate_model = feature_extractor()
d_x = intermidiate_model.predict(x)
loss = model.fit(z, [x, d_x], epochs=500, verbose=0)
similar_data, _ = model.predict(z)
return loss.history['loss'][-1], similar_data

 

GAN异常检测的一些实验

要做基于GANomaly的异常检测实验,需要准备大量的OK样本和少量的NG样本。找不到合适的数据集怎么办?很简单,随便找个开源的分类数据集,将其中一个类别的样本当作异常类别,其他所有类别的样本当作正常样本即可,文章中的实验就是这么干的。具体试验结果如下:

使用GAN 进行异常检测——anoGAN,TODO,待用于安全分析实验

反正在效果上,GANomaly是超过了之前两种代表性的方法。此外,作者还做了性能对比的实验。事实上前面已经介绍了GANomaly的推断方法,就是一个简单的前向传播和一个对比阈值的过程,因此速度非常快。具体结果如下:

使用GAN 进行异常检测——anoGAN,TODO,待用于安全分析实验

可以看出,计算性能上,GANomaly表现也是非常不错的。

使用GAN 进行异常检测——anoGAN,TODO,待用于安全分析实验

使用GAN 进行异常检测——anoGAN,TODO,待用于安全分析实验

使用GAN 进行异常检测——anoGAN,TODO,待用于安全分析实验的更多相关文章

  1. 使用GAN进行异常检测——可以进行网络流量的自学习哇,哥哥,人家是半监督,无监督的话,还是要VAE,SAE。

    实验了效果,下面的还是图像的异常检测居多. https://github.com/LeeDoYup/AnoGAN https://github.com/tkwoo/anogan-keras 看了下,本 ...

  2. 杜伦大学提出GANomaly:无需负例样本实现异常检测

    杜伦大学提出GANomaly:无需负例样本实现异常检测 本期推荐的论文笔记来自 PaperWeekly 社区用户 @TwistedW.在异常检测模块下,如果没有异常(负例样本)来训练模型,应该如何实现 ...

  3. LSTM UEBA异常检测——deeplog里其实提到了,就是多分类LSTM算法,结合LSTM预测误差来检测异常参数

    结合CNN的可以参考:http://fcst.ceaj.org/CN/article/downloadArticleFile.do?attachType=PDF&id=1497 除了行为,其他 ...

  4. kaggle信用卡欺诈看异常检测算法——无监督的方法包括: 基于统计的技术,如BACON *离群检测 多变量异常值检测 基于聚类的技术;监督方法: 神经网络 SVM 逻辑回归

    使用google翻译自:https://software.seek.intel.com/dealing-with-outliers 数据分析中的一项具有挑战性但非常重要的任务是处理异常值.我们通常将异 ...

  5. 异常检测-基于孤立森林算法Isolation-based Anomaly Detection-1-论文学习

    论文http://202.119.32.195/cache/10/03/cs.nju.edu.cn/da2d9bef3c4fd7d2d8c33947231d9708/tkdd11.pdf 1. INT ...

  6. 利用KD树进行异常检测

    软件安全课程的一次实验,整理之后发出来共享. 什么是KD树 要说KD树,我们得先说一下什么是KNN算法. KNN是k-NearestNeighbor的简称,原理很简单:当你有一堆已经标注好的数据时,你 ...

  7. 5-Spark高级数据分析-第五章 基于K均值聚类的网络流量异常检测

    据我们所知,有‘已知的已知’,有些事,我们知道我们知道:我们也知道,有 ‘已知的未知’,也就是说,有些事,我们现在知道我们不知道.但是,同样存在‘不知的不知’——有些事,我们不知道我们不知道. 上一章 ...

  8. 异常检测算法--Isolation Forest

    南大周志华老师在2010年提出一个异常检测算法Isolation Forest,在工业界很实用,算法效果好,时间效率高,能有效处理高维数据和海量数据,这里对这个算法进行简要总结. iTree 提到森林 ...

  9. 机器学习:异常检测算法Seasonal Hybrid ESD及R语言实现

    Twritters的异常检测算法(Anomaly Detection)做的比较好,Seasonal Hybrid ESD算法是先用STL把序列分解,考察残差项.假定这一项符合正态分布,然后就可以用Ge ...

随机推荐

  1. 关于使用QQ、新浪微博、腾讯微博等第三方登录网站的开发过程(二)

    (二).新浪微博登录 1. 首先在新浪微博开放平台注册成为开发者.[http://open.weibo.com/connect] 具体自己填写一些相关信息就OK! 2. 注册成功之后,点击[微连接], ...

  2. 以蓝牙开发的视觉解读微信Airsync协议

    微信硬件平台使用蓝牙作为近场控制的连接件,并拟定了<微信蓝牙外设协议>.这份协议更像一个标准,用于规范微信和蓝牙外设之间的数据交互场景和接口.但从开发者来看,要完全读懂这份协议,恐怕需要熟 ...

  3. 被墙的情况(同时下载AndroidSDK达到200&plus;kb&sol;s)

    相信大家都遇到过google搜索被墙掉的情况吧:现在用修改Hosts的方法来解决哈 linux下: sudo gedit /etc/hosts win7下: 到目录C:\Windows\System3 ...

  4. koa2&plus;webSocket 聊天室

    做了一个简单的的聊天室,用来看看 koa和 websocket的使用还是挺好的,已经放到gitHub. https://github.com/zhaowanhua/koa2WebSocket

  5. thinkphp5使用空模块

    今天想做一个功能,可以后台设置url是二级域名(也是指向同一个服务器)还是一级域名(域名/模块),网上找了找,TP3.2开始取消了空模块.所以只能自己修改框架源码了. ----------有点晚,明天 ...

  6. JS&lowbar;高程4&period;变量,作用域和内存问题(1)

    1.基本类型和应用类型的值 ECMAScript变量可能包含两种不同数据类型的值: 基本类型值——简单的数据段.(5种基本的数据类型,按值访问,因为可以操作保存在变量中的实际的值.) 引用类型值——多 ...

  7. ABAP-ALV报表导出格式恢复初始画面

    进入一个ALV表格,想下载数据,一般点清单-->输出-->电子数据表. 会出来一个对话框,可选择导出成各类格式. 在下端有一个“始终使用选定的格式”,一旦勾上,就再也不会弹出选择框了. 以 ...

  8. php session小节

    1.为什么要用session? 在人们访问网站的时候,有很多个网页,由于http自身的特点,用户每执行一个脚本都需要和web服务器重新建立连接.由于他们之间是无状态的,这次的连接无法得到上次连接的状态 ...

  9. java 翻页工具类

    Pagination类 package com.paic.bics.core.mybatis.page; import java.util.List; @SuppressWarnings(" ...

  10. golang实现高阶函数之map

    package main import "fmt" func iMap(num []int, f func(a int) int) []int{ var r []int for _ ...