Hadoop in Action] 第5章 高阶MapReduce
- 链接多个MapReduce作业
- 执行多个数据集的联结
- 生成Bloom filter
代码清单 用于链接MapReduce作业中mapper的driver
1 Configuration conf = getConf();
2 JobConf job = new JobConf(conf);
3
4 job.setJobName("ChainJob");
5 job.setInputFormat(TextInputFormat.class);
6 job.setOutputFormat(TextOutputFormat.class);
7
8 FileInputFormat.setInputPaths(job, in);
9 FileOutputFormat.setOutputPath(job, out);
10
11
12 JobConf map1Conf = new JobConf(false);
13 ChainMapper.addMapper(job,
14 Map1.class,
15 LongWritable.class,
16 Text.class,
17 Text.class,
18 Text.class,
19 true,
20 map1Conf);
21
22 JobConf map2Conf = new JobConf(false);
23 ChainMapper.addMapper(job,
24 Map2.class,
25 Text.class,
26 Text.class,
27 LongWritable.class,
28 Text.class,
29 true,
30 map2Conf);
31
32 JobConf reduceConf = new JobConf(false);
33 ChainReducer.setReducer(job,
34 Reduce.class,
35 LongWritable.class,
36 Text.class,
37 Text.class,
38 Text.class,
39 true,
40 reduceConf);
41
42 JobConf map3Conf = new JobConf(false);
43 ChainReducer.addMapper(job,
44 Map3.class,
45 Text.class,
46 Text.class,
47 LongWritable.class,
48 Text.class,
49 true,
50 map3Conf);
51
52 JobConf map4Conf = new JobConf(false);
53 ChainReducer.addMapper(job,
54 Map4.class,
55 LongWritable.class,
56 Text.class,
57 LongWritable.class,
58 Text.class,
59 true,
60 map4Conf);
61
62 JobClient.runJob(job);
driver首选会设置全局的JobConf对象,包含作业名、输入路径及输出路径等。它一次性添加这个由5个步骤链接在一起的作业,以步骤执行先后为序。它用ChainMapper.addMapper()添加位于Reduce之前的所有步骤。用静态的ChainReducer.setReducer()方法设置reducer。再用ChainReducer.addMapper()方法添加后续的步骤。全局JobConf对象经历所有的5个add*方法。此外,每个mapper和reducer都有一个本地JobConf对象(map1Conf、map2Conf、map3Conf、map4Conf和reduceConf),其优先级在配置各自mapper/reducer时高于全局的对象。建议本地JobConf对象采用一个新的JobConf对象,且在初始化时不设默认值——new JobConf(false)。 让我们通过ChainMapper.addMapper()方法的签名来详细了解如何一步步地链接作业,其中ChainReducer.setReducer()的签名和功能与ChainReducer.addMapper()类似: public static <k1, v1, k2, v2> void addMapper(JobConf job, Class <? extends Mapper<k1, v1, k2, v2>> class, Class <? extends k1> inputKeyClass, Class <? extends v1> inputValueClass, Class <? extends k2> outputKeyClass, Class <? extends v2> outputValueClass, boolean byValue, JobConf mapperConf) 该方法有8个参数,第一个和最后一个分别为全局和本地的JobConf对象。第二个参数klass是Mapper类,负责数据处理。对于byValue这个参数,如果确信map1的map()方法在调用OutoutCollector.collect(K k, V v)之后不再使用k和v的内容,或者map2并不改变k和v在其上的输入值,则可以通过设定buValue为false来获取一定的性能提升;如果对Mapper的内部代码不太了解,则可以通过设定byValue为true,确保Mapper会按预期的方式工作。余下的4个参数inputKeyClass、inputValueClass、outputKeyClass和outputValueClass是这个Mapper类中输入/输出类的类型。 2、联结不同来源数据 [Reduce侧的联结]
- 首先mapper接收的数据来自两个文件,Customers及Orders;
- 在map()封装输入的每个记录后,就执行MapReduce标准的分区、洗牌和排序操作;
- reduce()函数接收输入数据,并对其值进行完全交叉乘积;
- 交叉乘积得到的每个合并结果被送入函数conbine()。
代码清单 来自两个reduce侧连接数据的内联结
1 import java.io.DataInput;
2 import java.io.DataOutput;
3 import java.io.IOException;
4 import java.util.Iterator;
5
6 import org.apache.hadoop.conf.Configuration;
7 import org.apache.hadoop.conf.Configured;
8 import org.apache.hadoop.fs.Path;
9 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.io.Writable;
11 import org.apache.hadoop.mapred.FileInputFormat;
12 import org.apache.hadoop.mapred.FileOutputFormat;
13 import org.apache.hadoop.mapred.JobClient;
14 import org.apache.hadoop.mapred.JobConf;
15 import org.apache.hadoop.mapred.KeyValueTextInputFormat;
16 import org.apache.hadoop.mapred.MapReduceBase;
17 import org.apache.hadoop.mapred.Mapper;
18 import org.apache.hadoop.mapred.OutputCollector;
19 import org.apache.hadoop.mapred.Reducer;
20 import org.apache.hadoop.mapred.Reporter;
21 import org.apache.hadoop.mapred.TextInputFormat;
22 import org.apache.hadoop.mapred.TextOutputFormat;
23 import org.apache.hadoop.util.Tool;
24 import org.apache.hadoop.util.ToolRunner;
25
26 import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
27 import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
28 import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
29
30 public class DataJoin extends Configured implements Tool {
31
32 public static class MapClass extends DataJoinMapperBase {
33
34 protected Text generateInputTag(String inputFile) {
35 String datasource = inputFile.split("-")[0];
36 return new Text(datasource);
37 }
38
39 protected Text generateGroupKey(TaggedMapOutput aRecord) {
40 String line = ((Text) aRecord.getData()).toString();
41 String[] tokens = line.split(",");
42 String groupKey = tokens[0];
43 return new Text(groupKey);
44 }
45
46 protected TaggedMapOutput generateTaggedMapOutput(Object value) {
47 TaggedWritable retv = new TaggedWritable((Text) value);
48 retv.setTag(this.inputTag);
49 return retv;
50 }
51 }
52
53 public static class Reduce extends DataJoinReducerBase {
54
55 protected TaggedMapOutput combine(Object[] tags, Object[] values) {
56 if (tags.length < 2) return null;
57 String joinedStr = "";
58 for (int i=0; i<values.length; i++) {
59 if (i > 0) joinedStr += ",";
60 TaggedWritable tw = (TaggedWritable) values[i];
61 String line = ((Text) tw.getData()).toString();
62 String[] tokens = line.split(",", 2);
63 joinedStr += tokens[1];
64 }
65 TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
66 retv.setTag((Text) tags[0]);
67 return retv;
68 }
69 }
70
71 public static class TaggedWritable extends TaggedMapOutput {
72
73 private Writable data;
74
75 public TaggedWritable(Writable data) {
76 this.tag = new Text("");
77 this.data = data;
78 }
79
80 public Writable getData() {
81 return data;
82 }
83
84 public void write(DataOutput out) throws IOException {
85 this.tag.write(out);
86 this.data.write(out);
87 }
88
89 public void readFields(DataInput in) throws IOException {
90 this.tag.readFields(in);
91 this.data.readFields(in);
92 }
93 }
94
95 public int run(String[] args) throws Exception {
96 Configuration conf = getConf();
97
98 JobConf job = new JobConf(conf, DataJoin.class);
99
100 Path in = new Path(args[0]);
101 Path out = new Path(args[1]);
102 FileInputFormat.setInputPaths(job, in);
103 FileOutputFormat.setOutputPath(job, out);
104
105 job.setJobName("DataJoin");
106 job.setMapperClass(MapClass.class);
107 job.setReducerClass(Reduce.class);
108
109 job.setInputFormat(TextInputFormat.class);
110 job.setOutputFormat(TextOutputFormat.class);
111 job.setOutputKeyClass(Text.class);
112 job.setOutputValueClass(TaggedWritable.class);
113 job.set("mapred.textoutputformat.separator", ",");
114
115 JobClient.runJob(job);
116 return 0;
117 }
118
119 public static void main(String[] args) throws Exception {
120 int res = ToolRunner.run(new Configuration(),
121 new DataJoin(),
122 args);
123
124 System.exit(res);
125 }
126 }
[转载请注明] http://www.cnblogs.com/zhengrunjian/