Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

时间:2021-07-19 16:43:11

  不多说,直接上代码。

  假如这里有一份邮箱数据文件,我们期望统计邮箱出现次数并按照邮箱的类别,将这些邮箱分别输出到不同文件路径下。

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

Hadoop MapReduce编程 API入门系列之MapReduce多种输出格式分析(十九)

代码版本1

 package zhouls.bigdata.myMapReduce.Email;

 import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; //通过MultipleOutputs写到多个文件:参考博客http://www.cnblogs.com/codeOfLife/p/5452902.html // MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。
// 这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件。 采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,
// 其中 name 是由程序设定的任意名字, nnnnn 是一个指明块号的整数(从 0 开始)。块号保证从不同块(mapper 或 reducer)输出在相同名字情况下不会冲突。 public class Email extends Configured implements Tool {
public static class MailMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1); @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, one);
}
} public static class MailReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
private MultipleOutputs< Text, IntWritable> multipleOutputs; @Override
protected void setup(Context context) throws IOException ,InterruptedException{
multipleOutputs = new MultipleOutputs<Text, IntWritable>(context);
} protected void reduce(Text Key, Iterable<IntWritable> Values,Context context) throws IOException, InterruptedException {
int begin = Key.toString().indexOf("@");//indexOf方法返回一个整数值,指出 String 对象内子字符串的开始位置。
int end = Key.toString().indexOf(".");//indexOf方法返回一个整数值,指出 String 对象内子字符串的开始位置。只不过我们自己写出个end变量而已
// Key.toString().indexOf(ch)
// Key.toString().indexOf(str)
// Key.toString().indexOf(ch, fromIndex)
// Key.toString().indexOf(str, fromIndex)
// Key.toString().intern() // Java中字符串中子串的查找共有四种方法,如下:
// 1、int indexOf(String str) :返回第一次出现的指定子字符串在此字符串中的索引。
// 2、int indexOf(String str, int startIndex):从指定的索引处开始,返回第一次出现的指定子字符串在此字符串中的索引。
// 3、int lastIndexOf(String str) :返回在此字符串中最右边出现的指定子字符串的索引。
// 4、int lastIndexOf(String str, int startIndex) :从指定的索引处开始向后搜索,返回在此字符串中最后一次出现的指定子字符串的索引。 if(begin>=end){
return;
} //获取邮箱类别,比如 qq
String name = Key.toString().substring(begin+1, end);
// String.subString(start,end)截取的字符串包括起点所在的字符串,不包括终点所在的字符串 int sum = 0; for (IntWritable value : Values) {
sum += value.get();
} result.set(sum);
multipleOutputs.write(Key, result, name);
//这里,我们用到的是multipleOutputs.write(Text key, IntWritable value, String baseOutputPath); // multipleOutputs.write默认有3种构造方法:
// multipleOutputs.write(String namedOutput, K key, V value);
// multipleOutputs.write(Text key, IntWritable value, String baseOutputPath);
// multipleOutputs.write(String namedOutput, K key, V value,String baseOutputPath); // MultipleOutputs 类可以将数据写到多个文件,这些文件的名称源于输出的键和值或者任意字符串。
// 这允许每个 reducer(或者只有 map 作业的 mapper)创建多个文件。
// 采用name-m-nnnnn 形式的文件名用于 map 输出,name-r-nnnnn 形式的文件名用于 reduce 输出,
// 其中 name 是由程序设定的任意名字,
// nnnnn 是一个指明块号的整数(从 0 开始)。
// 块号保证从不同块(mapper 或 reducer)写的输出在相同名字情况下不会冲突。 } @Override
protected void cleanup(Context context) throws IOException ,InterruptedException{
multipleOutputs.close();
} } public int run(String[] args) throws Exception {
Configuration conf = new Configuration();// 读取配置文件 Path mypath = new Path(args[1]);
FileSystem hdfs = mypath.getFileSystem(conf);//创建输出路径
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
}
Job job = Job.getInstance();// 新建一个任务
job.setJarByClass(Email.class);// 主类 FileInputFormat.addInputPath(job, new Path(args[0]));// 输入路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));// 输出路径 job.setMapperClass(MailMapper.class);// Mapper
job.setReducerClass(MailReducer.class);// Reducer job.setOutputKeyClass(Text.class);// key输出类型
job.setOutputValueClass(IntWritable.class);// value输出类型 job.waitForCompletion(true);
return 0;
} public static void main(String[] args) throws Exception {
String[] args0 = {
"hdfs://HadoopMaster:9000/inputData/multipleOutputFormats/mail.txt",
"hdfs://HadoopMaster:9000/outData/MultipleOutputFormats/" };
int ec = ToolRunner.run(new Configuration(), new Email(), args0);
System.exit(ec);
}
}

