Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

时间:2022-09-16 07:54:21

前言

  前面一篇博文写的是Combiner优化MapReduce执行,也就是使用Combiner在map端执行减少reduce端的计算量。

一、作业的默认配置

  MapReduce程序的默认配置  

1)概述

  在我们的MapReduce程序中有一些默认的配置。所以说当我们程序如果要使用这些默认配置时,可以不用写。

  Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

  我们的一个MapReduce程序一定会有Mapper和Reducer,但是我们程序中不写的话,它也有默认的Mapper和Reducer。

  当我们使用默认的Mapper和Reducer的时候,map和reducer的输入和输出都是偏移量和数据文件的一行数据,所以就是相当于原样输出!

2)默认的MapReduce程序

/**
* 没有指定Mapper和Reducer的最小作业配置
*/
public class MinimalMapReduce {
public static void main(String[] args) throws Exception{
// 构建新的作业
Configuration conf=new Configuration();
Job job = Job.getInstance(conf, "MinimalMapReduce");
job.setJarByClass(MinimalMapReduce.class);
// 设置输入输出路径
FileInputFormat.addInputPath(job, new Path(args[]));
FileOutputFormat.setOutputPath(job, new Path(args[]));
// ᨀ交作业运行
System.exit(job.waitForCompletion(true)?:);
  }
}

  输入是:

    Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

  输出是:

    Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

二、作业的配置方式

  MapReduce的类型配置

  1)用于配置类型的属性

    Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

    Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

    在命令行中,怎么去配置呢?

      比如说mapreduce.job.inputformat.class。首先我们要继承Configured实现Tool工具才能这样去指定:

      -Dmapreduce.job.inputformat.class = 某一个类的类全名(一定要记得加报名)

    Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类这是Map端的输出类型控制

    Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类这是整个MapReduce程序输出类型控制,其实就是reduce的类型格式控制

  2)No Reducer的MapReduce程序--Mapper

    第一步:写一个TokenCounterMapper继承Mapper

/**
* 将输入的文本内容拆分为word,做一个简单输出的Mapper
*/
public class TokenCounterMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
private Text word=new Text();
private static final IntWritable one=new IntWritable();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
StringTokenizer itr=new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word, one);
}
}
}

TokenCounterMapper

    第二步:写一个NoReducerMRDriver完成作业配置

/**
*没有设置Reducer的MR程序
*/
public class NoReducerMRDriver {
public static void main(String[] args) throws Exception {
// 构建新的作业
Configuration conf=new Configuration();
Job job = Job.getInstance(conf, "NoReducer");
job.setJarByClass(NoReducerMRDriver.class);
// 设置Mapper
job.setMapperClass(TokenCounterMapper.class);
// 设置reducer的数量为0
job.setNumReduceTasks();
// 设置输出格式
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[]));
FileOutputFormat.setOutputPath(job, new Path(args[]));
// ᨀ交运行作业
System.exit(job.waitForCompletion(true)?:);
}
}

NoReducerMRDriver

    输入:

      Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

    结果:

      Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

    注意:如果作业拥有0个Reducer,则Mapper结果直接写入OutputFormat而不经key值排序。

  3)No Mapper的MapReduce程序--Reducer

    第一步:写一个TokenCounterReducer继承Reducer

/**
* 将reduce输入的values内容拆分为word,做一个简单输出的Reducer
*/
public class TokenCounterReducer extends Reducer<LongWritable, Text, Text, IntWritable>{
private Text word=new Text();
private static final IntWritable one=new IntWritable();
@Override
protected void reduce(LongWritable key, Iterable<Text> values,Reducer<LongWritable, Text, Text, IntWritable>.Context context)
throws IOException, InterruptedException {
// TODO Auto-generated method stub
for(Text value:values){
StringTokenizer itr=new StringTokenizer(value.toString());
while(itr.hasMoreTokens()){
word.set(itr.nextToken());
context.write(word, one);
}
}
}
}

TokenCounterReducer

    第二步:写一个NoMapperMRDrive完成作业配置

