hadoop之计数器和管道的mrunit测试

时间:2024-07-21 16:35:56

引言

hadoop的调试真心让人灰常恼火,而且从企业实际出发,集群的资源是有限的,不可能在集群上跑一遍又一遍根据log去调试代码,那么使用MRUnit编写测试单元,显得尤为重要。MRUnit中的MapReduce Driver可以测试一组Map/Reduce或者Combiner。 PipelineMapReduceDriver可以测试Map/Reduce作业工作流。目前,MRUnit还没有Partitioner对应的驱动。 MRUnit使开发人员在面对Hadoop特殊的架构的时候也能进行TDD和轻量级的单元测试。这篇文章重点使用MRUnit对MR程序进行测试,重点分为“计数器”和“pipeline”两个部分。

对于MR程序,测试重点有以下几个方面:

1. 对于特定的输入,输出结果是否是正确有序的。

2. 带有计数器的程序,计数器数值是否符合预期。

3. 组合式,迭代式或者混合式的任务,以及相互依赖的job如何测试。

计数器

计数器是MR程序获得各种各样统计量的一种机制,对于一个MR程序的job输出,有以下的几种计数器:

  1. Map输入的记录数
  2. Map输出的记录数
  3. Reduce输出的记录数
  4. 开始的Map任务数
  5. 失败的Map任务数
  6. 开始的Reduce任务数
  7. 失败的Reduce任务数

除了自带的计数器外,用户可以自己定义计数器,主要有两种方式,如下所示:

  • 事先定义java的enum:
    
    
    //定义Counter,并增加个数;MRUnit中使用MapDriver,ReduceDriver和MapReduceDriver来访问定义的Counters
    public enum MyCounters{
    ALL_RECORDS,ONE_WORD_lINE
    }
    ....
    context.getCounter(MyCounters.ONE_WORD_lINE).increment(1);
    private MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> driver;
    private MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
    private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
    driver.getCounters().findCounter(Enum.counterName);
    mapDriver.getCounters().findCounter(Enum.counterName);
    reduceDriver.getCounters().findCounter(Enum.counterName);
  • 动态生成计数器,制定“群名”和“计数器名”:
    context.getCounter("Group_counter", "Counter_name").increment(1);
    ....
    private MapReduceDriver<LongWritable, Text, Text, IntWritable, Text, IntWritable> driver;
    private MapDriver<LongWritable, Text, Text, IntWritable> mapDriver;
    private ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
    driver.getCounters().findCounter("groupName","counterName");
    mapDriver.getCounters().findCounter("groupName","counterName");
    reduceDriver.getCounters().findCounter("groupName","counterName");

MRUnit中可以针对上述两种方式定义的Counters来访问计数器,因此通过观察自定义的计数器来检测程序关键部分是否符合预期是检测的一个手段。在实际运用中存在下述两种方式去使用:

  • assertEquals,调用junit.Assert.assertEquals下断言来确定获取的计数器个数与预期的是否相同,下面是通过观察reduce阶段的相关计数器是否和2相等,如果不相等则抛出“Expected 2 counter increment”的异常。
    assertEquals("Expected 2 counter increment", 2, reduceDriver.getCounters().findCounter(Enum.counterName).getValue());
    assertEquals("Expected 2 counter increment", 2, reduceDriver.getCounters().findCounter("groupName","counterName").getValue());
  • withCounter,使用TestDriver中的withCounter来进行测试,MapDriver,ReduceDriver和MapReduceDriver的父类都继承了TestDriver了类,源码如下:
     public T withCounter(final Enum<?> e,
    final long expectedValue) {
    expectedEnumCounters.add(new Pair<Enum<?>, Long>(e, expectedValue));
    return thisAsTestDriver();
    }
    public T withCounter(final String group,
    final String name, final long expectedValue) {
    expectedStringCounters.add(new Pair<Pair<String, String>, Long>(
    new Pair<String, String>(group, name), expectedValue));
    return thisAsTestDriver();
    }

 多个MapReduce过程组合

