1. MapReduce原理
1.1. MapReduce概述
(1)MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题.
(2)MapReduce由两个阶段组成:Map和Reduce,用户只需要实现map()和reduce()两个函数,即可实现分布式计算,非常简单。这两个函数的形参是key、value对,表示函数的输入信息。
(3)在Hadoop 中,map 函数 位 于 内 置 类 org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN,KEYOUT, VALUEOUT>中,reduce 函数位于内置类 org.apache.hadoop.mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT,VALUEOUT>中。 我们要做的就是覆盖 map 函数和 reduce 函数。
对于 Hadoop 的 map 函数和reduce 函数,处理的数据是键值对,也就是说 map 函数接收的数据是键值对,两个参数;输出的也是键值对,两个参数;reduce 函数接收的参数和输出的结果也是键值对。
在Mapper 类,有四个泛型,分别是 KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个 KEYIN、 VALUEIN 指的是map 函数输入的参数 key、 value 的类型;后面两个 KEYOUT、VALUEOUT 指的是 map 函数输出的 key、value的类型。
输入参数 key、value 的类型就是KEYIN、VALUEIN,每一个键值对都会调用一次 map 函数。在这里,map 函数没有处理输入的 key、value,直接通过 context.write(…)方法输出了,输出的 key、value 的类型就是KEYOUT、VALUEOUT。这是默认实现,通常是需要根据业务逻辑覆盖的。
查看 Reducer 类,也有四个泛型,同理,分别指的是 reduce 函数输入的 key、value类型,和输出的 key、value 类型。看一下reduce 函数定义,如下图所示:
reduce 函数的形参 key、value 的类型是KEYIN、VALUEIN。要注意这里的value是存在于java.lang.Iterable<VALUEIN>中的,这是一 个迭代器,用于集合遍历的,意味着values 是一个集合。reduce 函数默认实现是把每个value 和对应的 key,通过调用context.write(…)输出了,这里输出的类型是 KEYOUT、VALUEOUT。通常会根据业务逻辑覆盖 reduce 函数的实现。
1.2. MapReduce执行过程
MapReduce 运行的时候,会通过 Mapper 运行的任务读取HDFS 中的数据文件,然后调用自己的方法,处理数据,最后输出。Reducer任务会接收 Mapper 任务输出的数据,作为自己的输入数据,调用自己的方法,最后输出到 HDFS 的文件中。
1.3. MapReduce原理及执行步骤
MapReduce原理图如下所示:
简单理解如下所示:
(1)Map任务处理
l 读取输入文件内容,解析成key、value对,对输入文件的每一行,解析成key、value对。每一个键值调用一次map函数;
l 覆盖map函数,对输入的key、value处理,转换成新的key、value输出;
l 对输出的key、value进行分区;
l 对不同分区的数据,按照key进行排序、分组;相同key的value放到一个集合中;
l 对分组后的数据进行规约。
l Mapper执行1-6个步骤得到最终的结果
(2)Reduce任务处理
l 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点;对多个map任务的输出进行合并、排序;
l 覆盖reduce函数,对输入的key、value处理,转换成新的key、value输出;
l 把reduce的输出保存到文件中。
l Reduce执行4-6个步骤对Mapper最终的结果执行4-6个步骤进一步处理
将上述步骤融入到原理图中后如下所示:
(3)键值对编号
对于 Mapper 任务输入的键值对,定义为key1 和 value1。在 map 方法中处理后,输出的键值对,定义为 key2 和 value2。reduce 方法接收 key2 和 value2,处理后,输出 key3 和 value3。在下文讨论键值对时,可能把 key1 和 value1 简写为<k1,v1>,key2 和value2 简写为<k2,v2>,key3 和 value3 简写为<k3,v3>。
1.4. 单词计数
package mavshuang.mapreduce;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
public class WordCountDemo {
private static final String INPUT_PATH = "hdfs://hadoop0:9000/hello";
private static final String OUTPUT_PATH = "hdfs://hadoop0:9000/out";
public static void main(String[] args) throws IOException, URISyntaxException, InterruptedException, ClassNotFoundException {
final Configuration configuration = new Configuration();
FileSystem fileSystem = FileSystem.get(new URI(OUTPUT_PATH), configuration);
// 判断文件是否存在,如果存在则删除
if (fileSystem.exists(new Path(OUTPUT_PATH))) {
fileSystem.delete(new Path(OUTPUT_PATH), true);
}
String jobName = WordCountDemo.class.getSimpleName();
Job job = new Job(configuration, jobName);
// 1.1 读取输入文件内容,解析成key、value对。对输入文件的每一行,解析成key、value对。每一个键值对调用一次map函数。
// 设置job执行作业时输入文件的路径
FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
// 设置把输入文件处理成键值对的类
job.setInputFormatClass(TextInputFormat.class);
// 1.2 覆盖map函数,对输入的key、value处理,转换成新的key、value输出。
// 省略的条件是map输出的<k,v>与reduce输出的<k,v>格式相同
// 设置自定义的MyMapper类
job.setMapperClass(MyMapper.class);
// 设置map方法输出的k2,v2值得类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 1.3 对输出的key、value进行分区。
// 设置对k2分区的类
job.setPartitionerClass(HashPartitioner.class);
// 设置运行的Reducer任务的数量
job.setNumReduceTasks(1);
// 1.4 对不同分区的数据,按照key进行排序、分组。相同key的value放到一个集合中。
// 1.5 (可选)分组后的数据进行归约。
// 2.1 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。对多个map任务的输出进行合并、排序。
// 设置自定义的MyReducer类
job.setReducerClass(MyReducer.class);
// 2.2 覆盖reduce函数,对输入的key、value处理,转换成新的key、value输出。
// 设置reduce方法输出的k3,v3值得类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 2.3 把reduce的输出保存到文件中。
// 设置job执行作业时的输出路径
FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
// 设置把输出文件处理成键值对的类
job.setOutputFormatClass(TextOutputFormat.class);
// 把job提交给JobTracker执行,等待执行结果返回
job.waitForCompletion(true);
}
// KEYIN:表示每一行的偏移量
// VALUEIN:表示每一行的内容
// KEYOUT:表示每一行中的每个单词
// VALUEOUT:表示每一行中每个单词的出现次数,常量为1
// 继承Mapper类实现map方法
static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
// 源文件会被解析成2个键值对,分别为<0,hello you >,<10,hello mavs>
// 每个<k,v>都调用一次函数
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
final String[] splited = value.toString().split("\t");
for (String word : splited) {
final Text k2 = new Text(word);
final LongWritable v2 = new LongWritable(1);
context.write(k2, v2);
}
}
}
// KEYIN:表示整个文件中的不同单词
// VALUEIN:表示整个文件中的不同单词出现的次数
// KEYOUT:表示整个文件中的不同单词
// VALUEOUT:表示整个文件中的不同单词出现的总次数
// 继承Reducer类实现reduce方法
static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
// reduce会被调用3次,分别是<hello,{1,1}>、<mavs,{1}>、<you,{1}>
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
Long count = 0L;// 定义成Java类型,便于操作
for (LongWritable times : v2s) {
count += times.get();
}
final LongWritable v3 = new LongWritable(count);
context.write(k2, v3);
}
}
}
上述结果错误原因在于: hello 文件,单词与单词之间应该使用 “Tab” 键来区分,而不是空格键;修改后的结果如下所示:
1.5. 分析上述代码执行过程
(1)JobTracker
负责接收用户提交的作业,负责启动、跟踪任务的执行。
JobSubmissionProtocol是JobClient与JobTracker通信的接口;
InterTrackerProtocol是TaskTracker与JobTracker通信的接口;
(2)TaskTracker
负责执行任务。
(3)JobClient
是用户作业与JobTracker交互的主要接口;
负责提交作业的,负责启动、跟踪任务执行、访问任务状态和日志等。
(4)MapReduce驱动默认的设置
以上代码是从上述代码中截取出来的,其中一些设置未按照MapReduce中的默认设置,MapReduce中的默认设置如下所示:
(5)上述代码分析执行过程
(6)总结图解
2. 序列化
2.1.序列化概念
(1)序列化(Serialization)是指把结构化对象转换为字节流;
(2)反序列化(Deserialization)是序列化的逆过程,即把字节流转回结构化对象;
(3)Java序列化是指java.io.Serializable接口。
2.2.Hadoop序列化特点
(1)紧凑
高效使用存储空间。
(2)快速
读写数据的额外开销小。
(3)可扩展
可透明地读取老格式的数据。
(5)互操作
支持多语言的交互。
2.3.Hadoop序列化的作用
主要有两大作用:进程间通信和永久存储;
Hadoop节点间通信如下图所示: