【AI数学原理】概率机器学习(四):半朴素贝叶斯之TAN算法实例
import re as pattern
import sys
import math
if len() != 4:
train = open('lymph_train.arff')
test = open('lymph_test.arff')
mode = 't'
else:
train = open([1])
test = open([2])
mode = [3]
# name list of different features : ['A1', 'A2', 'A3', 'A4', 'A5', 'A8', 'A14', 'A15']
feature_names = []
# value lists for different features : len(value)=len(type), each inner list includes all value for one feature
# [ [..], [..], [..], [..]', [..], [..], [..], [..] ]
feature_values = []
# value list for different labels : ['+', '-']
class_list = []
# list of training data (features)
train_data = []
# list of training data (labels)
train_data_label = []
# list of testing data (features)
test_data = []
# list of testing data (labels)
test_data_label = []
# calculate CMI between i feature and j feature, traverse all possible values and classes
# return CMI value between feature[I] and feature[J]
def calculate_conditional_mutual_information(I, J):
# list for all possible values within a certain feature
XI = feature_values[I]
XJ = feature_values[J]
CMI = 0.0
# get the list of combination of XI, XJ and all classes
XI_XJ_Class = list([])
for k in range(len(train_data)):
XI_XJ_Class.append([train_data[k][I], train_data[k][J], train_data_label[k]])
# get the list of [XI], [XJ], [XI,XJ]
for single_class in class_list:
# extract column with same class and certain feature
feature_is_XI = extract_feature_col(train_data_label, single_class, train_data, I, None)
feature_is_XJ = extract_feature_col(train_data_label, single_class, train_data, J, None)
feature_is_XIXJ = extract_feature_col(train_data_label, single_class, train_data, I, J)
for i in range(len(XI)):
# single value of XI
xi = XI[i]
for j in range(len(XJ)):
# single value of XJ
xj = XJ[j]
# calculate conditional possibility of xi, xj, or xi,xj , given class y
# need to match xi or xj value with all train data within same class
possibility_xi_given_y = laplace_estimate_possibility(feature_is_XI, [xi], len(feature_values[I]))
possibility_xj_given_y = laplace_estimate_possibility(feature_is_XJ, [xj], len(feature_values[J]))
possibility_xixj_given_y = laplace_estimate_possibility(feature_is_XIXJ, [xi, xj], len(feature_values[I]) * len(feature_values[J]))
possibility_xixjy = laplace_estimate_possibility(XI_XJ_Class, [xi, xj, single_class], len(feature_values[I]) * len(feature_values[J]) * len(class_list))
CMI = CMI + possibility_xixjy * (possibility_xixj_given_y / (possibility_xi_given_y * possibility_xj_given_y), 2)
return CMI
# extract train_data (only the columns of index_of_feature) by checking whether train_data_label == goal_class_value
# return 1 column or 2 columns which has the same class
def extract_feature_col(class__train, goal_class_value, feature__train, index1_of_feature, index2_of_feature):
col_of_certain_feature = list([])
# traverse all train data
for i in range(len(class__train)):
if class__train[i] == goal_class_value:
tem = list([])
# row: i, column: I, in train data
(feature__train[i][index1_of_feature])
if index2_of_feature is not None:
(feature__train[i][index2_of_feature])
# make a list in a list: [ [XI or XJ or [XI,XJ], ... , [XI or XJ or [XI,XJ] ]
col_of_certain_feature.append(tem)
return col_of_certain_feature
# in the same class, each xi or xj value's ratio/possibility;
# return a possibility value
def laplace_estimate_possibility(feature_column_with_same_class, feature_value, amount_of_values_combination):
num = 0
for i in range(len(feature_column_with_same_class)):
if feature_column_with_same_class[i] == feature_value:
num += 1
# numerator + 1 to avoid 0; corresponds to denominator add the amount of all value combination
return float(num + 1) / (len(feature_column_with_same_class) + amount_of_values_combination)
# find a maximum search tree using the edges existed; finally, each node has and has only one parent
# return a matrix (graph) which represents the maximum search tree (row is parent, column is child)
def prims_algorithm(edges, graph):
all_candidates = set(range(0, len(feature_names)))
# To root the maximal weight spanning tree, pick the first attribute (index=0) in the input file as the root
parent_candidates = set()
parent_candidates.add(0)
child_candidates = set(range(1, len(feature_names)))
parent_child_list = list([])
# If there are ties in selecting maximum weight edges, use the following preference criteria:
# 1. Prefer edges emanating from attributes listed earlier in the input file.
while parent_candidates != all_candidates:
current_max = float('-inf')
parent = None
child = None
for i in parent_candidates:
for j in child_candidates:
# 2. If there are multiple maximal weight edges emanating from the first such attribute,
# prefer edges going to attributes listed earlier in the input file.
if edges[i][j] <= current_max:
pass
elif edges[i][j] > current_max:
parent = i
child = j
current_max = edges[i][j]
parent_child_list.append([parent, child])
parent_candidates.add(child)
child_candidates.remove(child)
# Finally, in parent_child_list, each child [i][1] will appear only once,no repetition, totally len(features)-1
# (exclusively 0, namely the root)
for i in range(len(parent_child_list)):
graph[parent_child_list[i][0]][parent_child_list[i][1]] = True
# print(parent_child_list)
# return conditional probability p(child_node_value | class_node_value, parent_node_value),
# given class_value and parent_value
def conditional_probability(child_node_value, class_node_value, parent_node_value, parent_feature_index, child_feature_index):
# extract the column of child_feature_index from the segment which has parent_node_value and class_node_value
child_column = list([])
for i in range(len(train_data)):
if train_data[i][parent_feature_index] == parent_node_value:
if train_data_label[i] == class_node_value:
child_column.append(train_data[i][child_feature_index])
# child_column has all values of feature[child_feature_index]
num = 0
for i in range(len(child_column)):
if child_column[i] == child_node_value:
num += 1
# numerator + 1 to avoid 0; corresponds to denominator add the amount of all value combination
return float(num + 1) / (len(child_column) + len(feature_values[child_feature_index]))
# return a dictionary includes all cases for all XI, XJ, class values (traverse 3 "for")
# key order is child_value_index + class_value_index + parent_value_index
def create_dictionary_of_conditional_probability(parent_feature_index, child_feature_index):
parent_feature = feature_values[parent_feature_index]
child_feature = feature_values[child_feature_index]
dictionary = {}
for i in range(len(parent_feature)):
for j in range(len(child_feature)):
for k in range(len(class_list)):
# p(child_value | class_value, parent_value)
key = str(j) + str(k) + str(i)
dictionary[key] = conditional_probability(child_feature[j], class_list[k], parent_feature[i], parent_feature_index, child_feature_index)
return dictionary
# return p( X | class_list[class_value_index] ), X is determined by a_row_test_data
def prior_probability(graph, dictionaries, a_row_test_data, class_value_index):
single_prior_probability = list([])
chain_rule_prior_probability = 1.0
for child in range(len(a_row_test_data)):
parent_value_index = None
child_value_index = None
# find current child_feature's parent_feature and their value_index
for parent in range(len(a_row_test_data)):
if graph[parent][child] == 1:
# find parent_value_index using parent_value in a_row_test_data
for parent_value_index in range(len(feature_values[parent])):
if feature_values[parent][parent_value_index] == a_row_test_data[parent]:
break
# find child_value_index using child_value in a_row_test_data
for child_value_index in range(len(feature_values[child])):
if feature_values[child][child_value_index] == a_row_test_data[child]:
break
break
if child == 0: # root
# find the child_value_index inside the root feature, using the a_row_test_data[child]
for child_value_index in range(len(feature_values[child])):
if feature_values[child][child_value_index] == a_row_test_data[child]:
break
# p( x0 | class_list[class_value_index])
single_prior_probability.append(dictionaries[child][str(child_value_index) + str(class_value_index)])
else: # other nodes
# p( xj | class_list[class_value_index], x_parent )
single_prior_probability.append(dictionaries[child][str(child_value_index) + str(class_value_index) + str(parent_value_index)])
for i in range(len(single_prior_probability)):
chain_rule_prior_probability = chain_rule_prior_probability * single_prior_probability[i]
return chain_rule_prior_probability
# binary classification problem
def TAN_predict(graph, dictionaries, a_row_test_data):
probability_list = list([]) # for all class values
sum = 0.0
# class_value_index is i
for i in range(len(class_list)):
# p(yi)
probability_of_yi = laplace_estimate_possibility(train_data_label, class_list[i], len(class_list))
# compute prior probability p( X | yi )
probability_of_feature_i = prior_probability(graph, dictionaries, a_row_test_data, i)
probability_list.append(probability_of_yi * probability_of_feature_i)
# the sum of all probability_list[i] is not 1 !!!
sum += probability_list[i]
max_probability = float('-inf')
y_predict_index = 0
for i in range(len(probability_list)):
# now, the sum of all probability_list[i] is 1
# now, probability_list[i] is posterior probability p( y_predict | test_data[i] )
probability_list[i] = probability_list[i] / sum
if probability_list[i] > max_probability:
max_probability = probability_list[i]
y_predict_index = i
return [class_list[y_predict_index], max_probability]
# compute edges' weights -> find maximum spanning tree (graph) ->
# according to the graph, construct a dictionary includes all combinations' conditional probability (dictionaries) ->
# classify test_data using Bayes Net chain rules (y_predict and probability_is_predict)
def TAN():
# weights' matrix ( k by k, k is the length of features)
edges = list([])
for i in range(len(feature_names)):
([])
for j in range(len(feature_names)):
edges[i].append(0)
# calculate_edges(weights)
for i in range(len(feature_names)):
for j in range(i + 1, len(feature_names)):
edges[i][j] = calculate_conditional_mutual_information(i, j)
edges[j][i] = edges[i][j]
# find maximum weight spanning tree (MST)
graph = list([])
for i in range(len(feature_names)):
([])
for j in range(len(feature_names)):
graph[i].append(0)
prims_algorithm(edges, graph)
# print(graph)
# output the maximum weight spanning tree with edge directions
# after prims_algorithm, graph matrix's each column should have and have only one True
# (each node have only one parent, except the root)
for j in range(len(graph)):
if j == 0: # the root
print('1: %s class' % feature_names[j])
else: # other nodes
for i in range(len(graph)):
if graph[i][j] is True:
# output child first and then parent
print('2: %s %s class' % (feature_names[j], feature_names[i]))
break
print('')
# construct a big dictionary to store all possible conditional probability according to "graph" structure
length = len(feature_names)
dictionaries = list([{}] * length)
for j in range(len(graph)): # child_feature_index
if j == 0: # the root
# root: p( x0 | all y )
dictionary = {}
for i in range(len(feature_values[j])): # root feature's all values
for k in range(len(class_list)):
# certain class_value
feature_is_xroot = extract_feature_col(train_data_label, class_list[k], train_data, j, None)
# get the ratio of certain feature_value in the column above
# p ( xroot_value | class_value)
dictionary[str(i) + str(k)] = laplace_estimate_possibility(feature_is_xroot, [feature_values[j][i]], len(feature_values[j]))
dictionaries[j] = dictionary
else: # other nodes
for i in range(len(graph)): # parent_feature_index
if graph[i][j] is True:
dictionaries[j] = create_dictionary_of_conditional_probability(i, j)
# print(dictionaries)
# output: (i) the predicted class, (ii) the actual class, (iii) and the posterior probability of the predicted class
correct = 0
result = list([])
for i in range(len(test_data)):
[y_predict, probability_is_predict] = TAN_predict(graph, dictionaries, test_data[i])
dummy = [y_predict, probability_is_predict]
(dummy)
for i in range(len(result)):
if result[i][0] == test_data_label[i]:
correct += 1
print('3: %s %s %.12f' % (result[i][0], test_data_label[i], result[i][1]))
print('')
print(correct)
# comopute p(xj|class_value) -> p(X|class_value) -> p(X|class_value)*p(class_value) (nominator) ->
# sum all p(X|class_value)*p(class_value) for all class_value (denominator) ->
# p(class_value|X) (the probability of "prediction is class_value") ->
# find max probability for all class_value, namely prediction label
def naive_bayes():
correct = 0
probability_is_class_i = list([])
for i in range(len(feature_names)):
print(feature_names[i] + ' class')
print('')
# pre-set memory
for t in range(len(test_data)):
probability_is_class_i.append([])
# get p(Y|X) for each y of each test samples
for t in range(len(test_data)):
sample_data = test_data[t]
# get p(Y|X) for each y
for i in range(len(class_list)):
# p(X|class_value) = p(x1|class_value) * p(x2|class_value) * ... * p(xj|class_value)
probability_X_given_class_value = 1.0
for k in range(len(feature_names)):
temp_list = extract_feature_col(train_data_label, class_list[i], train_data, k, None)
probability_X_given_class_value *= laplace_estimate_possibility(temp_list, [sample_data[k]], len(feature_values[k]))
# p(class_value) * p(X | class_value)
probability_class_value = laplace_estimate_possibility(train_data_label, class_list[i], len(class_list))
probability_is_class_i[t].append(probability_class_value * probability_X_given_class_value)
# find max probability as prediction and match it with ture label
for t in range(len(test_data)):
denominator = 0.0
max_probability = 0.0
index_of_class_value_prediction = 0
# get bayes rule's denominator
for i in range(len(class_list)):
denominator += float(probability_is_class_i[t][i])
# p(class_list[i] | X)
for i in range(len(class_list)):
probability_is_class_i[t][i] = probability_is_class_i[t][i] / denominator
if probability_is_class_i[t][i] > max_probability:
max_probability = probability_is_class_i[t][i]
index_of_class_value_prediction = i
# match prediction with true label
if class_list[index_of_class_value_prediction] == test_data_label[t]:
correct += 1
print('%s %s %.12f' % (class_list[index_of_class_value_prediction], test_data_label[t], max_probability))
print('')
print(correct)
begin_data = False
for line in train:
if ('@data', line) != []:
begin_data = True
elif ('@attribute', line) != []:
line = (' ')
line = ('\n')
line = ('\r')
line = (' ')
line = (None, 2)
line[1] = line[1].replace(' ', '')
line[1] = line[1].replace('\'', '')
line[2] = line[2].replace(' ', '')
line[2] = line[2].replace('\'', '')
line[2] = line[2].strip('{')
line[2] = line[2].strip('}')
line[2] = line[2].split(',')
if line[1] != 'class':
feature_names.append(line[1])
feature_values.append(line[2])
else:
class_list = line[2]
elif begin_data is True:
line = ('\n')
line = ('\r')
line = (' ', '')
line = ('\'', '')
line = (',')
temp = []
for i in range(0, len(line) - 1):
(line[i])
train_data.append(temp)
train_data_label.append(line[len(line) - 1])
else:
pass
begin_data = False
for line in test:
if ('@data', line) != []:
begin_data = True
elif begin_data is True:
line = ('\n')
line = ('\r')
line = (' ', '')
line = ('\'', '')
line = (',')
temp = []
for i in range(0, len(line) - 1):
(line[i])
test_data.append(temp)
test_data_label.append(line[len(line) - 1])
else:
pass
if mode == 'n':
naive_bayes()
elif mode == 't':
TAN()
else:
pass