[Hadoop源码解读](六)MapReduce篇之MapTask类

时间:2022-12-27 10:26:33

MapTask类继承于Task类,它最主要的方法就是run(),用来执行这个Map任务。

run()首先设置一个TaskReporter并启动,然后调用JobConf的getUseNewAPI()判断是否使用New API,使用New API的设置在前面[Hadoop源码解读](三)MapReduce篇之Job类 讲到过,再调用Task继承来的initialize()方法初始化这个task,接着根据需要执行runJobCleanupTask()、runJobSetupTask()、runTaskCleanupTask()或相应的Mapper,执行Mapper时根据情况使用不同版本的MapReduce,这个版本是设置参数决定的。

   @Override
   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
     throws IOException, ClassNotFoundException, InterruptedException {
     this.umbilical = umbilical;

     // start thread that will handle communication with parent
     TaskReporter reporter = new TaskReporter(getProgress(), umbilical,
         jvmContext);
     reporter.startCommunicationThread();
     boolean useNewApi = job.getUseNewMapper();  //是由JobConf来的,而New API 的JobContext包含一个JobConf,Job类有
     //setUseNewAPI()方法,当Job.submit()时使用它,这样,waitForCompletion()就用submit()设置了使用New API,而此时就使用它。
     initialize(job, getJobID(), reporter, useNewApi);//一个Task的初始化工作,包括jobContext,taskContext,输出路径等,
                                  //使用的是Task.initialize()方法

     // check if it is a cleanupJobTask
     if (jobCleanup) {
       runJobCleanupTask(umbilical, reporter);
       return;
     }
     if (jobSetup) {
       runJobSetupTask(umbilical, reporter);
       return;
     }
     if (taskCleanup) {
       runTaskCleanupTask(umbilical, reporter);
       return;
     }

     if (useNewApi) {//根据情况使用不同的MapReduce版本执行Mapper
       runNewMapper(job, splitMetaInfo, umbilical, reporter);
     } else {
       runOldMapper(job, splitMetaInfo, umbilical, reporter);
     }
     done(umbilical, reporter);
   }

runNewMapper对应new API的MapReduce,而runOldMapper对应旧API。

runNewMapper首先创建TaskAttemptContext对象,Mapper对象,InputFormat对象,InputSplit,RecordReader;然后根据是否有Reduce task来创建不同的输出收集器NewDirectOutputCollector[没有reducer]或NewOutputCollector[有reducer],接下来调用input.initialize()初始化RecordReader,主要是为输入做准备,设置RecordReader,输入路径等等。然后到最主要的部分:mapper.run()。这个方法就是调用前面[Hadoop源码解读](二)MapReduce篇之Mapper类讲到的Mapper.class的run()方法。然后就是一条一条的读取K/V对,这样就衔接起来了。

  @SuppressWarnings("unchecked")
   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
   void runNewMapper(final JobConf job,
                     final TaskSplitIndex splitIndex,
                     final TaskUmbilicalProtocol umbilical,
                     TaskReporter reporter
                     ) throws IOException, ClassNotFoundException,
                              InterruptedException {
     // make a task context so we can get the classes
     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
       new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
     // make a mapper
     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
         ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
     // make the input format
     org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
       (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
     // rebuild the input split
     org.apache.hadoop.mapreduce.InputSplit split = null;
     split = getSplitDetails(new Path(splitIndex.getSplitLocation()),
         splitIndex.getStartOffset());

     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
       new NewTrackingRecordReader<INKEY,INVALUE>
           (split, inputFormat, reporter, job, taskContext);

     job.setBoolean("mapred.skip.on", isSkipping());
     org.apache.hadoop.mapreduce.RecordWriter output = null;
     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
          mapperContext = null;
     try {
       Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
         org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
         (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
                      Configuration.class,
                      org.apache.hadoop.mapreduce.TaskAttemptID.class,
                      org.apache.hadoop.mapreduce.RecordReader.class,
                      org.apache.hadoop.mapreduce.RecordWriter.class,
                      org.apache.hadoop.mapreduce.OutputCommitter.class,  //
                      org.apache.hadoop.mapreduce.StatusReporter.class,
                      org.apache.hadoop.mapreduce.InputSplit.class});

       // get an output object
       if (job.getNumReduceTasks() == 0) {
          output =
            new NewDirectOutputCollector(taskContext, job, umbilical, reporter);
       } else {
         output = new NewOutputCollector(taskContext, job, umbilical, reporter);
       }

       mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
                                                      input, output, committer,
                                                      reporter, split);

       input.initialize(split, mapperContext);
       mapper.run(mapperContext);
       input.close();
       output.close(mapperContext);
     } catch (NoSuchMethodException e) {
       throw new IOException("Can't find Context constructor", e);
     } catch (InstantiationException e) {
       throw new IOException("Can't create Context", e);
     } catch (InvocationTargetException e) {
       throw new IOException("Can't invoke Context constructor", e);
     } catch (IllegalAccessException e) {
       throw new IOException("Can't invoke Context constructor", e);
     }
   }

