2017年8月3日17:03:28
1.自定义对象根据Key和Value分为二种情况
1.1 bean放在value中传输
如果需要将自定义的bean放在value中传输,则实现Writable接口,自定义的bean实现的接口应该是:public class FlowBean implements Writable<FlowBean>
1.2 bean放在key中传输
如果需要将自定义的bean放在key中传输,则实现WritableComparable接口,因为mapreduce框中的shuffle过程一定会对key进行排序,此时,自定义的bean实现的接口应该是:public class FlowBean implements WritableComparable<FlowBean>
WritableComparable源码public interface WritableComparable<T> extends Writable, Comparable<T> {}
2.自定义bean放在value中传输案例
2.0需求
统计每一个用户(手机号)所耗费的总上行流量、下行流量,总流量
2.1代码
二个类:FlowBean和FlowCount
2.1.1 FlowBeanpackage cn.yzx.bigdata.mr.proviceflow;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable{
private long upFlow;
private long downFlow;
private long sumFlow;
// 反序列化时, 反射需要调用空参构造器
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow=upFlow+downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
/*
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/*
* 反序列化方法
* 注意序列化的顺序和反序列化的顺序要一致
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow+"\t"+downFlow+"\t"+sumFlow;
}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable{
private long upFlow;
private long downFlow;
private long sumFlow;
// 反序列化时, 反射需要调用空参构造器
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow=upFlow+downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}
public long getSumFlow() {
return sumFlow;
}
public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}
/*
* 序列化方法
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/*
* 反序列化方法
* 注意序列化的顺序和反序列化的顺序要一致
*/
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
@Override
public String toString() {
return upFlow+"\t"+downFlow+"\t"+sumFlow;
}
}
2.1.1 FlowCount
package cn.yzx.bigdata.mr.proviceflow;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FlowCount {
static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 将一行转化成字符串
String line = value.toString();
String[] fileds = line.split("\t");
// 取出手机号
String phoneNbr = fileds[1];
// 取出上行流量下行流量
long upFlow = Long.parseLong(fileds[fileds.length - 3]);
long downFlow = Long.parseLong(fileds[fileds.length - 2]);
context.write(new Text(phoneNbr), new FlowBean(upFlow, downFlow));
}
}
static class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long sum_upFlow = 0;
long sum_downFlow = 0;
for (FlowBean bean : values) {
sum_upFlow += bean.getUpFlow();
sum_downFlow += bean.getDownFlow();
}
FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
context.write(key, resultBean);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
/*
* conf.set("mapreduce.framework.name", "yarn");
* conf.set("yarn.resourcemanager.hostname","hadoop01");
*/
// 给一些默认的参数
Job job = Job.getInstance(conf);
// 指定本程序的jar包所在的本地路径 把jar包提交到yarn
job.setJarByClass(FlowCount.class);
/*
* 告诉框架调用哪个类 指定本业务job要是用的mapper/Reducer业务类
*/
job.setMapperClass(FlowCountMapper.class);
job.setReducerClass(FlowCountReducer.class);
//指定自定义的分区器
job.setPartitionerClass(ProvincePartitioner.class);
//同时指定相应数量的ReduceTasks
job.setNumReduceTasks(5);
/*
* 指定mapper输出数据KV类型
*/
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 指定最终的输出数据的kv类型 ,有时候不需要reduce过程,如果有的话最终输出指的就是指reducekv类型
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 指定job的文件输入的原始目录
// paths指你的待处理文件可以在多个目录里边
// 第一个参数是你给那个job设置 后边的参数 逗号分隔的多个路径 路径是在hdfs里的
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 指定job 的输出结果所在的目录
FileOutputFormat.setOutputPath(job, new Path(args[1]));
/*
* 找yarn通信 将job中配置的参数, 以及job所用的java类所在的jar包提交给yarn去运行
*/
/* job.submit(); */
// 参数表示程序执行完,告诉我们是否执行成功
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
唯一要注意的是:反序列化的方法,反序列化时,从流中读取到的各个字段的顺序应该与序列化时写出去的顺序保持一致