mapreduce: 揭秘InputFormat--掌控Map Reduce任务执行的利器

时间:2022-10-01 13:28:27

随着越来越多的公司采用Hadoop,它所处理的问题类型也变得愈发多元化。随着Hadoop适用场景数量的不断膨胀,控制好怎样执行以及何处执行map任务显得至关重要。实现这种控制的方法之一就是自定义InputFormat实现。

InputFormat 类是Hadoop Map Reduce框架中的基础类之一。该类主要用来定义两件事情:

  • 数据分割(Data splits)
  • 记录读取器(Record reader)

数据分割 是Hadoop Map Reduce框架中的基础概念之一,它定义了单个Map任务的大小及其可能的执行服务器信息。记录读取器 主要负责从输入文件实际读取数据并将它们(以键值对的形式)提交给mapper。尽管有不少文章介绍过怎样实现自定义的记录读取器(例如,参考文章[1]),但是关于如何进行分割(split)的介绍却相当粗略。这里我们将会解释什么是分割,并介绍怎样实现自定义分割来完成特定任务。

剖析分割

任何分割操作的实现都继承自Apache抽象基类——InputSplit,它定义了分割的长度及位置。分割长度 是指分割数据的大小(以字节为单位),而分割位置 是分割所在的机器结点名称组成的列表,其中待分割的数据都会于本地存在。分割位置可以方便调度器决定在哪个机器上执行此次分割。简化后的[1]作业跟踪器(job tracker)工作流程如下:

  • 接受来自某个任务跟踪器(task tracker)的心跳通信,得到该位置map的可用情况。
  • 为队列等候中的分割任务找到可用的“本地”结点。
  • 向任务跟踪器提交分割请求以待执行。

数据局部性(Locality)的相关程度因存储机制和整体的执行策略的不同而不同。例如,在Hadoop分布式文件系统(HDFS)中,分割通常对应一个物理数据块大小以及该数据块物理定位所在的一系列机器(其中机器总数由复制因子定义)的位置。这就是FileInputFormat 计算分割的过程。

而HBase的实现则采用了另外一套方法。在HBase中,分割 对应于一系列属于某个表区域(table region)的表键(table keys),而位置则为正在运行区域服务器的机器。

计算密集型应用

Map Reduce应用中有一类特殊的应用叫做计算密集型应用(Compute-Intensive application)。这类应用的特点在于Mapper.map()函数执行的时间要远远长于数据访问的时间,且至少要差一个数量级。从技术角度来说,虽然这类应用仍然可以使用“标准”输入格式的实现,但是它会带来数据存放结点过少而集群内剩余结点没能充分利用的问题(见图1)。

(点击图片进行放大)

mapreduce: 揭秘InputFormat--掌控Map Reduce任务执行的利器

图1:数据局部性情况下的结点使用图

图1中显示了针对计算密集型应用,使用“标准”数据局部性导致的结点使用率上的巨大差异——有些结点(红色标注)被过度使用,而其他结点(黄色和浅绿色标注)则使用不足。由此可见,在针对计算密集型应用时,需要重新思考对“局部性”概念的认识。在这种情况下,“局部性”意味着所有可用结点之间map任务的均匀分布——即最大化地使用集群机器的计算能力。

使用自定义InputFormat改变“局部性”