至于运行哪个Mapper类,一般是我们用job.setMapperClass(SelectGradeMapper.class)设置的,那设置后是怎样获取的,或者默认值是什么,且看下面的追溯。

MapTask.runNewMapper()

=>       (TaskAttemptContext)taskContext.getMapperClass();     //runNewMapper生成mapper时用到。

=>       JobContext.getMapperClass()

=>       JobConf.getClass(MAP_CLASS_ATTR,Mapper.class)

=>       Configuration.getClass(name,default)

根据上面一层的调用关系,找到了默认值是Mapper.class,它的获取过程也一目了然。

再仔细看看Configuration.getClass()

   public Class<?> getClass(String name, Class<?> defaultValue) {
     String valueString = get(name);
     if (valueString == null)
       return defaultValue;
     try {
       return getClassByName(valueString);
     } catch (ClassNotFoundException e) {
       throw new RuntimeException(e);
     }
   }

它首先看是否设置了某个属性,如果设置了,就调用getClassByName获取这个属性对应的类[加载之],否则就返回默认值。
Mapper执行完后,关闭RecordReader和OutputCollector等资源就完事了。

另外我们把关注点放在上面的runNewMapper()中的mapper.run(mapperContext);前面对Mapper.class提到,这个mapperContext会被用于读取输入分片的K/V对和写出输出结果的K/V对。而由

      mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
                                                     input, output, committer,
                                                     reporter, split);

可以看出,这个Context是由我们设置的mapper,RecordReader等进行配置的。

Mapper中的map方法不断使用context.write(K,V)进行输出,我们看这个函数是怎么进行的,先看Context类的层次关系:

[Hadoop源码解读](六)MapReduce篇之MapTask类

write()方法是由TaskInputOutputContext来的:

  public void write(KEYOUT key, VALUEOUT value
                    ) throws IOException, InterruptedException {
    output.write(key, value);
  }

它调用了RecordWriter.write(),RecordWriter是一个抽象类,主要是规定了write方法。

public abstract class RecordWriter<K, V> {
  public abstract void write(K key, V value
                             ) throws IOException, InterruptedException;

  public abstract void close(TaskAttemptContext context
                             ) throws IOException, InterruptedException;
}

然后看RecordWriter的一个实现NewOutputCollector,它是MapTask的内部类:

   private class NewOutputCollector<K,V>
     extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
     private final MapOutputCollector<K,V> collector;
     private final org.apache.hadoop.mapreduce.Partitioner<K,V> partitioner;
     private final int partitions;

     @SuppressWarnings("unchecked")
     NewOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
                        JobConf job,
                        TaskUmbilicalProtocol umbilical,
                        TaskReporter reporter
                        ) throws IOException, ClassNotFoundException {
       collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
       partitions = jobContext.getNumReduceTasks();
       if (partitions > 0) {
         partitioner = (org.apache.hadoop.mapreduce.Partitioner<K,V>)
           ReflectionUtils.newInstance(jobContext.getPartitionerClass(), job);
       } else {
         partitioner = new org.apache.hadoop.mapreduce.Partitioner<K,V>() {
           @Override
           public int getPartition(K key, V value, int numPartitions) {
             return -1;
           }
         };
       }
     }

     @Override
     public void write(K key, V value) throws IOException, InterruptedException {
       collector.collect(key, value,
                         partitioner.getPartition(key, value, partitions));
     }

     @Override
     public void close(TaskAttemptContext context
                       ) throws IOException,InterruptedException {
       try {
         collector.flush();
       } catch (ClassNotFoundException cnf) {
         throw new IOException("can't find class ", cnf);
       }
       collector.close();
     }
   }

