-
第三章 MapReduce分布式计算框架 (核心思想:“分而治之”)
-
3.1 MapReduce 概述
-
3.1.1 并发、并行与分布式编程的概念
-
并发和并行
-
并发是指两个任务可以在重叠的时间段内启动、运行和完成;
-
并行是指任务在同一时间运行。
-
并发是独立执行过程的组合,而并行是同时执行(可能相关的)计算。
-
-
分布式编程的主要特征
-
分布
-
一个程序由若干个可独立执行的程序模块组成。这些程序模块分布于一个分布式计算机系统的几台计算机上且相互关联。
-
-
通信
-
程序模块在同时执行时需要交换数据,即通信。通过通信,各程序模块能协调地完成一个共同的计算任务。
-
-
-
-
3.1.2 MapReduce 并行编程模型
-
概念
-
MapReduce 是一种编程模型,用于大规模数据集(大于1TB)的并行运算。
-
-
特点
-
易用性,因高度抽象化而变得非常简单,是在总结大量应用的共同特点的基础上抽象出来的分布式计算框架。
-
MapReduce 将复杂的运行于大规模集群上的并行计算过程高度抽象为Map和Reduce两个计算过程,分别对应一个函数。
-
适合用MapReduce处理的数据集(或任务)的基本要求:待处理的数据集可以分解成许多小的数据集,而且每一个小数据集都可以完全并行地进行处理。
-
-
设计理念
-
“计算向数据靠拢”,而不是“数据向计算靠拢”。
-
大规模数据处理环境下,移动计算要比移动数据(需要大量的网络传输)更有利。
-
-
-
3.1.3 Map 函数和Reduce函数
-
MapReduce编程模型的核心,由应用程序开发者负责具体实现。
-
map()
-
将一个函数映射到一个输入序列的每个元素上,并返回一个包含结果的迭代器。
-
map(function, sequence)
-
# 将列表中的每个元素都平方 numbers = [1, 2, 3, 4, 5] squared = map(lambda x: x**2, numbers) print(list(squared))
-
# 输出:[1, 4, 9, 16, 25]
-
-
-
reduce()
-
对一个序列进行累积操作,将一个函数作用在序列的前两个元素上,然后将结果与下一个元素进行累积,直到序列中的所有元素都被处理。
-
不带初始参数:
-
reduce(function,sequence)
-
# 计算列表中所有元素的和 from functools import reduce numbers = [1, 2, 3, 4, 5] total = reduce(lambda x, y: x + y, numbers) print(total)
-
# 输出:15
-
-
带初始参数:
-
reduce(function, sequence, initial)
-
from functools import reduce numbers = [1, 2, 3, 4, 5] initial = 10 total = reduce(lambda x, y: x + y, numbers, initial) print(total)
-
# 输出:25 (初始值10加上列表中所有元素的和)
-
-
-
-
-
3.2 MapReduce 工作原理
-
3.2.1 MapReduce 体系架构
-
核心思想“分而治之”
-
数据块无依赖
-
将一个大数据通过一定的数据划分方法,分成多个较小的具有同样计算过程的数据块,数据块之间不存在依赖关系。
-
-
无额外数据传输开销
-
将每一个数据块分给不同的Map任务去处理,每个Map 任务通常运行在存储数据的节点上,不需要额外的数据传输开销。
-
-
中间结果
-
1.当Map任务结束后,会生成以键值对<key,value>形式表示的许多中间结果(保存在本地存储中,如本地磁盘)。
-
2.然后,这些中间结果会划分成和Reduce 任务数相等的多个分区,不同的分区被分发给不同的Reduce任务并行处理,具有相同key 的<key,value>会被发送到同一个Reduce任务,
-
3.Reduce任务对中间结果进行汇总计算,从而得到最终结果,并输出到分布式文件系统中。
-
-
数据交换由框架实现
-
不同的Map任务之间不会进行通信, 不同的Reduce任务之间也不会发生任何信息交换。
-
-
-
组成部分(4)
-
1. Client (客户端)
-
用户编写的 MapReduce程序通过Client提交到JobTracker。
-
一个MapReduce程序对应若干Job(作业),而每个Job会被分解成若干个Task(任务)。
-
每一个Job都会在客户端通过JobClient 类将应用程序及配置参数打包成JAR包存储在HDFS里,并把路径提交到JobTracker 的Master 服务。 由Master 创建每一个Task,并将它们分发到各个TaskTracker 服务中执行。
-
-
2. JobTracker (作业服务器)
-
资源监控
-
监控所有TaskTracker 与Job的健康状况。
-
一旦发现执行失败,就将相应的任务转移到其他节点。
-
-
作业调度
-
跟踪任务的执行进度、资源使用量等信息,并将这些信息告诉任务调度器。
-
在Hadoop中,任务调度器是一个可插拔的模块,用户可以根据自己的需要设计相应的任务调度器。
-
-
任务调度器会在资源出现空闲时,选择合适的Task 使用这些资源。
-
-
-
3. TaskTracker (任务服务器)
-
1.周期性地通过Heartbeat 将本节点上资源的使用情况和任务的运行进度汇报给 JobTracker。
-
2.同时接收JobTracker 发送过来的命令并执行相应的操作(如启动新任务、结束任务等)。
-
3.使用 slot 等量划分本节点上的资源量。
-
slot
-
概念
-
代表计算资源(CPU、内存等)。
-
-
分类
-
分为Map slot 和 Reduce slot两种。 分别供 MapTask和ReduceTask使用。
-
-
作用
-
1.一个Task获取到一个slot后才有机会运行。 (TaskScheduler的作用就是将各个TaskTracker上的空闲 slot分配给Task 使用。)
-
2.TaskTracker通过slot的数目(可配置参数) 限定Task的并发度。
-
-
-
-
-
4. Task (任务)
-
分类
-
MapTask
-
ReduceTask
-
-
单位
-
处理单位是split。
-
(HDFS:以固定大小的数据块作为基本单位存储数据。)
-
split是一个逻辑概念,只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等。
-
-
-
划分方法
-
split的划分方法完全由用户自己决定。
-
split的多少决定了MapTask的数目。 (每个split只会交给一个MapTask处理)
-
-
-
-
-
MapRuduce处理主要包括MapTask处理和Reduce处理。
-
3.2.2 MapTask工作原理 (MapTask处理)
-
MapReduce工作流程的前半部分。
-
6个阶段
-
切片——解析键值对——map映射——分区——排序——reduce规约
-
(1)把输入文件按照一定的标准切分为逻辑上的多个输入片。
-
输入片 (InputSplit)
-
MapReduce对文件进行处理和运算的输入单位。
-
逻辑概念,每个输入片实际并没有对文件进行切割,只记录了要处理的数据的位置和长度。
-
大小
-
大小固定。
-
默认输入片的大小与数据块的大小是相同的。
-
eg:如果数据块的大小是默认值64MB,输入文件有两个,一个文件的太小是32MB,另一个文件的大小是72MB。
-
产生3个输入片(32MB),交由3个Mapper进程处理。
-
-
-
-
(2)把输入片中的记录按照一定的规则解析成键值对。
-
默认规则是把每一行文本内容解析成一个键值对。 (键是每一行的起始位置,值是本行的文本内容)
-
-
(3)对每个键值对调用一次map方法。
-
如果有1000个键值对,就会调用1000次map方法。
-
每一次调用map方法都输出零个或者多个键值对。
-
-
(4)对Map输出的键值对按照键进行分区。
-
目的:让Reduce 可以并行处理Map的结果。
-
eg:键表示省份(如河北、河南、山东等),那可以按照不同省份进行分区,同一个省份的键值对划分到一个分区中。
-
默认只有一个分分区的数量就是运行的Reducer 任务的数量。 (默认只有一个Reducer任务)
-
-
(5)对每个分区中的键值对进行排序。
-
按照键进行排序,对于键相同的键值对按照值进行排序。
-
例如3个键值对<2.2>、<1.3>、<2,1>,那么排序后的结果是<1,3>、<2.1>、<2.2>。
-
如果有第(6)阶段、那么进入第(6)阶段;如果没有,直接将结果输出到本地盘上。
-
-
(6)对每个分区中的数据进行规约。
-
调用reduce方法处理。
-
键相等的键值会调用一次reduce方法,得到<key,value-list>形式的中间结果。
-
-
经过这一阶段,数据量会减少。
-
规约后的数据输出到本地磁盘上。
-
-
-
-
3.2.3 ReduceTask工作原理
-
MapReduce工作流程的后半部分。
-
4个阶段
-
Copy、Merge、Sort 和Reduce。
-
1.Copy 阶段
-
复制MapTask输出结果
-
ReduceTask会主动从MapTask复制其输出的键值对
-
键值对的大小
-
超过阈值,写到磁盘上
-
否则直接放到内存中。
-
-
多个输出结果时,复制多个结果。
-
-
-
2.Merge 阶段
-
合并内存和磁盘上的文件
-
启动两个后台线程对内存和磁盘上的文件进行合并。
-
以防止内存使用过多或磁盘上文件过多。
-
-
-
3.Sort阶段
-
归并排序
-
按照MapReduce 语义,用户自定义reduce方法。
-
其接收的输入数据是按key进行聚集的一组数据。 (Hadoop采用基于排序的策略将key相同的数据聚在一起)
-
-
ReduceTask只需对所有数据进行一次归并排序即可。
-
各个MapTask 已经实现对自己的处理结果进行局部排序。
-
-
-
-
4.Reduce阶段
-
对排序后的键值对调用reduce方法
-
每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入HDFS文件中。
-
-
-
-
-
-
3.3 案例实战:MapReduce编程
-
3.3.1 WordCount 执行流程示例
-
WordCount 是Hadoop 自带的示例程序之一。
-
功能
-
统计输入文件(也可以是输入文件夹内的多个文件)中每个单词出现的次数。
-
-
基本设计思路
-
分别统计文件中每个单词出现的次数,然后累加不同文件中同一个单词出现的次数。
-
-
执行流程
-
(1)将文件拆分成split。
-
split 交给Map任务(并行)处理
-
将文件按行分割成<key1,value1>
-
key1为偏移量(包括回车符)
-
value1为文本行
-
-
-
(2)将分割好的<key1,value1>交给用户定义的map方法进行处理。
-
(3)得到map方法输出的<key2,value2>后,Mapper 会将它们按照key 值进行排序,并执行Combine过程,将key 值相同的value 值累加。
-
(4)Reducer先对从Mapper接收的数据进行排序。再交由用户自定义的reduce方法进行处理,将相同键下的所有值相加,得到新的<key3,value3>作为最终的输出结果。
-
-
-
-