记得在代码顶部加上python的环境路径 (#!/usr/bin/python) linux环境下需要找到python解释器
1. chmod -R 777 x.py 给代码文件赋予权限
2. chown 用户(hadoop) x.py 给代码转换用户
3. chgrp 用户(hadoop) x.py 给代码转换所属组
4. vi x.py 打开代码 输入" : " 再输入 set ff=unix 改变py文件的保存方式,否则linux可能不识别windows的默认保存方式
5. Mapreduce 运行py文件命令:
(1)先找到 streaming的jar包 (貌似是用来转换其他语言为java代码) 路径在 $HADOOP_PATH/share/hadoop/tools/lib/hadoop-streaming-x.jar (目前高版本路径,老版本不是这个路径)
(2)输入命令:
hadoop jar hadoop-streaming-x.jar -file ....../map.py -mapper ....../map.py -file ....../reduce.py -reducer ....../reduce.py -input /.../data.txt (hdfs路径) -output /.../out
map reduce 实例代码:
map:
#!/usr/bin/python import sys for line in sys.stdin: line.strip() data = line.split('\t') # print(data) priarea = data[0] date = data[4] time = 15 * 96 user = data[3] print('%s %s %s %s 1'%(priarea,date,user,time))
reduce:
#!/usr/bin/python import sys out = {} for line in sys.stdin: line = line.strip() data = line.split(' ') # print(data) priarea = data[0] date = data[1] time = data[3] mykey = '%s_%s'%(priarea,date) # print(key) if mykey in out.keys(): out[mykey]['sum_time'] = float(out[mykey]['sum_time']) + float(time) out[mykey]['user_count'] = float(out[mykey]['user_count']) + 1 else: out[mykey] = {'sum_time':time,'user_count':1.0} for key in out.keys(): key_list = key.split('_') priarea = key_list[0] date = key_list[1] print('%s %s %s %s' % (priarea, date, out[key]['user_count'], out[key]['sum_time']))