hadoop基础部分的学习告一段落,休息了几天,现在满血复活了。。。哈哈,让我们一起来学习学习hadoop的第一个链接MapReduce作业的案例吧。
在高阶数据处理中,会经常发现无法将整个流程写在单个MapReduce作业中,Hadoop支持将多个MapReduce程序链接成更大的作业。
1、顺序链接MapReduce作业
虽然两个作业可以手动的逐个执行,但更为快捷的方式是生成一个自动化的执行序列,可以将MapReduce作业按照作业顺序链接在一起,用一个MapReduce作业的输出作为下一个的输入。MapReduce作业链接类似与Unix管道
mapreduce-1 | mapreduce-2 | mapreduce-3 | ...
每个作业的driver都必须创建一个新的job,并将当前输入路径设为前一个的输出。在最后阶段删除在链上每个阶段生成的中间数据。
2、复杂依赖的MapReduce链接
在复杂数据处理任务中的子任务并不是按顺序运行的,则它们的MapReduce作业不能按线性方式链接。
若mapreduce-1处理一个数据集,MapReduce-2处理另一个数据集,而MapReduce-3对前两个做内部连结,则MapReduce-3依赖于其他两个作业,仅当MapReduce-1和MapReduce-2都完成后才可以执行,而MapReduce-1和MapReduce-2之间并无相互依赖。
hadoop的简化机制,通过Job和JobControl类管理非线性作业间的依赖。Job对象是MapReduce作业的表现形式,Job对象的实例化可通过传递一个JobConf对象到作业的构造函数来实现,除了要保持作业的配置信息外,job还通过设定addDependingJob()方法维护作业的依赖关系。如x.addDependingJob(y)意味着x在y完成前不会启动 。3、预处理和后处理阶段的链接
大量的数据处理任务涉及对记录的预处理和后处理。可以为预处理与后处理步骤各自编写一个MapReduce作业,并把他们链接起来,由于过程中每一个步骤的中间结果都需要占用I/O和存储资源,这种做法是低效的,也可以自己写mapper去预先调用所有的预处理步骤,在让reducer调用所有的后处理步骤,这将强制你采用模块化和可组合的方式来构建预处理和后处理。Hadoop在版本0.19.0中引入了ChainMapper和ChainReducer类来简化预处理和后处理的构成。
可以通过伪正则表达式将MapReduce作业的链接符合化的表达为:
[MAP | REDUCE] +
使用ChainMapper和ChainReduce所生成的作业表达式与此类似:
MAP+ | REDUCE | MAP+
作业按序执行多个mapper来预处理数据,并在运行reduce之后可选的按序执行多个mapper来做数据的后处理。
有这样一个例子:有4个mappper(Map1,Map2,Map3,Map4)和一个reducer(Reduce),它们被链接为单个MapReduce作业。
顺序以下: Map1 | Map2 | Reduce | Map3 | Map4
在这个组合中,可以把Map2和Reduce视为MapReduce作业的核心,在Mapper和Reducer之间使用标准的分区和洗牌。把Map1作为前处理步骤,Map3, Map4作为后处理步骤。
在driver中使用ChainMapper和ChainReducer类来设定这个mapper类和reducer类序列的构成。
ChainMapper使用模式:(预处理作业)
ChainReducer使用模式:(设置Reducer并添加后处理Mapper)
代码清单:MapReduce作业链接(旧版本API)
package com.yc.demo;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapred.lib.ChainMapper;
import org.apache.hadoop.mapred.lib.ChainReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyJObLink extends Configured implements Tool {
public static class Reduce extends MapReduceBase implements Reducer<LongWritable,Text,Text,Text>{
@Override
public void reduce(LongWritable key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
output.collect(new Text("1"), new Text("1"));
}
}
public static class Map1 extends MapReduceBase implements Mapper<LongWritable,Text,Text,Text> {
@Override
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter)
throws IOException {
output.collect(value, new Text(key.toString()));//V1(记录)作键K2,K1(偏移量)作值V2
}
}
public static class Map2 extends MapReduceBase implements Mapper<Text,Text,LongWritable,Text>{
@Override
public void map(Text key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter)
throws IOException {
output.collect(new LongWritable(Long.valueOf(value.toString())),key);//输入键值对,交换后作键值对输出
}
}
public static class Map3 extends MapReduceBase implements Mapper<Text,Text,LongWritable,Text> {
@Override
public void map(Text key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter)
throws IOException {
output.collect(new LongWritable(Long.valueOf("1")),key);//输入键值对后输出键为1,值为输入键
}
}
public static class Map4 extends MapReduceBase implements Mapper<LongWritable,Text,LongWritable,Text>{
@Override
public void map(LongWritable key, Text value, OutputCollector<LongWritable, Text> output, Reporter reporter)
throws IOException {
output.collect(new LongWritable(Long.valueOf("1")), new Text("1"));//输入键值对后输出键为1,值为1
}
}
@Override
public int run(String[] args) throws Exception {
//1、实例化作业对象
Configuration conf = getConf();
JobConf job = new JobConf(conf);
job.setJobName("xiaoxiaoChainJob");
//2、为作业设置输入文本格式化和输出文本的格式化
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
//3、为作业设置输入文件和输出文件的路径
Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out);
/**
* 在作业添加Map1阶段
* 使用ChainMapper.addMapper()添加Reduce之前的所有步骤
*
* ChainMapper.addMapper(JobConf job,Class<? extends Mapper<LongWritable,Text,Text,Text>> klass,
* Class<? extends LongWritable> inputKeyClass,Class<? extends Text> outputKeyClass,Class<? extends Text> outputValueClass,
* boolean byValue,JobConf mapperConf)
* 该方法有8个参数,第一个和最后一个分别为全局和本地JobConf对象,第二个参数(klass)是Mapper类,负责数据处理
* 余下4个参数inputKeyClass,inputValueClass,outputKeyClass,outputValueClass是这个Mapper类中输入/输出类的类型
* byValue这个参数,在标准的Mapper模型中,键/值对的输出在序列化之后写入磁盘(键和值实现为Writable使得它们能够被复制和序列化),
* 等待被洗牌到一个可能完全不同的节点上,形式上认为这个过程采用的是值传递(passed by value),发送的是键值对的副本
* 在目前的情况下我们可以将一个Mapper与另一个相链接,在相同的JVM线程中一起执行,因此,键/值对的发送有可能采用引用传递(passed by reference)
* 初始Mapper的输出放到内存中,后续的Mapper直接引用相同的内存位置
* 当Mapper 1调用OutputCollector.collect<K k,V v)时,对象k和v直接传递给Map2的map()方法,
* mapper之间可能有大量的数据需要传递,避免去复制这些数据可以让性能得以提高.
* 但是,这样会违背Hadoop中MapReduceApi的一个更为微妙的"约定",即对OutputCollector.collect(K k,V v)
* 的调用一定不会改变k和v的内容.
* Map1调用OutputCollector.collect(K k,V v)之后,可以继续使用对象k和v,并完全相信他们的值会保持不变.
* 但如果我们将这些对象通过引用传递给Map2,接下来Map2可能会改变他们,这就违反了API的"约定".
* 如果你确信Map1的map()方法在调用OutputCollector.collect(K k,V v)之后不再使用k和v的内容,
* 或者Map2并不改变k和v的在其上的输入值,你可以通过设定byValue为false来获得一定的性能提升.
* 如果你对Mapper的内部代码不太了解,安全起见最好设byValue为true,依旧采用值传递模式,
* 确保mapper会按预期的方式工作.
*/
//4、为作业设置Mapper和Reducer函数
//(1)在作业中添加Map1阶段,使用ChainMapper.addMapper()添加位于Reduce之前的步骤
JobConf map1Conf = new JobConf(false);
ChainMapper.addMapper(job, Map1.class,LongWritable.class,Text.class,Text.class,Text.class, true,map1Conf);
/**
* 在作业中添加Map2阶段
* 使用ChainMapper.addMapper()添加位于Reduce之前的所有步骤
*/
JobConf map2Conf=new JobConf(false);
ChainMapper.addMapper(job, Map2.class, Text.class, Text.class, LongWritable.class, Text.class, true, map2Conf);
/**
* 在作业中添加Reduce阶段
* 使用静态的ChainReducer.setReducer()方法设置reducer
*/
JobConf reducerConf = new JobConf(false);
ChainReducer.setReducer(job, Reduce.class, LongWritable.class, Text.class, Text.class, Text.class, true, reducerConf);
/**
* 在作业中添加Map3阶段
* 使用ChainReducer.addMapper()添加reducer后续的步骤
*/
JobConf map3Conf=new JobConf(false);
ChainReducer.addMapper(job, Map3.class, Text.class, Text.class, LongWritable.class, Text.class, true, map3Conf);
/**
* 在作业中添加Map4阶段
* 使用ChainReducer.addMapper()添加reducer后续的步骤
*/
JobConf map4Conf = new JobConf(false);
ChainReducer.addMapper(job, Map4.class,LongWritable.class,Text.class,LongWritable.class,Text.class,true,map4Conf);
//启动作业
JobClient.runJob(job);
return 0;
}
public static void main(String [] args) throws Exception {
/*final String inputPath = "/home/dev/hadooptest/mapin/cite";
final String outputPath="/home/dev/hadooptest/mapin/cite/out";
String [] paths = {inputPath,outputPath};*/
/**
* Driver中的main函数->ToolRunner中的run函数->Too接口中的run函数->
* Driver中覆盖函数处理参数->Driver中核心函数启动job(合并为一个方法,重写了接口Tool的run方法)
* args(运行时动态给定的)、paths(代码中定死的)为输入文本和输出文本的路径,
*/
int res=ToolRunner.run(new Configuration(), new MyJObLink(),args);
/*int res=ToolRunner.run(new Configuration(), new MyJObLink(),paths);*/
System.exit(res);
}
}
MapReduce作业链接(新版本API)
package com.yc.link;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class MyJobLink extends Configured implements Tool{
/**
* Reducer任务 Reduce
* @author 王云飞
*
*/
public static class Reduce extends Reducer<LongWritable,Text,Text,Text>{
@Override
public void reduce(LongWritable key, Iterable<Text> values,Context context) throws IOException, InterruptedException {
context.write(new Text("1"), new Text("1"));
}
}
/**
* Mapper任务 Map1
* @author 王云飞
*
*/
public static class Map1 extends Mapper<LongWritable,Text,Text,Text>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(value, new Text(key.toString()));// V1(记录)作键K2,K1(偏移量)作值V2
}
}
/**
* Mapper任务 Map2
* @author hadoop
*
*/
public static class Map2 extends Mapper<Text,Text,LongWritable,Text>{
@Override
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
context.write(new LongWritable(Long.valueOf(value.toString())), key);// 输入键值对交换后作键值对输出
}
}
/**
* Mapper任务 Map3
* @author 王云飞
*
*/
public static class Map3 extends Mapper<Text,Text,LongWritable,Text>{
@Override
public void map(Text key, Text value, Context context)
throws IOException, InterruptedException {
context.write(new LongWritable(Long.valueOf("1")), key);// 输入键值对后输出键为1,值为输入键
}
}
/**
* Mapper任务 Map4
* @author 王云飞
*
*/
public static class Map4 extends Mapper<LongWritable,Text,LongWritable,Text>{
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
context.write(new LongWritable(Long.valueOf("1")), new Text("1"));// 输入键值对后输出键为1,值为1
}
}
/**
* driver类
*/
@Override
public int run(String[] args) throws Exception {
// 1.实例化作业对象
Configuration conf=this.getConf();
Job job=new Job(conf,"飞哥ChainJob");
job.setJarByClass(MyJobLink.class);
// 2.为作业设置输入文件和输出文件的路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//3.为作业设置输入文本的格式化和输出文本的格式化
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 4.为作业设置Mapper 和Reducer函数
//(1)在作业中添加Map1阶段, 使用ChainMapper.addMapper()添加位于Reduce之前的步骤
//(job, klass, inputKeyClass, inputValueClass, outputKeyClass, outputValueClass, mapperConf
Configuration map1Conf=new Configuration(false);
ChainMapper.addMapper( job,
Map1.class,
LongWritable.class,
Text.class,
Text.class,
Text.class,
map1Conf);
// (2)在作业中添加Map2阶段, 使用ChainMapper.addMapper()添加位于Reduce之前的步骤
Configuration map2Conf=new Configuration(false);
ChainMapper.addMapper( job,
Map2.class,
Text.class,
Text.class,
LongWritable.class,
Text.class,
map2Conf);
//(3)在作业中添加Reduce阶段,使用ChainReducer.setReducer()方法设置Reducer
Configuration reduceConf=new Configuration(false);
//job, klass, inputKeyClass, inputValueClass, outputKeyClass, outputValueClass, reducerConf
ChainReducer.setReducer(job,
Reduce.class,
LongWritable.class,
Text.class,
Text.class,
Text.class,
reduceConf);
//(4)在作业中添加Map3阶段,使用ChainReducer.addMapper()添加reducer后续的步骤
Configuration map3Conf=new Configuration(false);
ChainReducer.addMapper( job,
Map3.class,
Text.class,
Text.class,
LongWritable.class,
Text.class,
map3Conf);
// (5)在作业中添加Map4阶段,使用ChainReducer.addMapper()添加reducer后续的步骤
Configuration map4Conf=new Configuration(false);
ChainReducer.addMapper( job,
Map4.class,
LongWritable.class,
Text.class,
LongWritable.class,
Text.class,
map4Conf);
// 5.启动作业
return (job.waitForCompletion(true)?0:1);
}
/**
* 主函数
* @param args
* @throws Exception
*/
public static void main(String [] args) throws Exception{
int res=ToolRunner.run(new Configuration(), new MyJobLink(), args);
System.exit(res);
}
}
和小伙伴一人写了一个版本的,但输出结果是一样的,输出结果如下: