Hadoop (十二)Hadoop-MR执行环境之---定义多个resuce task

时间:2022-09-24 23:28:53

引言

        之前我们说道,默认情况下reduce task只有一个,当我们需要对大量数据进行统计时,一个reduce task已经捉襟见肘,那么我们就有必要配置多哦reduce task进行并行任务执行。

实例

        例子:我们有一个每年每月温度的统计数据需要进行分析。如下图数据。我们需要统计每年中每个月的最高温度的前三名。

        Hadoop (十二)Hadoop-MR执行环境之---定义多个resuce task

        自定义分区:我们需要 分配多个reduce task,根据年来进行reduce task的分区,每个reduce task统计每一年的数据。这就引入了自定义分区。

        自定义分组:因为我们要统计每年每月气温的前三名。所以我们应该按照月分组,然后找这个组气温前三名。这就引入了自定义分组。

       自定义 排序:统计温度的前三名,则需要对数据进行排序,取前三。这就引入了自定义排序。

        之前我们已经知道分区、分组、排序规则如下

                分区:hashcode模reduce数量

                分组:根据key比较进行分组      

                排序:根据key的hashcode

        所以我们要将需要自定义分组、排序、分区的数据封装进入key中,所以我们需要对key进行多态封装。

编码

一、总述

        这个测试模块的类全在包com.zjt.mapreducer.weather下。

        因为分区、分组、排序都是自定义,所以关于分区、分组和排序以及其依赖的键都需要我们手动编码完成。我们需要一共完成下方七个类的编码:

        1、MyKey.java

                因为分区、分组、排序都需要我们自己完成,而这三个都是依赖于键进行其工作的。所以我们要根据我们时间需求自定义键。

        2、WeatherMapper.java

                hdfs输入数据的mapper处理函数。其将接收到的数据封装为MyKey实体类,传递给下一步进行分区

        3、MyPartitioner.java

                我们自己实现的分区类,给WeatherMapper.java传来的数据 通过 取模 运算,返回其分区号

        4、MySort.java

                自定义排序类。通过比较MyKey.java中年、月、日来比较其实体类是否相等。

        5、MyGroup.java

                自定义分组。通过比较MyKey.java中年、月来比较其实体类是否相等。

        6、WeatherReducer.java

                接收到被排序、分区、分组后的数据,取其最大的三个,则为每年每月气温最高的前三个。

        7、RunJob.java

                运行类,其中对代码项进行配置,启动任务。

二、各个类源码     

        1、MyKey.java

                因为分区、分组、排序都需要我们自己完成,而这三个都是依赖于键进行其工作的。所以我们要根据我们时间需求自定义键。

package com.zjt.mapreducer.weather;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;

/**
 *
 * @author ZhangJintao
 */
public class MyKey implements WritableComparable<MyKey>{
    private int year;  //年
    private int month;   //月
    private double hot;   //温度
    public int getYear() {
        return year;
    }
    public void setYear(int year) {
        this.year = year;
    }
    public int getMonth() {
        return month;
    }
    public void setMonth(int month) {
        this.month = month;
    }
    public double getHot() {
        return hot;
    }
    public void setHot(double hot) {
        this.hot = hot;
    }
    /**
     * 反序列化【从对象流中读出】
     */
    public void readFields(DataInput arg0) throws IOException {
        this.year = arg0.readInt();
        this.month = arg0.readInt();
        this.hot = arg0.readDouble();
    }
    /**
     * 序列化
     */
    public void write(DataOutput arg0) throws IOException {
        arg0.writeInt(year);
        arg0.writeInt(month);
        arg0.writeDouble(hot);
    }
    /**
     * 比较两个对象是否相等方法,分组时用到
     */
    public int compareTo(MyKey o) {
        int r1 = Integer.compare(this.year, o.getYear());
        if (r1 == 0) {
            int r2 = Integer.compare(this.month, o.getMonth());
            if (r2 == 0) {
                return Double.compare(this.hot, o.getHot());
            }else {
                return r2;
            }
        }else{
            return r1 ;
        }
    }
    @Override
    public String toString() {
        return "MyKey [year=" + year + ", month=" + month + ", hot=" + hot
                + "]";
    }
}

        2、WeatherMapper.java

                hdfs输入数据的mapper处理函数。其将接收到的数据封装为MyKey实体类,传递给下一步进行分区

