hadoop第一个例子wordcount学习

时间:2023-01-27 13:46:13

昨天在自己的电脑上配置了hadoop,也运行了第一个MapReduce程序WordCount程序。但是对mapreduce的编程还很不清楚,在网上转了一段对wordcount的解释,转载学习下。

Wordcount的输入是文件夹,文件夹内是多个文件,内容是以空格作分隔符的单词序列,输出为单词,以及他们的数量。

首先,在mapreduce程序中,程序会按照setInputFormat中设置的方法为将输入切分成一个个InputSplit。在Map过程中,程序会为每一个InputSplit调用map函数,这里即以空格作分隔符将单词切开。并以单词作为key,1作为value。需要特别指出的是,mapreduce的<key,value>无论是key还是value都是mapreduce预先定义好的格式,因此在wordcount这个程序中,我们要把String转换成text格式,int转换为IntWritable格式。如下:

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

再做

word.set(tokenizer.nextToken());                         

将这些<key,value>对作为Map的结果传递下去

output.collect(word, one);

在Reduce过程中,程序会对每组<key,list of values>调用reduce函数,在我们这个程序中,只需让value相加即可以。最后调用output.collect输出Reduce结果。

以下是程序内容及注释:

package com.felix;  

 

import java.io.IOException;  

import java.util.Iterator;  

import java.util.StringTokenizer;  

 

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.io.IntWritable;  

import org.apache.hadoop.io.LongWritable;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapred.FileInputFormat;  

import org.apache.hadoop.mapred.FileOutputFormat;  

import org.apache.hadoop.mapred.JobClient;  

import org.apache.hadoop.mapred.JobConf;  

import org.apache.hadoop.mapred.MapReduceBase;  

import org.apache.hadoop.mapred.Mapper;  

import org.apache.hadoop.mapred.OutputCollector;  

import org.apache.hadoop.mapred.Reducer;  

import org.apache.hadoop.mapred.Reporter;  

import org.apache.hadoop.mapred.TextInputFormat;  

import org.apache.hadoop.mapred.TextOutputFormat;  

/** 

 *  

 * 描述:WordCount explains by Felix 

 * @author Hadoop Dev Group 

 */ 

public class WordCount  

{  

 

    /** 

     * MapReduceBase类:实现了Mapper和Reducer接口的基类(其中的方法只是实现接口,而未作任何事情) 

     * Mapper接口: 

     * WritableComparable接口:实现WritableComparable的类可以相互比较。所有被用作key的类应该实现此接口。 

     * Reporter 则可用于报告整个应用的运行进度,本例中未使用。  

     *  

     */ 

    public static class Map extends MapReduceBase implements 

            Mapper<LongWritable, Text, Text, IntWritable>   //设定了map函数输入的形式为longwritable<key>text<value>输出地形式为text<key>intwritable<value>

    {  

        /** 

         * LongWritable, IntWritable, Text 均是 Hadoop 中实现的用于封装 Java 数据类型的类,这些类实现了WritableComparable接口, 

         * 都能够被串行化从而便于在分布式环境中进行数据交换,你可以将它们分别视为long,int,String 的替代品。 

         */ 

        private final static IntWritable one = new IntWritable(1);   //定义一个intwritable型的常量,用来说明出现过一次

        private Text word = new Text();                        //定义一个text型的变量,用来保存单词

          

        /** 

         * Mapper接口中的map方法: 

         * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter) 

         * 映射一个单个的输入k/v对到一个中间的k/v对 

         * 输出对不需要和输入对是相同的类型,输入对可以映射到0个或多个输出对。 

         * OutputCollector接口:收集Mapper和Reducer输出的<k,v>对。 

         * OutputCollector接口的collect(k, v)方法:增加一个(k,v)对到output 

         */ 

        public void map(LongWritable key, Text value,  

                OutputCollector<Text, IntWritable> output, Reporter reporter)   //map中的参变量说明map输入时的keyvalue对的形式,以及map输出和reduce接收的keyvalue数据类型

                throws IOException  

        {  

            String line = value.toString();   //将输入中的一行保存到line中

           StringTokenizer tokenizer = new StringTokenizer(line);   //将一行保存到准备切词的工具中

            while (tokenizer.hasMoreTokens())   //判断是否到一行的结束

            {  

                word.set(tokenizer.nextToken());  //设定key即word的值为从每一行切下来的单词  

                output.collect(word, one);       //设定map函数输出的keyvalue对

            }  

        }  

    }  

 

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>        //设定reduce函数中输入对的数据类型是text和intwritable,输出对的数据类型是text和intwritable

    {  

        public void reduce(Text key, Iterator<IntWritable> values,  

                OutputCollector<Text, IntWritable> output, Reporter reporter)   //设定reduce函数中输入对的数据类型是text和intwritable,输出对的数据类型是text和intwritable

                throws IOException  

        {  

            int sum = 0;  

            while (values.hasNext())        //计算同一个key下,所有value的总和

            {  

                sum += values.next().get();   //获取下一个value的值

            }  

            output.collect(key, new IntWritable(sum));   //收集reduce输出结果

        }  

    }  

 

    public static void main(String[] args) throws Exception  

    {  

        /** 

         * JobConf:map/reduce的job配置类,向hadoop框架描述map-reduce执行的工作 

         * 构造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等 

         */ 

        JobConf conf = new JobConf(WordCount.class);  

        conf.setJobName("wordcount");           //设置一个用户定义的job名称  

 

        conf.setOutputKeyClass(Text.class);    //为job的输出数据设置Key类  

        conf.setOutputValueClass(IntWritable.class);   //为job输出设置value类  

 

        conf.setMapperClass(Map.class);         //为job设置Mapper类  

        conf.setCombinerClass(Reduce.class);      //为job设置Combiner类  

        conf.setReducerClass(Reduce.class);        //为job设置Reduce类  

 

        conf.setInputFormat(TextInputFormat.class);    //为map-reduce任务设置InputFormat实现类  

        conf.setOutputFormat(TextOutputFormat.class);  //为map-reduce任务设置OutputFormat实现类  

 

       /** 

         * InputFormat描述map-reduce中对job的输入定义 

         * setInputPaths():为map-reduce job设置路径数组作为输入列表 

         * setInputPath():为map-reduce job设置路径数组作为输出列表 

         */ 

        FileInputFormat.setInputPaths(conf, new Path(args[0]));  

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));  

 

        JobClient.runJob(conf);         //运行一个job  

    }