mapreduce 跑python代码过程

时间:2022-05-25 18:22:52

记得在代码顶部加上python的环境路径  (#!/usr/bin/python)  linux环境下需要找到python解释器

1.  chmod -R 777 x.py      给代码文件赋予权限

2.  chown 用户(hadoop)  x.py  给代码转换用户

3.  chgrp  用户(hadoop)  x.py  给代码转换所属组

4.  vi  x.py  打开代码 输入" : "  再输入 set ff=unix  改变py文件的保存方式,否则linux可能不识别windows的默认保存方式

5.  Mapreduce 运行py文件命令:

  (1)先找到 streaming的jar包  (貌似是用来转换其他语言为java代码)  路径在 $HADOOP_PATH/share/hadoop/tools/lib/hadoop-streaming-x.jar  (目前高版本路径,老版本不是这个路径)

  (2)输入命令:

    hadoop jar hadoop-streaming-x.jar -file ....../map.py -mapper ....../map.py -file ....../reduce.py -reducer ....../reduce.py -input /.../data.txt (hdfs路径)  -output  /.../out

 map reduce 实例代码:


map:

#!/usr/bin/python
import sys
for line in sys.stdin:
    line.strip()
    data = line.split('\t')
    # print(data)
    priarea = data[0]
    date = data[4]
    time = 15 * 96
    user = data[3]
    print('%s %s %s %s 1'%(priarea,date,user,time))

reduce:

#!/usr/bin/python
import sys
out = {}

for line in sys.stdin:
    line = line.strip()
    data = line.split(' ')
    # print(data)
    priarea = data[0]
    date = data[1]
    time = data[3]
    mykey = '%s_%s'%(priarea,date)
    # print(key)
    if mykey in out.keys():
        out[mykey]['sum_time'] = float(out[mykey]['sum_time']) + float(time)
        out[mykey]['user_count'] = float(out[mykey]['user_count']) + 1
    else:
        out[mykey] = {'sum_time':time,'user_count':1.0}

for key in out.keys():
    key_list = key.split('_')
    priarea = key_list[0]
    date = key_list[1]
    print('%s %s %s %s' % (priarea, date, out[key]['user_count'], out[key]['sum_time']))