/**
*没有设置Mapper的MR程序
*/
public class NoMapperMRDriver {
public static void main(String[] args) throws Exception {
// 构建新的作业
Configuration conf=new Configuration();
Job job = Job.getInstance(conf, "NoMapper");
job.setJarByClass(NoMapperMRDriver.class);
// 设置Reducer
job.setReducerClass(TokenCounterReducer.class);
// 设置输出格式
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[]));
FileOutputFormat.setOutputPath(job, new Path(args[]));
// ᨀ交运行作业
System.exit(job.waitForCompletion(true)?:);
}
}

NoMapperMRDrive

    输入:

      Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

    输出:

      Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

三、Mapper类和Reducer类以及它们的子类(实现类)

3.1、Mapper概述

  Mapper:封装了应用程序Mapper阶段的数据处理逻辑

  Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类 

  1)ChainMapper

    方便用户编写链式Map任务, 即Map阶段包含多个Mapper,即可以别写多个自定义map去参与运算。
  2)InverseMapper

    一个能交换key和value的Mapper
  3)RegexMapper

    检查输入是否匹配某正则表达式, 输出匹配字符串和计数器(用的很少)
  4)TockenCounterMapper

    将输入分解为独立的单词, 输出个单词和计数器(以空格分割单词,value值为1)

3.2、Reducer概述

  Mapper:封装了应用程序Mapper阶段的数据处理逻辑

  Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

  1)ChainMapper:

    方便用户编写链式Map任务, 即Map阶段只能有一个Reducer,后面还可以用ChainMapper去多加Mapper。

  2)IntSumReducer/LongSumReducer

    对各key的所有整型值求和

3.2、写一个实例去使用

  注意:这里用到了一个输入格式为KeyValueTextInputFormat,我们查看源码注释:

    Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类

    我们需要用mapreduce.input.keyvaluelinerecordreader.key.value.separator去指定key和value的分隔符是什么,它的默认分隔符是"\t"也就是tab键。

    这个需要在配置文件中去指定,但是我们知道在配置文件中能设置的在程序中也是可以设置的。

    conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",",");

  代码实现: 

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainMapper;
import org.apache.hadoop.mapreduce.lib.chain.ChainReducer;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.map.InverseMapper;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.reduce.IntSumReducer;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class PatentReference_0010 extends Configured implements Tool{ static class PatentReferenceMapper extends Mapper<Text,Text,Text,IntWritable>{
private IntWritable one=new IntWritable();
@Override
protected void map(Text key,Text value,Context context) throws IOException, InterruptedException{
context.write(key,one);
}
} @Override
public int run(String[] args) throws Exception{
Configuration conf=getConf();
Path input=new Path(conf.get("input"));
Path output=new Path(conf.get("output"));
conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator",","); Job job=Job.getInstance(conf,this.getClass().getSimpleName());
job.setJarByClass(this.getClass()); ChainMapper.addMapper(job,InverseMapper.class,
// 输入的键值类型由InputFormat决定
Text.class,Text.class,
// 输出的键值类型与输入的键值类型相反
Text.class,Text.class,conf); ChainMapper.addMapper(job,PatentReferenceMapper.class,
// 输入的键值类型由前一个Mapper输出的键值类型决定
Text.class,Text.class,
Text.class,IntWritable.class,conf); ChainReducer.setReducer(job,IntSumReducer.class,
Text.class,IntWritable.class,
Text.class,IntWritable.class,conf); ChainReducer.addMapper(job,InverseMapper.class,
Text.class,IntWritable.class,
IntWritable.class,Text.class,conf); job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class); KeyValueTextInputFormat.addInputPath(job,input);
TextOutputFormat.setOutputPath(job,output); return job.waitForCompletion(true)?:;
} public static void main(String[] args) throws Exception{
System.exit(ToolRunner.run(new P00010_PatentReference_0010(),args));
}
}

  在Job job=Job.getInstance(conf,this.getClass().getSimpleName());设置中,job把conf也就是配置文件做了一个拷贝,因为hadoop要重复利用一个对象,如果是引用的话,发现值得改变就都改变了。        

  

    

    

喜欢就点个“推荐”哦!