package com.zjt.mapreducer.weather;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class WeatherMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-HH-dd hh:mm:ss");
    
    /**
     * 根据键和值,将我们的数据拿出来封装成MyKey
     *     key/value:每一行第一个隔开符 左边为key,右边为value
     */
//    @Override
    protected void map(Text key, Text value,
            Mapper<Text, Text, MyKey, DoubleWritable>.Context context)
            throws IOException, InterruptedException {
        System.out.println("WeatherMapper.map()");
        try {
            Date date = sdf.parse(key.toString());
            Calendar c = Calendar.getInstance();
            c.setTime(date);
            int year = c.get(Calendar.YEAR);
            int month = c.get(Calendar.MONTH);
            double hot = Double.parseDouble(value.toString().substring(0, value.toString().lastIndexOf("c")));
            MyKey myKey = new MyKey();
            myKey.setHot(hot);
            myKey.setYear(year);
            myKey.setMonth(month);
            System.out.println("===WeatherMapper.map() 开始输出数据");
            System.out.println("===   【"+myKey.toString() + " : " + hot +"】");
            context.write(myKey, new DoubleWritable(hot));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

        3、MyPartitioner.java

                我们自己实现的分区类,给WeatherMapper.java传来的数据 通过 取模 运算,返回其分区号

package com.zjt.mapreducer.weather;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;

public class MyPartitioner extends HashPartitioner<MyKey, DoubleWritable>{
    /**
     * 自定义分区
     * MyTask每输出一个数据调用一次,所以这个方法越短越好
     *     返回分区号
     */
//    @Override
    public int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {
        System.out.println("MyPartitioner.getPartition()");
        System.out.println("===MyPartitioner.getPartition()  接收到到数据");
        System.out.println("===   【" + key.toString() + "】");
        System.out.println("===MyPartitioner.getPartition()  开始返回分区号");
        System.out.println("===   【" + (key.getYear()-1949) % numReduceTasks + "】");
        //当前年份减去1949年,对ReduceTasks数量取模
        return (key.getYear()-1949) % numReduceTasks;
    }
}

        4、MySort.java

                自定义排序类。通过比较MyKey.java中年、月、日来比较其实体类是否相等。

package com.zjt.mapreducer.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定义排序函数,继承默认的排序
 * @author ZhangJintao
 */
public class MySort extends WritableComparator{
    public MySort() {
        super(MyKey.class,true);
    }
    
//    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        System.out.println("MySort.compare()");
        MyKey k1 = (MyKey)a;
        MyKey k2 = (MyKey)b;
        System.out.println("===MySort.compare()   开始比较数据");
        System.out.println("===   k1=【"+k1.toString()+"】");
        System.out.println("===   k2=【"+k2.toString()+"】");
        int r1 = Integer.compare(k1.getYear(), k2.getYear());
        if (r1 == 0) {
            int r2 = Integer.compare(k1.getMonth(), k2.getMonth());
            if (r2 == 0) {
                System.out.println("===MySort.compare()   比较结果");
                System.out.println("===   【"+-Double.compare(k1.getHot(), k2.getHot())+"】");
                return -Double.compare(k1.getHot(), k2.getHot());
            }else {
                System.out.println("===MySort.compare()   比较结果");
                System.out.println("===   【"+r2+"】");
                return r2;
            }
        }else{
            System.out.println("===MySort.compare()   比较结果");
            System.out.println("===   【"+r1+"】");
            return r1 ;
        }
    }
}

        5、MyGroup.java

                自定义分组。通过比较MyKey.java中年、月来比较其实体类是否相等。

package com.zjt.mapreducer.weather;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
 * 自定义分组函数
 * @author ZhangJintao
 */
public class MyGroup extends WritableComparator{
    public MyGroup() {
        super(MyKey.class,true);
    }
    
//    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        System.out.println("MyGroup.compare()");
        System.out.println("===MyGroup.compare()   开始比较数据");
        MyKey k1 = (MyKey)a;
        MyKey k2 = (MyKey)b;
        System.out.println("===   k1=【"+k1.toString()+"】");
        System.out.println("===   k2=【"+k2.toString()+"】");
        
        int r1 = Integer.compare(k1.getYear(), k2.getYear());
        if (r1 == 0) {
            System.out.println("===MyGroup.compare()   比较结果");
            System.out.println("===   【"+Integer.compare(k1.getMonth(), k2.getMonth())+"】");
            return Integer.compare(k1.getMonth(), k2.getMonth());
        }else{
            System.out.println("===MyGroup.compare()   比较结果");
            System.out.println("===   【"+r1+"】");
            return r1 ;
        }
    }
}

        6、WeatherReducer.java

                接收到被排序、分区、分组后的数据,取其最大的三个,则为每年每月气温最高的前三个。

