第一:MapReduce概述
- 优点
- 易于编程
- 良好的扩展
- 高容错性
- 适合PB级的海量离线处理
- 缺点
- 不擅长实时计算
毫米级返回处理结果
- 不擅长流式计算
MapReduce的数据源是静态的
- 不擅长DAG计算
map将结果存在hdfs中,不适合多次从hdfs读写来进行计算
第二:MapReduce编程模型
MapReduce将整个作业的运行过程分为两个阶段Map阶段和Reduce阶段
- map阶段由一定数量的map task组成
- 输入数据格式解析:inputFormat
- 文件分片方法
- 将分片数据解析成key/value对,默认是textInputFormat
- 输入数据处理:mapper
- 数据分组:partitioner
- Map task输出的数据交给哪个reduce task处理
- 默认实现:hash(key)mod R
- 本地处理:combiner
- 看做local reducer,通常与reducer的逻辑一致
- 减少mapper输入(磁盘io),网络传输(网络io)
- 结果可叠加才能使用例如sum。对应average就不行
- Reduce阶段由一定的reduce task组成
- 数据远程拷贝
- 数据按照key排序
- 数据处理:Reducer
- 数据输出格式:outputFormat
第三:MapReduce2.0架构及核心设计机制
- Client
通过client与yarn交互,提交MapReduce作业
- MRAppMaster
类似jobtracker。任务划分,申请资源二次分配给map task,reduce task,任务状态监控等
- ResourceManager
- NodeManager
- Container
- 容错性
- MRAppMaster挂了,由ResourceManager负责重启,默认2次
- map task,reduce task定期向MRAppMaster发送心跳,task挂掉,MRAppMaster重新申请资源运行,默认4次
- 数据计算本地行
- 数据与task同节点 >> 数据与task同机架不同节点 >> 数据与task跨机架不同节点。
- 推测执行机制
发现某个task运行慢,启动一个备份任务,取最先完成的结果
- 任务并行执行
第四:MapReduceJava编程
- Java编程(最原始的方法)
- Hadoop Streaming编程(支持多语言编程)
第五:项目实战
一:WordCount
- 代码
package com.dev4free.hadoop
import java.io.IOException
import java.util.StringTokenizer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.GenericOptionsParser
public class MyWordCount {
public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{
private static final IntWritable one = new IntWritable(1)
private Text word = new Text()
public void map(Object key, Text value, Context context) throws IOException,InterruptedException
{
StringTokenizer line = new StringTokenizer(value.toString())
while (line.hasMoreTokens()) {
word.set(line.nextToken())
context.write(word, one)
}
}
}
public static class WordCountRducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private static final IntWritable result = new IntWritable()
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
int sum = 0
for(IntWritable value:values){
sum = sum + value.get()
}
result.set(sum)
context.write(key, result)
}
}
public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration()
String[] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs()
if (otherArgs.length !=2) {
System.out.println("input path error")
System.exit(2)
}
Job job = Job.getInstance(configuration,"MyWordCounter")
job.setJarByClass(MyWordCount.class)
job.setMapperClass(WordCountMapper.class)
job.setReducerClass(WordCountRducer.class)
job.setCombinerClass(WordCountRducer.class)
job.setOutputKeyClass(Text.class)
job.setOutputValueClass(IntWritable.class)
FileInputFormat.addInputPath(job, new Path(otherArgs[0]))
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]))
System.exit(job.waitForCompletion(true)?0:1)
}
}
- 通过maven打包
-bash-3.2$ ls
pom.xml src target
-bash-3.2$ mvn -clean package
- 将jar包拷贝到hadoop集群中,运行命令
[hadoop@hadoopa ~]$ hadoop jar hadoop-0.0.1-SNAPSHOT.jar com.dev4free.hadoop.MyWordCount /me /wod
- 结果验证
[hadoop@hadoopa ~]$ hdfs dfs -cat /wod/part-r-00000
a 1
am 1
boy 1
china 1
country 2
i 2
is 1
love 1
my 2
二:倒排索引
- 代码
package com.dev4free.hadoop
import java.io.IOException
import java.util.HashMap
import java.util.Map
import java.util.StringTokenizer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.Mapper
import org.apache.hadoop.mapreduce.Reducer
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
import org.apache.hadoop.util.GenericOptionsParser
import com.dev4free.hadoop.MyInvertedIndex.InvertedIndexMapper.InvertedIndexReducer
public class MyInvertedIndex {
public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
StringTokenizer line = new StringTokenizer(value.toString())
String fileName = ((FileSplit)context.getInputSplit()).getPath().getName()
while (line.hasMoreTokens()) {
context.write(new Text(line.nextToken()), new Text(fileName))
}
}
public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
Map<String, Integer> counts = new HashMap<String, Integer>()
for(Text value : values){
String fileName = value.toString()
if (counts.containsKey(fileName)) {
counts.put(fileName, counts.get(fileName) + 1)
}else {
counts.put(fileName, 1)
}
}
context.write(key, new Text(counts.toString()))
}
}
}
public static void main(String[] args)throws Exception{
Configuration configuration = new Configuration()
String[] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs()
if (otherArgs.length != 2) {
System.out.println("input error")
System.exit(2)
}
Job job = Job.getInstance(configuration,"MyInvertedIndex")
job.setJarByClass(MyInvertedIndex.class)
job.setMapperClass(InvertedIndexMapper.class)
job.setReducerClass(InvertedIndexReducer.class)
job.setOutputKeyClass(Text.class)
job.setOutputValueClass(Text.class)
FileInputFormat.addInputPath(job, new Path(args[0]))
FileOutputFormat.setOutputPath(job, new Path(args[1]))
System.exit(job.waitForCompletion(true)?0:1)
}
}
- 通过maven打包
-bash-3.2$ mvn clean package
- 将jar包拷贝到hadoop上,并运行
[hadoop@hadoopa test]$ hdfs dfs -put /home/hadoop/test /inverted
[hadoop@hadoopa test]$ hadoop jar /home/hadoop/hadoop-0.0.1-SNAPSHOT.jar com.dev4free.hadoop.MyInvertedIndex /inverted /out/inverted
- 验证结果
[hadoop@hadoopa test]$ hdfs dfs -cat /out/inverted/part-r-00000
china {text1.txt=1}
country {text1.txt=2, text3.txt=2, text2.txt=2}
i {text1.txt=1, text2.txt=1, text3.txt=1}
is {text1.txt=1, text3.txt=1, text2.txt=1}
japan {text2.txt=1}
love {text1.txt=1, text3.txt=1, text2.txt=1}
my {text1.txt=2, text2.txt=2, text3.txt=2}
usa {text3.txt=1}