Hadoop(十七)之MapReduce作业配置与Mapper和Reducer类的更多相关文章

  1. hadoop学习&lpar;七&rpar;----mapReduce原理以及操作过程

    前面我们使用HDFS进行了相关的操作,也了解了HDFS的原理和机制,有了分布式文件系统我们如何去处理文件呢,这就的提到hadoop的第二个组成部分-MapReduce. MapReduce充分借鉴了分 ...

  2. 关于Mapper、Reducer的个人总结(转)

    Mapper的处理过程: 1.1. InputFormat 产生 InputSplit,并且调用RecordReader将这些逻辑单元(InputSplit)转化为map task的输入.其中Inpu ...

  3. 使用MRUnit,Mockito和PowerMock进行Hadoop MapReduce作业的单元测试

    0.preliminary 环境搭建 Setup development environment Download the latest version of MRUnit jar from Apac ...

  4. 分布式配置 tachyon 并执行Hadoop样例 MapReduce

    ----------此文章.笔者按着tachyon官网教程进行安装并记录. (本地安装tachyon具体解释:http://blog.csdn.net/u012587561/article/detai ...

  5. 使用IDEA远程向伪分布式搭建的Hadoop提交MapReduce作业

    环境 VirtualBox 6.1 IntelliJ IDEA 2020.1.1 Ubuntu-18.04.4-live-server-amd64 jdk-8u251-linux-x64 hadoop ...

  6. 高可用&comma;完全分布式Hadoop集群HDFS和MapReduce安装配置指南

    原文:http://my.oschina.net/wstone/blog/365010#OSC_h3_13 (WJW)高可用,完全分布式Hadoop集群HDFS和MapReduce安装配置指南 [X] ...

  7. Hadoop学习之路(二十七)MapReduce的API使用(四)

    第一题 下面是三种商品的销售数据 要求:根据以上数据,用 MapReduce 统计出如下数据: 1.每种商品的销售总金额,并降序排序 2.每种商品销售额最多的三周 第二题:MapReduce 题 现有 ...

  8. Hadoop官方文档翻译——MapReduce Tutorial

    MapReduce Tutorial(个人指导) Purpose(目的) Prerequisites(必备条件) Overview(综述) Inputs and Outputs(输入输出) MapRe ...

  9. 剖析MapReduce 作业运行机制

    包含四个独立的实体: ·  Client Node 客户端:编写 MapReduce代码,配置作业,提交MapReduce作业. ·  JobTracker :初始化作业,分配作业,与 TaskTra ...

随机推荐

  1. python基础回顾1

    定义 tuple(元组), list (表) #!/usr/bin/env python # encoding: utf-8 a = 10 #定义一直变量,无需声明 s1 = (2,1.3,'love ...

  2. SharePoint REST api

    http://msdn.microsoft.com/en-us/magazine/dn198245.aspx Understanding and Using the SharePoint 2013 R ...

  3. bzoj3932

    3932: [CQOI2015]任务查询系统 Time Limit: 20 Sec  Memory Limit: 512 MBSubmit: 1326  Solved: 480[Submit][Sta ...

  4. Android设计开发笔记

    1.因为Android的开发是基于框架的开发:往对方指定的位置加代码:其运行的Message\Handler机制也决定了其单步跟踪也不方便,所以建立新代码时要多Log,这样不但便于调试,而且帮助你加深 ...

  5. 说一下syslog日志吧~~~

    # -*- coding:utf-8 -*-from logging.handlers import *import loggingimport logging.handlers class MySo ...

  6. css一些基础效果

    1.旋转 .center>.bj>.div1>ul>li>.img1:hover {transform: rotate(-360deg);transition: 1s}/ ...

  7. 6——ThinkPhp中的请求:

    <?php namespace app\index\controller; use think\console\Input; use think\Controller; use think\Db ...

  8. Android自定义类似ProgressDialog效果的Dialog

    Android自定义类似ProgressDialog效果的Dialog. 方法如下: 1.首先准备两张自己要定义成哪样子的效果的图片和背景图片(也可以不要背景). 如我要的效果: 2.定义loadin ...

  9. bzoj 2434 AC自动机&plus;树状数组

    2434: [Noi2011]阿狸的打字机 Time Limit: 10 Sec  Memory Limit: 256 MBSubmit: 3493  Solved: 1909[Submit][Sta ...

  10. Unity3D 新版粒子系统 &lpar;Shuriken&rpar;

    Shuriken粒子系统是继Unity3.5版本之后推出的新版粒子系统,它采用了模块化管理,个性化的粒子模块配合粒子曲线编辑器使用户更容易创作出各种兵分复杂的粒子效果. 创建一个粒子系统的方式有两种: ...