Hbase+MapReduce计算学生成绩平均分

时间:2021-04-25 00:45:09

一、创建HBase的学生成绩表’test’ 、学生平均分成绩表’average_score’

  首先需要将学生成绩插入到表’test’,然后程序创建一个学生平均成绩表’average_score’ 供MapReduce程序写入数据。

1. 待插入到hbase的数据

Hbase+MapReduce计算学生成绩平均分

2.运行程序的hadoop指令

初始化hbase表的InitData函数在工程中的位置:
Hbase+MapReduce计算学生成绩平均分
hadoop运行jar指令格式:
  下面指令中,XXX是具体jar包的名字,KKK是需要运行的函数名称
  hadoop jar XXX.jar KKK
具体命令:

 hadoop jar hadoop_averageScore.jar hadoop_averageScore.InitData

3.查看程序执行结果

3.1 学生成绩数据插入完成后,使用hbase shell查看表 ‘test’
代码:

hbase shell
scan 'test'

结果:
Hbase+MapReduce计算学生成绩平均分
3.2 查看程序运行后,hbase共有多少个数据表
代码:

hbase shell
list

结果:
Hbase+MapReduce计算学生成绩平均分


二、运行计算学生成绩平均分的MapReduce程序

1. 运行MapReduce程序的指令

hadoop jar hadoop_averageScore.jar hadoop_averageScore.average_score

2. 使用hbase shell查看表 ‘average_score’

Hbase+MapReduce计算学生成绩平均分


三、完整的Java工程

Hbase+MapReduce计算学生成绩平均分
  InitData.java负责初始化学生成绩表’test’,和创建学生平均成绩表’average_score’供MapReduce程序写入数据。
  average_score.java是MapReduce程序,负责从表’test’读入学生成绩,然后计算学生的平均分,最后写入表’average_score’。
  HBaseUtil.java是程序操作hbase的工具类,里面提供了多种操作hbase的操作API。

1. InitData.java

package hadoop_averageScore;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.jruby.RubyBoolean.False;


public class InitData {

    @SuppressWarnings("deprecation")
    public static void main(String[] args) throws IOException {

        Configuration conf = HBaseConfiguration.create();
        HBaseAdmin admin = new HBaseAdmin(conf);

        //如果表"average_score"不存在,则创建表
        if(admin.tableExists("average_score")==false) 
        {
                //创建一个学生平均分表'average_score',只有一个列族avg(平均分average_score的缩写)
                HTableDescriptor tableDescriptor = new HTableDescriptor("average_score");
                HColumnDescriptor columnDescriptor =new HColumnDescriptor("avg"); 
                tableDescriptor.addFamily(columnDescriptor);
                admin.createTable(tableDescriptor);
                System.out.println("**************************成功创建表average_score********************************");
        }  

        //如果表"test"不存在,则创建表
        if(admin.tableExists("test")==false) 
        {
                //创建一个学生成绩表'test',有三个列族student_Name、subject、score(学生姓名,科目,分数)
                HTableDescriptor tableDescriptor = new HTableDescriptor("test");
                tableDescriptor.addFamily(new HColumnDescriptor("student_Name"));
                tableDescriptor.addFamily(new HColumnDescriptor("subject"));
                tableDescriptor.addFamily(new HColumnDescriptor("score"));
                admin.createTable(tableDescriptor);
                System.out.println("**************************成功创建表average_score********************************");
        }  

        //获取学生成绩表'test'
        HTable htable = HBaseUtil.getHTable("test");
        htable.setAutoFlush(false);

        //创建学生成绩数据
       List<Put> puts = new ArrayList<Put>();

       Put put1_1 = HBaseUtil.getPut("1","student_Name",null,"tom");
       Put put1_2 = HBaseUtil.getPut("1","subject",null,"yuwen");
       Put put1_3 = HBaseUtil.getPut("1","score",null,"80");

       Put put2_1 = HBaseUtil.getPut("2","student_Name",null,"tom");
       Put put2_2 = HBaseUtil.getPut("2","subject",null,"shuxue");
       Put put2_3 = HBaseUtil.getPut("2","score",null,"98");

       Put put3_1 = HBaseUtil.getPut("3","student_Name",null,"tom");
       Put put3_2 = HBaseUtil.getPut("3","subject",null,"yingyu");
       Put put3_3 = HBaseUtil.getPut("3","score",null,"89");

       Put put4_1 = HBaseUtil.getPut("4","student_Name",null,"cat");
       Put put4_2 = HBaseUtil.getPut("4","subject",null,"yuwen");
       Put put4_3 = HBaseUtil.getPut("4","score",null,"88");

       Put put5_1 = HBaseUtil.getPut("5","student_Name",null,"cat");
       Put put5_2 = HBaseUtil.getPut("5","subject",null,"shuxue");
       Put put5_3 = HBaseUtil.getPut("5","score",null,"99");

       Put put6_1 = HBaseUtil.getPut("6","student_Name",null,"cat");
       Put put6_2 = HBaseUtil.getPut("6","subject",null,"yingyu");
       Put put6_3 = HBaseUtil.getPut("6","score",null,"90");

       puts.add(put1_1);
       puts.add(put1_2);
       puts.add(put1_3);

       puts.add(put2_1);
       puts.add(put2_2);
       puts.add(put2_3);

       puts.add(put3_1);
       puts.add(put3_2);
       puts.add(put3_3);

       puts.add(put4_1);
       puts.add(put4_2);
       puts.add(put4_3);

       puts.add(put5_1);
       puts.add(put5_2);
       puts.add(put5_3);

       puts.add(put6_1);
       puts.add(put6_2);
       puts.add(put6_3);

       //将学生成绩数据插入到表'test'
      htable.put(puts);
      htable.flushCommits();
      htable.close();
      System.out.println("**************************向表'test'插入数据成功********************************");
    }
}

