数据来源:
网络资源/《Hadoop权威指南》
hadoop i/o 操作:
1.数据完整性:
datanode在接收到数据的时候会校验数据完整性,比如CRC-32,datanode在接受client数据或者复制其他datanode的数据时会验证数据完整性。
正在写数据的client会将数据和校验和发送到datanode管线,最后一个datanode来负责校验数据完整性
client接收到datanode的数据块之后,会校验数据和,并将验证结果返回datanode,datanode会持久化校验日志,每次client验证之后,会更新校验日志,保存这些信息,有利于检测磁盘损坏。
datanode会后台运行一个检测线程,用于检测datanode上的block是否损坏。
客户端校验使用LocalFileSystem,LocalFileSystem使用CheckSumFileSystem
2.压缩
数据压缩算法是否支持切分,如果不支持切分,在map任务阶段,会存在更多的节点间的数据传输
mapreduce结果和map结果支持压缩
3.序列化
mapreduce中使用序列化可以使用实现writable接口的类,基本类型和数据类型都已经支持了
或者使用序列化框架,比如avro
others:
string和text的区别:text是Hadoop 中继承writable借口的,跟string对应的类型,不同的是string存储的是char而text是utf-8编码的字节数组,string 不可变
如果遇到复杂的数据类型,最好不要类型套用,因为序列化速度会变慢,最好自己实现writable接口
4.sequenceFile
sequenceFile存储键值对,可以从任意位置读取,因为存储文件中包含了同步标识
压缩方式存储,只压缩value 不压缩key
mapfile: 可以按照键来获取值,将key进行划分存放到多个sequenceFile中,value是一个sequenceFile,但是数据的添加必须按照key顺序进行添加,bloomMapfile 是一个包含布隆过滤器的mapfile,适合稀疏数据。
5.mapreduce
hdfs可是设置多个配置文件,用于在不同的环境下运行,用于区分测试和开发环境
写程序,首先可以使用mrunit来编写map和reduce的单元测试,然后实现tool接口,编写一个测试驱动程序完成的进行流程测试
mapreduce 提交运行过程
mr的流程调度框架,jobcontrol和ooize,jobcontrol相当于客户端,而ooize是一个服务器的角色, 而且支持控制流。
job提交涉及组件:客户端client、yarn资源管理器、yarn节点管理器、application master和分布式文件系统
MapReduce任务submit过程:
1).客户端runjob
2).客户端想资源管理器申请应用ID,并检查输出文件路径正确性,计算数据分片,失败,不提交job
3).客户端将运行需要的jar包和输入切片等信息保存到共享文件系统中
4).然后调用submitApplication
5).资源管理器向yarn调度器请求一个运行容器,在该容器所在节点的节点管理器的帮助下,启动应用的master进程
6).Master进程会决定该任务是Uber任务还是分布式任务,如果是Uber任务,就在同一节点上运行应用程序
7).如果是分布式的程序,Master会帮助map和reduce任务向资源管理器请求运行容器,一般5%左右的map任务完成之后
才会有reduce的请求,map任务运行尽量本地化,reduce可以在任意节点上
8).运行容器分配之后Master会与容器的nodeManager,通讯,启动运行容器,然后进行本次任务运行资源的本地化,jar包等资源
9).容器运行应用任务
任务失败的情况:
1)任务失败,在jvm退出之前,想application master汇报,然后退出
2)任务失败,jvm崩溃,节点管理器会发现该任务的jvm异常退出,节点管理器向application master汇报
3)任务超时没有向application master汇报,application master标记该任务失败,通知节点管理器将该任务的JVM kill
application master失败:
资源管理器会寻找寻找新的容器,重新运行application master,新的application可是使用已经计算好的结果,不必重新计算
节点管理器失败:
1)心跳信息发送超时,资源管理器会将application和task转移到其他节点
2)节点上运行的任务失败次数过多,进入黑名单,但是黑名单在多个application之间是不共享的
资源管理器失败:
单点故障问题,一般使用zookeeper来搭建HA系统
Shuffle:
https://www.cnblogs.com/felixzh/p/4680808.html
map端的过程:计算输出->buffer in mem->溢出写到磁盘->合并所有的溢出文件
a. 计算输出,将输出写入缓冲区中,序列化成字节数组,在写入缓冲区的过程中会按照key获取partition信息,由哪个reducer来处理
b.缓冲区达到阈值(一般为80%),会启动另外一个线程,向磁盘中输出临时文件,剩余20%的缓冲空间,由另外的一个线程
继续输出,互不影响。在溢出的过程中,会对数据进行sort和merge,字典排序,相同key的值,会被合并,如果设置了
combiner,则会对值进行处理。
c.不管中间过程中是否会溢出临时文件,最后一步都是合并文件,确保有序和单一key值
可以使用计数器来判断溢出到磁盘的数量次数
map相关优化:
HDFS block和spile切片的区别,通过控制minSize(切片的最小大小)和HDFS blocksize,可以控制map数量,分两种情况:
文件大,不是小文件,增大splite的minSize,减少maper的数量
文件大,小文件,使用CombineFileInputFormat将input path合并减少mapper的数量
reduce:
多线程拷贝map的结果,并进行合并排序,分趟多次进行合并,合并之后,传给reduce函数
3种合并方式:内存->内存 内存->磁盘 磁盘->磁盘
第二种方式,类似溢出,只要mapper阶段没有结束,第二种方式就一直存在
第三种方式,是最后所有的数据接收完毕之后,merge所有的数据
最后给reducer的输入结果,可能在内存中,可能在磁盘中
reducer阶段优化:
中间结果压缩
mapreduce 并行度决定因素:
maptask的并行数目是由输入数据的逻辑切片split来决定的,逻辑切片的大小由多个因素共同决定
splitSize=max{minSize,min{maxSize,blockSize}}
minSize.maxSize 是对逻辑切边设置的大小,block是对HDFS块的大小
当minsize大于blocksize的时候,起作用
maxsize当小于blocksize的时候,起作用
Combiner作用:
合并<key,value>,减少数据传输量
针对spill的临时文件进行combine,当临时文件数量达到min.num.spill.for.combine,combiner就会在merge之前执行,否则会在merger之后执行,有时候还会因为负载问题,不执行
MapReduce排序:
自动排序规则和自定义排序:https://blog.csdn.net/yongh701/article/details/50601319
次排序、分区、排序、分组:https://www.cnblogs.com/datacloud/p/3584640.html
全排序:https://www.iteblog.com/archives/2147.html、https://www.iteblog.com/archives/2146.html
输入压缩:
http://www.cnblogs.com/yurunmiao/p/4528499.html
对于容器型的数据文件格式,avro,sequence file,parquet 等,选择一个快速的压缩算法即可,LZO、LZ4、Snappy
其他的可以选择支持切分的压缩算法,bzip2
大文件不能选择不支持切分的压缩算法
默认的MapReduce类型
默认的mapper将输入原封不动的写到输出,泛型类型,所以没有输入类型限制
默认的partitioner是hash partitionrer,分区数目根据reducer来设置
默认的reducer是一个,把输入写到输出
分片计算:
在步骤3的时候,客户端尽进行计算分片,然后application master使用了分片的引用,引用包含了分片的地址和字节长度,地址是为了调度map任务到数据节点上,包含有字节长度是因为application master会优先尽心大的分片计算,贪婪算法
任务分到到map的时候,在传递给mapper函数的之前,会将分片的引用信息传递给InputFormat的实现类,获取一个reader(迭代器),然后mapper函数会使用这个迭代器获取每一条记录
MapReduce数据输入
InputFormat实现类如下:
FileInputFormat、TextInputFormat、SequenceInputFormat、DBInputFormat
DBInputFormat使用小心,过多的mapper可能导致数据库崩溃,最好使用sqoop现将数据导出转入HDFS或者分布式数据库
Hbase对应的是TableInputFormat
多个输入:MultipleInputs可以为不同的输入路径指定不同的mapper,但是需要确保mapper输出一致
格式相同,下面的函数,格式不同需要指定不同的mapper,上面的函数
MapReduce数据输出
文本输出:TextOutputFormat
二进制数出:SequenceFileOutputormat 如果该输出接下来会被其他的MapReduce程序使用,可选,结构紧凑
多个输出:(键、值、文件名称和标识)
延迟输出:LazyOutputFormat 防止输出空文件,等输出第一条数据的时候才创建文件
MapReduce 计数器
mapreduce 使用枚举来做计数器的声明
Join
reduce端的join,shuffle传输数据量大,reduce端,占用内存大,容易oom
map端的join(https://blog.csdn.net/huashetianzu/article/details/7821674),使用DistributeCache,调用addlocalfile方法,
JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上,用户使用
DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
https://blog.csdn.net/huashetianzu/article/details/7821674
semi join ,缩减mapper到reducer的数据传输量
reducer side map + bloom filter