- 链接多个MapReduce作业
- 执行多个数据集的联结
- 生成Bloom filter
Configuration conf = getConf();
JobConf job = new JobConf(conf); job.setJobName("ChainJob");
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out); JobConf map1Conf = new JobConf(false);
ChainMapper.addMapper(job,
Map1.class,
LongWritable.class,
Text.class,
Text.class,
Text.class,
true,
map1Conf); JobConf map2Conf = new JobConf(false);
ChainMapper.addMapper(job,
Map2.class,
Text.class,
Text.class,
LongWritable.class,
Text.class,
true,
map2Conf); JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer(job,
Reduce.class,
LongWritable.class,
Text.class,
Text.class,
Text.class,
true,
reduceConf); JobConf map3Conf = new JobConf(false);
ChainReducer.addMapper(job,
Map3.class,
Text.class,
Text.class,
LongWritable.class,
Text.class,
true,
map3Conf); JobConf map4Conf = new JobConf(false);
ChainReducer.addMapper(job,
Map4.class,
LongWritable.class,
Text.class,
LongWritable.class,
Text.class,
true,
map4Conf); JobClient.runJob(job);
- 首先mapper接收的数据来自两个文件,Customers及Orders;
- 在map()封装输入的每个记录后,就执行MapReduce标准的分区、洗牌和排序操作;
- reduce()函数接收输入数据,并对其值进行完全交叉乘积;
- 交叉乘积得到的每个合并结果被送入函数conbine()。
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Iterator; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.KeyValueTextInputFormat;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput; public class DataJoin extends Configured implements Tool { public static class MapClass extends DataJoinMapperBase { protected Text generateInputTag(String inputFile) {
String datasource = inputFile.split("-")[0];
return new Text(datasource);
} protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(",");
String groupKey = tokens[0];
return new Text(groupKey);
} protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}
} public static class Reduce extends DataJoinReducerBase { protected TaggedMapOutput combine(Object[] tags, Object[] values) {
if (tags.length < 2) return null;
String joinedStr = "";
for (int i=0; i<values.length; i++) {
if (i > 0) joinedStr += ",";
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();
String[] tokens = line.split(",", 2);
joinedStr += tokens[1];
}
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
} public static class TaggedWritable extends TaggedMapOutput { private Writable data; public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
} public Writable getData() {
return data;
} public void write(DataOutput out) throws IOException {
this.tag.write(out);
this.data.write(out);
} public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
this.data.readFields(in);
}
} public int run(String[] args) throws Exception {
Configuration conf = getConf(); JobConf job = new JobConf(conf, DataJoin.class); Path in = new Path(args[0]);
Path out = new Path(args[1]);
FileInputFormat.setInputPaths(job, in);
FileOutputFormat.setOutputPath(job, out); job.setJobName("DataJoin");
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class); job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", ","); JobClient.runJob(job);
return 0;
} public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(),
new DataJoin(),
args); System.exit(res);
}
}
[Hadoop in Action] 第5章 高阶MapReduce的更多相关文章
-
[Hadoop in Action] 第7章 细则手册
向任务传递定制参数 获取任务待定的信息 生成多个输出 与关系数据库交互 让输出做全局排序 1.向任务传递作业定制的参数 在编写Mapper和Reducer时,通常会想让一些地方可以配 ...
-
[Hadoop in Action] 第6章 编程实践
Hadoop程序开发的独门绝技 在本地,伪分布和全分布模式下调试程序 程序输出的完整性检查和回归测试 日志和监控 性能调优 1.开发MapReduce程序 [本地模式] 本地模式 ...
-
[Hadoop in Action] 第4章 编写MapReduce基础程序
基于hadoop的专利数据处理示例 MapReduce程序框架 用于计数统计的MapReduce基础程序 支持用脚本语言编写MapReduce程序的hadoop流式API 用于提升性能的Combine ...
-
[hadoop in Action] 第3章 Hadoop组件
管理HDFS中的文件 分析MapReduce框架中的组件 读写输入输出数据 1.HDFS文件操作 [命令行方式] Hadoop的文件命令采取的形式为: hadoop fs -cmd < ...
-
[Hadoop in Action] 第2章 初识Hadoop
Hadoop的结构组成 安装Hadoop及其3种工作模式:单机.伪分布和全分布 用于监控Hadoop安装的Web工具 1.Hadoop的构造模块 (1)NameNode(名字节点) ...
-
[Hadoop in Action] 第1章 Hadoop简介
编写可扩展.分布式的数据密集型程序和基础知识 理解Hadoop和MapReduce 编写和运行一个基本的MapReduce程序 1.什么是Hadoop Hadoop是一个开源的框架,可编写和运 ...
-
Cloudera Hadoop 5&; Hadoop高阶管理及调优课程(CDH5,Hadoop2.0,HA,安全,管理,调优)
1.课程环境 本课程涉及的技术产品及相关版本: 技术 版本 Linux CentOS 6.5 Java 1.7 Hadoop2.0 2.6.0 Hadoop1.0 1.2.1 Zookeeper 3. ...
-
《JavaScript设计模式与开发实践》——第3章 闭包和高阶函数
闭包 变量的作用域和生存周期密切相关 高阶函数 函数可以作为参数被传递 函数可以作为返回值输出
-
Kotlin——高级篇(二):高阶函数详解与标准的高阶函数使用
在上面一个章节中,详细的讲解了Kotlin中关于Lambda表达式的语法以及运用,如果还您对其还不甚理解,请参见Kotlin--高级篇(一):Lambda表达式详解.在这篇文章中,多次提到了Kotli ...
随机推荐
-
ACCELEROMETER
顾名思义,是加速感应器.有2种应用吧:1,电脑保护,例如当笔记本掉落时,可以被自动检测到,此时会自动关闭硬盘操作以保护数据不在强烈冲击时丢失.
-
SQLServer基本查询
条件查询 --1.比较运算符 --2.确定集合谓词 --3.确定范围谓词 , ) --4.字符匹配谓词 select * from dbo.DepartMent where dName like 'C ...
-
sql server 读取表结构
SELECT 表名 then d.name else '' end, 字段序号=a.colorder, 主键 FROM sysobjects where xtype='PK' and name in ...
-
asp.net中C#对象与方法 属性详解
C#对象与方法 一.相关概念: 1.对象:现实世界中的实体 2. 类:具有相似属性和方法的对象的集合 3.面向对象程序设计的特点:封装 继承 多态 二.类的定义与语法 1.定义类: 修饰符 类名称 ...
-
对PostgreSQL的prepared statement的深入理解
看官方文档: http://www.postgresql.org/docs/current/static/sql-prepare.html PREPARE creates a prepared sta ...
-
快速消除IOS 版本升级带来的警告
开发中我们经常会遇到这样的情况,我们在IOS 6.0开发的程序,当出现IOS 7.0 或者IOS8.0的时候,我们代码中得某些方法苹果已经不推荐使用了,建议我们改用新的方法.如果我们不更新方法,则会出 ...
-
hdu 5997 rausen loves cakes(线段数合并+启发式修改)
题目链接:hdu 5997 rausen loves cakes 题意: 给你n个点,每个点有一个颜色,现在有两个操作,第一个操作,将颜色x改为颜色y,第二个操作,询问[x,y]区间有多少颜色段(颜色 ...
-
ODAC(V9.5.15) 学习笔记(三)TOraSession(4)
4. 数据库信息 名称 类型 说明 GetDatabaseNames 获取对应的数据库对象名称列表 GetSequenceNames GetStoredProcNames GetTableNames ...
-
stable
stable - 必应词典 美['steɪb(ə)l]英['steɪb(ə)l] n.马厩:马房:(养马作特定用途的)养马场 adj.稳定的:稳固的:牢固的:稳重的 v.使(马)入厩:把(马)拴在马厩 ...
-
3-22 Ruby 编码规则(个人整理)
编码规则 https://github.com/thoughtbot/guides/tree/master/style/ruby *Use a trailing comma after each it ...