从它的write()方法,我们从context.write(K,V)追溯到了collector.collect(K,V,partition),注意到输出需要一个Partitioner的getPartitioner()来提供当前K/V对的所属分区,因为要对K/V对分区,不同分区输出到不同Reducer,Partitioner默认是HashPartitioner,可设置,Reduce task数量决定Partition数量;

我们可以从NewOutputCollector看出NewOutputCollector就是MapOutputBuffer的封装。MapoutputBuffer是旧API中就存在了的,它很复杂,但很关键,暂且放着先,反正就是收集输出K/V对的。它实现了MapperOutputCollector接口:

  interface MapOutputCollector<K, V> {
    public void collect(K key, V value, int partition
                        ) throws IOException, InterruptedException;
    public void close() throws IOException, InterruptedException;
    public void flush() throws IOException, InterruptedException,
                               ClassNotFoundException;
  }

这个接口告诉我们,收集器必须实现collect,close,flush方法。

看一个简单的:NewDirectOutputCollector,它在没有reduce task的时候使用,主要是从InputFormat中获取OutputFormat的RecordWriter,然后就可以用这个RecordWriter的write()方法来写出,这就与我们设置的输出格式对应起来了。

   private class NewDirectOutputCollector<K,V>
   extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
     private final org.apache.hadoop.mapreduce.RecordWriter out;

     private final TaskReporter reporter;

     private final Counters.Counter mapOutputRecordCounter;
     private final Counters.Counter fileOutputByteCounter;
     private final Statistics fsStats;

     @SuppressWarnings("unchecked")
     NewDirectOutputCollector(org.apache.hadoop.mapreduce.JobContext jobContext,
         JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter)
     throws IOException, ClassNotFoundException, InterruptedException {
       this.reporter = reporter;
       Statistics matchedStats = null;
       if (outputFormat instanceof org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
         //outputFormat是Task来的,内部类访问外部类成员变量
         matchedStats = getFsStatistics(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
             .getOutputPath(jobContext), job);
       }
       fsStats = matchedStats;
       mapOutputRecordCounter =
         reporter.getCounter(MAP_OUTPUT_RECORDS);
       fileOutputByteCounter = reporter
           .getCounter(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.Counter.BYTES_WRITTEN);

       long bytesOutPrev = getOutputBytes(fsStats);
       out = outputFormat.getRecordWriter(taskContext); //主要是这句,获取设置的OutputputFormat里的RecordWriter
       long bytesOutCurr = getOutputBytes(fsStats);
       fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }

     @Override
     @SuppressWarnings("unchecked")
     public void write(K key, V value)
     throws IOException, InterruptedException {
       reporter.progress();  //报告一下进度
       long bytesOutPrev = getOutputBytes(fsStats);
       out.write(key, value);//使用out收集一条记录,out是设置的OutputFormat来的。
       long bytesOutCurr = getOutputBytes(fsStats);
       fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);  //更新输出字节数
       mapOutputRecordCounter.increment(1);      //更新输出K/V对数量
     }

     @Override
     public void close(TaskAttemptContext context)
     throws IOException,InterruptedException {
       reporter.progress();
       if (out != null) {
         long bytesOutPrev = getOutputBytes(fsStats);
         out.close(context);
         long bytesOutCurr = getOutputBytes(fsStats);
         fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
       }
     }

     private long getOutputBytes(Statistics stats) {
       return stats == null ? 0 : stats.getBytesWritten();
     }
   }

另外还有一些以runOldMapper()为主导的旧MapReduce API那套,就不进行讨论了。

from:  http://blog.csdn.net/posa88/article/details/7956767

