转载请注明出处:http://blog.csdn.net/l1028386804/article/details/79056120
一、环境准备
想了解如何使用原生Python编写MapReduce程序或者如何搭建Hadoop环境请参考博文《Python之——使用原生Python编写Hadoop MapReduce程序(基于Hadoop 2.5.2) 》的内容
Mrjob(http://pythonhosted.org/mrjob/index.html) 是一个编写MapRecuce任务的开源Python框架,它实际上对Hadoop Stream的命令进行了封装,因此让开发者接触不到Hadoop数据流命令行,使我们更轻松、快速编写MapReduce任务。Mrjob具有如下特点。
1)代码简介,map和reduce函数通过一个Python文件就可以搞定;
2)支持多步骤的MapReduce任务工作流;
3)支持多种运行方式,包括内嵌方式、本地环境、Hadoop、远程亚马逊;
4)支持亚马逊网络数据分析服务Elastic MapReduce(EMR);
5)调试方便,无需任务环境支持
安装Mrjob要求环境为Python 2.5及以上版本,源码下载地址为:https://github.com/yelp/mrjob
# pip install mrjob #pip安装方式 # python setup.py install #源码安装方式
二、利用Mrjob实现MapReduce
本实例同样实现统计文本文件(/usr/local/python/source/input.txt)中所有单词出现的词频,Mrjob通过,mapper()与reducer()方法实现了MR操作,具体代码如下:
【/usr/local/python/source/word_count.py】
# -*- coding:UTF-8 -*- ''' Created on 2018年1月14日 @author: liuyazhuang ''' from mrjob.job import MRJob class MRWordCounter(MRJob): def mapper(self, key, line): for word in line.split(): yield word, 1 def reducer(self, word, occurrences): yield word, sum(occurrences) if __name__ == '__main__': MRWordCounter.run()可以看出代码行数只是原生Python的1/3,逻辑也比较清晰,代码中包含了mapper、reducer函数。mapper函数接收每一行的输入数据,处理后返回一对key:value,初始化value为1;reducer接收mapper输出的key-value对进行整合,把相同key的value作累加操作后输出。Mrjob利用Python的yield机制将函数变成一个Generators(生成器),通过不断调用next()实现key-value的初始化或运算操作。
三、运行MapReduce
1、内嵌(-r inline)方式
特点是调试方便,启动单一进程模拟任务执行状态和结果,默认(-r inline)可以省略,输出文件使用 > output-file 或-o output-file,比如下面两种运行方式是等价的:
python word_count.py -r inline input.txt > output.txt python word_count.py input.txt > output.txt此时我们执行cat output.txt操作
[root@liuyazhuang121 source]# cat output.txt "test" 2 "welcome" 1 "where" 1 "xxx" 2 "aaa" 1 "ab" 1 "abc" 1 "adc" 1 "bar" 2 "bbb" 2 "xxyy" 1 "you" 1 "your" 1 "yyy" 2 "hello" 2 "home" 2 "iii" 2 "is" 1 "labs" 1 "liuyazhuang" 2 "lyz" 2 "bc" 1 "bec" 1 "by" 1 "ccc" 2 "hadoop" 2 "me" 1 "ooo" 2 "python" 2 "see" 1得出了正确结果。
2、本地(-r local)方式
用于本地模拟Hadoop调试,与内嵌(inline)方式的区别是启动了多进程执行每一个任务。如:
python word_count.py -r local input.txt > output1.txt此时我们cat output1.txt查看结果:
[root@liuyazhuang121 source]# cat output1.txt "test" 2 "welcome" 1 "where" 1 "xxx" 2 "aaa" 1 "ab" 1 "abc" 1 "adc" 1 "bar" 2 "bbb" 2 "xxyy" 1 "you" 1 "your" 1 "yyy" 2 "hello" 2 "home" 2 "iii" 2 "is" 1 "labs" 1 "liuyazhuang" 2 "lyz" 2 "bc" 1 "bec" 1 "by" 1 "ccc" 2 "hadoop" 2 "me" 1 "ooo" 2 "python" 2 "see" 1得出了正确结果。
3、Hadoop(-r hadoop)方式
用于hadoop环境,支持Hadoop运行调度控制参数,如:
1)指定Hadoop任务调度优先级(VERY_HIGH|HIGH),如:--jobconf mapreduce.job.priority=VERY_HIGH。
2)Map及Reduce任务个数限制,如:--jobconf mapreduce.map.tasks=2 --jobconf mapreduce.reduce.tasks=5
注意:执行之前需要配置Hadoop环境变量。
本例中我们依然使用Hadoop HDFS中的/user/root/word/input.txt文件,具体运行命令如下:
python word_count.py -r hadoop --jobconf mapreduce.job.priority=VERY_HIGH --jobconf mapreduce.map.tasks=2 --jobconf mapduce.reduce.tasks=1 -o hdfs://liuyazhuang121:9000/output/hadoop_word hdfs://liuyazhuang121:9000/user/root/word打印的结果如下:
[root@liuyazhuang121 source]# python word_count.py -r hadoop --jobconf mapreduce.job.priority=VERY_HIGH --jobconf mapreduce.map.tasks=2 --jobconf mapduce.reduce.tasks=1 -o hdfs://liuyazhuang121:9000/output/hadoop_word hdfs://liuyazhuang121:9000/user/root/word No configs found; falling back on auto-configuration No configs specified for hadoop runner Looking for hadoop binary in $PATH... Found hadoop binary: /usr/local/hadoop-2.5.2/bin/hadoop Using Hadoop version 2.5.2 Looking for Hadoop streaming jar in /usr/local/hadoop-2.5.2... Found Hadoop streaming jar: /usr/local/hadoop-2.5.2/share/hadoop/tools/lib/hadoop-streaming-2.5.2.jar Creating temp directory /tmp/word_count.root.20180114.050606.032324 Copying local files to hdfs:///user/root/tmp/mrjob/word_count.root.20180114.050606.032324/files/... Running step 1 of 1... packageJobJar: [/usr/local/hadoop-2.5.2/tmp/hadoop-unjar2522703497090634857/] [] /tmp/streamjob1355851303293562830.jar tmpDir=null Connecting to ResourceManager at liuyazhuang121/192.168.209.121:8032 Connecting to ResourceManager at liuyazhuang121/192.168.209.121:8032 Total input paths to process : 1 number of splits:2 Submitting tokens for job: job_1515893542122_0003 Submitted application application_1515893542122_0003 The url to track the job: http://liuyazhuang121:8088/proxy/application_1515893542122_0003/ Running job: job_1515893542122_0003 Job job_1515893542122_0003 running in uber mode : false map 0% reduce 0% map 33% reduce 0% map 100% reduce 0% map 100% reduce 100% Job job_1515893542122_0003 completed successfully Output directory: hdfs://liuyazhuang121:9000/output/hadoop_word Counters: 49 File Input Format Counters Bytes Read=323 File Output Format Counters Bytes Written=262 File System Counters FILE: Number of bytes read=486 FILE: Number of bytes written=305876 FILE: Number of large read operations=0 FILE: Number of read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=529 HDFS: Number of bytes written=262 HDFS: Number of large read operations=0 HDFS: Number of read operations=9 HDFS: Number of write operations=2 Job Counters Data-local map tasks=2 Launched map tasks=2 Launched reduce tasks=1 Total megabyte-seconds taken by all map tasks=23237632 Total megabyte-seconds taken by all reduce tasks=11787264 Total time spent by all map tasks (ms)=22693 Total time spent by all maps in occupied slots (ms)=22693 Total time spent by all reduce tasks (ms)=11511 Total time spent by all reduces in occupied slots (ms)=11511 Total vcore-seconds taken by all map tasks=22693 Total vcore-seconds taken by all reduce tasks=11511 Map-Reduce Framework CPU time spent (ms)=3150 Combine input records=0 Combine output records=0 Failed Shuffles=0 GC time elapsed (ms)=149 Input split bytes=206 Map input records=1 Map output bytes=392 Map output materialized bytes=492 Map output records=44 Merged Map outputs=2 Physical memory (bytes) snapshot=611057664 Reduce input groups=30 Reduce input records=44 Reduce output records=30 Reduce shuffle bytes=492 Shuffled Maps =2 Spilled Records=88 Total committed heap usage (bytes)=429916160 Virtual memory (bytes) snapshot=2661163008 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 Streaming final output from hdfs://liuyazhuang121:9000/output/hadoop_word... "aaa" 1 "ab" 1 "abc" 1 "adc" 1 "bar" 2 "bbb" 2 "bc" 1 "bec" 1 "by" 1 "ccc" 2 "hadoop" 2 "hello" 2 "home" 2 "iii" 2 "is" 1 "labs" 1 "liuyazhuang" 2 "lyz" 2 "me" 1 "ooo" 2 "python" 2 "see" 1 "test" 2 "welcome" 1 "where" 1 "xxx" 2 "xxyy" 1 "you" 1 "your" 1 "yyy" 2 Removing HDFS temp directory hdfs:///user/root/tmp/mrjob/word_count.root.20180114.050606.032324... Removing temp directory /tmp/word_count.root.20180114.050606.032324...结果显示,打印出了每个单词的频次。此时我们输入命令:
hadoop fs -ls /output/hadoop_word查看生成的文件如下:
[root@liuyazhuang121 source]# hadoop fs -ls /output/hadoop_word Found 2 items -rw-r--r-- 1 root supergroup 0 2018-01-14 13:06 /output/hadoop_word/_SUCCESS -rw-r--r-- 1 root supergroup 262 2018-01-14 13:06 /output/hadoop_word/part-00000此时,我们输入命令:
hadoop fs -cat /output/hadoop_word/part-00000查看输出的结果:
[root@liuyazhuang121 source]# hadoop fs -cat /output/hadoop_word/part-00000 "aaa" 1 "ab" 1 "abc" 1 "adc" 1 "bar" 2 "bbb" 2 "bc" 1 "bec" 1 "by" 1 "ccc" 2 "hadoop" 2 "hello" 2 "home" 2 "iii" 2 "is" 1 "labs" 1 "liuyazhuang" 2 "lyz" 2 "me" 1 "ooo" 2 "python" 2 "see" 1 "test" 2 "welcome" 1 "where" 1 "xxx" 2 "xxyy" 1 "you" 1 "your" 1 "yyy" 2我们可以看出,输出了正确的结果。