一、说在前面的话
上一篇,楼主介绍了使用flume集群来模拟网站产生的日志数据收集到hdfs。但我们所采集的日志数据是不规则的,同时也包含了许多无用的日志。当需要分析一些核心指标来满足系统业务决策的时候,对日志的数据清洗在所难免,楼主本篇将介绍如何使用mapreduce程序对日志数据进行清洗,将清洗后的结构化数据存储到hive,并进行相关指标的提取。
先明白几个概念:
1)PV(Page View)。页面浏览量即为PV,是指所有用户浏览页面的总和,一个独立用户每打开一个页面就被记录1 次。计算方式为:记录计数
2)注册用户数。对注册页面访问的次数。计算方式:对访问member.php?mod=register的url,计数
3)IP数。一天之内,访问网站的不同独立IP 个数加和。其中同一IP无论访问了几个页面,独立IP 数均为1。这是我们最熟悉的一个概念,无论同一个IP上有多少台主机,或者其他用户,从某种程度上来说,独立IP的多少,是衡量网站推广活动好坏最直接的数据。计算方式:对不同ip,计数
4)跳出率。只浏览了一个页面便离开了网站的访问次数占总的访问次数的百分比,即只浏览了一个页面的访问次数 / 全部的访问次数汇总。跳出率是非常重要的访客黏性指标,它显示了访客对网站的兴趣程度。跳出率越低说明流量质量越好,访客对网站的内容越感兴趣,这些访客越可能是网站的有效用户、忠实用户。该指标也可以衡量网络营销的效果,指出有多少访客被网络营销吸引到宣传产品页或网站上之后,又流失掉了,可以说就是煮熟的鸭子飞了。比如,网站在某媒体上打广告推广,分析从这个推广来源进入的访客指标,其跳出率可以反映出选择这个媒体是否合适,广告语的撰写是否优秀,以及网站入口页的设计是否用户体验良好。
计算方式:(1)统计一天内只出现一条记录的ip,称为跳出数
(2)跳出数/PV
本次楼主只做以上几项简单指标的分析,各个网站的作用领域不一样,所涉及的分析指标也有很大差别,各位同学可以根据自己的需求尽情拓展。废话不多说,上干货。
二、环境准备
1)hadoop集群。楼主用的6个节点的hadoop2.7.3集群,各位同学可以根据自己的实际情况进行搭建,但至少需要1台伪分布式的。(参考http://www.cnblogs.com/qq503665965/p/6790580.html)
2)hive。用于对各项核心指标进行分析(安装楼主不再介绍了)
3)mysql。存储分析后的数据指标。
4)sqoop。从hive到mysql的数据导入。
三、数据清洗
我们先看看从flume收集到hdfs中的源日志数据格式:
27.19.74.143 - - [30/4/2017:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
211.97.15.179 - - [30/4/2017:17:38:22 +0800] "GET /home.php?mod=misc&ac=sendmail&rand=1369906181 HTTP/1.1" 200 -
上面包含条个静态资源日志和一条正常链接日志(楼主这里不做静态资源日志的分析),需要将以 /static 开头的日志文件过滤掉;时间格式需要转换为时间戳;去掉IP与时间之间的无用符号;过滤掉请求方式;“/”分隔符、http协议、请求状态及当次流量。效果如下:
211.97.15.179 20170430173820 home.php?mod=misc&ac=sendmail&rand=1369906181
先写个日志解析类,测试是否能解析成功,我们再写mapreduce程序:
package mapreduce; import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Locale; public class LogParser {
public static final SimpleDateFormat FORMAT = new SimpleDateFormat("d/MM/yyyy:HH:mm:ss", Locale.ENGLISH);
public static final SimpleDateFormat dateformat1=new SimpleDateFormat("yyyyMMddHHmmss");
public static void main(String[] args) throws ParseException {
final String S1 = "27.19.74.143 - - [30/04/2017:17:38:20 +0800] \"GET /static/image/common/faq.gif HTTP/1.1\" 200 1127";
LogParser parser = new LogParser();
final String[] array = parser.parse(S1);
System.out.println("源数据: "+S1);
System.out.format("清洗结果数据: ip=%s, time=%s, url=%s, status=%s, traffic=%s", array[0], array[1], array[2], array[3], array[4]);
}
/**
* 解析英文时间字符串
* @param string
* @return
* @throws ParseException
*/
private Date parseDateFormat(String string){
Date parse = null;
try {
parse = FORMAT.parse(string);
} catch (ParseException e) {
e.printStackTrace();
}
return parse;
}
/**
* 解析日志的行记录
* @param line
* @return 数组含有5个元素,分别是ip、时间、url、状态、流量
*/
public String[] parse(String line){
String ip = parseIP(line);
String time = parseTime(line);
String url = parseURL(line);
String status = parseStatus(line);
String traffic = parseTraffic(line); return new String[]{ip, time ,url, status, traffic};
} private String parseTraffic(String line) {
final String trim = line.substring(line.lastIndexOf("\"")+1).trim();
String traffic = trim.split(" ")[1];
return traffic;
}
private String parseStatus(String line) {
final String trim = line.substring(line.lastIndexOf("\"")+1).trim();
String status = trim.split(" ")[0];
return status;
}
private String parseURL(String line) {
final int first = line.indexOf("\"");
final int last = line.lastIndexOf("\"");
String url = line.substring(first+1, last);
return url;
}
private String parseTime(String line) {
final int first = line.indexOf("[");
final int last = line.indexOf("+0800]");
String time = line.substring(first+1,last).trim();
Date date = parseDateFormat(time);
return dateformat1.format(date);
}
private String parseIP(String line) {
String ip = line.split("- -")[0].trim();
return ip;
}
}
输出结果:
源数据: 27.19.74.143 - - [30/04/2017:17:38:20 +0800] "GET /static/image/common/faq.gif HTTP/1.1" 200 1127
清洗结果数据: ip=27.19.74.143, time=20170430173820, url=GET /static/image/common/faq.gif HTTP/1.1, status=200, traffic=1127
再看mapreduce业务逻辑,在map中,我们需要拿出ip、time、url这三个属性的值,同时过滤掉静态资源日志。map的k1用默认的LongWritable就OK,v1不用说Text,k2、v2与k1、v1类型对应就行:
static class MyMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
LogParser logParser = new LogParser();
Text v2 = new Text();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Text>.Context context)
throws IOException, InterruptedException {
final String[] parsed = logParser.parse(value.toString()); //过滤掉静态信息
if(parsed[2].startsWith("GET /static/") || parsed[2].startsWith("GET /uc_server")){
return;
}
//过掉开头的特定格式字符串
if(parsed[2].startsWith("GET /")){
parsed[2] = parsed[2].substring("GET /".length());
}
else if(parsed[2].startsWith("POST /")){
parsed[2] = parsed[2].substring("POST /".length());
}
//过滤结尾的特定格式字符串
if(parsed[2].endsWith(" HTTP/1.1")){
parsed[2] = parsed[2].substring(0, parsed[2].length()-" HTTP/1.1".length());
}
v2.set(parsed[0]+"\t"+parsed[1]+"\t"+parsed[2]);
context.write(key, v2);
}
reduce相对来说就比较简单了,我们只需再讲map的输出写到一个文件中就OK:
static class MyReducer extends Reducer<LongWritable, Text, Text, NullWritable>{
@Override
protected void reduce(LongWritable arg0, Iterable<Text> arg1,
Reducer<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
for (Text v2 : arg1) {
context.write(v2, NullWritable.get());
}
}
}
最后,组装JOB:
public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(LogParser.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path("/logs/20170430.log"));
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
FileOutputFormat.setOutputPath(job, new Path("/20170430"));
job.waitForCompletion(true);
}
mapreduce完成后就是运行job了:
1)打包,mapreduce程序为loger.jar
2)上传jar包。运行loger.jar hadoop jar loger.jar
运行结果:
hdfs多了20170430目录:
我们下载下来看看清洗后的数据是否符合要求:
日志数据的清洗到此就完成了,接下来我们要在此之上使用hive提取核心指标数据。
四、核心指标分析
1)构建一个外部分区表,sql脚本如下:
CREATE EXTERNAL TABLE sitelog(ip string, atime string, url string) PARTITIONED BY (logdate string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LOCATION '/20170430';
2)增加分区,sql脚本如下:
ALTER TABLE sitelog ADD PARTITION(logdate='') LOCATION '/sitelog_cleaned/20170430';
3)统计每日PV,sql脚本如下:
CREATE TABLE sitelog_pv_20170430 AS SELECT COUNT(1) AS PV FROM sitelog WHERE logdate='';
4)统计每日注册用户数,sql脚本如下:
CREATE TABLE sitelog_reguser_20170430 AS SELECT COUNT(1) AS REGUSER FROM sitelog WHERE logdate=20170430' AND INSTR(url,'member.php?mod=register')>0;
5)统计每日独立IP,sql脚本如下:
CREATE TABLE site_ip_20170430 AS SELECT COUNT(DISTINCT ip) AS IP FROM sitelog WHERE logdate='';
6)统计每日跳出的用户数,sql脚本如下:
CREATE TABLE sitelog_jumper_20170430 AS SELECT COUNT(1) AS jumper FROM (SELECT COUNT(ip) AS times FROM sitelog WHERE logdate='' GROUP BY ip HAVING times=1) e;
7)把每天统计的数据放入一张表中,sql脚本如下:
CREATE TABLE sitelog_20170430 AS SELECT '', a.pv, b.reguser, c.ip, d.jumper FROM sitelog_pv_20170430 a JOIN sitelog_reguser_20170430 b ON 1=1 JOIN sitelog_ip_20170430 c ON 1=1 JOIN sitelog_jumper_20170430 d ON 1=1 ;
8)使用sqoop把数据导出到mysql中:
sqoop export --connect jdbc:mysql://hadoop02:3306/sitelog --username root --password root --table sitelog-result --fields-terminated-by '\001' --export-dir '/user/hive/warehouse/sitelog_20170430'
结果如下:
2017年4月30日日志分析结果:PV数为:169857;当日注册用户数:28;独立IP数:10411;跳出数:3749.
到此,一个简单的网站日志分析楼主就介绍完了,后面可视化的展示楼主就不写了,比较简单。相关代码地址:https://github.com/LJunChina/hadoop