分布式计算:批处理引擎 MapReduce(第一部分)

时间:2022-07-30 03:29:05

第一:MapReduce概述

  1. 优点
    • 易于编程
    • 良好的扩展
    • 高容错性
    • 适合PB级的海量离线处理
  2. 缺点
    • 不擅长实时计算
      毫米级返回处理结果
    • 不擅长流式计算
      MapReduce的数据源是静态的
    • 不擅长DAG计算
      map将结果存在hdfs中,不适合多次从hdfs读写来进行计算

第二:MapReduce编程模型

MapReduce将整个作业的运行过程分为两个阶段Map阶段和Reduce阶段
  1. map阶段由一定数量的map task组成
    1. 输入数据格式解析:inputFormat
      • 文件分片方法
      • 将分片数据解析成key/value对,默认是textInputFormat
    2. 输入数据处理:mapper
    3. 数据分组:partitioner
      • Map task输出的数据交给哪个reduce task处理
      • 默认实现:hash(key)mod R
    4. 本地处理:combiner
      • 看做local reducer,通常与reducer的逻辑一致
      • 减少mapper输入(磁盘io),网络传输(网络io)
      • 结果可叠加才能使用例如sum。对应average就不行
  2. Reduce阶段由一定的reduce task组成
    1. 数据远程拷贝
    2. 数据按照key排序
    3. 数据处理:Reducer
    4. 数据输出格式:outputFormat

第三:MapReduce2.0架构及核心设计机制

  1. Client
    通过client与yarn交互,提交MapReduce作业
  2. MRAppMaster
    类似jobtracker。任务划分,申请资源二次分配给map task,reduce task,任务状态监控等
  3. ResourceManager
  4. NodeManager
  5. Container
  6. 容错性
    • MRAppMaster挂了,由ResourceManager负责重启,默认2次
    • map task,reduce task定期向MRAppMaster发送心跳,task挂掉,MRAppMaster重新申请资源运行,默认4次
  7. 数据计算本地行
    • 数据与task同节点 >> 数据与task同机架不同节点 >> 数据与task跨机架不同节点。
  8. 推测执行机制
    发现某个task运行慢,启动一个备份任务,取最先完成的结果
  9. 任务并行执行

第四:MapReduceJava编程

  1. Java编程(最原始的方法)
  2. Hadoop Streaming编程(支持多语言编程)

第五:项目实战

一:WordCount

  1. 代码
package com.dev4free.hadoop;

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class MyWordCount {




public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable>{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException,InterruptedException
{
StringTokenizer line = new StringTokenizer(value.toString());
while (line.hasMoreTokens()) {
word.set(line.nextToken());
context.write(word, one);
}
}
}

public static class WordCountRducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private static final IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
int sum = 0;
for(IntWritable value:values){
sum = sum + value.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception{
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs();
if (otherArgs.length !=2) {
System.out.println("input path error");
System.exit(2);
}
Job job = Job.getInstance(configuration,"MyWordCounter");
job.setJarByClass(MyWordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountRducer.class);
job.setCombinerClass(WordCountRducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)?0:1);
}

}
  1. 通过maven打包
-bash-3.2$ ls
pom.xml src target
-bash-3.2$ mvn -clean package
  1. 将jar包拷贝到hadoop集群中,运行命令
[hadoop@hadoopa ~]$ hadoop jar hadoop-0.0.1-SNAPSHOT.jar com.dev4free.hadoop.MyWordCount /me /wod
  1. 结果验证
[hadoop@hadoopa ~]$ hdfs dfs -cat /wod/part-r-00000
a 1
am 1
boy 1
china 1
country 2
i 2
is 1
love 1
my 2

二:倒排索引

  1. 代码
package com.dev4free.hadoop;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import com.dev4free.hadoop.MyInvertedIndex.InvertedIndexMapper.InvertedIndexReducer;
public class MyInvertedIndex {

public static class InvertedIndexMapper extends Mapper<Object, Text, Text, Text>{
public void map(Object key,Text value,Context context) throws IOException,InterruptedException{
StringTokenizer line = new StringTokenizer(value.toString());
String fileName = ((FileSplit)context.getInputSplit()).getPath().getName();
while (line.hasMoreTokens()) {
context.write(new Text(line.nextToken()), new Text(fileName));
}
}





public static class InvertedIndexReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values,Context context) throws IOException,InterruptedException{
Map<String, Integer> counts = new HashMap<String, Integer>();
for(Text value : values){
String fileName = value.toString();
if (counts.containsKey(fileName)) {
counts.put(fileName, counts.get(fileName) + 1);
}else {
counts.put(fileName, 1);
}
}
context.write(key, new Text(counts.toString()));
}
}
}




public static void main(String[] args)throws Exception{
Configuration configuration = new Configuration();
String[] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs();
if (otherArgs.length != 2) {
System.out.println("input error");
System.exit(2);
}
Job job = Job.getInstance(configuration,"MyInvertedIndex");
job.setJarByClass(MyInvertedIndex.class);
job.setMapperClass(InvertedIndexMapper.class);
job.setReducerClass(InvertedIndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)?0:1);
}
}
  1. 通过maven打包
-bash-3.2$ mvn clean package
  1. 将jar包拷贝到hadoop上,并运行
[hadoop@hadoopa test]$ hdfs dfs -put /home/hadoop/test  /inverted
[hadoop@hadoopa test]$ hadoop jar /home/hadoop/hadoop-0.0.1-SNAPSHOT.jar com.dev4free.hadoop.MyInvertedIndex /inverted /out/inverted
  1. 验证结果
[hadoop@hadoopa test]$ hdfs dfs -cat /out/inverted/part-r-00000
china {text1.txt=1}
country {text1.txt=2, text3.txt=2, text2.txt=2}
i {text1.txt=1, text2.txt=1, text3.txt=1}
is {text1.txt=1, text3.txt=1, text2.txt=1}
japan {text2.txt=1}
love {text1.txt=1, text3.txt=1, text2.txt=1}
my {text1.txt=2, text2.txt=2, text3.txt=2}
usa {text3.txt=1}