假定源数据以文件序列的形式存在,那么一个简单的ComputeIntensiveSequenceFileInputFormat 类(见清单1)便可以实现将分割生成的结果均匀地分布在集群中的所有服务器上。

 package com.navteq.fr.mapReduce.InputFormat;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.StringTokenizer;

 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.ClusterStatus;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;

 public class ComputeIntensiveSequenceFileInputFormat<K, V> extends SequenceFileInputFormat<K, V> {

 private static final double SPLIT_SLOP = 1.1; // 10% slop
 static final String NUM_INPUT_FILES = "mapreduce.input.num.files";

 /**
 * Generate the list of files and make them into FileSplits.
 */
 @Override
 public List<InputSplit> getSplits(JobContext job) throws IOException {

 long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
 long maxSize = getMaxSplitSize(job);

 // get servers in the cluster
 String[] servers = getActiveServersList(job);
 if(servers == null)
 return null;
 // generate splits
 List<InputSplit> splits = new ArrayList<InputSplit>();
 List<FileStatus>files = listStatus(job);
 int currentServer = 0;
 for (FileStatus file: files) {
 Path path = file.getPath();
 long length = file.getLen();
 if ((length != 0) && isSplitable(job, path)) {
 long blockSize = file.getBlockSize();
 long splitSize = computeSplitSize(blockSize, minSize, maxSize);

 long bytesRemaining = length;
 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
 splits.add(new FileSplit(path, length-bytesRemaining, splitSize,
 new String[] {servers[currentServer]}));
 currentServer = getNextServer(currentServer, servers.length);
 bytesRemaining -= splitSize;
 }

 if (bytesRemaining != 0) {
 splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining,
 new String[] {servers[currentServer]}));
 currentServer = getNextServer(currentServer, servers.length);
 }
 } else if (length != 0) {
 splits.add(new FileSplit(path, 0, length,
 new String[] {servers[currentServer]}));
 currentServer = getNextServer(currentServer, servers.length);
 } else {
 //Create empty hosts array for zero length files
 splits.add(new FileSplit(path, 0, length, new String[0]));
 }
 }

 // Save the number of input files in the job-conf
 job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());

 return splits;
 }

 private String[] getActiveServersList(JobContext context){

 String [] servers = null;
 try {
 JobClient jc = new JobClient((JobConf)context.getConfiguration());
 ClusterStatus status = jc.getClusterStatus(true);
 Collection<String> atc = status.getActiveTrackerNames();
 servers = new String[atc.size()];
 int s = 0;
 for(String serverInfo : atc){
 StringTokenizer st = new StringTokenizer(serverInfo, ":");
 String trackerName = st.nextToken();
 StringTokenizer st1 = new StringTokenizer(trackerName, "_");
 st1.nextToken();
 servers[s++] = st1.nextToken();
 }
 }catch (IOException e) {
 e.printStackTrace();
 }

 return servers;

 }

 private static int getNextServer(int current, int max){

 current++;
 if(current >= max)
 current = 0;
 return current;
 }
 }

清单1:ComputeIntensiveSequenceFileInputFormat类

该类继承自 SequenceFileInputFormat 并重写了getSplits()方法,虽然计算分割的过程与FileInputFormat完全一样,但是它为分割指定了“局部性”,以便从集群中找出可用的服务器。getSplits()利用到了以下两个方法:

  • getActiveServersList() 方法,返回集群中当前可用的服务器(名称)组成的数组。
  • getNextServer() 方法,返回服务器数组中下一个服务器索引,且当服务器数组中元素全部用尽时,返回数组头部重新开始。

虽然上述实现(见清单1)将map任务的执行均匀地分布在了集群中的所有服务器上,但是它却完全忽略了数据的实际位置。稍微好点的getSplits方法实现(见清单2)可以试图将两种策略结合在一起:既尽量多地放置针对数据的本地作业,且保持剩余作业在集群上的良好平衡。[2]

 public List<InputSplit> getSplits(JobContext job) throws IOException {
 // get splits
 List<InputSplit> originalSplits = super.getSplits(job);

 // Get active servers
 String[] servers = getActiveServersList(job);
 if(servers == null)
 return null;
 // reassign splits to active servers
 List<InputSplit> splits = new ArrayList<InputSplit>(originalSplits.size());
 int numSplits = originalSplits.size();
 int currentServer = 0;
 for(int i = 0; i < numSplits; i++, currentServer = i>getNextServer(currentServer,
 servers.length)){
 String server = servers[currentServer]; // Current server
 boolean replaced = false;
 // For every remaining split
 for(InputSplit split : originalSplits){
 FileSplit fs = (FileSplit)split;
 // For every split location
 for(String l : fs.getLocations()){
 // If this split is local to the server
 if(l.equals(server)){
 // Fix split location
 splits.add(new FileSplit(fs.getPath(), fs.getStart(),
 fs.getLength(), new String[] {server}));
 originalSplits.remove(split);
 replaced = true;
 break;
 }
 }
 if(replaced)
 break;
 }
 // If no local splits are found for this server
 if(!replaced){
 // Assign first available split to it
 FileSplit fs = (FileSplit)splits.get(0);
 splits.add(new FileSplit(fs.getPath(), fs.getStart(), fs.getLength(),
 new String[] {server}));
 originalSplits.remove(0);
 }
 }
 return splits;
 }

清单2:优化过的getSplits方法

在此实现中,我们首先使用父类(FileInputSplit)来得到包含位置计算在内的分割以确保数据局部性。然后我们计算出可用的服务器列表,并为每一个存在的服务器尝试分配与其同处本地的分割。

延迟公平调度

虽然清单1和清单2中的代码都正确地计算出了分割位置,但当我们试图在Hadoop集群上运行代码时,就会发现结果与服务器之间产生均匀分布相去甚远。参考文章[2]中很好的描述了我们观察到的这个问题,并为该问题描述了一种解决方案——即延迟公平调度。

假设已经设置好了公平调度程序,那么下面的程序段应当加入到mapred-site.xml文件中以激活某个延迟调度程序[3]:

