JAVA 工程使用MapReduce
准备两个文件:file1.txt
hello, world
hello, china
hello, hefei
file2.txt
hello, world
hello, china
hello, hefei
统计每个单词出现的次数。
新建Eclipse创建一个Java工程
导入Hadoop的JAR文件
编码:
WordMapper类:
package wordcount;
import ...(导入包省略)
public class WordMapper extends Mapper<Object,Text,Text,IntWritable>{
private final static IntWritable one = new IntWritable(1);
public void map(Object key,Text value,Context context)
throws IOException,InterruptedException{
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word,one);
}
}
}
WordReducer类
package wordcount;
import ...
public class WordReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
private IntWritable result=new IntWritable();
public void reduce(Text key,Inerable<IntWritable> values,Context context) throws IOException,InterruptedException{
int sum=0;
for(IntWritable val:values){
sum += val.get();
}
result.set(sum);
context.write(key,result);
}
}
WordMain驱动类
package wordcount;
import ...
public class WordMain{
public static void main (String[] args) throws Exception{
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length!=2){
System.err.println("Usage:word count <in> <out>");
System.exit(2);
}
Job job = new Job(conf,"word count");
job.setJarByClass(WordMain.class);
job.setMapperClass(WordMapper.class);
job.setCombinerClass(WordReducer.class);
job.setReducerClass(WordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
打包运行,只需要把程序部署到集群中的Master即可。
打包时,Export->JAR file,只需要选择src文件夹,lib下的jar不需要添加到jar里面。不要把classpath和project文件添加到jar文件中。
在”Select the export destination”下面的JAR file选项中,选择jar文件存放的位置和文件名。
1. 把wordcount.jar发送到hadoop集群的Master节点的$HADOOP_HOME下面。
2. 把Master节点的/opt/下面创建两个文件 file1.txt和file2.txt
3. 把file1.txt和file2.txt传到HDFS
$ hadoop fs -put /opt/file* /user/hadoop/input/
$ hadoop fs -ls /user/hadoop/input/
运行
hadoop jar wordcount.jar wordcount.wordMain/user/hadoop/input/file* /user/hadoop/output1
本文学习,参考《Hadoop应用开发技术详解》(刘刚著)
感谢作者。
MapReduce join
reduce side join
要进行大量的数据传输,会造成大量的网络IO效率低下
map side join
在处理多个小表关联大表时非常有用。两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放在内存中。这样可以把小表复制多份,让每个map task内存中存在一份(比如放到hash table中),然后只扫描大表。对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录, 如果有,则连接后输出即可。为了支持文件的复制,hadoop提供了一个类DistributedCache,该类的使用方法如下:
1. 用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URL(如果是HDFS上的文件,则可以:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作业启动之前会获取这个URL列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
2. 用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。
semi join
shiyanlou MapReduce实例实验
准备两个文件:
dept
10,ACCOUNTING,NEW YORK
20,RESEARCH,DALLAS
30,SALES,CHICAGO
40,OPERATIONS,BOSTON
emp
7369,SMITH,CLERK,7902,17-12月-80,800,,20
7499,ALLEN,SALESMAN,7698,20-2月-81,1600,300,30
7521,WARD,SALESMAN,7698,22-2月-81,1250,500,30
7566,JONES,MANAGER,7839,02-4月-81,2975,,20
7654,MARTIN,SALESMAN,7698,28-9月-81,1250,1400,30
7698,BLAKE,MANAGER,7839,01-5月-81,2850,,30
7782,CLARK,MANAGER,7839,09-6月-81,2450,,10
7839,KING,PRESIDENT,,17-11月-81,5000,,10
7844,TURNER,SALESMAN,7698,08-9月-81,1500,0,30
7900,JAMES,CLERK,7698,03-12月-81,950,,30
7902,FORD,ANALYST,7566,03-12月-81,3000,,20
7934,MILLER,CLERK,7782,23-1月-82,1300,,10
执行命令
sudo vi /etc/hosts
//192.168.40.7 19.... hadoop
cd /app/hadoop-1.1.2/bin
./start-all.sh
cd /home/shiyanlou/install-pack/class6
hadoop fs -mkdir -p /class6/input
hadoop fs -copyFromLocal dept /class6/input
hadoop fs -copyFromLocal emp /class6/input
hadoop fs -ls /class6/input
计算工资示例
Q1SumDeptSalary.java
进入/app/hadoop-1.1.2/myclass/class6新建
Q1SumDeptSalary.java。
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Q1SumDeptSalary extends Configured implements Tool {
public static class MapClass extends Mapper<LongWritable, Text, Text, Text> {
// 用于缓存 dept文件中的数据
private Map<String, String> deptMap = new HashMap<String, String>();
private String[] kv;
// 此方法会在Map方法执行之前执行且执行一次
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader in = null;
try {
// 从当前作业中获取要缓存的文件
Path[] paths = DistributedCache.getLocalCacheFiles(context.getConfiguration());
String deptIdName = null;
for (Path path : paths) {
// 对部门文件字段进行拆分并缓存到deptMap中
if (path.toString().contains("dept")) {
in = new BufferedReader(new FileReader(path.toString()));
while (null != (deptIdName = in.readLine())) {
// 对部门文件字段进行拆分并缓存到deptMap中
// 其中Map中key为部门编号,value为所在部门名称
deptMap.put(deptIdName.split(",")[0], deptIdName.split(",")[1]);
}
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 对员工文件字段进行拆分
kv = value.toString().split(",");
// map join: 在map阶段过滤掉不需要的数据,输出key为部门名称和value为员工工资
if (deptMap.containsKey(kv[7])) {
if (null != kv[5] && !"".equals(kv[5].toString())) {
context.write(new Text(deptMap.get(kv[7].trim())), new Text(kv[5].trim()));
}
}
}
}
public static class Reduce extends Reducer<Text, Text, Text, LongWritable> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// 对同一部门的员工工资进行求和
long sumSalary = 0;
for (Text val : values) {
sumSalary += Long.parseLong(val.toString());
}
// 输出key为部门名称和value为该部门员工工资总和
context.write(key, new LongWritable(sumSalary));
}
}
@Override
public int run(String[] args) throws Exception {
// 实例化作业对象,设置作业名称、Mapper和Reduce类
Job job = new Job(getConf(), "Q1SumDeptSalary");
job.setJobName("Q1SumDeptSalary");
job.setJarByClass(Q1SumDeptSalary.class);
job.setMapperClass(MapClass.class);
job.setReducerClass(Reduce.class);
// 设置输入格式类
job.setInputFormatClass(TextInputFormat.class);
// 设置输出格式
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
// 第1个参数为缓存的部门数据路径、第2个参数为员工数据路径和第3个参数为输出路径
String[] otherArgs = new GenericOptionsParser(job.getConfiguration(), args).getRemainingArgs();
DistributedCache.addCacheFile(new Path(otherArgs[0]).toUri(), job.getConfiguration());
FileInputFormat.addInputPath(job, new Path(otherArgs[1]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[2]));
job.waitForCompletion(true);
return job.isSuccessful() ? 0 : 1;
}
/**
* 主方法,执行入口
* @param args 输入参数
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new Q1SumDeptSalary(), args);
System.exit(res);
}
}
编译:
javac -classpath ../../hadoop-core-1.1.2.jar:../../lib/commons-cli-1.2.jar Q1SumDeptSalary.java
jar cvf ./Q1SumDeptSalary.jar ./Q1SumDept*.class
mv *.jar ../..
rm Q1SumDept*.class
cd /app/hadoop-1.1.2
hadoop jar Q1SumDeptSalary.jar Q1SumDeptSalary hdfs://hadoop:9000/class6/input/dept hdfs://hadoop:9000/class6/input/emp hdfs://hadoop:9000/class6/out1
hadoop fs -ls /class6/out1
hadoop fs -cat/class6/out1/part-r-00000
可以查看到运行结果。