shape into blocks--source code in python based on pySpark

时间:2021-04-05 03:53:17

这是微博深度和广度预测的原始代码,写了大约半个月,第一个版本不是这样的,但是这个版本包含所有需要的功能。

模块化的程度也更高。找工作前一直想用python完美解决这个问题,后来发现自己的方法和硬件都有很大的局限。

算是我的第一次正儿八经的尝试在分布式计算的框架下,计算海量的数据。

意识到很多问题,影响我面试时候很多的代码风格。

def get_basic_info():
win_path = "E:/spark/weibo_predict/"
linux_path = "/home/jason/spark/weibo_predict/"
path = linux_path train_path = path + 'train/'
test_path = path + 'test/'
code_path = path + 'source_code/' print('\n训练准备文件保存路径px:%s' % train_path)
print('\n测试准备文件保存路径py:%s' % test_path)
print('\n代码准备文件保存路径pz:%s' % code_path) train_weibo_raw_path = path + "train_weibo_raw.txt"
train_weibo_repost_path = path + "train_weibo_repost_back.txt" test_weibo_raw_path = path + "test_weibo_raw.txt"
test_weibo_repost_path = path + "test_weibo_repost.txt" user_relations_path = path + "user_relations_back.txt" print("\n训练原始微博地址p1:%s" % train_weibo_raw_path)
print("训练转发微博地址p2:%s" % train_weibo_repost_path)
print("\n测试原始微博地址p3:%s" % test_weibo_raw_path)
print("测试转发微博地址p4:%s" % test_weibo_repost_path)
print("\n 用户关系地址p5:%s" % user_relations_path)
return train_path,test_path,code_path,train_weibo_raw_path,train_weibo_repost_path,test_weibo_raw_path,test_weibo_repost_path,user_relations_path
#传递  训练(原始微博,转发微博) 或者 测试(原始微博,转发微博)
#返回化简后的对应关系repost_id_line_time_reduce
#返回微博id对应的用户idwid_uid_rdd
from pyspark import SparkContext
def get_prime_rdd(train_or_test,sc, p1,p2,p3,p4):
if train_or_test == 'train':
inside_path_a = p1
inside_path_b = p2
elif train_or_test == 'test':
inside_path_a = p3
inside_path_b = p4
else:
print("only input train or test")
return 0,0 sc = sc
train_weibo_raw_data = sc.textFile(inside_path_a)
train_weibo_raw_data_count = train_weibo_raw_data.count()
train_weibo_raw_data_rdd = train_weibo_raw_data.map(lambda x: x.split("\001"))
w_id=train_weibo_raw_data_rdd.map(lambda x:x[0])
u_id=train_weibo_raw_data_rdd.map(lambda x:x[1])
wid_uid_rdd = w_id.zip(u_id) train_weibo_repost_data = sc.textFile(inside_path_b)
train_weibo_repost_data_count = train_weibo_repost_data.count()
train_weibo_repost_data_rdd = train_weibo_repost_data.map(lambda x: x.split("\001"))
repost_id = train_weibo_repost_data_rdd.map(lambda x: x[0])
repost_line_time = train_weibo_repost_data_rdd.map(lambda x: x[1:-1])
repost_id_line_time = repost_id.zip(repost_line_time)
repost_id_line_time_reduce = repost_id_line_time.groupByKey().mapValues(list) repost_id_line_time_reduce = repost_id_line_time_reduce.subtractByKey(repost_id_line_time_reduce.subtractByKey(wid_uid_rdd))
wid_uid_rdd = wid_uid_rdd.subtractByKey(wid_uid_rdd.subtractByKey(repost_id_line_time_reduce)) return repost_id_line_time_reduce,wid_uid_rdd
def get_uid_fnum_rdd(sc,p5):
sc = sc
user_relations_data = sc.textFile(p5)
user_relations_data_count = user_relations_data.count()
user_relations_data_rdd_1 = user_relations_data.map(lambda x: x.split("\t")[0])
user_relations_data_rdd_2 = user_relations_data.map(lambda x: x.split("\t")[1])
user_relations_data_rdd_user = user_relations_data_rdd_1
user_relations_data_rdd_fans = user_relations_data_rdd_2.map(lambda x: x.split("\x01"))
user_fans = user_relations_data_rdd_user.zip(user_relations_data_rdd_fans)
fans_nums = user_relations_data_rdd_fans.map(lambda s:len(s))
uid_fnum_rdd = user_fans.keys().zip(fans_nums)
return uid_fnum_rdd
##版本 2  分时间段计算指定时间段的转发量
def cal_times_j(list,j):
ct = 0
for i in range(len(list)):
#if int(list[i][-1]) >= j*900 and int(list[i][-1]) <= (j+1)*900:
#这里可以切换求累计的转发量还是区间的转发量
if int(list[i][-1]) <= (j)*900:
ct += 1
return ct
def cal_id_times_j(rdd,j):
times = rdd.values().map(lambda x: cal_times_j(x,j))
rdd = rdd.keys().zip(times)
return rdd def generate_times_file(rdd,k,path):
for j in range(k-1,k+1):
import csv
a_path = str(path) + 'wid_times/wid_times_'+str(j)+'.csv'
#print(path)
out_file_train_times_j = open(a_path,'w')
writer = csv.writer(out_file_train_times_j);
zhuanfa = cal_id_times_j(rdd,j+1)
for lists in zhuanfa.collect():
writer.writerow(lists)
out_file_train_times_j.close()
#计算深度
#定义函数,计算出指定阶段的,发生过的转发关系
def cal_during(list,j):
new_list=[]
for i in range(len(list)):
if int(list[i][-1]) <= j*900:
new_list.append(list[i])
return new_list #定义函数,计算一个rdd中,指定阶段,发生过的转发关系
def cal_rdd_during(rdd,j):
return rdd.map(lambda x: cal_during(x,j)) #定义函数,如果一个转发关系的尾部,是另外一个转发关系的头,那么久把这个头的尾部,加到这个转发关系的尾部
def add_deep(list):
kkk = len(list)
if kkk<=1:
return list
else:
for i in range(kkk):
for j in range(kkk):
if list[i][-1] == list[j][0]:
list[i].append(list[j][-1])
return list #定义函数返回序列中的数组的最长的值,作为最大的深度
def max_deep(list):
max=2
if len(list)==0:
return 0
else:
for i in range(len(list)):
max = (len(list[i]) if len(list[i])> max else max)
return max-1 #定义函数,取出其中的两列
def ti_qu(list):
for i in range(len(list)):
list[i] = list[i][:-1]
return list def cal_cal(all_in_one_rdd, j):
id_rdd = all_in_one_rdd.keys() #获取ID的RDD
line_time_rdd = all_in_one_rdd.values() #获取转发关系和转发时间对应的RDD
line_time_rdd_j = cal_rdd_during(line_time_rdd,j) #指定时间段,获取这个时间段发生过的转发和时间组成的RDD
line_rdd_j = line_time_rdd_j.map(lambda x : ti_qu(x))#提取转发关系
line_rdd_j_extend = line_rdd_j.map(lambda x: add_deep(x))#延长转发关系
line_rdd_j_extend_maxdeep = line_rdd_j_extend.map(lambda x:max_deep(x))#计算最大深度
id_deep_rdd_j = id_rdd.zip(line_rdd_j_extend_maxdeep)#组合微博ID与深度
return id_deep_rdd_j def generate_deeps_file(rdd,k,path):
import csv
for j in range(k-1,k+1):
b_path = str(path) + 'wid_deeps/wid_deeps_'+str(j)+'.csv'
#print(path)
out_file_train_deeps_j = open(b_path,'w')
writer = csv.writer(out_file_train_deeps_j);
shendu = cal_cal(rdd,j+1)
for lists in shendu.collect():
writer.writerow(lists)
out_file_train_deeps_j.close()
def get_wid_fnum_rdd(uid_fnum_rdd,wid_uid_rdd,path):
#print("用户和粉丝个数的对应关系,取出来一个看看:")
#print(uid_fnum_rdd.take(3))
#print(uid_fnum_rdd.count())
#print("\n训练原始约减微博的id和发送微博的人的id的对应rdd:")
#print(wid_uid_rdd.take(3))
#print(wid_uid_rdd.count())
uid_wid_rdd = wid_uid_rdd.values().zip(wid_uid_rdd.keys())
uid__wid_fnum = uid_wid_rdd.leftOuterJoin(uid_fnum_rdd)
wid_fnum_rdd = uid__wid_fnum.values().map(lambda x: x[0]).zip(uid__wid_fnum.values().map(lambda x: x[1]))
#print(wid_fnum_rdd.take(2))
#print(wid_fnum_rdd.count())
import csv
c_path = str(path) + 'wid_fnum_file.csv'
wid_fnum_file = open(c_path,"w")
writer = csv.writer(wid_fnum_file);
for lists in wid_fnum_rdd.collect():
writer.writerow(lists);
wid_fnum_file.close() return wid_fnum_rdd
#定义函数,将列表数组扁平化
def add_flat(list):
if list==None:
return 0
else:
kkk = len(list)
list0 = list[0]
for i in range(kkk):
if i==0:
pass
else:
list0 = list0.append(list[i])
return list0 #定义函数,计算覆盖用户数目
def clac_cover(list):
total_cover=0
for i in range(len(list)):
total_cover += cover_value(list[i])
return total_cover #定义函数,计算某个用户的粉丝数:
def cover_value(user):
'''
try:
return uid_fnum_dict[user]
except:
return 0
'''
for i in range(len(list_uid_fnum)):
if user == list_uid_fnum[i][0]:
return list_uid_fnum[i][1]
else:
return 0
def flatmapvalues(x):
return x def cal_sum(x):
sum = 0
if x==None and len(x)==0:
return sum
else:
for i in range(len(x)):
if x[i]== None:
pass
else:
sum += int(x[i])
return sum def fans_cover_till_j(all_in_one_rdd,j):
id_rdd = all_in_one_rdd.keys() #获取微博ID的RDD
line_time_rdd = all_in_one_rdd.values() #获取转发关系和转发时间对应的RDD
line_time_rdd_j = cal_rdd_during(line_time_rdd,j) #指定时间段,获取这个时间段发生过的转发和时间组成的RDD
#print("\n指定时间段,获取这个时间段发生过的转发和时间组成的RDD");print(line_time_rdd_j.first())
line_rdd_j = line_time_rdd_j.map(lambda x : ti_qu(x))#提取转发关系
#print("\n提取转发关系");print(line_rdd_j.first()) #line_rdd_j.flatMap(lambda x: re.sub(r'\D'," ",x).split())
#line_rdd_j_flat = line_rdd_j.map(lambda x: add_flat(x))#扁平化转发关系,不行
import re
line_rdd_j_flat = line_rdd_j.map(lambda x: re.sub(r'\D'," ",str(x)).split())#扁平化转发关系
#print("\n提取扁平化的转发关系");print(line_rdd_j_flat.first()) line_rdd_j_flat_disc = line_rdd_j_flat.map(lambda x:list(set(list(x)))) #扁平化之后约减重复的用户ID
#print("\n看看去重之后的转发用户");print(line_rdd_j_flat_disc.first()) fans_cover_rdd_j = id_rdd.zip(line_rdd_j_flat_disc)
#print("\n看看去重之后的微博ID和转发用户");print(fans_cover_rdd_j.first()) fans_cover_rdd_j = fans_cover_rdd_j.flatMapValues(flatmapvalues)
#print("\n看看去重之后的微博ID和转发用户,一对一flatmap之后");print(fans_cover_rdd_j.first()) fans_cover_rdd_j = fans_cover_rdd_j.values().zip(fans_cover_rdd_j.keys())
#print("\n翻转id和用户");print(fans_cover_rdd_j.first()) fans_cover_rdd_j = fans_cover_rdd_j.leftOuterJoin(uid_fnum_rdd).values()
#print("\n得到用户id_(微博ID,粉丝)");print(fans_cover_rdd_j.first())
#print(fans_cover_rdd_j.count()) fans_cover_rdd_j = fans_cover_rdd_j.map(lambda x: x[0]).zip(fans_cover_rdd_j.map(lambda x:x[1]))
#print("\n得微博id_粉丝");print(fans_cover_rdd_j.first())
#print(fans_cover_rdd_j.count()) fans_cover_rdd_j = fans_cover_rdd_j.groupByKey().mapValues(list)
#print("\n组合,");print(fans_cover_rdd_j.first())
#print(fans_cover_rdd_j.count()) fans_cover_rdd_j = fans_cover_rdd_j.keys().zip(fans_cover_rdd_j.values().map(lambda x: cal_sum(x)))
#print("\nmap求和");print(fans_cover_rdd_j.first()) #cover_rdd = line_rdd_j_flat_disc.map(lambda x: clac_cover(x))
#fans_cover_rdd_j = id_rdd.zip(cover_rdd)#组合微博ID与覆盖数目
#print(id_deep_rdd_j.first())
#return line_rdd_j_extend_maxdeep
temp_key_0 = all_in_one_rdd.keys().zip(all_in_one_rdd.values().map(lambda x: 0)) fans_cover_rdd_j = temp_key_0.leftOuterJoin(fans_cover_rdd_j)
fans_cover_rdd_j = fans_cover_rdd_j.keys().zip(fans_cover_rdd_j.values().map(lambda x: cal_sum(x))) return fans_cover_rdd_j def generate_covers_file(rdd,k,path):
#按理说没问题
import csv
for j in range(k-1,k+1):
c_path = str(path) + 'wid_covers/wid_covers_'+str(j)+'.csv'
#print(c_path)
out_file_train_covers_j = open(c_path,'w')
writer = csv.writer(out_file_train_covers_j)
covers = fans_cover_till_j(rdd,j+1)
for lists in covers.collect():
writer.writerow(lists)
out_file_train_covers_j.close()
px,py,pz,p1,p2,p3,p4,p5 = get_basic_info()
uid_fnum_rdd = get_uid_fnum_rdd(sc,p5)
train_repost_id_line_time_reduce, train_wid_uid_rdd = get_prime_rdd('train',sc,p1,p2,p3,p4)
#wid_fnum_rdd = get_wid_fnum_rdd(uid_fnum_rdd,train_wid_uid_rdd,px)
#generate_times_file(train_repost_id_line_time_reduce,292,px)
#generate_deeps_file(train_repost_id_line_time_reduce,292,px)
#generate_covers_file(train_repost_id_line_time_reduce,292,px) test_repost_id_line_time_reduce, test_wid_uid_rdd = get_prime_rdd('test',sc,p1,p2,p3,p4)
#test_wid_fnum_rdd = get_wid_fnum_rdd(uid_fnum_rdd,test_wid_uid_rdd,py)
#generate_times_file(test_repost_id_line_time_reduce,16,py)
#generate_deeps_file(test_repost_id_line_time_reduce,16,py)
#generate_covers_file(test_repost_id_line_time_reduce,16,py)
from pyspark.mllib.regression import LabeledPoint
import numpy as np
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.ml.linalg import Vectors
from pyspark.ml.linalg import SparseVector,DenseVector #获取用户ID和粉丝数的对比
def get_wid_fnum_rdd(path):
path = path+ 'wid_fnum_file'+'.csv'
wid_fnum_rdd = sc.textFile(path)
wid_fnum_rdd = wid_fnum_rdd.map(lambda x:x.split(","))
wid_fnum_rdd = wid_fnum_rdd.map(lambda x:x[0]).zip(wid_fnum_rdd.map(lambda x:x[1]))
wid_fnum_rdd = wid_fnum_rdd.sortByKey()
return wid_fnum_rdd def add_keys(rdd1):
rdd1 = rdd1
#path = '/home/jason/spark/weibo_predict/predicts/times_time_data_'+str(15)+'.txt'
#rdd1 = sc.textFile(path)
rdd2 = sc.textFile('/home/jason/spark/weibo_predict/test/wid_times/wid_times_0.csv')
rdd2 = rdd2.map(lambda x:x.split(',')[0]).zip(rdd2.map(lambda x:x.split(',')[1]))
rdd2 = rdd2.sortByKey()
rdd1 = rdd1.zipWithIndex()
rdd1 = rdd1.values().zip(rdd1.keys())
rdd2 = rdd2.keys().zipWithIndex()
rdd2 = rdd2.values().zip(rdd2.keys())
rdd = rdd2.join(rdd1)
rdd = rdd.values()
rdd = rdd.map(lambda x: x[0]).zip(rdd.map(lambda x: x[1]))
return rdd #获取其他三个需要的参数
def get_wid_x(j,path,times_or_deeps_or_covers):
if times_or_deeps_or_covers == 'times':
if path == px:
path = str(path) + 'wid_times/wid_times_'+str(j)+'.csv'
elif path ==py:
if j>=0 and j<15:
path = str(path) + 'wid_times/wid_times_'+str(j)+'.csv'
elif j>=15 and j<=291:
path = '/home/jason/spark/weibo_predict/predicts/times_time_data_'+str(j)+'.txt'
rdd1 = sc.textFile(path)
rdd = add_keys(rdd1)
return rdd
elif times_or_deeps_or_covers == 'deeps':
if path == px:
path = str(path) + 'wid_deeps/wid_deeps_'+str(j)+'.csv'
elif path ==py:
if j>=0 and j<15:
path = str(path) + 'wid_deeps/wid_deeps_'+str(j)+'.csv'
elif j>=15 and j<=291:
path = '/home/jason/spark/weibo_predict/predicts/deeps_time_data_'+str(j)+'.txt'
rdd1 = sc.textFile(path)
rdd = add_keys(rdd1)
return rdd
elif times_or_deeps_or_covers == 'covers':
if path == px:
path = str(path) + 'wid_covers/wid_covers_'+str(j)+'.csv'
elif path ==py:
if j>=0 and j<15:
path = str(path) + 'wid_covers/wid_covers_'+str(j)+'.csv'
elif j>=15 and j<=291:
path = '/home/jason/spark/weibo_predict/predicts/covers_time_data_'+str(j)+'.txt'
rdd1 = sc.textFile(path)
rdd = add_keys(rdd1)
return rdd
else:
print('wrong input about times_or_deeps_or_covers')
return 0
rdd = sc.textFile(path)
rdd = rdd.map(lambda x:x.split(","))
rdd = rdd.map(lambda x:x[0]).zip(rdd.map(lambda x:x[1]))
rdd = rdd.sortByKey()
return rdd #将两个RDDjoin返回一个rdd的函数
def my_join(rdd1,rdd2):
import re
rdd = rdd1.join(rdd2).keys().zip(rdd1.join(rdd2).values().map(lambda x:re.sub(r'\D'," ",str(x)).split()))
return rdd #根据rdd的元素制作lib_svm格式文件
def lib_svm(x):
str1 = str(x[0] + ' ')
for i in range(len(x)):
if i == 0:
pass
else:
str1 += str(str(i) + ":" +str(x[i])+ ' ')
return str1 #生成测试或者训练需要的数据
def generate_train_or_test_data(path,j,times_or_deeps):
if times_or_deeps == 'times':
if path == px:
data_path = str(px) + 'train_data/times_train_data_'+str(j)+'.txt'
wid_times_rdd = get_wid_x(j+1,path,'times')
elif path == py:
data_path = str(py) + 'test_data/times_test_data_'+str(j)+'.txt'
wid_times_rdd = get_wid_x(j,path,'times')
#print(wid_times_rdd.count())
else:
return 0
wid_fnum_rdd = get_wid_fnum_rdd(path)
wid_deeps_rdd = get_wid_x(j,path,'deeps')
wid_covers_rdd = get_wid_x(j,path,'covers')
#wid_covers_rdd = wid_covers_rdd.keys().zip(wid_covers_rdd.values().map(lambda x:float(x)/1000))
records = my_join(wid_times_rdd,wid_fnum_rdd)
records = my_join(records,wid_deeps_rdd)
records = my_join(records,wid_covers_rdd)
records = records.sortByKey()
#print('看看训练集合中的keys()的顺序-------------------------------------------')
#print(records.keys().take(10))
records = records.values()
data = records.map(lambda x:lib_svm(x))
open_data_path = open(data_path,'w')
for lines in data.collect():
open_data_path.write(lines)
open_data_path.write('\n')
elif times_or_deeps == 'deeps':
if path == px:
data_path = str(px) + 'train_data/deeps_train_data_'+str(j)+'.txt'
elif path == py:
data_path = str(py) + 'test_data/deeps_test_data_'+str(j)+'.txt'
else:
return 0
wid_fnum_rdd = get_wid_fnum_rdd(path)
if path == py:
wid_deeps_rdd = get_wid_x(j,path,'deeps')
else:
wid_deeps_rdd = get_wid_x(j+1,path,'deeps')
wid_times_rdd = get_wid_x(j,path,'times')
wid_deeps_rdd = get_wid_x(j,path,'deeps')
wid_covers_rdd = get_wid_x(j,path,'covers')
#wid_covers_rdd = wid_covers_rdd.keys().zip(wid_covers_rdd.values().map(lambda x:float(x)/1000))
records = my_join(wid_deeps_rdd,wid_fnum_rdd)
records = my_join(records,wid_times_rdd)
records = my_join(records,wid_covers_rdd)
records = records.values()
data = records.map(lambda x:lib_svm(x))
open_data_path = open(data_path,'w')
for lines in data.collect():
open_data_path.write(lines)
open_data_path.write('\n')
open_data_path.close()
elif times_or_deeps == 'covers':
if path == px:
data_path = str(px) + 'train_data/covers_train_data_'+str(j)+'.txt'
elif path == py:
data_path = str(py) + 'test_data/covers_test_data_'+str(j)+'.txt'
else:
return 0
wid_fnum_rdd = get_wid_fnum_rdd(path)
if path == py:
wid_covers_rdd = get_wid_x(j,path,'covers')
else:
wid_covers_rdd = get_wid_x(j+1,path,'covers')
#wid_covers_rdd = wid_covers_rdd.keys().zip(wid_covers_rdd.values().map(lambda x:float(x)/1000))
wid_times_rdd = get_wid_x(j,path,'times')
wid_deeps_rdd = get_wid_x(j,path,'deeps') records = my_join(wid_covers_rdd,wid_fnum_rdd)
records = my_join(records,wid_times_rdd)
records = my_join(records,wid_deeps_rdd)
records = records.values()
data = records.map(lambda x:lib_svm(x))
open_data_path = open(data_path,'w')
for lines in data.collect():
open_data_path.write(lines)
open_data_path.write('\n')
open_data_path.close()
else:
return 0 #生成指定时段的预测结果
def generate_test_predict(j,times_or_deeps):
if times_or_deeps == 'times':
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
tr_path = '/home/jason/spark/weibo_predict/train/train_data/'+'times_train_data_'+str(j)+'.txt'
te_path = '/home/jason/spark/weibo_predict/test/test_data/'+'times_test_data_'+str(j)+'.txt'
train_data = MLUtils.loadLibSVMFile(sc,tr_path)
test_data = MLUtils.loadLibSVMFile(sc,te_path)
model = RandomForest.trainRegressor(train_data, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto",
impurity='variance', maxDepth=4, maxBins=32,seed=42)
predictions = model.predict(test_data.map(lambda x: x.features))
pre_path = '/home/jason/spark/weibo_predict/predicts/'+'times_time_data_'+str(j+1)+'.txt'
times_predict = open(pre_path,'w')
for lines in predictions.collect():
times_predict.write(str(int(lines)))
times_predict.write('\n')
times_predict.close()
elif times_or_deeps == 'deeps':
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
tr_path = '/home/jason/spark/weibo_predict/train/train_data/'+'deeps_train_data_'+str(j)+'.txt'
te_path = '/home/jason/spark/weibo_predict/test/test_data/'+'deeps_test_data_'+str(j)+'.txt'
train_data = MLUtils.loadLibSVMFile(sc,tr_path)
test_data = MLUtils.loadLibSVMFile(sc,te_path)
model = RandomForest.trainRegressor(train_data, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto",
impurity='variance', maxDepth=4, maxBins=32,seed=42)
predictions = model.predict(test_data.map(lambda x: x.features))
pre_path = '/home/jason/spark/weibo_predict/predicts/'+'deeps_time_data_'+str(j+1)+'.txt'
times_predict = open(pre_path,'w')
for lines in predictions.collect():
times_predict.write(str(int(lines)))
times_predict.write('\n')
times_predict.close()
elif times_or_deeps == 'covers':
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
tr_path = '/home/jason/spark/weibo_predict/train/train_data/'+'covers_train_data_'+str(j)+'.txt'
te_path = '/home/jason/spark/weibo_predict/test/test_data/'+'covers_test_data_'+str(j)+'.txt'
train_data = MLUtils.loadLibSVMFile(sc,tr_path)
test_data = MLUtils.loadLibSVMFile(sc,te_path)
model = RandomForest.trainRegressor(train_data, categoricalFeaturesInfo={},
numTrees=3, featureSubsetStrategy="auto",
impurity='variance', maxDepth=4, maxBins=32,seed=42)
predictions = model.predict(test_data.map(lambda x: x.features))
pre_path = '/home/jason/spark/weibo_predict/predicts/'+'covers_time_data_'+str(j+1)+'.txt'
times_predict = open(pre_path,'w')
for lines in predictions.collect():
times_predict.write(str(int(lines)))
times_predict.write('\n')
times_predict.close() def generate_test_data_beyond15(j):
path = '/home/jason/spark/weibo_predict/predicts/'+'time_data_'+str(j)+'.txt'
rdd2 = sc.textFile(path)
rdd1 = get_wid_fnum_rdd(py).keys()
rdd = rdd1.zip(rdd2)
return rdd
def add_keys(rdd1):
rdd1 = rdd1
#path = '/home/jason/spark/weibo_predict/predicts/times_time_data_'+str(15)+'.txt'
#rdd1 = sc.textFile(path)
rdd2 = sc.textFile('/home/jason/spark/weibo_predict/test/wid_times/wid_times_0.csv')
rdd2 = rdd2.map(lambda x:x.split(',')[0]).zip(rdd2.map(lambda x:x.split(',')[1]))
rdd2 = rdd2.sortByKey()
rdd1 = rdd1.zipWithIndex()
rdd1 = rdd1.values().zip(rdd1.keys())
rdd2 = rdd2.keys().zipWithIndex()
rdd2 = rdd2.values().zip(rdd2.keys())
rdd = rdd2.join(rdd1)
rdd = rdd.values()
rdd = rdd.map(lambda x: x[0]).zip(rdd.map(lambda x: x[1]))
return rdd
for i in range(15):
generate_train_or_test_data(px,i,'times')
generate_train_or_test_data(py,i,'times')
generate_test_predict(i,'times')
generate_train_or_test_data(px,i,'deeps')
generate_train_or_test_data(py,i,'deeps')
generate_test_predict(i,'deeps')
generate_train_or_test_data(px,i,'covers')
generate_train_or_test_data(py,i,'covers')
generate_test_predict(i,'covers')
for i in range(15,292):
print(i)
generate_train_or_test_data(px,i,'times')
generate_train_or_test_data(py,i,'times')
generate_test_predict(i,'times')
generate_train_or_test_data(px,i,'deeps')
generate_train_or_test_data(py,i,'deeps')
generate_test_predict(i,'deeps')
generate_train_or_test_data(px,i,'covers')
generate_train_or_test_data(py,i,'covers')
generate_test_predict(i,'covers')
generate_train_or_test_data(px,291,'times')
generate_train_or_test_data(py,291,'times')
generate_test_predict(291,'times')
generate_train_or_test_data(px,291,'deeps')
generate_train_or_test_data(py,291,'deeps')
generate_test_predict(291,'deeps')
generate_train_or_test_data(px,291,'covers')
generate_train_or_test_data(py,291,'covers')
generate_test_predict(291,'covers')
#组团搞出来最后的文件

