本文介绍通过AWK和MapReduce两种方式统计出每年温度到最高气温直。awk速度虽然快,而且简短,但是数据量巨大到时候,就遇到力瓶颈,及时分布式执行awk脚本,也会出现机器死掉等问题,需要容错机制保障分布式运行,所以就出现力MapReduce计算模型到Hadoop机制。
1 数据集样式
++023450FM-+000599999V0202701N015919999999N0000001N9-+99999102001ADDGF108991999999999999999999 ++023450FM-+000599999V0202901N008219999999N0000001N9-+99999102001ADDGF104991999999999999999999 ++023450FM-+000599999V0209991C000019999999N0000001N9-+99999102001ADDGF108991999999999999999999
为了方便存储,上图所示为压缩样式,需要提取出相关字段:时间和温度。
2 AWK是linux系统有力到文本分析工具,awk逐行读入,以空格分割变量。对不了解awk到人,下面总结了一些基础知识。
(1)统计一年到最高气温:
#!/usr/bin/env bash gunzip -c ' | \ awk '{ temp = substr($0, 88, 5) + 0; q = substr($, , ); && q ~ /[]/ && temp > max) max = temp } END { print max }'
输入是.gz的压缩包,输出结果是:317
(2)统计多年到最高气温:
#!/usr/bin/env bash for year in *.gz do echo $year gunzip -c $year | \ awk '{temp = substr($0, 88, 5) + 0; q = substr($, , ); && q ~ /[]/ && temp > max) max = temp} END { print max}' done
输入是多年到数据,实例为两年到1901.gz 1902.gz 数据,输出每年到最高气温,37, 44
3 MapReduce计算模型求最高气温
(1)MaxTemperatureMapper.java
public class MaxTemperatureMapper extends Mapper<Object, Text, Text, IntWritable> { ; @Override protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(, ); ; ) == '+'){ airTemperature = Integer.parseInt( line.substring(,) ); }else { airTemperature = Integer.parseInt(line.substring(, )); } String quality = line.substring(, ); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } }
(2)MaxTemperatureReducer.java
public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ @Override protected void reduce(Text arg0, Iterable<IntWritable> arg1, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int max = Integer.MIN_VALUE; for (IntWritable temp : arg1){ max = Math.max(temp.get(), max); } context.write(arg0, new IntWritable(max)); } }
(3)MaxTemperature.java
public class MaxTemperature { public static void main(String[] args) throws Exception { args = new String[] { "/home/hadoop/Develop/hadoop-develop/data-authorized/input-file/file", "/home/hadoop/Develop/hadoop-develop/data-authorized/output/maxtemperature" }; ) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-); } Job job = new Job(); job.setJarByClass(MaxTemperature.class); job.setJobName("Max temperature"); FileInputFormat.addInputPath(job, ])); FileOutputFormat.setOutputPath(job, ])); job.setMapperClass(MaxTemperatureMapper.class); job.setReducerClass(MaxTemperatureReducer.class); job.setCombinerClass(MaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion( : ); } }
(4)运行结果,成功
// :: INFO mapred.Task: Task 'attempt_local184459823_0001_r_000000_0' done. // :: INFO mapred.LocalJobRunner: Finishing task: attempt_local184459823_0001_r_000000_0 // :: INFO mapred.LocalJobRunner: reduce task executor complete. // :: INFO mapreduce.Job: map % reduce % // :: INFO mapreduce.Job: Job job_local184459823_0001 completed successfully
4 附录-awk基础
基本语法 awk -F '分隔符' '{命令}' 路径
(1)awk输出管道last输入流到第一个变量
last -n | awk '{print $1}'
(2)-F指定分隔符:
cat /etc/passwd |awk -F ':' '{print $1}'
(3)begin和 end 可以分别指定开始和结束执行到一段命令;中间{}部分逐行执行。
cat /etc/passwd |awk -F ':' 'BEGIN {print "name,shell"} {print $1","$7} END {print "blue,/bin/nosh"}'
(4)正则模式匹配,以root开头到行
awk -F: '/^root/' /etc/passwd
正则+命令到格式
awk -F: '/root/{print $7}' /etc/passwd
(5)if语句
ls -l |awk 'BEGIN {size=0;print "[start]size is ", size} {if($5!=4096){size=size+$5;}} END{print "[end]size is ", size/1024/1024,"M"}'
(6)for语句和数组
awk -F ':' 'BEGIN {count=0;} {name[count] = $1;count++;}; END{for (i = 0; i < NR; i++) print i, name[i]}' /etc/passwd
(7)awk内部变量表
ARGC 命令行参数个数 ARGV 命令行参数排列 ENVIRON 支持队列中系统环境变量的使用 FILENAME awk浏览的文件名 FNR 浏览文件的记录数 FS 设置输入域分隔符,等价于命令行 -F选项 NF 浏览记录的域的个数 NR 已读的记录数 OFS 输出域分隔符 ORS 输出记录分隔符 RS 控制记录分隔符
包含内部变量到格式
#awk -F ':' '{print "filename:" FILENAME ",linenumber:" NR ",columns:" NF ",linecontent:"$0}' /etc/passwd
参考: 《hadoop权威指南》