Hadoop MapReduce编程 API入门系列之wordcount版本1(五)

时间:2022-08-30 09:47:41

  

  这个很简单哈,编程的版本很多种。

Hadoop MapReduce编程 API入门系列之wordcount版本1(五)

Hadoop MapReduce编程 API入门系列之wordcount版本1(五)

Hadoop MapReduce编程 API入门系列之wordcount版本1(五)

Hadoop MapReduce编程 API入门系列之wordcount版本1(五)

代码版本1

 package zhouls.bigdata.myMapReduce.wordcount5;

 import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount
{
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1);
private Text word = new Text(); public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
} public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
} public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// FileInputFormat.addInputPath(job, new Path("hdfs:/HadoopMaster:9000/wc.txt"));
// FileOutputFormat.setOutputPath(job, new Path("hdfs:/HadoopMaster:9000/out/wordcount"));
FileInputFormat.addInputPath(job, new Path("./data/wc.txt"));
FileOutputFormat.setOutputPath(job, new Path("./out/WordCount"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

代码版本3

 package com.dajiangtai.Hadoop.MapReduce;

 import java.io.IOException;
import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; @SuppressWarnings("unused")
public class WordCount {//2017最新详解版 public static class TokenizerMapper extends
Mapper<Object, Text, Text, IntWritable>
// 为什么这里k1要用Object、Text、IntWritable等,而不是java的string啊、int啊类型,当然,你可以用其他的,这样用的好处是,因为它里面实现了序列化和反序列化。
// 可以让在节点间传输和通信效率更高。这就为什么hadoop本身的机制类型的诞生。 //这个Mapper类是一个泛型类型,它有四个形参类型,分别指定map函数的输入键、输入值、输出键、输出值的类型。hadoop没有直接使用Java内嵌的类型,而是自己开发了一套可以优化网络序列化传输的基本类型。这些类型都在org.apache.hadoop.io包中。
//比如这个例子中的Object类型,适用于字段需要使用多种类型的时候,Text类型相当于Java中的String类型,IntWritable类型相当于Java中的Integer类型
{
//定义两个变量或者说是定义两个对象,叫法都可以
private final static IntWritable one = new IntWritable(1);//这个1表示每个单词出现一次,map的输出value就是1.
//因为,v1是单词出现次数,直接对one赋值为1
private Text word = new Text(); public void map(Object key, Text value, Context context)
//context它是mapper的一个内部类,简单的说*接口是为了在map或是reduce任务中跟踪task的状态,很自然的MapContext就是记录了map执行的上下文,在mapper类中,这个context可以存储一些job conf的信息,比如job运行时参数等,我们可以在map函数中处理这个信息,这也是Hadoop中参数传递中一个很经典的例子,同时context作为了map和reduce执行中各个函数的一个桥梁,这个设计和Java web中的session对象、application对象很相似
//简单的说context对象保存了作业运行的上下文信息,比如:作业配置信息、InputSplit信息、任务ID等
//我们这里最直观的就是主要用到context的write方法。
//说白了,context起到的是连接map和reduce的桥梁。起到上下文的作用! throws IOException, InterruptedException {
//The tokenizer uses the default delimiter set, which is " \t\n\r": the space character, the tab character, the newline character, the carriage-return character
StringTokenizer itr = new StringTokenizer(value.toString());//将Text类型的value转化成字符串类型
//StringTokenizer是字符串分隔解析类型,StringTokenizer 用来分割字符串,你可以指定分隔符,比如',',或者空格之类的字符。 //使用StringTokenizer类将字符串“hello,java,delphi,asp,PHP”分解为三个单词
// 程序的运行结果为:
// hello
// java
// delphi
// asp
//
// php while (itr.hasMoreTokens()) {//hasMoreTokens() 方法是用来测试是否有此标记生成器的字符串可用更多的标记。
// 实际上就是java.util.StringTokenizer.hasMoreTokens()
// hasMoreTokens() 方法是用来测试是否有此标记生成器的字符串可用更多的标记。
//java.util.StringTokenizer.hasMoreTokens() word.set(itr.nextToken());//nextToken()这是 StringTokenizer 类下的一个方法,nextToken() 用于返回下一个匹配的字段。
context.write(word, one);
}
}
} public static class IntSumReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
//我们这里最直观的就是主要用到context的write方法。
//说白了,context起到的是连接map和reduce的桥梁。起到上下文的作用! int sum = 0;
for (IntWritable val : values) {//叫做增强的for循环,也叫for星型循环
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
} public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();//程序里,只需写这么一句话,就会加载到hadoop的配置文件了
//Configuration类代表作业的配置,该类会加载mapred-site.xml、hdfs-site.xml、core-site.xml等配置文件。
//删除已经存在的输出目录
Path mypath = new Path("hdfs://djt002:9000/outData/wordcount");//输出路径
FileSystem hdfs = mypath.getFileSystem(conf);//程序里,只需写这么一句话,就可以获取到文件系统了。
//FileSystem里面包括很多系统,不局限于hdfs,是因为,程序读到conf,哦,原来是hadoop集群啊。这时,才认知到是hdfs //如果文件系统中存在这个输出路径,则删除掉,保证输出目录不能提前存在。
if (hdfs.isDirectory(mypath)) {
hdfs.delete(mypath, true);
} //job对象指定了作业执行规范,可以用它来控制整个作业的运行。
Job job = Job.getInstance();// new Job(conf, "word count");
job.setJarByClass(WordCount.class);//我们在hadoop集群上运行作业的时候,要把代码打包成一个jar文件,然后把这个文件
//传到集群上,然后通过命令来执行这个作业,但是命令中不必指定JAR文件的名称,在这条命令中通过job对象的setJarByClass()
//中传递一个主类就行,hadoop会通过这个主类来查找包含它的JAR文件。 job.setMapperClass(TokenizerMapper.class);
//job.setReducerClass(IntSumReducer.class);
job.setCombinerClass(IntSumReducer.class);//Combiner最终不能影响reduce输出的结果
// 这句话要好好理解!!! job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//一般情况下mapper和reducer的输出的数据类型是一样的,所以我们用上面两条命令就行,如果不一样,我们就可以用下面两条命令单独指定mapper的输出key、value的数据类型
//job.setMapOutputKeyClass(Text.class);
//job.setMapOutputValueClass(IntWritable.class);
//hadoop默认的是TextInputFormat和TextOutputFormat,所以说我们这里可以不用配置。
//job.setInputFormatClass(TextInputFormat.class);
//job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(
"hdfs://djt002:9000/inputData/wordcount/wc.txt"));//FileInputFormat.addInputPath()指定的这个路径可以是单个文件、一个目录或符合特定文件模式的一系列文件。
//从方法名称可以看出,可以通过多次调用这个方法来实现多路径的输入。
FileOutputFormat.setOutputPath(job, new Path(
"hdfs://djt002:9000/outData/wordcount"));//只能有一个输出路径,该路径指定的就是reduce函数输出文件的写入目录。
//特别注意:输出目录不能提前存在,否则hadoop会报错并拒绝执行作业,这样做的目的是防止数据丢失,因为长时间运行的作业如果结果被意外覆盖掉,那肯定不是我们想要的
System.exit(job.waitForCompletion(true) ? 0 : 1);
//使用job.waitForCompletion()提交作业并等待执行完成,该方法返回一个boolean值,表示执行成功或者失败,这个布尔值被转换成程序退出代码0或1,该布尔参数还是一个详细标识,所以作业会把进度写到控制台。
//waitForCompletion()提交作业后,每秒会轮询作业的进度,如果发现和上次报告后有改变,就把进度报告到控制台,作业完成后,如果成功就显示作业计数器,如果失败则把导致作业失败的错误输出到控制台
}
} //TextInputFormat是hadoop默认的输入格式,这个类继承自FileInputFormat,使用这种输入格式,每个文件都会单独作为Map的输入,每行数据都会生成一条记录,每条记录会表示成<key,value>的形式。
//key的值是每条数据记录在数据分片中的字节偏移量,数据类型是LongWritable.
//value的值为每行的内容,数据类型为Text。
//
//实际上InputFormat()是用来生成可供Map处理的<key,value>的。
//InputSplit是hadoop中用来把输入数据传送给每个单独的Map(也就是我们常说的一个split对应一个Map),
//InputSplit存储的并非数据本身,而是一个分片长度和一个记录数据位置的数组。
//生成InputSplit的方法可以通过InputFormat()来设置。
//当数据传给Map时,Map会将输入分片传送给InputFormat(),InputFormat()则调用getRecordReader()生成RecordReader,RecordReader则再通过creatKey()和creatValue()创建可供Map处理的<key,value>对。
//
//OutputFormat()
//默认的输出格式为TextOutputFormat。它和默认输入格式类似,会将每条记录以一行的形式存入文本文件。它的键和值可以是任意形式的,因为程序内部会调用toString()将键和值转化为String类型再输出。

 代码版本2

 package zhouls.bigdata.myMapReduce.wordcount5;

 import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; public class WordCount implements Tool
{
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1);
private Text word = new Text(); public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
} public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
} public int run(String[] arg0) throws Exception {
Configuration conf = new Configuration();
//2删除已经存在的输出目录
Path mypath = new Path(arg0[1]);//下标为1,即是输出路径
FileSystem hdfs = mypath.getFileSystem(conf);//获取文件系统
if (hdfs.isDirectory(mypath))
{//如果文件系统中存在这个输出路径,则删除掉
hdfs.delete(mypath, true);
} Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(arg0[0]));// 文件输入路径
FileOutputFormat.setOutputPath(job, new Path(arg0[1]));// 文件输出路径
job.waitForCompletion(true); return 0; } public static void main(String[] args) throws Exception { //集群路径
// String[] args0 = { "hdfs:/HadoopMaster:9000/wc.txt",
// "hdfs:/HadoopMaster:9000/out/wordcount"}; //本地路径
String[] args0 = { "./data/wc.txt",
"./out/WordCount"};
int ec = ToolRunner.run( new Configuration(), new WordCount(), args0);
System. exit(ec);
} @Override
public Configuration getConf() {
// TODO Auto-generated method stub
return null;
} @Override
public void setConf(Configuration arg0) {
// TODO Auto-generated method stub }
}                                                                                             