rdd1 = sc.textFile('/home/jason/spark/weibo_predict/predicts/times_time_data_'+str(1)+'.txt')
rdd1 = add_keys(rdd1)
for j in range(4,292):
j = j+1
if j==1:
pass
else:
rdd2 = sc.textFile('/home/jason/spark/weibo_predict/predicts/times_time_data_'+str(j)+'.txt')
rdd2 = add_keys(rdd2)
rdd1 = my_join(rdd1,rdd2) for j in range(4,292):
j=j+1
rdd3 = sc.textFile('/home/jason/spark/weibo_predict/predicts/deeps_time_data_'+str(j)+'.txt')
rdd3 = add_keys(rdd3)
rdd1 = my_join(rdd1,rdd3) def add_head(x):
str1 = 'testWeibo'
str1 = str1+str(x)
return str1 import re
rdd1 = rdd1.map(lambda x: re.sub(r'\D'," ",str(x)).split())
rdd1 = rdd1.sortBy(lambda x: int(x[0]))
rdd1 = rdd1.map(lambda x:x[0]).zip(rdd1.map(lambda x:x[1:]))
rdd1_key = rdd1.keys().map(lambda x:add_head(x))
rdd1 = rdd1_key.zip(rdd1.values())
rdd1 = rdd1.map(lambda x: re.sub(r'\D'," ",str(x)).split()) import csv
path = '/home/jason/spark/weibo_predict/'
end_path = str(path) + 'end_of_end.csv'
end_f = open(end_path,'w')
writer = csv.writer(end_f)
for lists in rdd1.collect():
writer.writerow(lists)
end_f.close()
a=','
s1 = ['scaleT'+str((i+1)*15) for i in range(4,292)]
s1 = a.join(s1)
s2 = ['depthT'+str((i+1)*15) for i in range(4,292)]
s2 = a.join(s2)
s3 = 'WeiboID (Time Unit: Minutes)'+a+s1+s2
#print(s3)
end_path_2 = '/home/jason/spark/weibo_predict/end_of_end.csv'
end_path_1 = '/home/jason/spark/weibo_predict/end_of_end_.csv'
rdd = sc.textFile(end_path_2)
rdd = rdd.map(lambda x:add_head(x))
end_ff = open(end_path_1,'w')
end_ff.write(s3)
end_ff.write('\n')
for lists in rdd.collect():
end_ff.write(lists)
end_ff.write('\n')
end_ff.close()