MyJobLink链接MapReduce作业案例,新旧API比较

时间:2021-04-07 05:09:47

hadoop基础部分的学习告一段落,休息了几天,现在满血复活了。。。哈哈,让我们一起来学习学习hadoop的第一个链接MapReduce作业的案例吧。

在高阶数据处理中,会经常发现无法将整个流程写在单个MapReduce作业中,Hadoop支持将多个MapReduce程序链接成更大的作业。

1、顺序链接MapReduce作业

虽然两个作业可以手动的逐个执行,但更为快捷的方式是生成一个自动化的执行序列,可以将MapReduce作业按照作业顺序链接在一起,用一个MapReduce作业的输出作为下一个的输入。MapReduce作业链接类似与Unix管道

mapreduce-1 | mapreduce-2 | mapreduce-3 | ...

每个作业的driver都必须创建一个新的job,并将当前输入路径设为前一个的输出。在最后阶段删除在链上每个阶段生成的中间数据

2、复杂依赖的MapReduce链接

在复杂数据处理任务中的子任务并不是按顺序运行的,则它们的MapReduce作业不能按线性方式链接。

若mapreduce-1处理一个数据集,MapReduce-2处理另一个数据集,而MapReduce-3对前两个做内部连结,则MapReduce-3依赖于其他两个作业,仅当MapReduce-1和MapReduce-2都完成后才可以执行,而MapReduce-1和MapReduce-2之间并无相互依赖

hadoop的简化机制,通过Job和JobControl类管理非线性作业间的依赖。Job对象是MapReduce作业的表现形式,Job对象的实例化可通过传递一个JobConf对象到作业的构造函数来实现,除了要保持作业的配置信息外,job还通过设定addDependingJob()方法维护作业的依赖关系。如x.addDependingJob(y)意味着x在y完成前不会启动

3、预处理和后处理阶段的链接

大量的数据处理任务涉及对记录的预处理和后处理。可以为预处理与后处理步骤各自编写一个MapReduce作业,并把他们链接起来,由于过程中每一个步骤的中间结果都需要占用I/O和存储资源,这种做法是低效的,也可以自己写mapper去预先调用所有的预处理步骤,在让reducer调用所有的后处理步骤,这将强制你采用模块化和可组合的方式来构建预处理和后处理。Hadoop在版本0.19.0中引入了ChainMapper和ChainReducer类来简化预处理和后处理的构成。

可以通过伪正则表达式将MapReduce作业的链接符合化的表达为:

[MAP | REDUCE] +

使用ChainMapper和ChainReduce所生成的作业表达式与此类似:

MAP+ | REDUCE | MAP+

作业按序执行多个mapper来预处理数据,并在运行reduce之后可选的按序执行多个mapper来做数据的后处理。

有这样一个例子:有4个mappper(Map1,Map2,Map3,Map4)和一个reducer(Reduce),它们被链接为单个MapReduce作业。
顺序以下: Map1 | Map2 | Reduce | Map3 | Map4

在这个组合中,可以把Map2和Reduce视为MapReduce作业核心,在Mapper和Reducer之间使用标准的分区和洗牌。把Map1作为前处理步骤,Map3, Map4作为后处理步骤

在driver中使用ChainMapper和ChainReducer类来设定这个mapper类和reducer类序列的构成。
ChainMapper使用模式:(预处理作业)
ChainReducer使用模式:(设置Reducer并添加后处理Mapper)


代码清单:MapReduce作业链接(旧版本API)

package com.yc.demo;

