多个mapreduce连接实例

时间:2022-11-06 05:10:43

将reduce端连接的Map/Reduce结果作为wordCount 的map输入源:

package com.mr.multiMapReduce;

import java.io.IOException;

import org.apache.hadoop.examples.WordCount;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import com.mr.reduceSideJoin.CombineValues;
import com.mr.reduceSideJoin.ReduceSideJoin_LeftOuterJoin;

/*
* 这个实例不能在eclipse中运行,无法启动job2,但是放在hadoop环境上是好的。
* */
public class MultiMapReduce {
// 启动函数
public static void main(String[] args) throws IOException {

JobConf conf = new JobConf(MultiMapReduce.class);

// 第一个job的配置
Job job1 = new Job(conf, "join1");
job1.setJarByClass(MultiMapReduce.class);

job1.setMapperClass(ReduceSideJoin_LeftOuterJoin.LeftOutJoinMapper.class);
job1.setReducerClass(ReduceSideJoin_LeftOuterJoin.LeftOutJoinReducer.class);

job1.setMapOutputKeyClass(Text.class);// map阶段的输出的key
job1.setMapOutputValueClass(CombineValues.class);// map阶段的输出的value

job1.setOutputKeyClass(Text.class);// reduce阶段的输出的key
job1.setOutputValueClass(Text.class);// reduce阶段的输出的value

// job1的输入输出文件路径
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileOutputFormat.setOutputPath(job1, new Path(args[1]));

// 第二个作业的配置
Job job2 = new Job(conf, "Join2");
job2.setJarByClass(MultiMapReduce.class);

job2.setMapperClass(WordCount.TokenizerMapper.class);
job2.setReducerClass(WordCount.IntSumReducer.class);

job2.setMapOutputKeyClass(Text.class);// map阶段的输出的key
job2.setMapOutputValueClass(IntWritable.class);// map阶段的输出的value

job2.setOutputKeyClass(Text.class);// reduce阶段的输出的key
job2.setOutputValueClass(IntWritable.class);// reduce阶段的输出的value

FileInputFormat.addInputPath(job2, new Path(args[1]));
FileOutputFormat.setOutputPath(job2, new Path(args[2]));

ControlledJob jobx = new ControlledJob(conf);
jobx.setJob(job1);
ControlledJob joby = new ControlledJob(conf);
joby.setJob(job2);
joby.addDependingJob(jobx);

// 主的控制容器,控制上面的总的两个子作业
JobControl jobCtrl = new JobControl("myctrl");
jobCtrl.addJob(jobx);
jobCtrl.addJob(joby);

// 在线程启动,记住一定要有这个
Thread t = new Thread(jobCtrl);
t.start();

while (true) {
if (jobCtrl.allFinished()) {// 如果作业成功完成,就打印成功作业的信息
System.out.println(jobCtrl.getSuccessfulJobList());
// 等任务运行完删除第一个job的reduce结果。
FileSystem fs = FileSystem.get(conf);
Path path = new Path(new Path(args[1]).toUri());
boolean bool = fs.deleteOnExit(path);
if (bool) {
System.out.println("文件删除成功");
}
fs.close();
jobCtrl.stop();
break;
}
}
}
}