<property>

        <name>mapred.fairscheduler.locality.delay</name>
        <value>360000000</value>
<property>

适当借助延迟公平调度程序,作业执行将可以利用整个集群(见图2)。此外,根据我们的实验,这种情况下的执行时间相比“数据局部性”的做法要节省约30%。

(点击图片进行放大)

mapreduce: 揭秘InputFormat--掌控Map Reduce任务执行的利器

图2:执行局部性情况下的结点使用图

其他注意事项

用于测试的计算作业共使用了96个split和mapper任务。测试集群拥有19个数据结点,其中每个数据结点拥有8个mapper槽,因此该集群共有152个可用槽。当该作业运行时,它并没有充分利用集群中的所有槽。

Ganglia的两份截图都展示了我们所使用的测试集群,其中前三个结点为控制结点,而第四个结点为边缘结点,主要用来启动作业。图中展示了*处理器/机器的负载情况。在图1中,有一些结点被过度使用(红色显示),而集群中的其他结点则未得到充分利用。在图2中,虽然我们得到了更加平衡的分布,然而集群仍然未被充分利用。用于测试的作业也可以运行多线程,这么做会增加*处理器的负载,但同时也会降低在每次Mapper.map()迭代上的整体时间花费。正如图3所示,通过增加线程数量,我们可以更好地利用集群资源,并进一步减少完成作业所花费的时间。通过改变作业区域性,我们可以在不牺牲性能的情况下更好地利用群集处理远程作业数据。

(点击图片进行放大)

mapreduce: 揭秘InputFormat--掌控Map Reduce任务执行的利器

图3:使用多线程Map作业的执行区域性情况下的结点使用图

即使机器*处理器处于高负荷状态,它仍然可以允许其他磁盘I/O密集型作业运行在开放槽中,要注意的是,这么做会带来些许的性能下降。

自定义分割

本文中提到的方法对大文件非常适用,但是对于小文件而言,并没有足够的分割来让其使用集群中的多台机器。一种可行的方法是使用更小的分割块,但是这么做会给集群命名结点带来更多的负担(内存需求方面)。一种更好的做法是修改清单1中的代码,以使用自定义的块大小(而不是文件块大小)。这种方法可以计算出所需的分割块数量,而不用理会实际的文件大小。

总结

在这篇文章中,我们已经展示了如何利用自定义的InputFormats来更紧密地控制Map Reduce中的map任务在可用服务器间的分布。这种控制对于一类特殊应用——计算密集型应用非常重要,控制过程将Hadoop Map Reduce做为通用的并行执行框架使用。

关于作者

Boris Lublinsky是NAVTEQ公司的首席架构师,在这家公司中他的工作是为大型数据管理、处理以及SOA定义架构愿景,并且实施各种NAVTEQ的项目。他还是InfoQ的SOA编辑,OASIS的SOA RA工作组的参与者。Boris是一位作者并经常发表演讲,他最新的一本书叫做《Applied SOA》。

Michael Segel在过去二十多年里一直不断与客户合作,帮助他们发现并解决业务上的问题。Michael做过许多不同类型的工作,也在不同的行业圈摸打滚爬过。他是一位独立顾问,并且总是期望能够解决所有具有挑战性的问题。Michael拥有俄亥俄州立大学的软件工程学位。

参考

1. Boris Lublinsky, Mike Segel. Custom XML Records Reader.

2. Matei Zaharia, Dhruba Borthakur, Joydeep Sen Sarma, Khaled Elmeleegy, Scott Shenker, Ion Stoica. Delay Scheduling: A Simple Technique for Achieving Locality and Fairness in Cluster Scheduling.


[1] 这是一个简化后的解释。真实的调度算法要复杂得多;考虑更多的参数而不仅仅是分割的位置。

[2] 虽然我们将其列作一个选项,但是如果花费在Mapper.map()方法上的时间高与远程访问数据时间的一个或多个数量级时,清单1中的代码将不会有任何性能上的提升。但尽管如此,它也许会带来网络利用率上的略微提高。

[3] 请注意这里的延迟以毫秒为单位,在改变该值之后需要重新启动作业跟踪器。

查看英文原文:Uncovering mysteries of InputFormat: Providing better control for your Map Reduce execution.

