MapReduce源码解析之Map端的输入输出【小二讲堂】

时间:2024-03-23 10:28:24

一、Map输入端

当作业进行切片后被提交到MR进行执行时,必须经过Mapper类的run方法进行任务的执行,这样我们在map端的自定义map方法也会执行,而根据业务需求我们可以对map端的输出进行业务逻辑的处理,以达到在Reduce我们对Map端处理的数据进行接收,并进行计算。
1.自定义类中继承Mapper类
MapReduce源码解析之Map端的输入输出【小二讲堂】
2.进入mapper.class类中,我们可以看到有setup()方法,还有自定义类继承的map()方法,以及cleanup方法,最重要的当然是run方法了。

调用一次次setup()方法–可以做预处理,准备资源
map()业务执行:
首先判断context上下文对象中是否包含key和vlaue,如果有则开始调用map()方法,则在这里开始执行自定的map方法,进行自定义逻辑代码的运行,当上下文对象中没有值时,跳出循环,执行cleanup方法,结束执行。
在每循环一次时,会将congtext对象的key和value传递,然后我们对key和value进行自己的逻辑处理
cleanup()清除申请的资源
将map端执行的结果进行发送输出
MapReduce源码解析之Map端的输入输出【小二讲堂】
一、MapTask
到这里那么map端的资源时怎么过来的呢?当作业提交完成时(这里前面一节说过了),在分配到container容器之后,会执行MapTask,Reduce Task,MapTask会拿着container所提交上来的job作业进行处理。下面从MapTask执行开始解析
1.run方法的执行
MapReduce源码解析之Map端的输入输出【小二讲堂】
首先会进行检查是否是mapTask任务,这里当然是了
然后获取配置文件中的reduce数量,如果reduce数量为0,则map端的输出直接输出的存储系统中,这里是可以没有reduce。
然后开始执行map阶段的任务,进行mapPhase进行数据的映射,和排序。
继续执行
MapReduce源码解析之Map端的输入输出【小二讲堂】
到这里进行判断是否用新的API执行作业,当然是了。
2.在调用run方法时,将job,splitmetainfo,umbilical.reporter作为参数传递给runNewMapper()方法
然后执行一系列的初始化方法
-创建输入格式化类-默认情况是TextInputFormat
-获取切片信息,包括,文件名称,偏移量offset,切片的大小size,location切片位置信息
-创建RecordReader(split,taskContext),将数据切片的信息和taskContext上下文对象的信息进行传递
进入RecordReader中----TextInputFormat
MapReduce源码解析之Map端的输入输出【小二讲堂】
返回了一个行的读取器,在这里我们知道了在mapReduce数据读取嗯的时候时候,是一行一行进行读取的,一行对应了一个map,我们 也可以进行对输入格式化类进行自定义,我们想多行多行读取进只需进行创建一个类继承InputFormat格式化类重写createRecorder()方法就行了。

这里可以知道我们在自定义输入格式化类的时候,必须要重写createReader()方法

MapReduce源码解析之Map端的输入输出【小二讲堂】
3.主要的执行代码有
MapReduce源码解析之Map端的输入输出【小二讲堂】
输入流input的初始化
mapper.run执行逻辑代码等等。

继续代码跟进,进入MapTask.class中的MapContextImpl<>()看都做了什么
MapReduce源码解析之Map端的输入输出【小二讲堂】
4.MapContextImpl.class
MapReduce源码解析之Map端的输入输出【小二讲堂】
这个类基于了RecordReader的对象reader,实现了几个方法。recorderReader是前面的Input输入对象,而这个input就是RecordReader类的对象,这样的意思就是就是RecordReader对象中包含了下面的三个方法。
getCurrentKey()
getCurrentValue()
nextKeyValue()
MapReduce源码解析之Map端的输入输出【小二讲堂】

这里开始正式进入MapTask执行的初始化方法Initialize()方法,
MapReduce源码解析之Map端的输入输出【小二讲堂】
5.下面这个initlizez()初始化方法中我们可以找出当初在hdfs文件上传中,数据切分的一个小问题,数据切分是按照字节一个一个进行切分的,这样特殊情况有时候会将一个字符切分成“两半”,那么这个问题的是怎么解决的呢?

.进入Initialize初始化方法
1)获取切片的信息,获取偏移量,起始索引,文件路径

MapReduce源码解析之Map端的输入输出【小二讲堂】
2)获取输入流fileIn,从这可以看出FileSyste直接怼到了文件上,为而不是切片上的,
MapReduce源码解析之Map端的输入输出【小二讲堂】
3)seek(start)从文件的偏移量开始读取数据。注意:在本机进行读取数据的时候,如果本节点上有数据,则依照就近原则,FileSystem就会从本节点开始读,不会去其他节点上读取。在这点上也可以看出计算是先数据移动的,直接去定位数据的位置
MapReduce源码解析之Map端的输入输出【小二讲堂】
****解决数据"两半"问题—LineRecoderReader
*1…开启IO,做偏移量设置,达到数据本地化读取
*2.修正HDFS底层数据被“切割”的问题(“两半问题”)
在这里首先受一个if判断,判断起始偏移量是否是0,就是意味着,如果是数据的第一个切片的起始位置,则不会进行下面逻辑代码,如果不是第一个切片,则偏移量肯定是大于0的。
意味着从第二个切片开始执行下面代码:
每次利用流读取时,将第一次读取出来的文本进行丢弃,即将分隔符第一的进行丢弃,代码是通过,new Text()匿名内部类的方式进行丢弃。而丢弃的这一行是由上次读取的时候,多读取一个分割符后的数据进行了。

