一: Hadoop Streaming详解
1、Streaming的作用
Hadoop Streaming框架,最大的好处是,让任何语言编写的map, reduce程序能够在hadoop集群上运行;map/reduce程序只要遵循从标准输入stdin读,写出到标准输出stdout即可
其次,容易进行单机调试,通过管道前后相接的方式就可以模拟streaming, 在本地完成map/reduce程序的调试
# cat inputfile | mapper | sort | reducer > output
最后,streaming框架还提供了作业提交时的丰富参数控制,直接通过streaming参数,而不需要使用java语言修改;很多mapreduce的高阶功能,都可以通过steaming参数的调整来完成
2、Streaming的局限
Streaming默认只能处理文本数据Textfile,对于二进制数据,比较好的方法是将二进制的key, value进行base64编码,转化为文本
Mapper和reducer的前后都要进行标准输入和标准输出的转化,涉及数据拷贝和解析,带来了一定的开销
3、Streaming命令的相关参数 (普通选项、streaming选项)
Streaming命令的形式如下:
# /usr/local/src/hadoop-1.2.1/bin/hadoop jar hadoop-streaming.jar \
[普通选项] [Streaming选项] # 注意:普通选项一定要写在streaming选项前面
普通选项
参数 |
可选/必选 |
解释 |
-conf 配置文件 |
可选 |
指定一个应用程序配置文件 |
-fs host:port or local |
可选 |
指定一个namenode |
-jt host:port or local |
可选 |
指定一个jobtracker |
-files 文件1,文件2, -files hdfs://192.168.179.100:9000/file1.txt, hdfs://192.168.179.100:9000/file2.txt 将代替-cacheFile选项 |
可选 |
类似-file, 不同的 1)将HDFS中的多个文件进行分发 2)文件已经位于HDFS上 3)框架会在该作业attemps目录内创建一个符号链接,指向该作业的jar目录(放置所有分发文件) |
-archives 框架会在作业的attempt目录创建符号链接,指向作业的jar目录,jar目录中才是分发到本地的压缩文件 -archives hdfs://host:fs_port/user/testfile.tgz#tgzdir testfile.tgz是用户上传到HDFS的打包压缩文件 #后的tgzdir是别名,hadoop-1.2.1中必须要别名 |
可选 |
逗号分隔的多个压缩文件,已经位于HDFS上 框架自动分发压缩文件到计算节点,并且Inputformat会自动进行解压 |
-D property=value |
可选 |
重点,很多属性通过-D指定 |
插曲1: mapred-site.xml 指定map的slot,reduce的slot
Map和reduce在datanode上的运行,会受到slot的限制,并且有各自的slot限制; 每个Datanode读取相应的配置文件, 从而确定每个datanode上能运行的最大map,reduce个数,以及节点能力是否充分发挥
Hadoop1.0中,slot在mapred-site.xml中配置(mapreduce作业前配置好), 基本上每个slot在运行1个map, reduce作业后会占用1个CPU core, 最激进的做法是设置map和reduce的slot都是CPU core-1 (Map执行完后才会进行reduce), 预留1个CPU core给tasktracker(比如上报心跳等), 但通常reducer的slot要比reducer少,考虑大多数情况下mapper要比reducer多
默认map的slot为2,reduce的slot也为2
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>http://192.168.179.100:9001</value>
</property>
<property>
<name>mapred.tasktracker.map.tasks.maximum</name>
<value>15</value>
</property>
<property>
<name>mapreduce.tasktracker.tasks.reduce.maximum</name>
<value>10</value>
</property>
</configuration>
插曲二: mapred-site.xml 指定map最终输出的merge文件的存放路径
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>http://192.168.179.100:9001</value>
</property>
<property>
<name>mapred.local.dir</name>
<value>/usr/loca/src/hadoop-1.2.1/tmp/mapoutput</value>
</property>
</configuration>
当1个作业被提交并在tasktracer的管理下开始运行时,会对每个job创建1个目录,所有分发的文件,都放置在这里
${mapred.local.dir}/taskTracker/$user/jobcache/$jobid/jars/
普通选项中的-D property=value
-D 普通选项,使用最多的高级参数,替代-jobconf(参数将被废弃),需要注意的是 -D选项要放在streaming参数的前面,一般我会放在参数的开头
类别 |
|||
指定目录 |
-D dfs.data.dir=/tmp |
修改本地临时目录 |
|
-D mapred.local.dir=/tmp/local -D mapred.system.dir=/tmp/system -D mapred.tmp.dir=/tmp/tmp |
指定额外的本地临时目录 |
||
指定作业名 |
-D mapred.job.name=”Test001” |
||
指定只有map的作业 |
-D mapred.reduce.tasks=0 |
该作业只有mapper, mapper的输出直接作为作业的输出 |
|
指定reducer个数 |
-D mapred.reduce.tasks=2 |
||
指定mapper个数 |
-D mapred.map.tasks=2 |
指定了不一定生效输入文件为压缩文件时,mapper和压缩文件个数一一对应, |
输入数据为压缩文件时,mapper和文件个数一一对应,比较好的控制Mapper数量的方法 |
指定Mapper输出的key,value分隔符 |
-D stream.map.output.field.separator=. -D stream.num.map.output.key.fields=4 |
Mapper的输出使用.做分割符,并且第4个.之前的部分作为key, 剩余的部分作为value (包含剩余的.) 如果mapper的输出没有4个., 则整体一行作为key, value为空 |
默认: 使用 \t做分隔符,第1个\t之前的部分作为key, 剩余为value, 如果mapper输出没有\t,则整体一行作为key,value为空 |
指定reducer输出的value, key分隔符 |
-D stream.reduce.output.field.seperator=. -D stream.num.reduce.output.key.fields=4 |
指定reduce输出根据.分割,直到第4个.之前的内容为key,其他为value |
Reducer程序要根据指定进行key,value的构造 |
不常用 |
-D stream.map.input.field.seperator |
Inputformat如何分行,默认\n |
|
不常用 |
-D stream.reduce.input.field.seperator |
||
作业优先级 |
-D mapred.job.priority=HIGH |
VERY_LOW, LOW, NORMAL, HIGH, VERY_HIGH |
|
最多同时运行的map任务数 |
-D mapred.job.map.capacity=5 |
||
最多同时运行的reduce任务数 |
-D mapred.job.reduce.capacity=3 |
||
Task没有响应(输入输出)的最大时间 |
-D mapred.task.timeout=6000 |
毫秒 |
超时后,该task被终止 |
Map的输出是否压缩 |
-D mapred.compress.map.output=True |
||
Map的输出的压缩方式 |
-D mapred.map.output.comression.codec= |
||
Reduce的输出是否压缩 |
-D mapred.output.compress=True |
||
Reducer的输出的压缩方式 |
-D mapred.output.compression.codec= |
-D 指定job名称
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-D mapred.job.name=”Test001”
-D 指定reduce任务、map任务个数
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -D mapred.job.name=”Teset001”
-D mapred.reduce.tasks=2 # reduce task个数,一定生效
-D mapred.map.tasks=5 # map task个数,不一定生效
-D 指定mapper的输出分隔符
-D stream.map.output.field.seperator=. # 指定mapper每条输出key,value分隔符
-D stream.num.map.output.key.fields=4 # 第4个.之前的部分为key,剩余为value
-D map.output.key.field.separator=. # 设置map输出中,Key内部的分隔符
-D 指定基于哪些key进行分桶
基于指定的Key进行分桶,打标签
指定列数
-D num.key.fields.for.partition=1 # 只用1列Key做分桶
-D num.key.fields.for.partition=2 # 使用1,2共两列key做分桶
指定某些字段做key
-D mapred.text.key.partitioner.option =-k1,2 # 第1,2列Key做分桶
-D mapred.text.key.partitioner.option =-k2,2 # 第2列key做分桶
都要修改partition为能够只基于某些Key进行分桶的类
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
-D 指定将reducer的输出进行压缩
-D mapred.output.compress=true -D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-D 指定将mapper的输出进行压缩
-D mapred.compress.map.output=true -D mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec
-D 指定Comparator对key进行数字、倒序排序
$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ -D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ # 使用keyFieldBasedComparator进行key排序 -D stream.map.output.field.separator=. \ -D stream.num.map.output.key.fields=4 \ -D map.output.key.field.separator=. \ -D mapred.text.key.comparator.options=-k2,2nr \ # -k2,2只用第二列排序,n数字排序,r倒序(从大到小) -input myInputDirs \ -output myOutputDir \ -mapper org.apache.hadoop.mapred.lib.IdentityMapper \ -reducer org.apache.hadoop.mapred.lib.IdentityReducer
-D 指定每个reduce task申请的内存数量
-D mapreduce.reduce.memory.mb=512 #单位为M
Streaming选项
参数 |
可选/必选 |
参数描述 |
-input <HDFS目录或文件路径> 支持*通配符,指定多个文件或目录,多次-input,指定多个输入文件/目录 |
必选 |
Mapper的输入数据,文件要在任务提交前手动上传到HDFS |
-output <HDFS目录> # 路径不能已存在,否则认为是其他job的输出 |
必选 |
reducer输出结果的HDFS存放路径, 不能已存在,但脚本中一定要配置 |
-mapper <可执行命令或java类> -mapper “python map.py” -mapper “bash map.sh” -mapper “perl map.perl” |
必选 |
Mapper程序 |
-reducer <可执行命令或java类> -reducer “python reducer.py” -reducer “bash reducer.sh” -reducer “perl reducer.sh” |
可选 |
Reducer程序,不需要reduce处理就不指定 |
-combiner <可执行命令或java类>
-combiner “python map.py” -combiner “bash map.sh” -combiner “perl map.perl” |
可选 |
处理mapper输出的combiner程序 |
-file <本地mapper、reducer程序文件、程序运行需要的其他文件>
-file map.py -file reduce.py -file white_list |
可选 文件在本地,小文件 |
将本地文件分发给计算节点
文件作为作业的一部分,一起被打包并提交,所有分发的文件最终会被放置在datanode该job的同一个专属目录下:jobcache/job_xxx/jar
|
-cacheFile “hdfs://master:9000/cachefile_dir/white_list” |
|
分发HDFS文件
Job运行需要的程序,辅助文件都先放到HDFS上,指定HDFS文件路径,将HDFS文件拷贝到计算节点,也是都放置在job的同一个专属目录下: jobcache/job_xxx/jar |
-cacheArchive
“hdfs://master:9000/w.tar.gz#WLDIR” |
|
分发HDFS压缩文件、压缩文件内部具有目录结构
|
-numReduceTasks <数字> -numReduceTasks 2 |
可选 |
指定该任务的reducer个数 |
-inputformat <Java类名> |
可选 |
指定自己定义的inputformat类,默认TextInputformat类 |
-outputformat <Java类名> |
可选 |
指定自己定义的outputformat类,默认TextOutputformat类 |
-cmdenv name=value |
可选 |
传递给streaming命令的环境变量 |
二、Mapper输入/输出,根据哪些key分桶,根据哪些key进行排序
先看看Hadoop-1.2.1 文档原文中的解释
As the mapper task runs, it converts its inputs into lines and feed the lines to the stdin of the process. In the meantime, the mapper collects the line oriented outputs from the stdout of the process and converts each line into a key/value pair, which is collected as the output of the mapper. By default, the prefix of a line up to the first tab character is the key and the rest of the line (excluding the tab character) will be the value. If there is no tab character in the line, then entire line is considered as key and the value is null. However, this can be customized, as discussed later.
Mapper输入:
每一个mapper开始运行时,输入文件会被转换成多行(TextInputformat根据\n来进行分行),并将每一行传递给stdin, 作为Mapper的输入, mapper直接对stdin中的每行内容做处理
Mapper输出分隔符:
默认情况下hadoop设置mapper输出的key, value通过tab进行分隔,可以重新指定
-D stream.map.output.field.seperator=. # 指定mapper每条输出key,value分隔符
-D stream.num.map.output.key.fields=4 # 第4个.之前的部分为key,剩余为value
mapper的输出会经历:
1、 partition前,根据mapper输出分隔符分离出Key和Value;
-D stream.map.output.field.separator=. # 指定mapper每条输出key,value分隔符
-D stream.num.map.output.key.fields=4 # 第4个.之前的为key, 剩下的为value
-D map.output.key.field.separator=. # 设置map输出中,Key内部的分隔符
2、 根据 “分桶分隔符”,确定哪些key被用来做partition(默认是用所有key, 只有1列; 或者是Mapper输出分隔符分离出的所有key都被用于Partition)
基于指定的Key进行分桶,打标签
指定列数
-D num.key.fields.for.partition=1 # 只用1列Key做分桶,也就是第一列
-D num.key.fields.for.partition=2 # 使用1,2共两列key做分桶(列数)
指定某些字段做key
-D mapred.text.key.partitioner.option =-k1,2 # 第1,2列Key做分桶
-D mapred.text.key.partitioner.option =-k2,2 # 第2列key做分桶
#都要修改partition为能够只基于某些Key进行分桶的类
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
3、Spill时根据Partition标签和所有Key进行排序;
4、Partition标签和key之间,也是通过mapper输出分隔符来隔离
5、reducer前的文件会删除partition标签,并根据Mapper输出分隔符确定出key, 进行Reducer前的归并排序;(reducer前的归并排序,基于所有mapper的key进行排序)
因此如果要定义新的Mapper输出分隔符就要做到:1)mapper代码中根据新分隔符来构建输出到stdout的内容;2)提交作业时,通过—D 指定新的Mapper输出分隔符,以及第几个分隔符来分离Key
Reducer的输入:
每个Reducer的每条输入,就是去除Partition标签(根据Mapper分隔符分离出partition标签)后的内容,和Mapper输出到stdout中的内容相同,但不同记录之间已经做了排序;因此如果重新指定了Mapper的输出分隔符,Reducer程序就要修改为根据新的Mapper输出分隔符来分离Key,value;
Reducer的输出:
Reducer的输出,默认也是根据tab来分离key,value, 这也是reducer程序要根据tab来组合key,value输出给stdout的原因; 当Reducer输出分隔符重新指定,Reducer程序中输出给stdout的内容也要配合新的分隔符来构造(Reducer->stdout-> outputformat ->file, outputformat根据reducer的输出分隔符来分离key,value, 并写入文件)
-D stream.reduce.output.field.seperator=. # reducer输出key,value间的分隔符
-D stream.num.reduce.output.key.fields=4 # 第4个.之前的内容为key, 其他为value