高阶MapReduce(1)-链接多个MapReduce作业

时间:2022-10-10 05:11:39

许多复杂的任务需要分解成简单任务,每个任务通过MapReduce作业来完成。Hadoop支持将多个MapReduce链接成更大得作业。

1.顺序链接MapReduce作业

类似于Unix中的管道:

mapreduce-1 | mapreduce-2 | mapreduce-3 ......

每一个阶段创建一个job,并将当前输入路径设为前一个的输出。在最后阶段删除链上生成的中间数据

2.具有复杂依赖的MapReduce链接

若mapreduce-1处理一个数据集,MapReduce-2处理另一个数据集,而MapReduce-3对前两个做内部连结。

这种情况通过Job和JobControl类管理非线性作业间的依赖。如x.addDependingJob(y)意味着x在y完成前不会启动

3.预处理和后处理的链接

一般将预处理和后处理写为Mapper任务。可以自己进行链接或使用ChainMapper和ChainReducer类,生成得作业表达式类似于:

MAP+ | REDUCE | MAP*

如以下作业: Map1 | Map2 | Reduce | Map3 | Map4

把Map2和Reduce视为MapReducez作业核心。Map1作为前处理,Map3, Map4作为后处理。:

ChainMapper使用模式:(预处理作业)

ChainReducer使用模式:(设置Reducer并添加后处理Mapper)

...
conf.setJobName("chain");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);


JobConf mapAConf = new JobConf(false); ... ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class, Text.class, Text.class, true, mapAConf);

JobConf mapBConf = new JobConf(false); ... ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class, LongWritable.class, Text.class, false, mapBConf);

JobConf reduceConf = new JobConf(false); ... ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, reduceConf);

ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class, LongWritable.class, Text.class, false, null);

ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class, LongWritable.class, LongWritable.class, true, null);

FileInputFormat.setInputPaths(conf, inDir); FileOutputFormat.setOutputPath(conf, outDir); ...

JobClient jc = new JobClient(conf); RunningJob job = jc.submitJob(conf); ...
以public static <k1,v1,k2,v2> void

addMapper(JobConf job,

                       Class<? extends Mapper<k1,v1,k2,v2> kclass,

                        Class<? extends k1> inputKeyClass,

                        Class<? extends v2> inputValueClass,

                        Class<? extends k2> outputKeyClass,

                        Class<? extends v2> outputValueClass,

                        boolean byValue,

                        JobConf mapperConf)

8个参数中,第一个和最后一个是全局和本地jobconf对象,第二个是mapper类,接下来四个mapper使用的类。

byValue参数:true表示值传递,false表示引用传递。

在标准Mapper中,<k2,v2>是采用值传递被洗牌到不同节点上(传递副本),但是目前我们可以将mapper与另一个链接,就在统一个JVM线程执行,就可以采取引用传递。但是一般来说,map1在调用context.write()写出数据后,这些数据是按约定不会更改的。如果引用传递就会破坏约定。但是使用引用传递会提高效率。如果确定数据不会被破坏,可以设置为false

一般安全起见,设置为true即可。。