mapreduce: 揭秘InputFormat--掌控Map Reduce任务执行的利器的更多相关文章

  1. Hadoop 少量map&sol;reduce任务执行慢问题

    最近在做报表统计,跑hadoop任务. 之前也跑过map/reduce但是数据量不大,遇到某些map/reduce执行时间特别长的问题. 执行时间长有几种可能性: 1. 单个map/reduce任务处 ...

  2. hadoop学习WordCount&plus;Block&plus;Split&plus;Shuffle&plus;Map&plus;Reduce技术详解

    转自:http://blog.csdn.net/yczws1/article/details/21899007 纯干货:通过WourdCount程序示例:详细讲解MapReduce之Block+Spl ...

  3. Map&sol;Reduce的类体系架构

    Map/Reduce的类体系架构 Map/Reduce案例解析: 先以简单的WordCount例程, 来讲解如何去描述Map/Reduce任务. public static void main(Str ...

  4. Map&sol;Reduce 工作机制分析 --- 作业的执行流程

    前言 从运行我们的 Map/Reduce 程序,到结果的提交,Hadoop 平台其实做了很多事情. 那么 Hadoop 平台到底做了什么事情,让 Map/Reduce 程序可以如此 "轻易& ...

  5. 第九篇:Map&sol;Reduce 工作机制分析 - 作业的执行流程

    前言 从运行我们的 Map/Reduce 程序,到结果的提交,Hadoop 平台其实做了很多事情. 那么 Hadoop 平台到底做了什么事情,让 Map/Reduce 程序可以如此 "轻易& ...

  6. MapReduce剖析笔记之五:Map与Reduce任务分配过程

    在上一节分析了TaskTracker和JobTracker之间通过周期的心跳消息获取任务分配结果的过程.中间留了一个问题,就是任务到底是怎么分配的.任务的分配自然是由JobTracker做出来的,具体 ...

  7. MapReduce剖析笔记之三:Job的Map&sol;Reduce Task初始化

    上一节分析了Job由JobClient提交到JobTracker的流程,利用RPC机制,JobTracker接收到Job ID和Job所在HDFS的目录,够早了JobInProgress对象,丢入队列 ...

  8. MapReduce启动的Map&sol;Reduce子任务简要分析

      对于Hadoop来说,是通过在DataNode中启动Map/Reduce java进程的方式来实现分布式计算处理的,那么就从源码层简要分析一下hadoop中启动Map/Reduce任务的过程.   ...

  9. Map&sol;Reduce个人实战--生成数据测试集

    背景: 在大数据领域, 由于各方面的原因. 有时需要自己来生成测试数据集, 由于测试数据集较大, 因此采用Map/Reduce的方式去生成. 在这小编(mumuxinfei)结合自身的一些实战经历, ...

随机推荐

  1. shell-引号

    shell中的 ``(反引号) ''(单引号) ""(双引号) 反引号里面的内容赋给变量的时候 会以执行命令的方式给例如: str=`cat 1.txt` echo $str 就会 ...

  2. js控制页面显示和表单提交

    早期的web页面在显示方面一般在后台进行控制,虽然对后台开发来讲是比较容易做到的,但是涉及到一个问题,那就是数据库压力. 因为要控制显示,所以会比较频繁的从数据库中来回调用. 现在的js功能越来越强, ...

  3. HDU3348(贪心求硬币数

    ;} coins Time Limit: 2000/1000 MS (Java/Others)    Memory Limit: 32768/32768 K (Java/Others)Total Su ...

  4. Android 签名&lpar;5&rpar;用命令签名和用IDE签名

    1,用命令签名 无论用哪个 IDE 开发,最终只是用了 keytool 和 jarsigner 这两个 Java 工具来完成签名任务(在 jdk 的 bin 目录下).其中 keytool 用来生成 ...

  5. Object-c学习之路(oc点语法)

    最近想学习object-c了自己上网找了一些资料自学了一下:oc中的点语法是为了java.c等的程序员更好的上手而添加的功能. 主函数 // // main.m // OcTest1 // // Cr ...

  6. 聊聊VUE中的nextTick

    在谈nextTick之前,先要说明一件事,可能在我们平时使用vue时并没有关注到,事实上,vue执行的DOM更新是异步的. 举个栗子: <template> <div class=& ...

  7. Android笔记(五)利用Intent启动活动

    Intent是意图的意思,分为显式 Intent 和隐式 Intent. 以下我们试图在FirstActivity中通过点击button来启动SecondActivity 1.显式Intent 在应用 ...

  8. pp 总结二

    1. return false   ES6函数的扩展:箭头函数  数组 arr.map()   arr.filter() <!DOCTYPE html> <html lang=&qu ...

  9. nodejs 访问网站并操作xpath

    var xpath = require('xpath'); //引用xpath包 var dom = require('xmldom-silent').DOMParser;//引用xmldom包 va ...

  10. 20172330 2017-2018-1 《Java程序设计》第五周学习总结

    20172330 2017-2018-1 <Java程序设计>第五周学习总结 教材学习内容总结 第五章 首先是对各种各种运算符的了解:刚开始以为相等就是=,还有其他一些符号都挺简单的,然后 ...