许多复杂的任务需要分解成简单任务,每个任务通过MapReduce作业来完成。Hadoop支持将多个MapReduce链接成更大得作业。
1.顺序链接MapReduce作业
类似于Unix中的管道:
mapreduce-1 | mapreduce-2 | mapreduce-3 ......
每一个阶段创建一个job,并将当前输入路径设为前一个的输出。在最后阶段删除链上生成的中间数据
2.具有复杂依赖的MapReduce链接
若mapreduce-1处理一个数据集,MapReduce-2处理另一个数据集,而MapReduce-3对前两个做内部连结。
这种情况通过Job和JobControl类管理非线性作业间的依赖。如x.addDependingJob(y)意味着x在y完成前不会启动
3.预处理和后处理的链接
一般将预处理和后处理写为Mapper任务。可以自己进行链接或使用ChainMapper和ChainReducer类,生成得作业表达式类似于:
MAP+ | REDUCE | MAP*
如以下作业: Map1 | Map2 | Reduce | Map3 | Map4
把Map2和Reduce视为MapReducez作业核心。Map1作为前处理,Map3, Map4作为后处理。:
ChainMapper使用模式:(预处理作业)
ChainReducer使用模式:(设置Reducer并添加后处理Mapper)
...以public static <k1,v1,k2,v2> void
conf.setJobName("chain");
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
JobConf mapAConf = new JobConf(false); ... ChainMapper.addMapper(conf, AMap.class, LongWritable.class, Text.class, Text.class, Text.class, true, mapAConf);
JobConf mapBConf = new JobConf(false); ... ChainMapper.addMapper(conf, BMap.class, Text.class, Text.class, LongWritable.class, Text.class, false, mapBConf);
JobConf reduceConf = new JobConf(false); ... ChainReducer.setReducer(conf, XReduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, reduceConf);
ChainReducer.addMapper(conf, CMap.class, Text.class, Text.class, LongWritable.class, Text.class, false, null);
ChainReducer.addMapper(conf, DMap.class, LongWritable.class, Text.class, LongWritable.class, LongWritable.class, true, null);
FileInputFormat.setInputPaths(conf, inDir); FileOutputFormat.setOutputPath(conf, outDir); ...
JobClient jc = new JobClient(conf); RunningJob job = jc.submitJob(conf); ...
addMapper(JobConf job,
Class<? extends Mapper<k1,v1,k2,v2> kclass,
Class<? extends k1> inputKeyClass,
Class<? extends v2> inputValueClass,
Class<? extends k2> outputKeyClass,
Class<? extends v2> outputValueClass,
boolean byValue,
JobConf mapperConf)
8个参数中,第一个和最后一个是全局和本地jobconf对象,第二个是mapper类,接下来四个mapper使用的类。
byValue参数:true表示值传递,false表示引用传递。
在标准Mapper中,<k2,v2>是采用值传递被洗牌到不同节点上(传递副本),但是目前我们可以将mapper与另一个链接,就在统一个JVM线程执行,就可以采取引用传递。但是一般来说,map1在调用context.write()写出数据后,这些数据是按约定不会更改的。如果引用传递就会破坏约定。但是使用引用传递会提高效率。如果确定数据不会被破坏,可以设置为false
一般安全起见,设置为true即可。。