import java.io.IOException;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class MyJObLink  extends Configured implements Tool {

    public static class Reduce extends MapReduceBase implements Reducer<LongWritable,Text,Text,Text>{

        @Override
        public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {
            output.collect(new Text("1"), new Text("1"));
        }
        
    }
    
    public static class Map1 extends MapReduceBase implements Mapper<LongWritable,Text,Text,Text> {

        @Override
        public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
                throws IOException {
        output.collect(value, new Text(key.toString()));//V1(记录)作键K2,K1(偏移量)作值V2
        }
        
    }
    
    public static class Map2 extends MapReduceBase implements Mapper<Text,Text,LongWritable,Text>{

        @Override
        public void map(Text key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter)
                throws IOException {
            output.collect(new LongWritable(Long.valueOf(value.toString())),key);//输入键值对,交换后作键值对输出
        }
    }
    
    public static class Map3 extends MapReduceBase implements Mapper<Text,Text,LongWritable,Text> {

        @Override
        public void map(Text key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter)
                throws IOException {
            output.collect(new LongWritable(Long.valueOf("1")),key);//输入键值对后输出键为1,值为输入键
        }
        
    }
    
    public static class Map4 extends MapReduceBase implements Mapper<LongWritable,Text,LongWritable,Text>{

        @Override
        public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter)
                throws IOException {
            output.collect(new LongWritable(Long.valueOf("1")), new Text("1"));//输入键值对后输出键为1,值为1
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        //1、实例化作业对象
        Configuration conf = getConf();
        JobConf job = new JobConf(conf);
        
        job.setJobName("xiaoxiaoChainJob");
        //2、为作业设置输入文本格式化和输出文本的格式化
        job.setInputFormat(TextInputFormat.class);
        job.setOutputFormat(TextOutputFormat.class);
        
        //3、为作业设置输入文件和输出文件的路径
        Path in = new Path(args[0]);
        Path out = new Path(args[1]);
        FileInputFormat.setInputPaths(job, in);
        FileOutputFormat.setOutputPath(job, out);
        
        /**
         * 在作业添加Map1阶段
         * 使用ChainMapper.addMapper()添加Reduce之前的所有步骤
         *
         * ChainMapper.addMapper(JobConf job,Class<? extends Mapper<LongWritable,Text,Text,Text>> klass,
         *           Class<? extends LongWritable> inputKeyClass,Class<? extends Text> outputKeyClass,Class<? extends Text> outputValueClass,
         *           boolean byValue,JobConf mapperConf)
         *           该方法有8个参数,第一个和最后一个分别为全局和本地JobConf对象,第二个参数(klass)是Mapper类,负责数据处理
         *           余下4个参数inputKeyClass,inputValueClass,outputKeyClass,outputValueClass是这个Mapper类中输入/输出类的类型
         *           byValue这个参数,在标准的Mapper模型中,键/值对的输出在序列化之后写入磁盘(键和值实现为Writable使得它们能够被复制和序列化),
         *           等待被洗牌到一个可能完全不同的节点上,形式上认为这个过程采用的是值传递(passed by value),发送的是键值对的副本
         *           在目前的情况下我们可以将一个Mapper与另一个相链接,在相同的JVM线程中一起执行,因此,键/值对的发送有可能采用引用传递(passed by reference)
         *           初始Mapper的输出放到内存中,后续的Mapper直接引用相同的内存位置
         *           当Mapper 1调用OutputCollector.collect<K k,V v)时,对象k和v直接传递给Map2的map()方法,
         *           mapper之间可能有大量的数据需要传递,避免去复制这些数据可以让性能得以提高.
         *  但是,这样会违背Hadoop中MapReduceApi的一个更为微妙的"约定",即对OutputCollector.collect(K k,V v)
         *  的调用一定不会改变k和v的内容.
         *  Map1调用OutputCollector.collect(K k,V v)之后,可以继续使用对象k和v,并完全相信他们的值会保持不变.
         *  但如果我们将这些对象通过引用传递给Map2,接下来Map2可能会改变他们,这就违反了API的"约定".
         *  如果你确信Map1的map()方法在调用OutputCollector.collect(K k,V v)之后不再使用k和v的内容,
         *  或者Map2并不改变k和v的在其上的输入值,你可以通过设定byValue为false来获得一定的性能提升.
         *  如果你对Mapper的内部代码不太了解,安全起见最好设byValue为true,依旧采用值传递模式,
         *  确保mapper会按预期的方式工作.
         */
        //4、为作业设置Mapper和Reducer函数
        //(1)在作业中添加Map1阶段,使用ChainMapper.addMapper()添加位于Reduce之前的步骤
        JobConf map1Conf = new JobConf(false);
        ChainMapper.addMapper(job, Map1.class,LongWritable.class,Text.class,Text.class,Text.class, true,map1Conf);
        
        /**
         * 在作业中添加Map2阶段
         * 使用ChainMapper.addMapper()添加位于Reduce之前的所有步骤
         */
        JobConf map2Conf=new JobConf(false);
        ChainMapper.addMapper(job, Map2.class, Text.class, Text.class, LongWritable.class, Text.class, true, map2Conf);
        /**
         * 在作业中添加Reduce阶段
         * 使用静态的ChainReducer.setReducer()方法设置reducer
         */
        JobConf reducerConf = new JobConf(false);
        ChainReducer.setReducer(job, Reduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, reducerConf);
        /**
         * 在作业中添加Map3阶段
         * 使用ChainReducer.addMapper()添加reducer后续的步骤
         */
        JobConf map3Conf=new JobConf(false);
        ChainReducer.addMapper(job, Map3.class, Text.class, Text.class, LongWritable.class, Text.class, true, map3Conf);
        /**
         * 在作业中添加Map4阶段
         * 使用ChainReducer.addMapper()添加reducer后续的步骤
         */
        JobConf map4Conf = new JobConf(false);
        ChainReducer.addMapper(job, Map4.class,LongWritable.class,Text.class,LongWritable.class,Text.class,true,map4Conf);
        
        //启动作业
        JobClient.runJob(job);
        
        return 0;
    }

    public static void main(String [] args) throws Exception {
        /*final String inputPath = "/home/dev/hadooptest/mapin/cite";
        final String outputPath="/home/dev/hadooptest/mapin/cite/out";
        String [] paths = {inputPath,outputPath};*/
        
        /**
         *  Driver中的main函数->ToolRunner中的run函数->Too接口中的run函数->
         *  Driver中覆盖函数处理参数->Driver中核心函数启动job(合并为一个方法,重写了接口Tool的run方法)
         *  args(运行时动态给定的)、paths(代码中定死的)为输入文本和输出文本的路径,
         */
        int res=ToolRunner.run(new Configuration(), new MyJObLink(),args);
        /*int res=ToolRunner.run(new Configuration(), new MyJObLink(),paths);*/
        System.exit(res);
    }
}

