HADOOP
hadoop的概念
Hadoop是一个由Apache基金会所开发的分布式系统基础架构。用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。
它主要有以下几个优点:
- 高可靠性:Hadoop按位存储和处理数据的能力值得人们信赖。
- 高扩展性:Hadoop是在可用的计算机集簇间分配数据并完成计算任务的,这些集簇可以方便地扩展到数以千计的节点中。
- 高效性:Hadoop能够在节点之间动态地移动数据,并保证各个节点的动态平衡,因此处理速度非常快。
- 高容错性:Hadoop能够自动保存数据的多个副本,并且能够自动将失败的任务重新分配。
- 低成本:与一体机、商用数据仓库以及QlikView、Yonghong Z-Suite等数据集市相比,hadoop是开源的,项目的软件成本因此会大大降低。
hadoop的三大核心四大模块:
hdfs:分布式文件系统
yarn:分布式资源调度系统
mapreduce:分布式运算框架
common:支持其他Hadoop模块的通用实用程序。
HDFS:
HDFS具有主从架构。HDFS集群由单个NameNode组成,这是一个主服务器,用于管理文件系统命名空间并调节客户端对文件的访问。此外,还有许多DataNodes,通常是集群中的每个节点,它们管理连接到运行它们的节点的存储。HDFS公开文件系统命名空间,并允许将用户数据存储在文件中。在内部,一个文件被分割成一个或多个块,这些块存储在一组DataNodes中。NameNode执行文件系统命名空间操作,比如打开、关闭和重命名文件和目录。它还确定块到DataNodes的映射。DataNodes负责服务来自文件系统客户端的读写请求。DataNodes还对NameNode的指令执行块创建、删除和复制。
HDFS的设计思想:
1)大文件拆成块,分在多台机器上存储(解决处理数据时的IO瓶颈)
2)块均匀分布(解决负载均衡问题)
HDFS优点
(1)高容错性:数据自动保存多个副本,副本丢失后,会自动恢复。
(2)适合批处理:移动计算而非数据、数据位置暴露给计算框架。
(3)适合大数据处理:GB、TB、甚至PB级数据、百万规模以上的文件数量,1000以上节点规模。
(4)流式文件访问:一次性写入,多次读取;保证数据一致性。
(5)可构建在廉价机器上:通过多副本提高可靠性,提供了容错和恢复机制。
HDFS缺点
(1)低延迟数据访问:比如毫秒级、低延迟与高吞吐率。
(2)小文件存取:占用NameNode大量内存,寻道时间超过读取时间。
(3)并发写入、文件随机修改:一个文件只能有一个写者,仅支持append
HDFS的可靠性策略:
1)文件完整性
在文件建立时,每个数据块都产生校验和,校验和会保存在.meta文件内;
客户端获取数据时可以检查校验和是否相同,从而发现数据块是否损坏;如果正在读取的数据块损坏,则可以继续读取其它副本。NameNode标记该块已经损坏,然后复制block达到预期设置的文件备份数;DataNode 在其文件创建后三周验证其checksum.
2)网络或者机器失效
1)副本冗余
2)机架感知策略(副本放置策略)
3)心跳机制策略
3)namenode挂掉
(1)主备切换(高可用)
(2)镜像文件和操作日志磁盘存储
(3)镜像文件和操作日志可以存储多份,多磁盘存储
4)其他机制
(1)快照(和虚拟机快照意义相同,保存了系统某一时刻的影像,可以还原到该时刻)
(2)回收站机制
(3)安全模式
HDFS的优缺点
HDFS优点:
(1)高容错性:数据自动保存多个副本,副本丢失后,会自动恢复。
(2)适合批处理:移动计算而非数据、数据位置暴露给计算框架。
(3)适合大数据处理:GB、TB、甚至PB级数据、百万规模以上的文件数量,1000以上节点规模。
(4)流式文件访问:一次性写入,多次读取;保证数据一致性。
(5)可构建在廉价机器上:通过多副本提高可靠性,提供了容错和恢复机制。
HDFS缺点:
(1)低延迟数据访问:比如毫秒级、低延迟与高吞吐率。
(2)小文件存取:占用NameNode大量内存,寻道时间超过读取时间。
(3)并发写入、文件随机修改:一个文件只能有一个写者,仅支持append
**HDFS不适合存储小文件,如果生成场景中还必须将这些小文件进行存储(比如,每天产生的日志,数据量很小,但是必须存储)
HDFS天生就是为存储大文件而生的,一个块的元数据大小大概在150byte左右,存储一个小文件就要占用150byte的内存,如果存储大量的小文件,很快就将内存耗尽,而整个集群存储的数据量很小,失去了HDFS的意义****可以将数据合并上传,或者将文件append形式追加在HDFS文件末尾。
集群的搭建
hdfs shell
HDFS读写流程
(1)写数据流程
1).客户端发出请求 hdfs dfs -put /etc/profile /qf/data
2).namenode查看维护的目录结构,检查/qf/data是否存在,如不存在直接报错”no such file or directory“
如存在返回给客户端同意上传文件请求,将操作写入日志文件
3).客户端请求上传第一个块,询问namenode块的存储位置
4).namenode查看自己的datanode池,返回给客户端一个datanode列表
5).客户端发出请求建立pipeline
6).客户端先把文件写入缓存,达到一个块的大小时,会在客户端和第一个datanode建立连接开始流式的传输数据,这个datanode会一小部分一小部分(4K)的接收数据然后写入本地仓库,同时会把这些数据传输到第二个datanode,第二个datanode也同样一小部分一小部分的接收数据并写入本地仓库,同时传输给第三个datanode(在流式复制时,逐级传输和响应采用响应队列来等待传输结果。队列响应完成后返回给客户端)
7).第一个数据块传输完成后会使用同样的方式传输下面的数据块直到整个文件上传完成。
8).整个文件完成,namenode更新内存元数据
(2)读数据流程
1)客户端向namenode发起RPC调用,请求读取文件数据。
2)namenode检查文件是否存在,如果存在则获取文件的元信息(blockid以及对应的datanode列表)。
3)客户端收到元信息后选取一个网络距离最近的datanode,依次请求读取每个数据块。客户端首先要校检文件是否损坏,如果损坏,客户端会选取另外的datanode请求。
4)datanode与客户端建立socket连接,传输对应的数据块,客户端收到数据缓存到本地,之后写入文件。
5)依次传输剩下的数据块,直到整个文件合并完成。
拓展:hdfs的模拟实现 —
namenode的工作机制 – namenode的职责 --namenode的启动流程
namenode的工作机制:
- 第一次启动namenode格式化后,创建fsimage和edits文件,如果不是第一次启动,直接加载编辑日志和镜像文件到内存。
- 客户端对元数据镜像增删改的请求
- namenode记录操作日志,更新滚动日志
- namenode在内存中对数据镜像增删改查
namenode的职责:
(1)维护目录结构
(2)管理元数据
(3)响应客户端请求
namenode的启动过程:
(1)加载镜像文件,还原了checkpoint时间节点前的元数据(包含目录结构,文件大小,块的大小,块的id等等信息),不包含块的存储位置
(2)加载edits文件,还原了checkpoint时间节点到集群停止的元数据,不包含块的存储位置。(至此namenode还原的元数据唯一缺失的就是块的存储位置)
(3)blockreport阶段,datanode启动,向namendoe汇报自己所管理的块及块的id,namenode根据块的ID还原块的存储位置
(4)在blockreport结束后,集群会判断,datanode的启动数量(可设置,默认为0),丢失的块的占比(可设置,默认0.999f)
是否满足退出安装模式的条件,如果满足,30秒后退出安全模式。在安全模式下namenode会删除多余的块(副本数为3,结果实际存储4个。ps:这种情况发生在datanode宕机,集群把宕机的datanode管理的块进行了复制,而宕机的datanode又重新启动了)还会复制低于副本数的块。
什么情况下会进入安全模式,安全模式的解决办法?
块的数量低于阀值,datanode启动数量不够都会进入安全模式
(1)调低阀值
hdfs-site.xml中
<name>dfs.namenode.safemode.threshold-pct</name>
<value>0.999f</value>
(2)强制离开
hdfs dfsadmin -safemode leave
(3)重新格式化集群
(4)修复损坏的块文件
secondary namenode的工作机制 – checkpoint的操作流程
(日志合并的步骤)
Secondary NameNode工作机制 :协助namenode进行日志合并
- Secondary NameNode询问namenode是否需要checkpoint。直接带回namenode是否检查结果
- Secondary NameNode请求执行checkpoint
- namenode滚动正在写的edits日志
- 将滚动前的编辑日志和镜像文件拷贝到Secondary NameNode
- Secondary NameNode加载编辑日志和镜像文件到内存,开始合并。
- 生成新的镜像文件fsimage.chkpoint
- 拷贝fsimage.chkpoint到namenode
- nodenode将fsimage.chkpoint重新命名成fsimage
datanode的工作机制 – datanode的职责
datanode的工作职责:
(1)负责块的存储
(2)定期向namenode报告块的存储情况
datanode宕机后,集群能否立即将宕机的datanode下线,datanode下线后,集群将进行什么工作?
不能,10分30秒,将复制下线的datanode 管理的块将一个集群重新格式化namenode后,使用start-dfs.sh启动 集群,datanode能启动起来么?为什么?
不能,namendoe重新格式化后,clusterid改变了,而datanode还保存着原来的clusterid。
zookeeper:分布式协调服务
应用场景:
服务器动态上下线感知
主从协调
配置管理
分布式共享锁
zookeeper提供的功能:
1、提供用户的注册、查询功能
2、提供注册监听器的功能
3、通过心跳动态感知用户的状态
阐述zookeeper分布式锁的实现原理
在zk上创建永久节点server,所有要访问资源的客户端在永久节点server**册临时有序节点,并且监听自己前一个节点。
***最小的获得锁可以访问资源,访问结束,断开连接,注册的临时节点被删除,他的下一个节点通过监听能够知道,
此时节点***变为最小,获取到了锁,可以访问资源。
选举机制:
1、初始化状态的选举,Id大的为老大
2、运行过程中的选举,依据数据版本、逻辑时钟、Id大
选举机制: 5台服务器集群: 只要半数节点存活,就能正常对外提供服务
1、集群初始化时–Id大的为老大
服务器1启动,广播自己的选票,处于looking状态
服务器2 启动,第一次选举都是自己选自己,各1票,再次投票,1给2,2给自己,判断是否半数存活,处于looking状态
服务器3启动,第一次选举,2有2票,3有1票,再次投票,3获得3票
服务器4和5启动,之后就不用再次选举了
2、集群运行一段时间之后的选举
选举的标准就变为:数据版本、逻辑时钟、ID号大
优先选择数据版本新的
选择逻辑时钟大的,逻辑时钟小的投票结果会被忽略,重新投票
统一逻辑时钟,选举ID号为大的胜出
数据ID相同的情况下,Leader的ID号大的胜出
3、节点(Znode)类型
- 短暂(ephemeral)(断开连接自己删除)
- 持久(persistent)(断开连接不删除)
四种形式:
- PERSISTENT
- PERSISTENT_SEQUENTIAL(持久序列/test0000000019 )
- EPHEMERAL
- EPHEMERAL_SEQUENTIAL
序列化的节点,每一个目录下重新生成一批序号,非序列化的虽然没有生成序号,但依旧占用一个序号。
5、监听器
监听器的有效时间就是一次数据的改变时间
watch监听只能触发一次
问题:如何建立持久的监听?
高可用机制:
原理:两个改变
1、日志文件存储方式发生了改变
2、namenode的状态切换 zkfc
注意:脑裂 brainsplit
yarn:分布式资源调度系统
1、resourcemanager:
resourceManager的工作职责:
- 负责接收client任务请求
- 接收和监控NodeManager的资源分配与调度
- 启动和监控ApplititionMaster
2、nodemanager:
NodeManager的工作职责:
负责节点上的资源管理,启动Container运行Task,上报节点资源使用情况、Container运行状况给ApplicationMAster.
3、container:
container是yarn中的资源抽象,主要负责对任务运行环境的抽象,描述一系列信息:任务运行资源(节点、内存、cpu)、任务启动命令和任务运行环境。yarn会为每个任务分配一个Container,且每个任务只能使用分派的Container中描述的资源。
4、appMaster:
负责单个Application作业的任务(Task)管理和调度,向Resourcemanager申请资源,向Nodemanager发出启动Container的指令,接收Nodemanager的Task处理状态信息。
job提交流程
1.客户端提交作业给resourcemanager
2.resourcemanager返回给客户端jobid,存储路径(HDFS上的路径)信息
3.客户端将job.jar、job.xml、job.split、job.splitinfo等信息上传到存储路径(HDFS上的路径/)
4.客户端通知resourcemanager可以启动job
5.resourcemanager将job加入到job队列当中,等待nodemanager领取job,然后nodemanager启动container,job.jar、job.xml、job.split、job.splitinfo等信息下载到container内,将客户端发出命令启动AppMaster
6.AppMaster计算资源,向resourcemanager请求maptask的资源
7.resourcemanager分配资源(container),下载job.jar到container内,AppMaster启动maptask(yarnchild)
8.maptask执行完成,通知AppMaster,然后释放maptask资源,AppMaster向resourcemanager申请reducetask的资源
9.resourcemanager分配资源(container),下载job.jar到container内,AppMaster启动reducetask(yarnchild)
10.reducetask执行完成,通知AppMaster,然后释放reducetask资源。AppMaster通知resourcemanager。AppMaster释放资源。
mapreduce:分布式运算框架
mr的三大核心问题
mr的概略图
mr的运行全流程
MR运行流程:
一个MR程序启动时,最先启动MRAPPMaster,MRAPPMaster启动后根据本次的job的描述信息,计算出需要的mapTask实例数量,然后向集群申请机器启动相应数量的mapTask进程
1、通过客户指定的inputformat来获取RecordReader(三个方法:nextkeyvalue、getcurrentkey、getcurrentvalue)读取数据,形成输入kv对
2、将输入的kv对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对传到OutputCollector(数据收集器)
3、通过环形缓冲区将缓存中的KV对按照K分区排序(分区且区内有序)后不断溢写到文件(多个文件),归并排序生成一个大文件
4、当有一个mapTask运行完成,reducetask启动,reducetask到运行完成mapTask的机器上拉取属于自己分区的数据Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序(多个mapTask进程启动后MRAPPMaster会根据客户指定的参数启动相应数量的reducetask进程)
5、在调用reduce()方法时:
(1)、框架reducetask先从合并后的文件中读取一个key传递给reduce方法,同时传一个value迭代器
(2)、value迭代器的hasnext方法会判断文件中的下一个key是否还是已传入的key,如果是,则next可以返回下一个value,否则,hasnext直接返回false,导致本次reduce方法调用结束
6、然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储
MR的shuffle流程
(1).maptask执行,收集maptask的输出数据,将数据写入环形缓冲区中,记录起始偏移量
(2).环形缓冲区默认大小为100M,当数据达到80M时,记录终止偏移量。
(3).将数据进行分区(默认分组根据key的hash值%reduce数量进行分区),分区内进行快速排序
(4).分区,排序结束后,将数据刷写到磁盘(这个过程中,maptask输出的数据写入剩余20%环形缓冲区,同样需要记录起始偏移量)
(5).maptask结束后将形成的多个小文件做归并排序合并成一个大文件
(6).当有一个maptask执行完成后,reducetask启动
(7).reducetask到运行完成maptask的机器上拉取属于自己分区的数据
(8).reducetask将拉取过来的数据“分组”,每组数据调用一次reduce()方法
(9).执行reduce逻辑,将结果输出到文件
如何控制MapTask数量,如何控制ReduceTask数量
分片机制如下,一个分片对应一个map,可调整客户端的块大小,minSize,maxSize改变map数量
minSize默认值是1,maxSize默认是long的最大值
protected long computeSplitSize(long blockSize, long minSize,long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));}
ReduceTask数量是我们通过程序设置的,具体数量根据数据map任务的个数,以及实际集群的情况等综合因素设置(根据业务场景)
分片与分块的区别?
分片是逻辑概念,分片有冗余
分块是物理概念,是将数据拆分,无冗余
现块的大小为128M,现在有一文件大小为260M,进行spilt的时候,会被分成几片
2片,1.1的冗余
用户可干预组件及机制原理
八大用户可干预组件:
- InputFormat
- RecordReader
- hashPartitioner
- key.compareTo
- combiner
- GroupingComparator:(TopN)欺骗reducetask的迭代器,让他认为只要orderID相同,则两个对象相等
- OutputFormat
- RecordWriter
列举MR中可干预的组件 (详细说明各组件的原理,ps:combine)
- combine:相当于在map端(每个maptask生成的文件)做了一次reduce
- partition:分区,默认根据key的hash值%reduce的数量,自定义分区是继承Partitioner类,重写getPartition()分区方法。自定义分区可以有效的解决数据倾斜的问题
- group:分组,继承WritableComparator类,重写compare()方法,自定义分组(就是定义reduce输入的数据分组规则)
- sort:排序,继承WritableComparator类,重写compare()方法,根据自定义的排序方法,将reduce的输出结果进行排序
- 分片:可调整客户端的blocksize,minSize,maxSize
案例
流量日志统计
技术点: 自定义javaBean用来在mapreduce中充当value
注意: javaBean要实现Writable接口,实现两个方法(序列化、反序列化)
统计流量且按照流量大小倒序排序
技术点:这种需求,用一个mapreduce -job 不好实现,需要两个mapreduce -job
第一个job负责流量统计,跟上题相同
第二个job读入第一个job的输出,然后做排序,要将flowBean作为map的key输出,这样mapreduce就会自动排序,此时,flowBean要实现接口WritableComparable,要实现其中的compareTo()方法,方法中,我们可以定义倒序比较的逻辑
统计流量且按照手机号的归属地,将结果数据输出到不同的省份文件中
技术点:自定义Partitioner
自定义partition后,要根据自定义partitioner的逻辑设置相应数量的reduce task—
job.setNumReduceTasks(5);
社交好友数据挖掘
join
自定义inputFormat
程序的核心机制:
自定义一个InputFormat
改写RecordReader,实现一次读取一个完整文件封装为KV
在输出时使用SequenceFileOutPutFormat输出合并文件
数据分类输出–自定义outputFormat
实现要点:
1、 在mapreduce中访问外部资源
2、 自定义outputformat,改写其中的recordwriter,改写具体输出数据的方法write()
TOPN–自定义GroupingComparator
实现
自定义groupingcomparator
/**
* 用于控制shuffle过程中reduce端对kv对的聚合逻辑
* @author [email protected]
*
*/
public class ItemidGroupingComparator extends WritableComparator {
protected ItemidGroupingComparator() {
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean abean = (OrderBean) a;
OrderBean bbean = (OrderBean) b;
//将item_id相同的bean都视为相同,从而聚合为一组
return abean.getItemid().compareTo(bbean.getItemid());
}
}
定义订单信息bean
/**
* 订单信息bean,实现hadoop的序列化机制
* @author [email protected]
*
*/
public class OrderBean implements WritableComparable<OrderBean>{
private Text itemid;
private DoubleWritable amount;
public OrderBean() {
}
public OrderBean(Text itemid, DoubleWritable amount) {
set(itemid, amount);
}
public void set(Text itemid, DoubleWritable amount) {
this.itemid = itemid;
this.amount = amount;
}
public Text getItemid() {
return itemid;
}
public DoubleWritable getAmount() {
return amount;
}
@Override
public int compareTo(OrderBean o) {
int cmp = this.itemid.compareTo(o.getItemid());
if (cmp == 0) {
cmp = -this.amount.compareTo(o.getAmount());
}
return cmp;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(itemid.toString());
out.writeDouble(amount.get());
}
@Override
public void readFields(DataInput in) throws IOException {
String readUTF = in.readUTF();
double readDouble = in.readDouble();
this.itemid = new Text(readUTF);
this.amount= new DoubleWritable(readDouble);
}
@Override
public String toString() {
return itemid.toString() + "\t" + amount.get();
}
}
编写mapreduce处理流程
/**
* 利用secondarysort机制输出每种item订单金额最大的记录
* @author [email protected]
*
*/
public class SecondarySort {
static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{
OrderBean bean = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = StringUtils.split(line, "\t");
bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[1])));
context.write(bean, NullWritable.get());
}
}
static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{
//在设置了groupingcomparator以后,这里收到的kv数据 就是: <1001 87.6>,null <1001 76.5>,null ....
//此时,reduce方法中的参数key就是上述kv组中的第一个kv的key:<1001 87.6>
//要输出同一个item的所有订单中最大金额的那一个,就只要输出这个key
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(SecondarySort.class);
job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//指定shuffle所使用的GroupingComparator类
job.setGroupingComparatorClass(ItemidGroupingComparator.class);
//指定shuffle所使用的partitioner类
job.setPartitionerClass(ItemIdPartitioner.class);
job.setNumReduceTasks(3);
job.waitForCompletion(true);
}
}
Mapreduce的其他补充
计数器应用
在实际生产代码中,常常需要将数据处理过程中遇到的不合规数据行进行全局计数,类似这种需求可以借助mapreduce框架中提供的全局计数器来实现
示例代码如下:
public class MultiOutputs {
//通过枚举形式定义自定义计数器
enum MyCounter{MALFORORMED,NORMAL}
static class CommaMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] words = value.toString().split(",");
for (String word : words) {
context.write(new Text(word), new LongWritable(1));
}
//对枚举定义的自定义计数器加1
context.getCounter(MyCounter.MALFORORMED).increment(1);
//通过动态设置自定义计数器加1
context.getCounter("counterGroupa", "countera").increment(1);
}
}
mapreduce参数优化
//以下参数是在用户自己的mr应用程序中配置在mapred-site.xml就可以生效
(1) mapreduce.map.memory.mb: 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。
(2) mapreduce.reduce.memory.mb: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。
(3) mapreduce.map.cpu.vcores: 每个Map task可使用的最多cpu core数目, 默认值: 1
(4) mapreduce.reduce.cpu.vcores: 每个Reduce task可使用的最多cpu core数目, 默认值: 1
(5) mapreduce.map.java.opts: Map Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.
“-Xmx1024m -verbose:gc -Xloggc:/tmp/@[email protected]” (@[email protected]会被Hadoop框架自动换为相应的taskid), 默认值: “”
(6) mapreduce.reduce.java.opts: Reduce Task的JVM参数,你可以在此配置默认的java heap size等参数, e.g.
“-Xmx1024m -verbose:gc -Xloggc:/tmp/@[email protected]”, 默认值: “”
//应该在yarn启动之前就配置在服务器的yarn-site.xml配置文件中才能生效
(7) yarn.scheduler.minimum-allocation-mb 1024 给应用程序container分配的最小内存
(8) yarn.scheduler.maximum-allocation-mb 8192 给应用程序container分配的最大内存
(9) yarn.scheduler.minimum-allocation-vcores 1
(10)yarn.scheduler.maximum-allocation-vcores 32
(11)yarn.nodemanager.resource.memory-mb 8192 每台NodeManager最大可用内存
(12)yarn.nodemanager.resource.cpu-vcores 8 每台NodeManager最大可用cpu核数
//shuffle性能优化的关键参数,应在yarn启动之前就配置好
(13) mapreduce.task.io.sort.mb 100 //shuffle的环形缓冲区大小,默认100m
(14) mapreduce.map.sort.spill.percent 0.8 //环形缓冲区溢出的阈值,默认80%
容错相关参数
(1) mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
(2) mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
(3) mapreduce.map.failures.maxpercent: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业扔认为成功。
(4) mapreduce.reduce.failures.maxpercent: 当失败的Reduce Task失败比例超过该值为,整个作业则失败,默认值为0.
(5) mapreduce.task.timeout: Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是300000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。
本地运行mapreduce 作业
设置以下几个参数:
mapreduce.framework.name=local
mapreduce.jobtracker.address=local
fs.defaultFS=local
效率和稳定性相关参数
(1) mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为false
(2) mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为false
(3) mapreduce.job.user.classpath.first & mapreduce.task.classpath.user.precedence:当同一个class同时出现在用户jar包和hadoop jar中时,优先使用哪个jar包中的class,默认为false,表示优先使用hadoop jar中的class。
(4) mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片时的最小切片大小
(5)mapreduce.input.fileinputformat.split.maxsize: FileInputFormat做切片时的最大切片大小
(切片的默认大小就等于blocksize,即 134217728)