《OD大数据实战》MapReduce实战

时间:2023-03-08 16:57:26

一、github使用手册

1. 我也用github(2)——关联本地工程到github

2. Git错误non-fast-forward后的冲突解决

3. Git中从远程的分支获取最新的版本到本地

4. Git教程

二、案例:倒排索引

1. 完成功能:

统计一系列文本文件中的每个单词构成的倒排索引。

1)分析:
(1)倒排索引主要是用来存储某个单词在一个文档中或者一组文档中出现的位置映射关系,即提供一个根据内容查找文档的方式。

(2)加权倒排索引,在确定指定单词到文档位置的映射关系的时候,加入权重考虑信息。

代码演示:

git@github.com:yeahwell/demobigdata.git

三、用户自定义数据类型

1. MapReduce中的数据类型

至少有两种用途:
第一个用途,这些类型定义的数据可以被序列化进行网络传输和文件存储,

第二个用途,在shuffle阶段要可以进行大小比较。

那么在hadoop中解决第一种方式采用hadoop的接口Writable,第二种采用接口java接口Comparable

(Hadoop将这两个接口结合提供了WritableComparable接口)。

Hadoop提供了很多的内置数据类型,比如:MapWritable, LongWritable, IntWritable, BooleanWritable, ByteWritable, DoubleWritable, FloatWritable, Text, NullWritable等。

2. 用户定制数据输入格式化器

数据输入格式(InputFormat)用于描述MR作业的数据输入格式规范。

MapReduce框架依赖InputFormat进行输入数据分片以及提供读取分片数据的RecordReader实例对象。

每一个InputFormat类都会有一个对应的RecordReader类,RecordReader类主要作用是将输入数据转换为键值对,传输给mapper阶段的map方法。

MapReduce默认的数据输入格式是:TextInputFormat(LineRecordReader)。除了这个格式器以外,还有KeyValueTextInputFormat, CombineTextInputFormat, SequenceFileInputFormat, DBInputFormat等。

1)全称:org.apache.hadoop.mapreduce.InputFormat

方法详解:
getSplits:返回值是分片信息集合;作用:通过分片个数确定mappre的个数,并根据分片信息中的数据地址信息决定是否采用数据本地化策略。
createRecordReader:创建一个具体读取数据并构造key/value键值对的RecordReader实例对象。

2)全称:org.apache.hadoop.mapreduce.RecordReader

方法详解:
initialize:根据对应的分片信息进行初始化操作。
nextKeyValue:判断是否还有下一个key/value键值对,如果有返回true;否则返回false。
getCurrentKey/getCurrentValue:获取当前key/value键值对。
getProgress:获取操作进度信息。
close:关闭资源读取相关连接。

3)全称:org.apache.hadoop.mapreduce.InputSplit

方法详解:
getLength:获取分片长度。
getLocations:获取该分片数据对应的位置信息,确定数据本地化时候有用。

3. 用户定制数据输出格式化器

数据输出格式(OutputFormat)用于描述MR作业的数据输出格式规范。

MapReduce框架依赖OutputFormat进行输出路径(输出空间)检测、获取提交job的OutputCommitter实例对象以及提供一个具体定义如何输出数据的RecordWriter实例对象。

每一个OutputFormat类都会有一个对应的RecordWriter类,RecordWriter类主要作用是明确定义如何写入以及写入的格式,接收reducer阶段输出的key/value键值对。
MapReduce默认的数据输出格式是:TextOutputFormat(LineRecordWriter)。除了这个格式器以外,还有SequenceFileOutputFormat, DBOutputFormat等。

1)全称:org.apache.hadoop.mapreduce.OutputFormat

方法详解:
getRecordWriter:创建一个具体写数据的RecordWriter实例。
checkOutputSpecs:检测输出空间相关信息,如果检测失败,直接抛出异常。
getOutputCommitter:获取一个提交job的committer对象。一般情况下,直接使用FileOutputCommitter对象即可。如果觉得FileOutputCommitter内容比较多,也可以自己实现一个完全为空的类。