MapReduce作业链接(新版本API

package com.yc.link;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;

import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

 

public class MyJobLink extends Configured implements Tool{

      /**

      * Reducer任务 Reduce

      * @author 王云飞

      *

      */

      public static class Reduce extends Reducer<LongWritable,Text,Text,Text>{

             @Override

             public void reduce(LongWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException {

                    context.write(new Text("1"), new Text("1"));

             }

      }

      /**

      * Mapper任务 Map1

      * @author 王云飞

      *

      */

      public static class Map1 extends Mapper<LongWritable,Text,Text,Text>{

             @Override

             public void map(LongWritable key, Text value, Context context)

                           throws IOException, InterruptedException {

                    context.write(value, new Text(key.toString()));// V1(记录)作键K2,K1(偏移量)作值V2

             }      

      }

      /**

      * Mapper任务 Map2

      * @author hadoop

      *

      */

      public static class Map2 extends Mapper<Text,Text,LongWritable,Text>{

             @Override

             public void map(Text key, Text value, Context context)

                           throws IOException, InterruptedException {

                    context.write(new LongWritable(Long.valueOf(value.toString())), key);// 输入键值对交换后作键值对输出

            

             }

      }

      /**

      * Mapper任务 Map3

      * @author 王云飞

      *

      */

      public static class Map3 extends Mapper<Text,Text,LongWritable,Text>{

             @Override

             public  void map(Text key, Text value, Context context)

                           throws IOException, InterruptedException {

                    context.write(new LongWritable(Long.valueOf("1")), key);// 输入键值对后输出键为1,值为输入键

            

             }

      }

      /**

      * Mapper任务 Map4

      * @author 王云飞

      *

      */

      public static class Map4 extends Mapper<LongWritable,Text,LongWritable,Text>{

             @Override

             public  void map(LongWritable key, Text value, Context context)

                           throws IOException, InterruptedException {

                    context.write(new LongWritable(Long.valueOf("1")), new Text("1"));// 输入键值对后输出键为1,值为1

             }

      }

      /**

      * driver类

      */

      @Override

      public int run(String[] args) throws Exception {

             // 1.实例化作业对象

             Configuration conf=this.getConf();

             Job job=new Job(conf,"飞哥ChainJob");

             job.setJarByClass(MyJobLink.class);

            

             // 2.为作业设置输入文件和输出文件的路径

             FileInputFormat.addInputPath(job, new Path(args[0]));

             FileOutputFormat.setOutputPath(job, new Path(args[1]));

            

             //3.为作业设置输入文本的格式化和输出文本的格式化

             job.setInputFormatClass(TextInputFormat.class);

             job.setOutputFormatClass(TextOutputFormat.class);

            

             // 4.为作业设置Mapper 和Reducer函数

             //(1)在作业中添加Map1阶段, 使用ChainMapper.addMapper()添加位于Reduce之前的步骤

             //(job, klass, inputKeyClass, inputValueClass, outputKeyClass, outputValueClass, mapperConf

             Configuration map1Conf=new Configuration(false);

             ChainMapper.addMapper( job,

                                                                                                Map1.class,

                                                                                                LongWritable.class,

                                                                                                Text.class,

                                                                                                Text.class,

                                                                                                Text.class,

                                                                                                map1Conf);

             // (2)在作业中添加Map2阶段, 使用ChainMapper.addMapper()添加位于Reduce之前的步骤

             Configuration map2Conf=new Configuration(false);

             ChainMapper.addMapper( job,

                                                                                                Map2.class,

                                                                                                Text.class,

                                                                                                Text.class,

                                                                                                LongWritable.class,

                                                                                                Text.class,

                                                                                                map2Conf);

             //(3)在作业中添加Reduce阶段,使用ChainReducer.setReducer()方法设置Reducer

             Configuration reduceConf=new Configuration(false);

             //job, klass, inputKeyClass, inputValueClass, outputKeyClass, outputValueClass, reducerConf

             ChainReducer.setReducer(job,

                                                                                           Reduce.class,

                                                                                           LongWritable.class,

                                                                                           Text.class,

                                                                                           Text.class,

                                                                                           Text.class,

                                                                                           reduceConf);

             //(4)在作业中添加Map3阶段,使用ChainReducer.addMapper()添加reducer后续的步骤

             Configuration map3Conf=new Configuration(false);

             ChainReducer.addMapper( job,

                                                                                                Map3.class,

                                                                                                Text.class,

                                                                                                Text.class,

                                                                                                LongWritable.class,

                                                                                                Text.class,

                                                                                                map3Conf);

            

             // (5)在作业中添加Map4阶段,使用ChainReducer.addMapper()添加reducer后续的步骤

             Configuration map4Conf=new Configuration(false);

             ChainReducer.addMapper( job,

                                                                                                Map4.class,

                                                                                                LongWritable.class,

                                                                                                Text.class,

                                                                                                LongWritable.class,

                                                                                                Text.class,

                                                                                                map4Conf);   

             // 5.启动作业

             return (job.waitForCompletion(true)?0:1);

      }

      /**

      * 主函数

      * @param args

      * @throws Exception

      */

      public static void main(String [] args) throws Exception{

             int res=ToolRunner.run(new Configuration(), new MyJobLink(), args);

             System.exit(res);

      }

}

和小伙伴一人写了一个版本的,但输出结果是一样的,输出结果如下:

MyJobLink链接MapReduce作业案例,新旧API比较