package com.zjt.mapreducer.weather;

import java.io.IOException;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class WeatherReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{
//    @Override
    protected void reduce(MyKey arg0, Iterable<DoubleWritable> arg1,
            Reducer<MyKey, DoubleWritable, Text, NullWritable>.Context arg2)
            throws IOException, InterruptedException {
        System.out.println("WeatherReducer.reduce()");
        System.out.println("===WeatherReducer.reduce()   处理收到的数据");
        System.out.println("===WeatherReducer.reduce()   【"+arg0.toString()+"】");
        System.out.println("===WeatherReducer.reduce()   打印当前组的所有数据");
        for (DoubleWritable v :arg1) {
            String msg = arg0.getYear() + "\t" + arg0.getMonth() + "\t" + v.get();
            System.out.println("===WeatherReducer.reduce()   【"+msg+"】");
        }
        int i = 0 ;
        for (DoubleWritable v :arg1) {
            i ++ ;
            String msg = arg0.getYear() + "\t" + arg0.getMonth() + "\t" + v.get();
            arg2.write(new Text(msg), NullWritable.get());
            if (i == 3) {
                break;
            }
        }
    }
}

        7、RunJob.java

                运行类,其中对代码项进行配置,启动任务。 

package com.zjt.mapreducer.weather;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {

    public static void main(String[] args) {
        Configuration config =new Configuration();
        config.set("fs.defaultFS", "hdfs://node1:8020");
        config.set("yarn.resourcemanager.hostname", "node1");
//        config.set("mapred.jar", "C:\\Users\\ZhangJintao\\Desktop\\wc.jar");
        try {
            FileSystem fs =FileSystem.get(config);
            
            Job job =Job.getInstance(config);
            job.setJarByClass(RunJob.class);
            
            job.setJobName("weather");
            job.setMapperClass(WeatherMapper.class);
            job.setReducerClass(WeatherReducer.class);
            job.setMapOutputKeyClass(MyKey.class);
            job.setOutputValueClass(DoubleWritable.class);
            
            job.setPartitionerClass(MyPartitioner.class);
            job.setSortComparatorClass(MySort.class);
            job.setGroupingComparatorClass(MyGroup.class);
            
            job.setNumReduceTasks(3);
            
            job.setInputFormatClass(KeyValueTextInputFormat.class);
            
            FileInputFormat.addInputPath(job, new Path("/usr/input/weather"));
            
            Path outpath =new Path("/usr/output/weather");
            
            
            if(fs.exists(outpath)){
                fs.delete(outpath, true);
            }
            FileOutputFormat.setOutputPath(job, outpath);
            
            boolean f= job.waitForCompletion(true);
            if(f){
                System.out.println("JOB 执行成功");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
三、运行项目

        给hdfs上传/usr/input/weather,执行RunJob.java 的 main方法,执行成功后,刷新output得到如下结果

Hadoop (十二)Hadoop-MR执行环境之---定义多个resuce task

四、运行分析

        我们可以给每个类的执行方法内加入System.out.print("类名+方法名");从控制台得到的信息筛选出来,如下所示(即为代码的执行顺序)

       WeatherMapper.map()
       WeatherMapper.map() 开始输出数据
          【MyKey [year=1949, month=0, hot=34.0] : 34.0】
    MyPartitioner.getPartition()
       MyPartitioner.getPartition()  接收到到数据
          【MyKey [year=1949, month=0, hot=34.0]】
       MyPartitioner.getPartition()  开始返回分区号
          【0】
    WeatherMapper.map()
       WeatherMapper.map() 开始输出数据
          【MyKey [year=1949, month=0, hot=36.0] : 36.0】
    MyPartitioner.getPartition()
       MyPartitioner.getPartition()  接收到到数据
          【MyKey [year=1949, month=0, hot=36.0]】
       MyPartitioner.getPartition()  开始返回分区号
          【0】
    WeatherMapper.map()
       WeatherMapper.map() 开始输出数据
          【MyKey [year=1950, month=0, hot=32.0] : 32.0】
    MyPartitioner.getPartition()
       MyPartitioner.getPartition()  接收到到数据
          【MyKey [year=1950, month=0, hot=32.0]】
       MyPartitioner.getPartition()  开始返回分区号
          【1】
    WeatherMapper.map()
       WeatherMapper.map() 开始输出数据
          【MyKey [year=1950, month=0, hot=37.0] : 37.0】
    MyPartitioner.getPartition()
       MyPartitioner.getPartition()  接收到到数据
          【MyKey [year=1950, month=0, hot=37.0]】
       MyPartitioner.getPartition()  开始返回分区号
          【1】
    WeatherMapper.map()
       WeatherMapper.map() 开始输出数据
          【MyKey [year=1951, month=0, hot=23.0] : 23.0】
    MyPartitioner.getPartition()
       MyPartitioner.getPartition()  接收到到数据
          【MyKey [year=1951, month=0, hot=23.0]】
       MyPartitioner.getPartition()  开始返回分区号
          【2】
    WeatherMapper.map()
       WeatherMapper.map() 开始输出数据
          【MyKey [year=1950, month=0, hot=41.0] : 41.0】
    MyPartitioner.getPartition()
       MyPartitioner.getPartition()  接收到到数据
          【MyKey [year=1950, month=0, hot=41.0]】
       MyPartitioner.getPartition()  开始返回分区号
          【1】
    WeatherMapper.map()
       WeatherMapper.map() 开始输出数据
          【MyKey [year=1950, month=0, hot=27.0] : 27.0】
    MyPartitioner.getPartition()
       MyPartitioner.getPartition()  接收到到数据
          【MyKey [year=1950, month=0, hot=27.0]】
       MyPartitioner.getPartition()  开始返回分区号
          【1】
    WeatherMapper.map()
       WeatherMapper.map() 开始输出数据
          【MyKey [year=1951, month=0, hot=45.0] : 45.0】
    MyPartitioner.getPartition()
       MyPartitioner.getPartition()  接收到到数据
          【MyKey [year=1951, month=0, hot=45.0]】
       MyPartitioner.getPartition()  开始返回分区号
          【2】
    WeatherMapper.map()
       WeatherMapper.map() 开始输出数据
          【MyKey [year=1951, month=0, hot=46.0] : 46.0】
    MyPartitioner.getPartition()
       MyPartitioner.getPartition()  接收到到数据
          【MyKey [year=1951, month=0, hot=46.0]】
       MyPartitioner.getPartition()  开始返回分区号
          【2】
    WeatherMapper.map()
       WeatherMapper.map() 开始输出数据
          【MyKey [year=1951, month=0, hot=47.0] : 47.0】
    MyPartitioner.getPartition()
       MyPartitioner.getPartition()  接收到到数据
          【MyKey [year=1951, month=0, hot=47.0]】
       MyPartitioner.getPartition()  开始返回分区号
          【2】
    MySort.compare()
       MySort.compare()   开始比较数据
          k1=【MyKey [year=1951, month=0, hot=47.0]】
          k2=【MyKey [year=1951, month=0, hot=46.0]】
       MySort.compare()   比较结果
          【-1】
    MySort.compare()
       MySort.compare()   开始比较数据
          k1=【MyKey [year=1951, month=0, hot=46.0]】
          k2=【MyKey [year=1951, month=0, hot=45.0]】
       MySort.compare()   比较结果
          【-1】
    MySort.compare()
       MySort.compare()   开始比较数据
          k1=【MyKey [year=1950, month=0, hot=27.0]】
          k2=【MyKey [year=1950, month=0, hot=41.0]】
       MySort.compare()   比较结果
          【1】
    MySort.compare()
       MySort.compare()   开始比较数据
          k1=【MyKey [year=1951, month=0, hot=45.0]】
          k2=【MyKey [year=1951, month=0, hot=23.0]】
       MySort.compare()   比较结果
          【-1】
    MySort.compare()
       MySort.compare()   开始比较数据
          k1=【MyKey [year=1950, month=0, hot=27.0]】
          k2=【MyKey [year=1950, month=0, hot=37.0]】
       MySort.compare()   比较结果
          【1】
    MySort.compare()
       MySort.compare()   开始比较数据
          k1=【MyKey [year=1950, month=0, hot=41.0]】
          k2=【MyKey [year=1950, month=0, hot=37.0]】
       MySort.compare()   比较结果
          【-1】
    MySort.compare()
       MySort.compare()   开始比较数据
          k1=【MyKey [year=1950, month=0, hot=27.0]】
          k2=【MyKey [year=1950, month=0, hot=32.0]】
       MySort.compare()   比较结果
          【1】
    MySort.compare()
       MySort.compare()   开始比较数据
          k1=【MyKey [year=1950, month=0, hot=37.0]】
          k2=【MyKey [year=1950, month=0, hot=32.0]】
       MySort.compare()   比较结果
          【-1】
    MySort.compare()
       MySort.compare()   开始比较数据
          k1=【MyKey [year=1949, month=0, hot=36.0]】
          k2=【MyKey [year=1949, month=0, hot=34.0]】
       MySort.compare()   比较结果
          【-1】
    MyGroup.compare()
       MyGroup.compare()   开始比较数据
          k1=【MyKey [year=1949, month=0, hot=36.0]】
          k2=【MyKey [year=1949, month=0, hot=34.0]】
       MyGroup.compare()   比较结果
          【0】
    WeatherReducer.reduce()
       WeatherReducer.reduce()   处理收到的数据
       WeatherReducer.reduce()   【MyKey [year=1949, month=0, hot=36.0]】
       WeatherReducer.reduce()   打印当前组的所有数据
       WeatherReducer.reduce()   【1949    0    36.0】
       WeatherReducer.reduce()   【1949    0    34.0】
    MyGroup.compare()
       MyGroup.compare()   开始比较数据
          k1=【MyKey [year=1950, month=0, hot=41.0]】
          k2=【MyKey [year=1950, month=0, hot=37.0]】
       MyGroup.compare()   比较结果
          【0】
    WeatherReducer.reduce()
       WeatherReducer.reduce()   处理收到的数据
       WeatherReducer.reduce()   【MyKey [year=1950, month=0, hot=41.0]】
       WeatherReducer.reduce()   打印当前组的所有数据
       WeatherReducer.reduce()   【1950    0    41.0】
    MyGroup.compare()
       MyGroup.compare()   开始比较数据
          k1=【MyKey [year=1950, month=0, hot=37.0]】
          k2=【MyKey [year=1950, month=0, hot=32.0]】
       MyGroup.compare()   比较结果
          【0】
       WeatherReducer.reduce()   【1950    0    37.0】
    MyGroup.compare()
       MyGroup.compare()   开始比较数据
          k1=【MyKey [year=1950, month=0, hot=32.0]】
          k2=【MyKey [year=1950, month=0, hot=27.0]】
       MyGroup.compare()   比较结果
          【0】
       WeatherReducer.reduce()   【1950    0    32.0】
       WeatherReducer.reduce()   【1950    0    27.0】
    MyGroup.compare()
       MyGroup.compare()   开始比较数据
          k1=【MyKey [year=1951, month=0, hot=47.0]】
          k2=【MyKey [year=1951, month=0, hot=46.0]】
       MyGroup.compare()   比较结果
          【0】
    WeatherReducer.reduce()
       WeatherReducer.reduce()   处理收到的数据
       WeatherReducer.reduce()   【MyKey [year=1951, month=0, hot=47.0]】
       WeatherReducer.reduce()   打印当前组的所有数据
       WeatherReducer.reduce()   【1951    0    47.0】
    MyGroup.compare()
       MyGroup.compare()   开始比较数据
          k1=【MyKey [year=1951, month=0, hot=46.0]】
          k2=【MyKey [year=1951, month=0, hot=45.0]】
       MyGroup.compare()   比较结果
          【0】
       WeatherReducer.reduce()   【1951    0    46.0】
    MyGroup.compare()
       MyGroup.compare()   开始比较数据
          k1=【MyKey [year=1951, month=0, hot=45.0]】
          k2=【MyKey [year=1951, month=0, hot=23.0]】
       MyGroup.compare()   比较结果
          【0】
       WeatherReducer.reduce()   【1951    0    45.0】
       WeatherReducer.reduce()   【1951    0    23.0】
        从上方打印的控制台信息可以看出执行步骤如下

        1、WeatherMapper.map将hdfs传来的数据封装为MyKey实体,传递给MyPartitioner.getPartition进行分区

        2、MySort.compare进行排序

        3、MyGroup进行分组

        4、WeatherReducer.reduce接受到分组且拍好顺序的数据后,拿出前三位。