[Hadoop源码解读](六)MapReduce篇之MapTask类的更多相关文章

  1. Hadoop源码解读系列目录

    Hadoop源码解读系列 1.hadoop源码|common模块-configuration详解2.hadoop源码|core模块-序列化与压缩详解3.hadoop源码|core模块-远程调用与NIO ...

  2. Hadoop2源码分析-MapReduce篇

    1.概述 前面我们已经对Hadoop有了一个初步认识,接下来我们开始学习Hadoop的一些核心的功能,其中包含mapreduce,fs,hdfs,ipc,io,yarn,今天为大家分享的是mapred ...

  3. &lbrack;Hadoop源码解读&rsqb;(一)MapReduce篇之InputFormat

    平时我们写MapReduce程序的时候,在设置输入格式的时候,总会调用形如job.setInputFormatClass(KeyValueTextInputFormat.class);来保证输入文件按 ...

  4. &lbrack;Hadoop源码解读&rsqb;(五)MapReduce篇之Writable相关类

    前面讲了InputFormat,就顺便讲一下Writable的东西吧,本来应当是放在HDFS中的. 当要在进程间传递对象或持久化对象的时候,就需要序列化对象成字节流,反之当要将接收到或从磁盘读取的字节 ...

  5. spring beans源码解读之--总结篇

    spring beans下面有如下源文件包: org.springframework.beans, 包含了操作java bean的接口和类.org.springframework.beans.anno ...

  6. Vue&period;js 源码分析&lpar;六&rpar; 基础篇 计算属性 computed 属性详解

    模板内的表达式非常便利,但是设计它们的初衷是用于简单运算的.在模板中放入太多的逻辑会让模板过重且难以维护,比如: <div id="example">{{ messag ...

  7. &lbrack;Hadoop源码解读&rsqb;(二)MapReduce篇之Mapper类

    前面在讲InputFormat的时候,讲到了Mapper类是如何利用RecordReader来读取InputSplit中的K-V对的. 这一篇里,开始对Mapper.class的子类进行解读. 先回忆 ...

  8. &lbrack;Hadoop源码解读&rsqb;(三)MapReduce篇之Job类

    下面,我们只涉及MapReduce 1,而不涉及YARN. 当我们在写MapReduce程序的时候,通常,在main函数里,我们会像下面这样做.建立一个Job对象,设置它的JobName,然后配置输入 ...

  9. &lbrack;Hadoop源码解读&rsqb;(四)MapReduce篇之Counter相关类

    当我们定义一个Counter时,我们首先要定义一枚举类型: public static enum MY_COUNTER{ CORRUPTED_DATA_COUNTER, NORMAL_DATA_COU ...

随机推荐

  1. C语言 经典编程100题

    一.题目 [程序1] 题目:有1.2.3.4个数字,能组成多少个互不相同且无重复数字的三位数?都是多少? =============================================== ...

  2. 未找到与约束ContractName Microsoft&period;VisualStudio&period;Text&period;ITextDocumentFactoryService&period;&period;&period; 匹配的导出 VS2012报错

    刚安装完VS2012,打开VS2012新建项目,但是并没有像之前那样顺利的创建页面,而是弹出了一个错误窗口. 我的系统是win7旗舰版 64位 ,同时安装了VS2010和VS2012.然后我又试了一下 ...

  3. Mybatis出现:无效的列类型&colon; 1111 错误

    在使用Mybatis时,不同的xml配置文件,有的会提示:无效的列类型: 1111 比如这个sql: update base.sys_person t set t.rybh=#{rybh},t.xm= ...

  4. bzoj4216 Pig

    水题,题目难点大概就是空间限制上了,开longlong会爆,可以开个int数组求前缀和,然后一旦绝对值超过20亿,则将其取模,并记录下当前位置,这样询问时就可以二分这些超过的位置,将其乘以20亿后加上 ...

  5. Linux基础命令---pgrep

    pgrep pgrep指令可以按名字或者其他属性搜索指定的进程,显示出进程的id到标准输出. 此命令的适用范围:RedHat.RHEL.Ubuntu.CentOS.SUSE.openSUSE.Fedo ...

  6. 3D 网页,webgl ,threejs 实例

    http://learningthreejs.com/blog/2013/04/30/closing-the-gap-between-html-and-webgl/ http://adndevblog ...

  7. 一步步Cobol 400 上手自学入门教程03 - 数据部

    数据部的作用 程序中涉及到的全部数据(输入.输出.中间)都要在此定义,对它们的属性进行说明.主要描述以下属性: 数据类型(数值/字符)和存储形式(长度) 数据项之间的关系(层次和层号) 文件与记录的关 ...

  8. SpringCloud之搭建配置中心

    一.搭建config-server 1.引入pom <dependencies> <dependency> <groupId>org.springframework ...

  9. kindeditor支持flv视频播放方法

    打开plugins\media下面的media.js,打开,找到下面的代码: var html = K.mediaImg(self.themesPath + ‘common/blank.gif’, { ...

  10. Python学习---内置函数的学习

    内置函数 [Py3.5官方文档]https://docs.python.org/3.5/library/functions.html#abs Built-in Functions abs() dict ...