实际开发中,要解决的问题往往很复杂,仅仅依靠一个Map和一个Reduce往往解决不了问题,这个时候就需要将多个MR程序组合起来,那么组合的方式有如下的三种:

  1. 迭代式

    Hadoop的一些复杂的任务难以用一次mapreduce处理完成,需要多次mapreduce才能完成任务,例如Pagrank,Kmeans算法都需要多次的迭代,关于mapreduce迭代在mahout中运用较多。在map/reduce迭代过程中,思想还是比较简单,就像类似for循环一样,前一个mapreduce的输出结果,作为下一个mapreduce的输入,任务完成后中间结果都可以删除。下面是mahout中kmeans的关键代码:

    /**
    * Iterate over data using a prior-trained ClusterClassifier, for a number of iterations using a mapreduce
    * implementation
    *
    * @param conf
    * the Configuration
    * @param inPath
    * a Path to input VectorWritables
    * @param priorPath
    * a Path to the prior classifier
    * @param outPath
    * a Path of output directory
    * @param numIterations
    * the int number of iterations to perform
    */
    public static void iterateMR(Configuration conf, Path inPath, Path priorPath, Path outPath, int numIterations)
    throws IOException, InterruptedException, ClassNotFoundException {
    ClusteringPolicy policy = ClusterClassifier.readPolicy(priorPath);
    Path clustersOut = null;
    int iteration = 1;
    while (iteration <= numIterations) {
    conf.set(PRIOR_PATH_KEY, priorPath.toString()); String jobName = "Cluster Iterator running iteration " + iteration + " over priorPath: " + priorPath;
    Job job = new Job(conf, jobName);
    job.setMapOutputKeyClass(IntWritable.class);
    job.setMapOutputValueClass(ClusterWritable.class);
    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(ClusterWritable.class); job.setInputFormatClass(SequenceFileInputFormat.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setMapperClass(CIMapper.class);
    job.setReducerClass(CIReducer.class); FileInputFormat.addInputPath(job, inPath);
    clustersOut = new Path(outPath, Cluster.CLUSTERS_DIR + iteration);
    priorPath = clustersOut;
    FileOutputFormat.setOutputPath(job, clustersOut); job.setJarByClass(ClusterIterator.class);
    if (!job.waitForCompletion(true)) {
    throw new InterruptedException("Cluster Iteration " + iteration + " failed processing " + priorPath);
    }
    ClusterClassifier.writePolicy(policy, clustersOut);
    FileSystem fs = FileSystem.get(outPath.toUri(), conf);
    iteration++;
    if (isConverged(clustersOut, conf, fs)) {
    break;
    }
    }
    Path finalClustersIn = new Path(outPath, Cluster.CLUSTERS_DIR + (iteration - 1) + Cluster.FINAL_ITERATION_SUFFIX);
    FileSystem.get(clustersOut.toUri(), conf).rename(clustersOut, finalClustersIn);
    }

    从上面可以看出,主代码是while循环来做mapreduce的迭代,其中每一次mapreduce的过程。另外这种频繁迭代操作,IO的消耗往往成为影响程序的瓶颈,所以需要大量的时间去执行一个数据量很大的程序。对于大数据来说,基于内存的告诉迭代的spark是个不错的选择。

  2. 依赖式

    对于更复杂的问题,多个作业之间可能会存在多个依赖关系,形成一个作业的有向无环图。使用Hadoop里面的MapReduce来处理海量数据是非常简单方便的,但有时候我们的应用程序,往往需要多个 MR作业,来计算结果,比如说一个最简单的使用MR提取海量搜索日志的TopN的问题,注意,这里面,其实涉及了两个MR作业,第一个是词频统计,第两个 是排序求TopN,这显然是需要两个MapReduce作业来完成的。其他的还有,比如一些数据挖掘类的作业,常常需要迭代组合好几个作业才能完成,这类 作业类似于DAG类的任务,各个作业之间是具有先后,或相互依赖的关系,比如说,这一个作业的输入,依赖上一个作业的输出等等。

    在Hadoop里实际上提供了,JobControl类,来组合一个具有依赖关系的作业,在新版的API里,又新增了ControlledJob类,细化
    了任务的分配,通过这两个类,我们就可以轻松的完成类似DAG作业的模式,这样我们就可以通过一个提交来完成原来需要提交2次的任务,大大简化了任务的繁
    琐度。具有依赖式的作业提交后,hadoop会根据依赖的关系,先后执行的job任务,每个任务的运行都是独立的。

    作业刚开始处于WAITING状态。如果没有依赖作业或者所有依赖作业均已运行完成,则进入READY状态。一旦进入READY状态,则作业可被提交到 Hadoop集群上运行,并进入RUNNING状态。在RUNNING状态下,根据作业运行情况,可能进入SUCCESS或者FAILED状态。需要注意 的是,如果一个作业的依赖作业失败,则该作业也会失败,于是形成“多米诺骨牌效应”后续所有作业均会失败。下面是新版接口的使用实例:

        ........
    Configuration conf = new Configuration();
    Job job = new Job(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path("/output1")); Configuration conf2 = new Configuration();
    Job job2 = new Job(conf2, "word count1");
    ................. Configuration conf3 = new Configuration();
    Job job3 = new Job(conf3, "word count2");
    ................. ControlledJob controlledJob1 = new ControlledJob(job.getConfiguration());
    controlledJob1.setJob(job);
    ControlledJob controlledJob2 = new ControlledJob(job2.getConfiguration());
    controlledJob2.setJob(job2);
    ControlledJob controlledJob3 = new ControlledJob(job2.getConfiguration());
    controlledJob3.setJob(job3); controlledJob1.addDependingJob(controlledJob2);
    controlledJob1.addDependingJob(controlledJob3); JobControl jobControl = new JobControl("test");
    jobControl.addJob(controlledJob1);
    jobControl.addJob(controlledJob2);
    jobControl.addJob(controlledJob3); Thread thread = new Thread(jobControl);
    thread.start(); while(true){
    if(jobControl.allFinished())
    {
    System.out.println(jobControl.getSuccessfulJobList());
    jobControl.stop();
    System.exit(0);
    }
    if(jobControl.getFailedJobList().size() > 0)
    {
    System.out.println(jobControl.getFailedJobList());
    jobControl.stop();
    System.exit(1);
    }
    }
  3. 链式

    ChainMapper/ChainReducer主要为了解决线性链式Mapper 而提出的。也就是说,在Map或者Reduce阶段存在多个Mapper,这些Mapper像Linux管道一样,前一个Mapper的输出结果直接重定 向到下一个Mapper 的输入,形成一个流水线,形式类似于 [MAP+REDUCE MAP*]。在Map阶段,数据依次经过Mapper1和 Mapper2处理;在Reduce阶段,数据经过shuffle sort后;交由对应的 Reducer处理,但Reducer处理之后并没有直接写到HDFS上,而是交给另外一个Mapper处理,它产生的结果写到最终的HDFS输出目录 中。PS:需要注意的是,对于任意一个MapReduce作业,Map和Reduce阶段可以有无限个Mapper,但Reducer只能有一个也就是说。

    ...
    conf.setJobName("chain");
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class); JobConf mapAConf = new JobConf(false);
    ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class, Text.class, Text.class, true, mapAConf); JobConf mapBConf = new JobConf(false);
    ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class, LongWritable.class, Text.class, false, mapBConf); JobConf reduceConf = new JobConf(false);
    ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, reduceConf); ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class, LongWritable.class, Text.class, false, null); ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class, LongWritable.class, LongWritable.class, true, null); FileInputFormat.setInputPaths(conf, inDir); FileOutputFormat.setOutputPath(conf, outDir); ... JobClient jc = new JobClient(conf); RunningJob job = jc.submitJob(conf);
    ...

