较基础的svm,后续会加上多分类以及高斯核,供大家参考。
talk is cheap, show me the code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
import tensorflow as tf
from sklearn.base import baseestimator, classifiermixin
import numpy as np
class tfsvm(baseestimator, classifiermixin):
def __init__( self ,
c = 1 , kernel = 'linear' ,
learning_rate = 0.01 ,
training_epoch = 1000 ,
display_step = 50 ,
batch_size = 50 ,
random_state = 42 ):
#参数列表
self .svmc = c
self .kernel = kernel
self .learning_rate = learning_rate
self .training_epoch = training_epoch
self .display_step = display_step
self .random_state = random_state
self .batch_size = batch_size
def reset_seed( self ):
#重置随机数
tf.set_random_seed( self .random_state)
np.random.seed( self .random_state)
def random_batch( self , x, y):
#调用随机子集,实现mini-batch gradient descent
indices = np.random.randint( 1 , x.shape[ 0 ], self .batch_size)
x_batch = x[indices]
y_batch = y[indices]
return x_batch, y_batch
def _build_graph( self , x_train, y_train):
#创建计算图
self .reset_seed()
n_instances, n_inputs = x_train.shape
x = tf.placeholder(tf.float32, [none, n_inputs], name = 'x' )
y = tf.placeholder(tf.float32, [none, 1 ], name = 'y' )
with tf.name_scope( 'trainable_variables' ):
#决策边界的两个变量
w = tf.variable(tf.truncated_normal(shape = [n_inputs, 1 ], stddev = 0.1 ), name = 'weights' )
b = tf.variable(tf.truncated_normal([ 1 ]), name = 'bias' )
with tf.name_scope( 'training' ):
#算法核心
y_raw = tf.add(tf.matmul(x, w), b)
l2_norm = tf.reduce_sum(tf.square(w))
hinge_loss = tf.reduce_mean(tf.maximum(tf.zeros( self .batch_size, 1 ), tf.subtract( 1. , tf.multiply(y_raw, y))))
svm_loss = tf.add(hinge_loss, tf.multiply( self .svmc, l2_norm))
training_op = tf.train.adamoptimizer(learning_rate = self .learning_rate).minimize(svm_loss)
with tf.name_scope( 'eval' ):
#正确率和预测
prediction_class = tf.sign(y_raw)
correct_prediction = tf.equal(y, prediction_class)
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
init = tf.global_variables_initializer()
self ._x = x; self ._y = y
self ._loss = svm_loss; self ._training_op = training_op
self ._accuracy = accuracy; self .init = init
self ._prediction_class = prediction_class
self ._w = w; self ._b = b
def _get_model_params( self ):
#获取模型的参数,以便存储
with self ._graph.as_default():
gvars = tf.get_collection(tf.graphkeys.global_variables)
return {gvar.op.name: value for gvar, value in zip (gvars, self ._session.run(gvars))}
def _restore_model_params( self , model_params):
#保存模型的参数
gvar_names = list (model_params.keys())
assign_ops = {gvar_name: self ._graph.get_operation_by_name(gvar_name + '/assign' ) for gvar_name in gvar_names}
init_values = {gvar_name: assign_op.inputs[ 1 ] for gvar_name, assign_op in assign_ops.items()}
feed_dict = {init_values[gvar_name]: model_params[gvar_name] for gvar_name in gvar_names}
self ._session.run(assign_ops, feed_dict = feed_dict)
def fit( self , x, y, x_val = none, y_val = none):
#fit函数,注意要输入验证集
n_batches = x.shape[ 0 ] / / self .batch_size
self ._graph = tf.graph()
with self ._graph.as_default():
self ._build_graph(x, y)
best_loss = np.infty
best_accuracy = 0
best_params = none
checks_without_progress = 0
max_checks_without_progress = 20
self ._session = tf.session(graph = self ._graph)
with self ._session.as_default() as sess:
self .init.run()
for epoch in range ( self .training_epoch):
for batch_index in range (n_batches):
x_batch, y_batch = self .random_batch(x, y)
sess.run( self ._training_op, feed_dict = { self ._x:x_batch, self ._y:y_batch})
loss_val, accuracy_val = sess.run([ self ._loss, self ._accuracy], feed_dict = { self ._x: x_val, self ._y: y_val})
accuracy_train = self ._accuracy. eval (feed_dict = { self ._x: x_batch, self ._y: y_batch})
if loss_val < best_loss:
best_loss = loss_val
best_params = self ._get_model_params()
checks_without_progress = 0
else :
checks_without_progress + = 1
if checks_without_progress > max_checks_without_progress:
break
if accuracy_val > best_accuracy:
best_accuracy = accuracy_val
#best_params = self._get_model_params()
if epoch % self .display_step = = 0 :
print ( 'epoch: {}\tvalidaiton loss: {:.6f}\tvalidation accuracy: {:.4f}\ttraining accuracy: {:.4f}'
. format (epoch, loss_val, accuracy_val, accuracy_train))
print ( 'best accuracy: {:.4f}\tbest loss: {:.6f}' . format (best_accuracy, best_loss))
if best_params:
self ._restore_model_params(best_params)
self ._intercept = best_params[ 'trainable_variables/weights' ]
self ._bias = best_params[ 'trainable_variables/bias' ]
return self
def predict( self , x):
with self ._session.as_default() as sess:
return self ._prediction_class. eval (feed_dict = { self ._x: x})
def _intercept( self ):
return self ._intercept
def _bias( self ):
return self ._bias
|
实际运行效果如下(以iris数据集为样本):
画出决策边界来看看:
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。
原文链接:https://blog.csdn.net/sdoddyjm68/article/details/79392230