2)全称:org.apache.hadoop.mapreduce.RecordWriter

方法详解:
write:接收reducer阶段产生的输出key/value键值对数据,并将其写出。
close:关闭流,进行一些其他操作。

四、案例:MongoDB Hadoop

1. 实现功能:

从MongoDB中读取日志数据,将MapReduce程序处理过的数据写出到MongoDB中。

2. 代码演示:

git@github.com:yeahwell/demobigdata.git

五、Shuffle阶段说明

1. shuffle阶段

1)shuffle阶段主要包括map阶段的combine、group、sort、partition以及reducer阶段的合并排序。

2)map阶段通过shuffle后会将输出数据按照reduce的分区分文件的保存,文件内容是按照定义的sort进行排序好的。

3)map阶段完成后会通知ApplicationMaster,然后AM会通知Reduce进行数据的拉取,在拉取过程中进行reduce端的shuffle过程。

2. 用户自定义combiner

1)Combiner可以减少Map阶段的中间输出结果数,降低网络开销。默认情况下是没有Combiner的。

2)用户自定义的Combiner要求是Reducer的子类,以Map的输出<key,value>作为Combiner的输入<key,value>和输出<key,value>,也就是说Combiner的输入和输出必须是一样的。

3)可以通过job.setCombinerClass设置combiner的处理类,MapReduce框架不保证一定会调用该类的方法。

3. 用户自定义Partitoner

1)Partitioner是用于确定map输出的<key,value>对应的处理reducer是那个节点。

2)默认MapReduce任务reduce个数为1个,此时Partitioner其实没有什么效果,但是当我们将reduce个数修改为多个的时候,partitioner就会决定key所对应reduce的节点序号(从0开始)。

3)可以通过job.setPartitionerClass方法指定Partitioner类,默认情况下使用HashPartitioner(默认调用key的hashCode方法)。

4.  用户自定义Group

1)GroupingComparator是用于将Map输出的<key,value>进行分组组合成<key,List<value>>的关键类,直白来讲就是用于确定key1和key2是否属于同一组,如果是同一组,就将map的输出value进行组合。

2)要求我们自定义的类实现自接口RawComparator,可以通过job.setGroupingComparatorClass方法指定比较类。

3)默认情况下使用WritableComparator,但是最终调用key的compareTo方法进行比较。

5. 用户自定义Sort

1)SortComparator是用于将Map输出的<key,value>进行key排序的关键类, 直白来讲就是用于确定key1所属组和key2所属组那个在前,那个在后。

2)要求我们自定义的类实现自接口RawComparator,可以通过job.setSortComparatorClass方法指定比较类。

3)默认情况下使用WritableComparator,但是最终调用key的compareTo方法进行比较。

6. 用户自定义Reducer的Shuffle

1)在reduce端拉取map的输出数据的时候,会进行shuffle(合并排序),

2)MapReduce框架以插件模式提供了一个自定义的方式,我们可以通过实现接口ShuffleConsumerPlugin,并指定参数mapreduce.job.reduce.shuffle.consumer.plugin.class来指定自定义的shuffle规则,

但是一般情况下,直接采用默认的类org.apache.hadoop.mapreduce.task.reduce.Shuffle。

六、案例:二次排序

1. 实现功能

hadoop默认只对key进行排序,有时候我们需要将value部分也进行排序。

这种情况下有两种方式实现:

第一种,我们将排序放到reducer端进行,但是这种方式当数据量比较大的时候,会比较消耗内存。

那么另外一种方式就是二次排序。二次排序的内部实行其实是先按照key+value组合的方式进行排序,然后根据单独key进行分组的一种实行方式。

要求reducer个数为2,而且奇数到第一个reducer进行处理,偶数到第二个reducer进行处理。

2. 代码演示

git@github.com:yeahwell/demobigdata.git

hadoop jar demobigdata.jar com.webmovie.bigdata.mapreduce.shuffle.DemoRunner