Hadoop MapReduce编程 API入门系列之wordcount版本1(五)的更多相关文章

  1. Hadoop MapReduce编程 API入门系列之wordcount版本4(八)

    这篇博客,给大家,体会不一样的版本编程. 是将map.combiner.shuffle.reduce等分开放一个.java里.则需要实现Tool. 代码 package zhouls.bigdata. ...

  2. Hadoop MapReduce编程 API入门系列之wordcount版本5(九)

    这篇博客,给大家,体会不一样的版本编程. 代码 package zhouls.bigdata.myMapReduce.wordcount1; import java.io.IOException; i ...

  3. Hadoop MapReduce编程 API入门系列之wordcount版本3(七)

    这篇博客,给大家,体会不一样的版本编程. 代码 package zhouls.bigdata.myMapReduce.wordcount3; import java.io.IOException; i ...

  4. Hadoop MapReduce编程 API入门系列之wordcount版本2(六)

    这篇博客,给大家,体会不一样的版本编程. 代码 package zhouls.bigdata.myMapReduce.wordcount4; import java.io.IOException; i ...

  5. Hadoop MapReduce编程 API入门系列之最短路径(十五)

    不多说,直接上代码. ======================================= Iteration: 1= Input path: out/shortestpath/input. ...

  6. Hadoop MapReduce编程 API入门系列之压缩和计数器(三十)

    不多说,直接上代码. Hadoop MapReduce编程 API入门系列之小文件合并(二十九) 生成的结果,作为输入源. 代码 package zhouls.bigdata.myMapReduce. ...

  7. Hadoop MapReduce编程 API入门系列之挖掘气象数据版本3(九)

    不多说,直接上干货! 下面,是版本1. Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一) 下面是版本2. Hadoop MapReduce编程 API入门系列之挖掘气象数 ...

  8. Hadoop MapReduce编程 API入门系列之挖掘气象数据版本2(十)

    下面,是版本1. Hadoop MapReduce编程 API入门系列之挖掘气象数据版本1(一) 这篇博文,包括了,实际生产开发非常重要的,单元测试和调试代码.这里不多赘述,直接送上代码. MRUni ...

  9. Hadoop MapReduce编程 API入门系列之join(二十六)(未完)

    不多说,直接上代码. 天气记录数据库 Station ID Timestamp Temperature 气象站数据库 Station ID Station Name 气象站和天气记录合并之后的示意图如 ...

