1. MapReduce - 映射、化简编程模型
1.1 MapReduce 的概念
1.1.1 map 和 reduce
1.1.2 shufftle 和 排序
MapReduce 保证每个 reducer 的输入都已经按键排序。
1.1.3 MapReduce 类型和输入输出
MapReduce 中的 map 和 reduce 函数遵循以下形式:
map: (K1, V1) ----> list(K2, V2)
reduce: (K2, list(V2)) ----> list (K3, V3)
通常来说,map 函数输入的键值对的类型与用于输出的不同,然而,reduce 函数的输入类型必须与 map 函数的输出类型相一致。
1.1.4 一个示例
下面是一个示例。该实例从很多行的气象数据中获取每年的最高气温值。
(1)原始数据是气象局的气象数据:
我们以每一行在文件中的偏移量作为键,构建 map 函数需要的键/值对:
黑体部分说明的是年份和该年气温。
(2)map 函数的功能正是提取每一行的年份和气温,得到下面的list:
(1950, 0)
(1950, 22)
(1950, -11)
(1949, 111)
(1949, 78)
(3)map 函数的输出先由 MapReduce 框架处理,然后再发给 reduce 函数。这一处理过程会根据键来对键/值对进行排序和分组,其结果是:
(1949, [111, 78])
(1950, [0, 22, -11])
每年的年份后都有一系列气温数据。
(4)所有reduce 函数限制必须重复这个列表并从中找出做大的读数:
(1949, 111)
(1950, 22)
这是最后的输出:全球气温记录中每年的最高气温。 下面是总体过程:
1.1.5 该示例的 Java 代码实现
Mapper 的实现:
可见:
(1)map 方法的输入是 <LongWritable key, Text value>,即长整形的 key 和文本类型的 value,对应的是气象文件中的每一行的行偏移量和行的内容。
(2)map 方法的输出是 <Text, IntWritable>,即年分和气温。
(3)该方法的功能就是对每一个键值对,取其年份值和气温值,然后写入 output。
reduce 的实现:
可见:
(1)方法的输入是 <Text key, Iterator<IntWritable> values>,即 年份值 和 该年份的气温值列表。
(2)方法的输出是 <Text, IntWritable>,即 年分 和 计算得出的该年最该气温。
(3)该方法的功能就是求 values 中的最大值,然后写入 output。
运行该 Job 的代码:
可见:
(1)创建 JobConf 对象,它包括该 JOb 的各种参数。
(2)它指定输出和输出路径。
(3)它指定 map 和 reduce 方法。
(4)它指定输出的键值对类型。
(5)最后提交作业。
当我们在 Hadoop 集群运行该 Job 时,我们把代码打包成一个 JAR 文件,然后 Hadoop 会在集群内分发这个包。
注意:以上代码使用的是旧的 MR API。 Hadoop 在0.20.0 中包含了 最新版 API,它 包括一个全新的 MR Java API。该示例摘自《Hadoop 权威指南》一书。
1.2 分布式 MapReduce
前面的例子展示了 MR 针对小量输入的工作原理,它使用的是本地文件系统中的文件。为了分不华,我们需要把数据存储在分布式文件系统中,典型的如 HDFS,以允许 Hadoop 把 MR 的计算移到承载部分数据的各个机器。
1.2.1 基本概念
作业 Job:客户端的执行单位,它包括输入数据、MR 程序和配置信息。Hadoop 把 Job 分成若干小任务 task 来工作,其包括两种类型的任务:map 任务和 reduce 任务。
控制节点:两种类型的节点控制着作业执行过程:jobtracker 和 多个 tasktracker。jobtracker 通过调度任务在 tasktracker 上运行,来协调所有运行在系统上的作业。tasktracker 运行任务的同时,把进度报告给 jobtracker,jobtracker 记录每项任务的总体运行情况。
输入数据:Hadoop 把输入数据划分成等长的小数据发给 MR,成为输入分片 split。Hadoop 为每个 split 创建一个 map 任务,该任务会运行用户自定义的 map 函数。map 任务的执行节点和输入数据的存储节点是同一节点时,Hadoop 的性能达到最佳。
map 的输出:map 任务把输出写入本地硬盘,而不是 HDFS,这是因为 map 的输出只是中间输出,在 reduce 任务完成后作业完成时,map 的输出会被删除。
reduce 的过程:Map 任务结束后进入 Reduce 过程。每个 Reduce 任务包括 组合(shuffle)、排序(sort)和聚合数据(Reduce)三个阶段。MR 框架根据中间结果中的键 key,将多个 mapper 产生的同一个 key 的中间结果通过 HTTP 协议传给处理这个key 的 reducer。在组合和排序阶段,将来自不同 mapper 具有相同 key 值得 <key,value>对合并到一起,最后将通过组合和排序后得到的 <key,(list of value)> 做为输入送到 Reduce 方法中处理,将得到的结果写入由 Hadoop 的分布式文件系统 HDFS 管理的输出文件中。
comniner:为了减少 map 和 reduce 任务之间数据传输量,Hadoop 允许声明一个 combiner。它运行在 map 的输出之上,它的输出作为 reduce 的输入。
假设 1950、1951 年的气象数据都由俩个 mappper 进行处理,它们的输出分别是:
mapper1 的输出:(1950, 0) (1950, 20) (1950, 10) (1951, 25) (1951, 15)
mapper2 的输出:(1950, 25) (1950, 11) (1951, 8)
假设有两个 reducer,分别处理 1950 年和 1951 年的 数据。
reducer1 的输入: (1950, [0, 20, 10, 25, 11]),其输出是 (1950, 25)。
reducer2 的输入: (1951, [25, 15,8]),其输出是 (1951, 25)。
有 combiner 的情况下,
mapper1 的输出:(1950, 20)(1951, 25)
mapper2 的输出:(1950, 25) (1951, 8)
两个reducer 的输入将是 (1950, [20, 25]) 和 (1951, [25, 8]) ,其输出任然是 (1950, 25)和(1951, 25)。
1.2.2 运行过程
组件:
- JobClient : 每一个 job 都会在用户端通过 JobClient 类将应用程序以及配置参数打包成 jar 文件存储到 HDFS 中,并把路径提交给 JobTracker,然后由 JobTracker 创建每个 task,并将它们分发到各个 tasktracker 服务中去执行。
- jobtracker:它是一个 Master 服务,负责调度 job 的每个 task 运行于 tasktracker 之上,并监控它们,如果发现有失败的 task,就重新启动它。一般应将 tasktracker 部署在单独的机器上。它包含一个 job 队列来跟踪和调度所有 job。
- tasktracker:它是运行在多个节点上的 slaver 服务,负责直接执行每个 task。tasktracker 都需要运行在 HDFS 的 datanode 上。
简要流程:
(1)JobTracker 一直在等待 JobClient 通过 RPC 提交作业。
(2) MR 自定义程序通过 JobClient.runJob(job)向 master 节点上的 JobTracker 提交 job。JobTracker 接到请求后将其放入到作业队列中。
(3)TaskTracker 一直通过 RPC 向 JobTracker 发心跳 heartbeat 询问有没有任务可做。如果有,让其派发任务给它执行。
(4)如果 JobTracker 的作业队列不为空,JobTracker 会将任务派发给询问它的 tasktracker。
(5)slave 节点上的 tasktracker 接到任务后在其本地发起 task 来执行任务。
详细过程:
(2)向 jobtracker 请求一个新的作业 ID (通过调用 JobTracker 的 getNewJobID())(步骤2)。它会检查作业的输出说明,比如,如果没有执行输出目录或者它已经存在,作业就不会被提交,并有错误返回给 MapReduce 程序;以及检查作业的输入划分,如果划分无法计算,或者输入路径不存在,作业就不会被提交,并有错误返回给 MapReduce 程序。
(3)将运行作业所需要的资源 - 包括作业 JAR 文件、配置文件和计算所得输入划分 - 复制到一个以作业ID号命名的目录中。(步骤3)
(4)告诉 jobtracker 作业准备执行(通过调用 JobTracker 的 submitJob()方法)(步骤4)
(5)JobTracker 接收到对其 submitJob 方法的调用后,会把此调用放入一个内部队列中,交由作业调度器进行调度,并对其进行初始化。初始化包括创建一个代表该正在运行的作业的对象,它封装任务和记录信息,以便跟踪任务的状态和进程(步骤5)
(6)要创建运行任务列表,作业调度去首先从共享文件系统中获取 JobClient 已计算好的输入划分信息。(步骤6)然后为某个划分创建一个 map 任务。创建的 reduce 任务的数量由 JobConf 的 mapped.reduce.tasks 属性决定。
(7)TaskTracker 执行一个简单的循环,定期发送心跳方法调用 JobTracker。心跳方法告诉 jobtracker,tasktracker 是否还存活,同时也充当两者之间的消息通道。做为心跳方法调用的一部分,tasktracker 会指明它是否已经准备运行新的任务,如果是,jobtracker 会为它分配一个任务,并使用心跳方法的返回值与 tasktracker 进行通信。
(8)现在 tasktracker 已经被分配了任务,下一步是运行任务。首先,它本地化作业的 JAR 文件,将它从共享文件系统复制到 tasktracker 所在的文件系统。同时,将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。然后,为任务新建一个本地工作目录,并把 JAR 文件中的内容解压到这个文件夹下。第三步,新建一个 taskrunner 实例来运行任务。
(9)TaskRunner 启动一个新的 Java 虚拟机。
(10)开始运行任务。新的虚拟机使得用户定义的 map 和 rudue 函数的任何缺陷都不会影响 tasktracker。但是不同任务之间重用 JVM 还是可能的。
jobtracker 收到作业最后一个任务已经完成的通知后,便把作业的状态设置为”成功“。然后,当 JobClient 查询状态时,它将得知任务已经成功完成,所以便显示一条消息告知客户,然后从 runJob 方法返回。
1.3 失败处理
子任务失败:最常见的是map 或者 reduce 任务中的代码抛出运行时异常。如果发生这种情况,子任务 JVM 进程会再退出之前向其 tasktracker 父进程发送错误报告。除了写日志外,tasktracker 会将此次任务尝试标记为 failed,释放一个slot以便运行另一个任务。
tasktracker 失败:如果 tasktracker 停止向 jobtracker 发送心跳,jobtracker 会注意到此停止,它会将该 tasktracker 从等待任务调度的 tasktracker 池中删除。任何进行中的任务都会被重新调度。
jobtracker 失败:这是所有失败中最严重的一种,目前该失败是 HDFS 一个单点故障。
1.4 作业调度
早起版本的 Hadoop 使用一种非常简单的方法来调度用户作业:FIFO。现在,已经有多种调度器可选,比如一种叫做公平调度器(Fair Scheduler)的多用户调度器。
1.5 MR 程序设计过程
(1)<key,value>对
这是MR 编程框架中基本的数据单元,其中 key 实现了 WritableComparable 接口, value 实现了 Writable 接口,这使得框架可对其序列号并可对 key 执行排序。
(2)数据输入
InputFormat、InputSplit、RecordReader 是数据输入的主要编程接口。InputFormat 主要实现的功能是将输入数据分切成多块,每个块是 InputSplit 类型;而 RecordReader 负责将每个 InputSplit 块分解成对个 <key 1, value 1>对传给 Map。
(3)Mapper 阶段
此阶段涉及的主要编程接口有 Mapper,Reducer 和 Partioner。实现 Mapper 接口主要是其 map 方法,该方法主要用来处理输入 <key 1,value 1>对并对其产生输出 <key 2,value 2>对。在 map 处理过 <key 1,value 1>对之后,可实现一个 Combiner 类对 map 的输出进行初步的规约操作,此类实现了 Reducer 接口。而 Partioner 主要根据 map 输出的 <key 2,value 2> 对的值,将其分发给不同的 reduce 任务。
(4)Reducer 阶段
此阶段主要实现 Reduce 接口,主要是实现 reduce 方法。框架将map 输出的中间结果根据相同的 key2组合成 <key 2,list(value 2)>对作为 reduce 方法的输入数据并对其处理,同时产生 <key 3, value 3>对。
(5)数据输出
此阶段主要实现两个编程接口,其中 FileOutputFormat 接口涌来将输出数据输出到文件;RecordWriter 接口负责输出到一个<key,value>对。
2. Hadoop V1 中的 MapReduce 的实现
2.1 Hadoop V1 中 MapReduce 的实现
- NameNode 中记录了文件是如何被拆分成 block 以及这些 block 都存储到了那些DateNode节点。NameNode 同时保存了文件系统运行的状态信息.
- DataNode中存储的是被拆分的blocks.
- Secondary NameNode 帮助 NameNode收集文件系统运行的状态信息.
- JobTracker 当有任务提交到 Hadoop 集群的时候负责 Job 的运行,负责调度多个TaskTracker.
- TaskTracker 负责某一个 map 或者 reduce 任务.
2.2 MapReduce 的资源管理
Hadoop V1 中,MapReduce 除了数据处理外,还兼具资源管理功能。
Hadoop 1.0 资源管理由两部分组成:资源表示模型和资源分配模型,其中,资源表示模型用于描述资源的组织方式,Hadoop 1.0采用“槽位”(slot)组织各节点上的资源,而资源分配模型则决定如何将资源分配给各个作业/任务,在Hadoop中,这一部分由一个插拔式的调度器完成。
Hadoop引入了“slot”概念表示各个节点上的计算资源。为了简化资源管理,Hadoop将各个节点上的资源(CPU、内存和磁盘等)等量切分成若干份,每一份用一个slot表示,同时规定一个task可根据实际需要占用多个slot 。通过引入“slot“这一概念,Hadoop将多维度资源抽象简化成一种资源(即slot),从而大大简化了资源管理问题。
更进一步说,slot相当于任务运行“许可证”,一个任务只有得到该“许可证”后,才能够获得运行的机会,这也意味着,每个节点上的slot数目决定了该节点上的最大允许的任务并发度。为了区分Map Task和Reduce Task所用资源量的差异,slot又被分为Map slot和Reduce slot两种,它们分别只能被Map Task和Reduce Task使用。Hadoop集群管理员可根据各个节点硬件配置和应用特点为它们分配不同的map slot数(由参数mapred.tasktracker.map.tasks.maximum指定)和reduce slot数(由参数mapred.tasktrackerreduce.tasks.maximum指定)。
Hadoop 1.0中的资源管理存在以下几个缺点:
(1)静态资源配置。采用了静态资源设置策略,即每个节点实现配置好可用的slot总数,这些slot数目一旦启动后无法再动态修改。
(2)资源无法共享。Hadoop 1.0将slot分为Map slot和Reduce slot两种,且不允许共享。对于一个作业,刚开始运行时,Map slot资源紧缺而Reduce slot空闲,当Map Task全部运行完成后,Reduce slot紧缺而Map slot空闲。很明显,这种区分slot类别的资源管理方案在一定程度上降低了slot的利用率。
(3) 资源划分粒度过大。这种基于无类别slot的资源划分方法的划分粒度仍过于粗糙,往往会造成节点资源利用率过高或者过低 ,比如,管理员事先规划好一个slot代表2GB内存和1个CPU,如果一个应用程序的任务只需要1GB内存,则会产生“资源碎片”,从而降低集群资源的利用率,同样,如果一个应用程序的任务需要3GB内存,则会隐式地抢占其他任务的资源,从而产生资源抢占现象,可能导致集群利用率过高。
(4) 没引入有效的资源隔离机制。Hadoop 1.0仅采用了基于jvm的资源隔离机制,这种方式仍过于粗糙,很多资源,比如CPU,无法进行隔离,这会造成同一个节点上的任务之间干扰严重。
2.3 MapReduce 架构的局限
1. JobTracker 是 Map-reduce 的集中处理点,存在单点故障。
2. JobTracker 完成了太多的任务,造成了过多的资源消耗,当 map-reduce job 非常多的时候,会造成很大的内存开销,潜在来说,也增加了 JobTracker fail 的风险,这也是业界普遍总结出老 Hadoop 的 Map-Reduce 只能支持 4000 节点主机的上限。
3. 在 TaskTracker 端,以 map/reduce task 的数目作为资源的表示过于简单,没有考虑到 cpu/ 内存的占用情况,如果两个大内存消耗的 task 被调度到了一块,很容易出现 OOM。
4. 在 TaskTracker 端,把资源强制划分为 map task slot 和 reduce task slot, 如果当系统中只有 map task 或者只有 reduce task 的时候,会造成资源的浪费,也就是前面提过的集群资源利用的问题。
5. 源代码层面分析的时候,会发现代码非常的难读,常常因为一个 class 做了太多的事情,代码量达 3000 多行,,造成 class 的任务不清晰,增加 bug 修复和版本维护的难度。
6. 从操作的角度来看,现在的 Hadoop MapReduce 框架在有任何重要的或者不重要的变化 ( 例如 bug 修复,性能提升和特性化 ) 时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的喜好,强制让分布式集群系统的每一个用户端同时更新。这些更新会让用户为了验证他们之前的应用程序是不是适用新的 Hadoop 版本而浪费大量时间。
3. Hadoop 2.0 中的 MapReduce
- 受限的扩展性;
- JobTracker 单点故障;
- 难以支持MR之外的计算;
- 多计算框架各自为战,数据共享困难,比如MR(离线计算框架),Storm实时计算框架,Spark内存计算框架很难部署在同一个集群上,导致数据共享困难等
- 新的资源管理器 YARN 全局管理所有应用程序计算资源的分配,每一个应用的 ApplicationMaster 负责相应的调度和协调。一个应用程序无非是一个单独的传统的 MapReduce 任务或者是一个 DAG( 有向无环图 ) 任务。ResourceManager 和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能对计算进行组织。
- 任务的调度和监控依然由 MapReduce 负责。