MapReduce源码解析之Map端的输入输出【小二讲堂】
以上初始化工作完毕!!!
6.开始执行run方法
MapReduce源码解析之Map端的输入输出【小二讲堂】
.进入Mapper类中
判断context中否包含key和value数据,两个getCurrentKey和getCurrentValue分别是LineRecordReader.class的方法
MapReduce源码解析之Map端的输入输出【小二讲堂】
进入nextkeyvalue()方法:
对数据newSize进行更新,当还有下一个数据时,就会读取前面的size数据长度,偏移量。
这个类相当于进行判断下一条数据的判断,当有下一条数据时,就会进行数据的更新。返回ture.否则返回false
MapReduce源码解析之Map端的输入输出【小二讲堂】
进行下一条数据的执行
MapReduce源码解析之Map端的输入输出【小二讲堂】
到这里则实行完毕了

二、Map输出端

当数据使用Reduce端进行数据处理是:

MapReduce源码解析之Map端的输入输出【小二讲堂】
2.分区数量等于Redeuce数量
MapReduce源码解析之Map端的输入输出【小二讲堂】
分区数量的判断,进入不同的类中
MapReduce源码解析之Map端的输入输出【小二讲堂】
当分区数量<=1时,重写了partition的方法getPartition,这里的意思就是对分区中的数据进行返回,返回分区的数量。
当分区数量大于1是,则通过反射从配置文件中获取分区数量,如果用户没有设置则获取默认的分区数量,通过HashPartitioner.class进行获取
MapReduce源码解析之Map端的输入输出【小二讲堂】
分区的默认计算:
HashPartitioiner.class方法中获取默认设置partition的方式:
由下面可以看出通过key的hash对reduce数量进行取模。
MapReduce源码解析之Map端的输入输出【小二讲堂】

准备好了分区器之后,来返回起那么,看collector这个对象
MapReduce源码解析之Map端的输入输出【小二讲堂】
进入到collector中,(从返回值向前看,这里面的实现太多了)
MapReduce源码解析之Map端的输入输出【小二讲堂】
Collector的实现看出:collector–subclazz-----MapOutputCollector
MapReduce源码解析之Map端的输入输出【小二讲堂】
到这里可以看出这里的MapOutPutCollector来源了:要么用户自己设置了(自己写的,对于新手难道很大的,一般都是默认的),要么是默认的类(默认的自己可以去看)

开始继续跟进:
MapReduce源码解析之Map端的输入输出【小二讲堂】
进入collector的初始化方法,–MapOutputBuffer
MapReduce源码解析之Map端的输入输出【小二讲堂】
进入之后基本可以划分为两部分,一部分是基本信息,另一部分是环形缓冲区的代码逻辑。

MapReduce源码解析之Map端的输入输出【小二讲堂】
环形缓冲区:当数据进行分区完毕之后,会将数据向一个环形缓冲区中写数据,当环形缓冲区中的数据达到80%的时候,就会向本地磁盘溢写。向磁盘溢写之前必须通过快速排序然后才向磁盘中写数据。这两个动作是并行进行的。也就是说这里是由多线程实现的!!!
在环形缓冲区中,索引来记录对应的数据储存的位置。

MapReduce源码解析之Map端的输入输出【小二讲堂】
缓冲区完毕之后,开始执行下面的步骤了:
排序:
MapReduce源码解析之Map端的输入输出【小二讲堂】
用户没有设置的时候默认使用快速比较器

比较器的获取:
MapReduce源码解析之Map端的输入输出【小二讲堂】
在比较器的设置中如果用户没有设置,则会自动取OutputKeyClass的默认比较器,OutputKeyClass这个类是必须设置的。因此我们每次都会去设置。
MapReduce源码解析之Map端的输入输出【小二讲堂】

MapReduce源码解析之Map端的输入输出【小二讲堂】

当缓冲区在向磁盘溢写的之前,会进行快速排序和合并操作。
快速排序:
对环形缓冲区中的数据进行创建一个对应的索引,排序的时候对这些索引进行排序–采用快速排序
合并:
在数据向磁盘溢写的时候,如果数据想磁盘溢写小文件个数小于3,则不会这两个文件进行合并。当溢写的小文件个数大于3是,会对这些小文件进行合并。减少reduce端拉取时的IO量。

MR源码总体流程图:
MapReduce源码解析之Map端的输入输出【小二讲堂】

小二推荐:初学者入门Scala讲解!!!Scala六大特性,Scala高阶语法讲解】
https://blog.csdn.net/Mirror_w/article/details/89348834
小二讲堂:小二讲堂