代码版本1

 package zhouls.bigdata.myMapReduce.Email;

 import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; //假如这里有一份邮箱数据文件,我们期望统计邮箱出现次数并按照邮箱的类别,将这些邮箱分别输出到不同文件路径下。
/*wolys@21cn.com
zss1984@126.com
294522652@qq.com
simulateboy@163.com
zhoushigang_123@163.com
sirenxing424@126.com
lixinyu23@qq.com
chenlei1201@gmail.com
370433835@qq.com
cxx0409@126.com
viv093@sina.com
q62148830@163.com
65993266@qq.com
summeredison@sohu.com
zhangbao-autumn@163.com
diduo_007@yahoo.com.cn
fxh852@163.com /out/163-r-00000
/out/126-r-00000
/out/21cn-r-00000
/out/gmail-r-00000
/out/qq-r-00000
/out/sina-r-00000
/out/sohu-r-00000
/out/yahoo-r-00000
/out/part-r-00000
*/ public class Email extends Configured implements Tool{
public static class MailMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);//赋值1给one @Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
context.write(value, one);//将value和one写入到context里。 value是k2,one是v2
// context.write(new Text(value),new IntWritable(one));等价
// key默认是行偏移量,可以自己自定义改 }
} // MultipleOutputs将结果输出到多个文件或文件夹的步骤:
// 见博客http://tydldd.iteye.com/blog/2053867 public static class MailReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
private MultipleOutputs<Text, IntWritable> multipleOutputs;//MultipleOutputs将结果输出到多个文件或文件夹
// 因为,MultipleOutputs是将结果输出到多个文件或文件夹,那么结果是什么,则就是k3,v3啦。即在这里就是MultipleOutputs<Text, IntWritable> multipleOutputs; //创建对象,以下是模板,别怕
protected void setup(Context context) throws IOException ,InterruptedException{
multipleOutputs = new MultipleOutputs<Text, IntWritable>(context);
} protected void reduce(Text Key, Iterable<IntWritable> Values,Context context) throws IOException, InterruptedException{
//294522652@qq.com
int begin = Key.toString().indexOf("@");//indexOf() 方法可返回某个指定的字符串值在字符串中首次出现的位置。 即begin是9
int end = Key.toString().indexOf(".");//indexOf() 方法可返回某个指定的字符串值在字符串中首次出现的位置。 即end是12
if(begin>=end){
return;
} //获取邮箱类别,比如 qq
String name = Key.toString().substring(begin+1, end);//substring()是去除指定字符串的方法,及substring(10,12)
int sum = 0;
for (IntWritable value : Values) {//计数,for星型循环,即将Iterable<IntWritable> Values的值,一一传给IntWritable value
sum += value.get();//就是拿取IntWritable类型的value的值,给value类型的sum
}
result.set(sum);//即求和计数,如wolys@21cn.com出现了几次几次。
multipleOutputs.write(Key, result, name);//将Key和result和name一起写入multipleOutputs /*
* http://www.cnblogs.com/codeOfLife/p/5452902.html
* multipleOutputs.write(key, value, baseOutputPath)方法的第三个函数表明了该输出所在的目录(相对于用户指定的输出目录)。
* 如果baseOutputPath不包含文件分隔符"/",那么输出的文件格式为baseOutputPath-r-nnnnn(name-r-nnnnn);
* 如果包含文件分隔符"/",例如baseOutputPath="029070-99999/1901/part",那么输出文件则为029070-99999/1901/part-r-nnnnn
*/
} //关闭对象,以下是模板,别怕
protected void cleanup(Context context) throws IOException ,InterruptedException{
multipleOutputs.close();
}
} public int run(String[] arg0) throws Exception{
Configuration conf = new Configuration();// 读取配置文件
Path mypath = new Path(arg0[1]);//下标为1,即是输出路径
FileSystem hdfs = mypath.getFileSystem(conf);//FileSystem对象hdfs
if (hdfs.isDirectory(mypath))
{
hdfs.delete(mypath, true);
}
Job job = Job.getInstance();// 新建一个任务
job.setJarByClass(Email.class);// 主类 job.setMapperClass(MailMapper.class);// Mapper
job.setReducerClass(MailReducer.class);// Reducer job.setOutputKeyClass(Text.class);// key输出类型
job.setOutputValueClass(IntWritable.class);// value输出类型 FileInputFormat.addInputPath(job, new Path(arg0[0]));// 文件输入路径
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));// 文件输出路径
job.waitForCompletion(true); return 0;
} public static void main(String[] args) throws Exception{
//集群路径
// String[] args0 = { "hdfs://HadoopMaster:9000/email/email.txt",
// "hdfs://HadoopMaster:9000/out/email"}; //本地路径
String[] args0 = { "./data/email/email.txt",
"out/email/"}; int ec = ToolRunner.run( new Configuration(), new Email(), args0);
System. exit(ec);
}
}