1.分析记录手机流量的日志。
2.拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量,然后封装成kv发送出去
3.使用java中的map方法;
public class FlowNumMapper extends Mapper<LongWritable,Text,Text,FlowBean> {
@Override
protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{
//拿一行数据
String line = value.toString();
//切分成各个字段
String[] fields=StringUtils.split(line, "\t");
String phoneNB=fields[1];
long u_flow=Long.parseLong(fields[7]);
long d_flow=Long.parseLong(fields[8]);
//封装数据为KV并输出
context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));
}
}
4.在map方法中FlowBean参数传递的是一个序列化实体。
package hadoop.mr.flownum;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class FlowBean implements Writable {
private String phoneNB;
private long up_flow;
private long d_flow;
private long s_flow;
// 在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数
public FlowBean() {
}
// 为了对象数据的初始化方便,加入一个带参数的构造函数
public FlowBean(String phoneNB, long up_flow, long d_flow) {
this.phoneNB = phoneNB;
this.up_flow = up_flow;
this.d_flow = d_flow;
this.s_flow = up_flow + d_flow;
}
public String getPhoneNB() {
return phoneNB;
}
public void setPhoneNB(String phoneNB) {
this.phoneNB = phoneNB;
}
public long getUp_flow() {
return up_flow;
}
public void setUp_flow(long up_flow) {
this.up_flow = up_flow;
}
public long getD_flow() {
return d_flow;
}
public void setD_flow(long d_flow) {
this.d_flow = d_flow;
}
public long getS_flow() {
return s_flow;
}
public void setS_flow(long s_flow) {
this.s_flow = s_flow;
}
// 将对象数据序列化对流中
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(phoneNB);
out.writeLong(up_flow);
out.writeLong(d_flow);
out.writeLong(s_flow);
}
// 从数据流中反序列出对象的数据
// 从数据流中读出对象字段时,必须跟序列化时的顺序保持一样
@Override
public void readFields(DataInput in) throws IOException {
phoneNB = in.readUTF();
up_flow = in.readLong();
d_flow = in.readLong();
s_flow = in.readLong();
}
@Override
public String toString(){
return ""+up_flow+"\t"+d_flow+"\t"+s_flow;
}
}
5.传一组数据调用一次我们的reduce方法,reduce中的业务逻辑就是遍历values,然后进行累加求和输出.
package hadoop.mr.flownum;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class FlowNumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long up_flow_counter=0;
long d_flow_counter=0;
for (FlowBean bean : values) {
up_flow_counter +=bean.getD_flow();
d_flow_counter+=bean.getD_flow();
}
context.write(key, new FlowBean(key.toString(),up_flow_counter,d_flow_counter));
}
}
6.job提交:
package hadoop.mr.flownum;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
public class FlowNumRunner extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(FlowNumRunner.class);
job.setMapperClass(FlowNumMapper.class);
job.setMapOutputKeyClass(FlowNumReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new FlowNumRunner(), args);
System.exit(res);
}
}
7.对mapreduce进行打包。
8.把打包的jar包上传到虚拟机,把要统计的日志上传到hadoop
hadoop fs -put HTTP_20130313143750.dat /flow/data
在hadoop中执行flow.jar结果输出到flow/output文件下
hadoop jar flow.jar hadoop.mr.flownum.FlowNumRunner /flow/data /flow/output
9.执行hadoop fs -cat /flow/output/part-r-00000命令查询里面输出的内容,对日志里面的内容统计如下: