MapReduce程序开发之流量求和(八)

时间:2022-03-10 21:57:31

1.分析记录手机流量的日志。

2.拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量,然后封装成kv发送出去

3.使用java中的map方法;

public class FlowNumMapper extends Mapper<LongWritable,Text,Text,FlowBean> {

@Override

protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{

//拿一行数据

String line = value.toString();

//切分成各个字段

String[] fields=StringUtils.split(line, "\t");

String phoneNB=fields[1];

long u_flow=Long.parseLong(fields[7]);

long d_flow=Long.parseLong(fields[8]);

//封装数据为KV并输出

context.write(new Text(phoneNB), new FlowBean(phoneNB,u_flow,d_flow));

}

}

4.在map方法中FlowBean参数传递的是一个序列化实体。

package hadoop.mr.flownum;

import java.io.DataInput;

import java.io.DataOutput;

import java.io.IOException;

import org.apache.hadoop.io.Writable;

public class FlowBean implements Writable {

private String phoneNB;

private long up_flow;

private long d_flow;

private long s_flow;

// 在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数

public FlowBean() {

}

// 为了对象数据的初始化方便,加入一个带参数的构造函数

public FlowBean(String phoneNB, long up_flow, long d_flow) {

this.phoneNB = phoneNB;

this.up_flow = up_flow;

this.d_flow = d_flow;

this.s_flow = up_flow + d_flow;

}

public String getPhoneNB() {

return phoneNB;

}

public void setPhoneNB(String phoneNB) {

this.phoneNB = phoneNB;

}

public long getUp_flow() {

return up_flow;

}

public void setUp_flow(long up_flow) {

this.up_flow = up_flow;

}

public long getD_flow() {

return d_flow;

}

public void setD_flow(long d_flow) {

this.d_flow = d_flow;

}

public long getS_flow() {

return s_flow;

}

public void setS_flow(long s_flow) {

this.s_flow = s_flow;

}

// 将对象数据序列化对流中

@Override

public void write(DataOutput out) throws IOException {

out.writeUTF(phoneNB);

out.writeLong(up_flow);

out.writeLong(d_flow);

out.writeLong(s_flow);

}

// 从数据流中反序列出对象的数据

// 从数据流中读出对象字段时,必须跟序列化时的顺序保持一样

@Override

public void readFields(DataInput in) throws IOException {

phoneNB = in.readUTF();

up_flow = in.readLong();

d_flow = in.readLong();

s_flow = in.readLong();

}

@Override

public String toString(){

return ""+up_flow+"\t"+d_flow+"\t"+s_flow;

}

}

5.传一组数据调用一次我们的reduce方法,reduce中的业务逻辑就是遍历values,然后进行累加求和输出.

package hadoop.mr.flownum;

import java.io.IOException;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

public class FlowNumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {

@Override

protected void reduce(Text key, Iterable<FlowBean> values, Context context)

throws IOException, InterruptedException {

long up_flow_counter=0;

long d_flow_counter=0;

for (FlowBean bean : values) {

up_flow_counter +=bean.getD_flow();

d_flow_counter+=bean.getD_flow();

}

context.write(key, new FlowBean(key.toString(),up_flow_counter,d_flow_counter));

}

}

6.job提交:

package hadoop.mr.flownum;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

public class FlowNumRunner extends Configured implements Tool {

@Override

public int run(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);

job.setJarByClass(FlowNumRunner.class);

job.setMapperClass(FlowNumMapper.class);

job.setMapOutputKeyClass(FlowNumReducer.class);

job.setMapOutputKeyClass(Text.class);

job.setMapOutputValueClass(FlowBean.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));

FileOutputFormat.setOutputPath(job, new Path(args[1]));

return job.waitForCompletion(true) ? 0 : 1;

}

public static void main(String[] args) throws Exception {

int res = ToolRunner.run(new Configuration(), new FlowNumRunner(), args);

System.exit(res);

}

}

7.对mapreduce进行打包。
8.把打包的jar包上传到虚拟机,把要统计的日志上传到hadoop

hadoop fs -put HTTP_20130313143750.dat /flow/data

在hadoop中执行flow.jar结果输出到flow/output文件下

hadoop jar flow.jar hadoop.mr.flownum.FlowNumRunner /flow/data /flow/output

MapReduce程序开发之流量求和(八)

9.执行hadoop fs -cat /flow/output/part-r-00000命令查询里面输出的内容,对日志里面的内容统计如下:

MapReduce程序开发之流量求和(八)