随机推荐

  1. 百度地图-省市县联动加载地图 分类: Demo JavaScript 2015-04-26 13&colon;08 530人阅读 评论&lpar;0&rpar; 收藏

    在平常项目中,我们会遇到这样的业务场景: 客户希望把自己的门店绘制在百度地图上,通过省.市.区的选择,然后加载不同区域下的店铺位置. 先看看效果图吧: 实现思路: 第一步:整理行政区域表: 要实现通过 ...

  2. Visual Studio Professional 2015 key

    Visual Studio Professional 2015 Key : HMGNV-WCYXV-X7G9W-YCX63-B98R2 Visual Studio Enterprise 2015 Ke ...

  3. 数学之路-python计算实战&lpar;20&rpar;-机器视觉-拉普拉斯算子卷积滤波

    拉普拉斯算子进行二维卷积计算,线性锐化滤波 # -*- coding: utf-8 -*- #线性锐化滤波-拉普拉斯算子进行二维卷积计算 #code:myhaspl@myhaspl.com impor ...

  4. 使用jQuery和css3实现了仿淘宝ued博客左边的菜单切换动画

    今天看到淘宝ued博客的左侧导航菜单的动画好,要使用jQuery和css3我做一个简单的示例,主要用途是实现jQuery 事件和css3 transition属性. 个元素来实现鼠标滑动到某个导航的背 ...

  5. Android JNI的使用浅析

    介绍JNI的好文章: http://blog.csdn.net/yuanzeyao/article/details/42418977 JNI技术对于多java开发的朋友相信并不陌生,即(java na ...

  6. 【AC自动机】Lougu P3796

    题目描述 有NNN个由小写字母组成的模式串以及一个文本串TTT.每个模式串可能会在文本串中出现多次.你需要找出哪些模式串在文本串TTT中出现的次数最多. 输入输出格式 输入格式: 输入含多组数据. 每 ...

  7. jquery validate 动态增加删除验证规则

    增加规则示例: $('.class').rules('add',{ required: true, messages:{ required: '这是必填,请填写', } }); 删除规则示例: $(' ...

  8. mysqldump的single-transaction

    先看一下--lock-tables和--lock-all-tables --lock-all-tables 一次性锁定所有数据库的所有表,在整个dump期间一直获取global read lock: ...

  9. cleanCode&lbrack;2&rsqb;:函数编写的几大规则

    函数编写的几大规则 很难一开始就遵循这些规则,但是可以先想什么就写什么,然后再打磨它. 1.短小 函数的第一规则是短小,第二规则是还要更短小. if.else.while语句等,其中的代码块应该只有一 ...

  10. 6&period;Exceptions-异常&lpar;Dart中文文档&rpar;

    异常是用于标识程序发生未知异常.如果异常没有被捕获,If the exception isn't caught, the isolate that raised the exception is su ...