2. average_score.java

package hadoop_averageScore;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;

public class average_score {
    /** * TableMapper<Text,IntWritable> Text:输出的key类型,IntWritable:输出的value类型 */
    public static class MyMapper extends TableMapper<Text,Text>{

        private static Text one = new Text();
        private static Text word = new Text();


        @Override
        protected void map(ImmutableBytesWritable key, Result value,
                Context context)
                throws IOException, InterruptedException {
            //获取每一行的列score的值
            byte[] score =value.getValue(Bytes.toBytes("score"), null);
            byte [] student_Name =value.getValue(Bytes.toBytes("student_Name"), null);
            word.set(student_Name);
            one.set(score);
            context.write(word, one);

        }
    }

    /** * TableReducer<Text,IntWritable> Text:输入的key类型,Text:输入的value类型,ImmutableBytesWritable:输出类型 */
    public static class MyReducer extends TableReducer<Text,Text,ImmutableBytesWritable>{

        @Override
        protected void reduce(Text key, Iterable<Text> values,
                Context context)
                throws IOException, InterruptedException {

            //定义科目数量 
            int courseCount = 0;  
            //定义中成绩 
            int sum = 0;  
            //定义平均分 
            float average = 0;  

            for(Text val:values) {
                sum+=Integer.parseInt(val.toString());
                courseCount ++;  
            }

           //求平均成绩 
            average = sum / courseCount;  

            //添加一行记录,每一个单词作为行键
            Put put = new Put(Bytes.toBytes(key.toString()));
            //在列族result中添加一个标识符num,赋值为每个单词出现的次数
            //String.valueOf(sum)先将数字转化为字符串,否则存到数据库后会变成\x00\x00\x00\x这种形式
            //然后再转二进制存到hbase。
            put.add(Bytes.toBytes("avg"), null, Bytes.toBytes(String.valueOf(average)));
            context.write(new ImmutableBytesWritable(Bytes.toBytes(key.toString())),put);
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = HBaseConfiguration.create();
        Job job = Job.getInstance(conf);  
        job.setJarByClass(average_score.class);


        Scan scan = new Scan();
        //指定要查询的列族
// scan.addColumn(Bytes.toBytes("score"),null);
        //指定Mapper读取的表为word
        TableMapReduceUtil.initTableMapperJob("test", scan, MyMapper.class, Text.class, Text.class, job);
       //指定Reducer写入的表为stat
        TableMapReduceUtil.initTableReducerJob("average_score", MyReducer.class, job);
        System.exit(job.waitForCompletion(true)?0:1);
    }
}

3. HBaseUtil.java

package hadoop_averageScore;



import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseUtil {

    /** * 初始化HBase的配置文件 * @return */
    public static Configuration getConfiguration(){
        Configuration conf = HBaseConfiguration.create();
        //和hbase-site.xml中配置的一致
        conf.set("hbase.zooker.quorum", "h1,h2,h3");
        return conf;
    }

    /** * 实例化HBaseAdmin,HBaseAdmin用于对表的元素据进行操作 * @return * @throws IOException */
    public static HBaseAdmin getHBaseAdmin() throws IOException{
        return new HBaseAdmin(getConfiguration());
    }

    /** * 创建表 * @param tableName 表名 * @param columnFamilies 列族 * @throws IOException */
    @SuppressWarnings("deprecation")
    public static void createTable(String tableName,String...columnFamilies) throws IOException {
        HTableDescriptor htd = new HTableDescriptor(tableName.getBytes());//
        for(String fc : columnFamilies) {
            htd.addFamily(new HColumnDescriptor(fc));
        }
        getHBaseAdmin().createTable(htd);
    }

    /** * 获取HTableDescriptor * @param tableName * @return * @throws IOException */
    public static HTableDescriptor getHTableDescriptor(byte[] tableName) throws IOException{
        return getHBaseAdmin().getTableDescriptor(tableName); 
    }

    /** * 获取表 * @param tableName 表名 * @return * @throws IOException */
    public static HTable getHTable(String tableName) throws IOException{
        return new HTable(getConfiguration(),tableName);
    }

    /** * 获取Put,Put是插入一行数据的封装格式 * @param tableName * @param row * @param columnFamily * @param qualifier * @param value * @return * @throws IOException */
    public static Put getPut(String row,String columnFamily,String qualifier,String value) throws IOException{
        Put put = new Put(row.getBytes());
        if(qualifier==null||"".equals(qualifier)) {
            put.add(columnFamily.getBytes(), null, value.getBytes());
        }else {
            put.add(columnFamily.getBytes(), qualifier.getBytes(), value.getBytes());
        }
        return put;
    }

    /** * 查询某一行的数据 * @param tableName 表名 * @param row 行键 * @return * @throws IOException */
    public static Result getResult(String tableName,String row) throws IOException {
        Get get = new Get(row.getBytes());
        HTable htable  = getHTable(tableName);
        Result result = htable.get(get);
        htable.close();
        return result;

    }

    /** * 条件查询 * @param tableName 表名 * @param columnFamily 列族 * @param queryCondition 查询条件值 * @param begin 查询的起始行 * @param end 查询的终止行 * @return * @throws IOException */
    public static ResultScanner getResultScanner(String tableName,String columnFamily,String queryCondition,String begin,String end) throws IOException{

        Scan scan = new Scan();
        //设置起始行
        scan.setStartRow(Bytes.toBytes(begin));
        //设置终止行
        scan.setStopRow(Bytes.toBytes(end));

        //指定要查询的列族
        scan.addColumn(Bytes.toBytes(columnFamily),null);
        //查询列族中值等于queryCondition的记录
        Filter filter1 = new SingleColumnValueFilter(Bytes.toBytes(columnFamily),null,CompareOp.EQUAL,Bytes.toBytes(queryCondition));
        //Filter filter2 = new SingleColumnValueFilter(Bytes.toBytes(columnFamily),null,CompareOp.EQUAL,Bytes.toBytes("chuliuxiang"));

        FilterList filterList = new FilterList(Operator.MUST_PASS_ONE,Arrays.asList(filter1));

        scan.setFilter(filterList);
        HTable htable  = getHTable(tableName);

        ResultScanner rs = htable.getScanner(scan);
        htable.close();
        return rs;
    }

}