MRUnit测试

下面我们将从一个完整的例子来说明“计数器”和“pipeline”的测试。

  1. 加入MRUnit的依赖
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion> <groupId>wordcount_old</groupId>
    <artifactId>wordcount_old</artifactId>
    <version>1.0-SNAPSHOT</version> <dependencies>
    <dependency>
    <groupId>org.apache.mrunit</groupId>
    <artifactId>mrunit</artifactId>
    <version>1.0.0</version>
    <type>jar</type>
    <classifier>hadoop2</classifier>
    <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>2.4.0-mdh2.0.7</version>
    </dependency>
    <dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.0-mdh2.2-SNAPSHOT</version>
    </dependency>
    </dependencies> </project>
  2. wordCount代码,代码是老版的API,因为PipelineMapReduceDriver目前只有老版的接口。
    package com.qiao.test.wordCount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapred.*;
    import org.apache.hadoop.io.Text; import java.io.IOException;
    import java.util.Iterator;
    import java.util.StringTokenizer; /**
    * Created by mi on 15-8-19.
    */ public class WordCount {
    public static enum WordCounter {
    wordInput,
    unduplicate,
    rangeOuter
    } private final static IntWritable LOW_LIMIT = new IntWritable(2); public static class Map extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1);
    private Text word = new Text(); @Override
    public void map(Object o, Text value, OutputCollector<Text, IntWritable> out, Reporter reporter) throws IOException {
    String line = value.toString();
    StringTokenizer tokenizer = new StringTokenizer(line);
    while (tokenizer.hasMoreTokens()) {
    word.set(tokenizer.nextToken());
    out.collect(word, one);
    reporter.getCounter(WordCounter.wordInput).increment(1);
    }
    }
    } public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { @Override
    public void reduce(Text key, Iterator<IntWritable> value, OutputCollector<Text, IntWritable> out, Reporter reporter) throws IOException {
    int sum = 0; while (value.hasNext()) {
    value.next();
    sum += 1;
    }
    reporter.getCounter(WordCounter.unduplicate).increment(1);
    out.collect(key, new IntWritable(sum));
    }
    } public static class RangeMap extends MapReduceBase implements Mapper<Text, IntWritable, Text, IntWritable> { @Override
    public void map(Text key, IntWritable value, OutputCollector<Text, IntWritable> out, Reporter reporter) throws IOException {
    if (value.compareTo(LOW_LIMIT) >= 0) {
    reporter.getCounter(WordCounter.rangeOuter).increment(1);
    out.collect(key, value);
    }
    }
    } }
  3. 测试代码
    import com.qiao.test.wordCount.WordCount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapred.Mapper;
    import org.apache.hadoop.mapred.Reducer;
    import org.apache.hadoop.mapred.lib.IdentityReducer; import org.apache.hadoop.mrunit.MapDriver;
    import org.apache.hadoop.mrunit.MapReduceDriver;
    import org.apache.hadoop.mrunit.PipelineMapReduceDriver;
    import org.apache.hadoop.mrunit.ReduceDriver;
    import org.junit.Before;
    import org.junit.Test; import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List; /**
    * Created by mi on 15-8-19.
    */ public class WordCountTest {
    MapDriver<Object, Text, Text, IntWritable> mapDriver;
    ReduceDriver<Text, IntWritable, Text, IntWritable> reduceDriver;
    MapReduceDriver<Object, Text, Text, IntWritable, Text, IntWritable> mapReduceDriver;
    Mapper<Object, Text, Text, IntWritable> tokenMapper;
    Reducer<Text, IntWritable, Text, IntWritable> sumReducer;
    Mapper<Text, IntWritable, Text, IntWritable> rangeMapper; @Before
    public void setup() {
    tokenMapper = new WordCount.Map();
    sumReducer = new WordCount.Reduce();
    rangeMapper = new WordCount.RangeMap(); mapDriver = new MapDriver<Object, Text, Text, IntWritable>(tokenMapper);
    reduceDriver = new ReduceDriver<Text, IntWritable, Text, IntWritable>(sumReducer);
    mapReduceDriver = new MapReduceDriver<Object, Text, Text, IntWritable, Text, IntWritable>(tokenMapper, sumReducer); } @Test
    public void testPipeline() throws IOException { PipelineMapReduceDriver<Object, Text, Text, IntWritable> pipeDriver = PipelineMapReduceDriver.newPipelineMapReduceDriver();
    pipeDriver.withMapReduce(tokenMapper, sumReducer)
    .withMapReduce(rangeMapper, new IdentityReducer<Text, IntWritable>())
    .withInput(new Text(), new Text("qiao qiao bao xue xue xue"))
    .withOutput(new Text("qiao"), new IntWritable(2))
    .withOutput(new Text("xue"), new IntWritable(3))
    .withCounter(WordCount.WordCounter.wordInput, 6)
    .withCounter(WordCount.WordCounter.unduplicate, 3)
    .withCounter(WordCount.WordCounter.rangeOuter, 2)
    .runTest(false);
    } @Test
    public void mapTest() throws IOException {
    mapDriver.withInput(new Text(), new Text("qiao bao xue xue bao qiao"))
    .withOutput(new Text("qiao"), new IntWritable(1))
    .withOutput(new Text("bao"), new IntWritable(1))
    .withOutput(new Text("xue"), new IntWritable(1)) .withOutput(new Text("xue"), new IntWritable(1))
    .withOutput(new Text("bao"), new IntWritable(1))
    .withOutput(new Text("qiao"), new IntWritable(1))
    .runTest(); } @Test
    public void reduceTest() throws IOException {
    List<IntWritable> inputList = new ArrayList<IntWritable>();
    inputList.add(new IntWritable(1));
    inputList.add(new IntWritable(1));
    inputList.add(new IntWritable(1));
    reduceDriver.withInput(new Text("qiao"), inputList)
    .withOutput(new Text("qiao"), new IntWritable(3))
    .runTest();
    } @Test
    public void runTest() throws IOException {
    mapReduceDriver.withInput(new Text(), new Text("qiao bao xue"))
    .withOutput(new Text("qiao"), new IntWritable(1))
    .withOutput(new Text("bao"), new IntWritable(1))
    .withOutput(new Text("xue"), new IntWritable(1))
    .runTest(false);
    }
    }

PS:runTest(false)则输出的结果是不需要顺序的。